]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
53971fc7258851ef352844fb4666eee4b7062292
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/indexcmds.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/amapi.h"
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/reloptions.h"
22 #include "access/sysattr.h"
23 #include "access/tableam.h"
24 #include "access/xact.h"
25 #include "catalog/catalog.h"
26 #include "catalog/index.h"
27 #include "catalog/indexing.h"
28 #include "catalog/pg_am.h"
29 #include "catalog/pg_constraint.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_opfamily.h"
33 #include "catalog/pg_tablespace.h"
34 #include "catalog/pg_type.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/defrem.h"
38 #include "commands/event_trigger.h"
39 #include "commands/tablecmds.h"
40 #include "commands/tablespace.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "nodes/nodeFuncs.h"
45 #include "optimizer/optimizer.h"
46 #include "parser/parse_coerce.h"
47 #include "parser/parse_func.h"
48 #include "parser/parse_oper.h"
49 #include "partitioning/partdesc.h"
50 #include "rewrite/rewriteManip.h"
51 #include "storage/lmgr.h"
52 #include "storage/proc.h"
53 #include "storage/procarray.h"
54 #include "utils/acl.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/inval.h"
58 #include "utils/lsyscache.h"
59 #include "utils/memutils.h"
60 #include "utils/partcache.h"
61 #include "utils/pg_rusage.h"
62 #include "utils/regproc.h"
63 #include "utils/snapmgr.h"
64 #include "utils/syscache.h"
65
66
67 /* non-export function prototypes */
68 static void CheckPredicate(Expr *predicate);
69 static void ComputeIndexAttrs(IndexInfo *indexInfo,
70                                   Oid *typeOidP,
71                                   Oid *collationOidP,
72                                   Oid *classOidP,
73                                   int16 *colOptionP,
74                                   List *attList,
75                                   List *exclusionOpNames,
76                                   Oid relId,
77                                   const char *accessMethodName, Oid accessMethodId,
78                                   bool amcanorder,
79                                   bool isconstraint);
80 static char *ChooseIndexName(const char *tabname, Oid namespaceId,
81                                 List *colnames, List *exclusionOpNames,
82                                 bool primary, bool isconstraint);
83 static char *ChooseIndexNameAddition(List *colnames);
84 static List *ChooseIndexColumnNames(List *indexElems);
85 static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
86                                                                 Oid relId, Oid oldRelId, void *arg);
87 static bool ReindexRelationConcurrently(Oid relationOid, int options);
88 static void ReindexPartitionedIndex(Relation parentIdx);
89
90 /*
91  * CheckIndexCompatible
92  *              Determine whether an existing index definition is compatible with a
93  *              prospective index definition, such that the existing index storage
94  *              could become the storage of the new index, avoiding a rebuild.
95  *
96  * 'heapRelation': the relation the index would apply to.
97  * 'accessMethodName': name of the AM to use.
98  * 'attributeList': a list of IndexElem specifying columns and expressions
99  *              to index on.
100  * 'exclusionOpNames': list of names of exclusion-constraint operators,
101  *              or NIL if not an exclusion constraint.
102  *
103  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
104  * any indexes that depended on a changing column from their pg_get_indexdef
105  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
106  * DefineIndex.  We assume that the old and new indexes have the same number
107  * of columns and that if one has an expression column or predicate, both do.
108  * Errors arising from the attribute list still apply.
109  *
110  * Most column type changes that can skip a table rewrite do not invalidate
111  * indexes.  We acknowledge this when all operator classes, collations and
112  * exclusion operators match.  Though we could further permit intra-opfamily
113  * changes for btree and hash indexes, that adds subtle complexity with no
114  * concrete benefit for core types. Note, that INCLUDE columns aren't
115  * checked by this function, for them it's enough that table rewrite is
116  * skipped.
117  *
118  * When a comparison or exclusion operator has a polymorphic input type, the
119  * actual input types must also match.  This defends against the possibility
120  * that operators could vary behavior in response to get_fn_expr_argtype().
121  * At present, this hazard is theoretical: check_exclusion_constraint() and
122  * all core index access methods decline to set fn_expr for such calls.
123  *
124  * We do not yet implement a test to verify compatibility of expression
125  * columns or predicates, so assume any such index is incompatible.
126  */
127 bool
128 CheckIndexCompatible(Oid oldId,
129                                          const char *accessMethodName,
130                                          List *attributeList,
131                                          List *exclusionOpNames)
132 {
133         bool            isconstraint;
134         Oid                *typeObjectId;
135         Oid                *collationObjectId;
136         Oid                *classObjectId;
137         Oid                     accessMethodId;
138         Oid                     relationId;
139         HeapTuple       tuple;
140         Form_pg_index indexForm;
141         Form_pg_am      accessMethodForm;
142         IndexAmRoutine *amRoutine;
143         bool            amcanorder;
144         int16      *coloptions;
145         IndexInfo  *indexInfo;
146         int                     numberOfAttributes;
147         int                     old_natts;
148         bool            isnull;
149         bool            ret = true;
150         oidvector  *old_indclass;
151         oidvector  *old_indcollation;
152         Relation        irel;
153         int                     i;
154         Datum           d;
155
156         /* Caller should already have the relation locked in some way. */
157         relationId = IndexGetRelation(oldId, false);
158
159         /*
160          * We can pretend isconstraint = false unconditionally.  It only serves to
161          * decide the text of an error message that should never happen for us.
162          */
163         isconstraint = false;
164
165         numberOfAttributes = list_length(attributeList);
166         Assert(numberOfAttributes > 0);
167         Assert(numberOfAttributes <= INDEX_MAX_KEYS);
168
169         /* look up the access method */
170         tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
171         if (!HeapTupleIsValid(tuple))
172                 ereport(ERROR,
173                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
174                                  errmsg("access method \"%s\" does not exist",
175                                                 accessMethodName)));
176         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
177         accessMethodId = accessMethodForm->oid;
178         amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
179         ReleaseSysCache(tuple);
180
181         amcanorder = amRoutine->amcanorder;
182
183         /*
184          * Compute the operator classes, collations, and exclusion operators for
185          * the new index, so we can test whether it's compatible with the existing
186          * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
187          * DefineIndex would have called this function with the same arguments
188          * later on, and it would have failed then anyway.  Our attributeList
189          * contains only key attributes, thus we're filling ii_NumIndexAttrs and
190          * ii_NumIndexKeyAttrs with same value.
191          */
192         indexInfo = makeNode(IndexInfo);
193         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
194         indexInfo->ii_NumIndexKeyAttrs = numberOfAttributes;
195         indexInfo->ii_Expressions = NIL;
196         indexInfo->ii_ExpressionsState = NIL;
197         indexInfo->ii_PredicateState = NULL;
198         indexInfo->ii_ExclusionOps = NULL;
199         indexInfo->ii_ExclusionProcs = NULL;
200         indexInfo->ii_ExclusionStrats = NULL;
201         indexInfo->ii_Am = accessMethodId;
202         indexInfo->ii_AmCache = NULL;
203         indexInfo->ii_Context = CurrentMemoryContext;
204         typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
205         collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
206         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
207         coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
208         ComputeIndexAttrs(indexInfo,
209                                           typeObjectId, collationObjectId, classObjectId,
210                                           coloptions, attributeList,
211                                           exclusionOpNames, relationId,
212                                           accessMethodName, accessMethodId,
213                                           amcanorder, isconstraint);
214
215
216         /* Get the soon-obsolete pg_index tuple. */
217         tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
218         if (!HeapTupleIsValid(tuple))
219                 elog(ERROR, "cache lookup failed for index %u", oldId);
220         indexForm = (Form_pg_index) GETSTRUCT(tuple);
221
222         /*
223          * We don't assess expressions or predicates; assume incompatibility.
224          * Also, if the index is invalid for any reason, treat it as incompatible.
225          */
226         if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
227                   heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
228                   indexForm->indisvalid))
229         {
230                 ReleaseSysCache(tuple);
231                 return false;
232         }
233
234         /* Any change in operator class or collation breaks compatibility. */
235         old_natts = indexForm->indnkeyatts;
236         Assert(old_natts == numberOfAttributes);
237
238         d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
239         Assert(!isnull);
240         old_indcollation = (oidvector *) DatumGetPointer(d);
241
242         d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indclass, &isnull);
243         Assert(!isnull);
244         old_indclass = (oidvector *) DatumGetPointer(d);
245
246         ret = (memcmp(old_indclass->values, classObjectId,
247                                   old_natts * sizeof(Oid)) == 0 &&
248                    memcmp(old_indcollation->values, collationObjectId,
249                                   old_natts * sizeof(Oid)) == 0);
250
251         ReleaseSysCache(tuple);
252
253         if (!ret)
254                 return false;
255
256         /* For polymorphic opcintype, column type changes break compatibility. */
257         irel = index_open(oldId, AccessShareLock);      /* caller probably has a lock */
258         for (i = 0; i < old_natts; i++)
259         {
260                 if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
261                         TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
262                 {
263                         ret = false;
264                         break;
265                 }
266         }
267
268         /* Any change in exclusion operator selections breaks compatibility. */
269         if (ret && indexInfo->ii_ExclusionOps != NULL)
270         {
271                 Oid                *old_operators,
272                                    *old_procs;
273                 uint16     *old_strats;
274
275                 RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
276                 ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
277                                          old_natts * sizeof(Oid)) == 0;
278
279                 /* Require an exact input type match for polymorphic operators. */
280                 if (ret)
281                 {
282                         for (i = 0; i < old_natts && ret; i++)
283                         {
284                                 Oid                     left,
285                                                         right;
286
287                                 op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
288                                 if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
289                                         TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
290                                 {
291                                         ret = false;
292                                         break;
293                                 }
294                         }
295                 }
296         }
297
298         index_close(irel, NoLock);
299         return ret;
300 }
301
302
303 /*
304  * WaitForOlderSnapshots
305  *
306  * Wait for transactions that might have an older snapshot than the given xmin
307  * limit, because it might not contain tuples deleted just before it has
308  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
309  * individually. This is used when building an index concurrently.
310  *
311  * We can exclude any running transactions that have xmin > the xmin given;
312  * their oldest snapshot must be newer than our xmin limit.
313  * We can also exclude any transactions that have xmin = zero, since they
314  * evidently have no live snapshot at all (and any one they might be in
315  * process of taking is certainly newer than ours).  Transactions in other
316  * DBs can be ignored too, since they'll never even be able to see the
317  * index being worked on.
318  *
319  * We can also exclude autovacuum processes and processes running manual
320  * lazy VACUUMs, because they won't be fazed by missing index entries
321  * either.  (Manual ANALYZEs, however, can't be excluded because they
322  * might be within transactions that are going to do arbitrary operations
323  * later.)
324  *
325  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
326  * check for that.
327  *
328  * If a process goes idle-in-transaction with xmin zero, we do not need to
329  * wait for it anymore, per the above argument.  We do not have the
330  * infrastructure right now to stop waiting if that happens, but we can at
331  * least avoid the folly of waiting when it is idle at the time we would
332  * begin to wait.  We do this by repeatedly rechecking the output of
333  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
334  * doesn't show up in the output, we know we can forget about it.
335  */
336 static void
337 WaitForOlderSnapshots(TransactionId limitXmin)
338 {
339         int                     n_old_snapshots;
340         int                     i;
341         VirtualTransactionId *old_snapshots;
342
343         old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
344                                                                                   PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
345                                                                                   &n_old_snapshots);
346
347         for (i = 0; i < n_old_snapshots; i++)
348         {
349                 if (!VirtualTransactionIdIsValid(old_snapshots[i]))
350                         continue;                       /* found uninteresting in previous cycle */
351
352                 if (i > 0)
353                 {
354                         /* see if anything's changed ... */
355                         VirtualTransactionId *newer_snapshots;
356                         int                     n_newer_snapshots;
357                         int                     j;
358                         int                     k;
359
360                         newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
361                                                                                                         true, false,
362                                                                                                         PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
363                                                                                                         &n_newer_snapshots);
364                         for (j = i; j < n_old_snapshots; j++)
365                         {
366                                 if (!VirtualTransactionIdIsValid(old_snapshots[j]))
367                                         continue;       /* found uninteresting in previous cycle */
368                                 for (k = 0; k < n_newer_snapshots; k++)
369                                 {
370                                         if (VirtualTransactionIdEquals(old_snapshots[j],
371                                                                                                    newer_snapshots[k]))
372                                                 break;
373                                 }
374                                 if (k >= n_newer_snapshots) /* not there anymore */
375                                         SetInvalidVirtualTransactionId(old_snapshots[j]);
376                         }
377                         pfree(newer_snapshots);
378                 }
379
380                 if (VirtualTransactionIdIsValid(old_snapshots[i]))
381                         VirtualXactLock(old_snapshots[i], true);
382         }
383 }
384
385
386 /*
387  * DefineIndex
388  *              Creates a new index.
389  *
390  * 'relationId': the OID of the heap relation on which the index is to be
391  *              created
392  * 'stmt': IndexStmt describing the properties of the new index.
393  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
394  *              nonzero to specify a preselected OID for the index.
395  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
396  *              of a partitioned index.
397  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
398  *              the child of a constraint (only used when recursing)
399  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
400  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
401  *              should be true except when ALTER is deleting/recreating an index.)
402  * 'check_not_in_use': check for table not already in use in current session.
403  *              This should be true unless caller is holding the table open, in which
404  *              case the caller had better have checked it earlier.
405  * 'skip_build': make the catalog entries but don't create the index files
406  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
407  *
408  * Returns the object address of the created index.
409  */
410 ObjectAddress
411 DefineIndex(Oid relationId,
412                         IndexStmt *stmt,
413                         Oid indexRelationId,
414                         Oid parentIndexId,
415                         Oid parentConstraintId,
416                         bool is_alter_table,
417                         bool check_rights,
418                         bool check_not_in_use,
419                         bool skip_build,
420                         bool quiet)
421 {
422         char       *indexRelationName;
423         char       *accessMethodName;
424         Oid                *typeObjectId;
425         Oid                *collationObjectId;
426         Oid                *classObjectId;
427         Oid                     accessMethodId;
428         Oid                     namespaceId;
429         Oid                     tablespaceId;
430         Oid                     createdConstraintId = InvalidOid;
431         List       *indexColNames;
432         List       *allIndexParams;
433         Relation        rel;
434         HeapTuple       tuple;
435         Form_pg_am      accessMethodForm;
436         IndexAmRoutine *amRoutine;
437         bool            amcanorder;
438         amoptions_function amoptions;
439         bool            partitioned;
440         Datum           reloptions;
441         int16      *coloptions;
442         IndexInfo  *indexInfo;
443         bits16          flags;
444         bits16          constr_flags;
445         int                     numberOfAttributes;
446         int                     numberOfKeyAttributes;
447         TransactionId limitXmin;
448         ObjectAddress address;
449         LockRelId       heaprelid;
450         LOCKTAG         heaplocktag;
451         LOCKMODE        lockmode;
452         Snapshot        snapshot;
453         int                     i;
454
455         /*
456          * count key attributes in index
457          */
458         numberOfKeyAttributes = list_length(stmt->indexParams);
459
460         /*
461          * Calculate the new list of index columns including both key columns and
462          * INCLUDE columns.  Later we can determine which of these are key
463          * columns, and which are just part of the INCLUDE list by checking the
464          * list position.  A list item in a position less than ii_NumIndexKeyAttrs
465          * is part of the key columns, and anything equal to and over is part of
466          * the INCLUDE columns.
467          */
468         allIndexParams = list_concat(list_copy(stmt->indexParams),
469                                                                  list_copy(stmt->indexIncludingParams));
470         numberOfAttributes = list_length(allIndexParams);
471
472         if (numberOfAttributes <= 0)
473                 ereport(ERROR,
474                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
475                                  errmsg("must specify at least one column")));
476         if (numberOfAttributes > INDEX_MAX_KEYS)
477                 ereport(ERROR,
478                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
479                                  errmsg("cannot use more than %d columns in an index",
480                                                 INDEX_MAX_KEYS)));
481
482         /*
483          * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
484          * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
485          * (but not VACUUM).
486          *
487          * NB: Caller is responsible for making sure that relationId refers to the
488          * relation on which the index should be built; except in bootstrap mode,
489          * this will typically require the caller to have already locked the
490          * relation.  To avoid lock upgrade hazards, that lock should be at least
491          * as strong as the one we take here.
492          *
493          * NB: If the lock strength here ever changes, code that is run by
494          * parallel workers under the control of certain particular ambuild
495          * functions will need to be updated, too.
496          */
497         lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock;
498         rel = table_open(relationId, lockmode);
499
500         namespaceId = RelationGetNamespace(rel);
501
502         /* Ensure that it makes sense to index this kind of relation */
503         switch (rel->rd_rel->relkind)
504         {
505                 case RELKIND_RELATION:
506                 case RELKIND_MATVIEW:
507                 case RELKIND_PARTITIONED_TABLE:
508                         /* OK */
509                         break;
510                 case RELKIND_FOREIGN_TABLE:
511
512                         /*
513                          * Custom error message for FOREIGN TABLE since the term is close
514                          * to a regular table and can confuse the user.
515                          */
516                         ereport(ERROR,
517                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
518                                          errmsg("cannot create index on foreign table \"%s\"",
519                                                         RelationGetRelationName(rel))));
520                         break;
521                 default:
522                         ereport(ERROR,
523                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
524                                          errmsg("\"%s\" is not a table or materialized view",
525                                                         RelationGetRelationName(rel))));
526                         break;
527         }
528
529         /*
530          * Establish behavior for partitioned tables, and verify sanity of
531          * parameters.
532          *
533          * We do not build an actual index in this case; we only create a few
534          * catalog entries.  The actual indexes are built by recursing for each
535          * partition.
536          */
537         partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
538         if (partitioned)
539         {
540                 if (stmt->concurrent)
541                         ereport(ERROR,
542                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
543                                          errmsg("cannot create index on partitioned table \"%s\" concurrently",
544                                                         RelationGetRelationName(rel))));
545                 if (stmt->excludeOpNames)
546                         ereport(ERROR,
547                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
548                                          errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
549                                                         RelationGetRelationName(rel))));
550         }
551
552         /*
553          * Don't try to CREATE INDEX on temp tables of other backends.
554          */
555         if (RELATION_IS_OTHER_TEMP(rel))
556                 ereport(ERROR,
557                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
558                                  errmsg("cannot create indexes on temporary tables of other sessions")));
559
560         /*
561          * Unless our caller vouches for having checked this already, insist that
562          * the table not be in use by our own session, either.  Otherwise we might
563          * fail to make entries in the new index (for instance, if an INSERT or
564          * UPDATE is in progress and has already made its list of target indexes).
565          */
566         if (check_not_in_use)
567                 CheckTableNotInUse(rel, "CREATE INDEX");
568
569         /*
570          * Verify we (still) have CREATE rights in the rel's namespace.
571          * (Presumably we did when the rel was created, but maybe not anymore.)
572          * Skip check if caller doesn't want it.  Also skip check if
573          * bootstrapping, since permissions machinery may not be working yet.
574          */
575         if (check_rights && !IsBootstrapProcessingMode())
576         {
577                 AclResult       aclresult;
578
579                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
580                                                                                   ACL_CREATE);
581                 if (aclresult != ACLCHECK_OK)
582                         aclcheck_error(aclresult, OBJECT_SCHEMA,
583                                                    get_namespace_name(namespaceId));
584         }
585
586         /*
587          * Select tablespace to use.  If not specified, use default tablespace
588          * (which may in turn default to database's default).
589          */
590         if (stmt->tableSpace)
591         {
592                 tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
593         }
594         else
595         {
596                 tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence);
597                 /* note InvalidOid is OK in this case */
598         }
599
600         /* Check tablespace permissions */
601         if (check_rights &&
602                 OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
603         {
604                 AclResult       aclresult;
605
606                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
607                                                                                    ACL_CREATE);
608                 if (aclresult != ACLCHECK_OK)
609                         aclcheck_error(aclresult, OBJECT_TABLESPACE,
610                                                    get_tablespace_name(tablespaceId));
611         }
612
613         /*
614          * Force shared indexes into the pg_global tablespace.  This is a bit of a
615          * hack but seems simpler than marking them in the BKI commands.  On the
616          * other hand, if it's not shared, don't allow it to be placed there.
617          */
618         if (rel->rd_rel->relisshared)
619                 tablespaceId = GLOBALTABLESPACE_OID;
620         else if (tablespaceId == GLOBALTABLESPACE_OID)
621                 ereport(ERROR,
622                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
623                                  errmsg("only shared relations can be placed in pg_global tablespace")));
624
625         /*
626          * Choose the index column names.
627          */
628         indexColNames = ChooseIndexColumnNames(allIndexParams);
629
630         /*
631          * Select name for index if caller didn't specify
632          */
633         indexRelationName = stmt->idxname;
634         if (indexRelationName == NULL)
635                 indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
636                                                                                         namespaceId,
637                                                                                         indexColNames,
638                                                                                         stmt->excludeOpNames,
639                                                                                         stmt->primary,
640                                                                                         stmt->isconstraint);
641
642         /*
643          * look up the access method, verify it can handle the requested features
644          */
645         accessMethodName = stmt->accessMethod;
646         tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
647         if (!HeapTupleIsValid(tuple))
648         {
649                 /*
650                  * Hack to provide more-or-less-transparent updating of old RTREE
651                  * indexes to GiST: if RTREE is requested and not found, use GIST.
652                  */
653                 if (strcmp(accessMethodName, "rtree") == 0)
654                 {
655                         ereport(NOTICE,
656                                         (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
657                         accessMethodName = "gist";
658                         tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
659                 }
660
661                 if (!HeapTupleIsValid(tuple))
662                         ereport(ERROR,
663                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
664                                          errmsg("access method \"%s\" does not exist",
665                                                         accessMethodName)));
666         }
667         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
668         accessMethodId = accessMethodForm->oid;
669         amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
670
671         if (stmt->unique && !amRoutine->amcanunique)
672                 ereport(ERROR,
673                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
674                                  errmsg("access method \"%s\" does not support unique indexes",
675                                                 accessMethodName)));
676         if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
677                 ereport(ERROR,
678                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
679                                  errmsg("access method \"%s\" does not support included columns",
680                                                 accessMethodName)));
681         if (numberOfAttributes > 1 && !amRoutine->amcanmulticol)
682                 ereport(ERROR,
683                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
684                                  errmsg("access method \"%s\" does not support multicolumn indexes",
685                                                 accessMethodName)));
686         if (stmt->excludeOpNames && amRoutine->amgettuple == NULL)
687                 ereport(ERROR,
688                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
689                                  errmsg("access method \"%s\" does not support exclusion constraints",
690                                                 accessMethodName)));
691
692         amcanorder = amRoutine->amcanorder;
693         amoptions = amRoutine->amoptions;
694
695         pfree(amRoutine);
696         ReleaseSysCache(tuple);
697
698         /*
699          * Validate predicate, if given
700          */
701         if (stmt->whereClause)
702                 CheckPredicate((Expr *) stmt->whereClause);
703
704         /*
705          * Parse AM-specific options, convert to text array form, validate.
706          */
707         reloptions = transformRelOptions((Datum) 0, stmt->options,
708                                                                          NULL, NULL, false, false);
709
710         (void) index_reloptions(amoptions, reloptions, true);
711
712         /*
713          * Prepare arguments for index_create, primarily an IndexInfo structure.
714          * Note that ii_Predicate must be in implicit-AND format.
715          */
716         indexInfo = makeNode(IndexInfo);
717         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
718         indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
719         indexInfo->ii_Expressions = NIL;        /* for now */
720         indexInfo->ii_ExpressionsState = NIL;
721         indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
722         indexInfo->ii_PredicateState = NULL;
723         indexInfo->ii_ExclusionOps = NULL;
724         indexInfo->ii_ExclusionProcs = NULL;
725         indexInfo->ii_ExclusionStrats = NULL;
726         indexInfo->ii_Unique = stmt->unique;
727         /* In a concurrent build, mark it not-ready-for-inserts */
728         indexInfo->ii_ReadyForInserts = !stmt->concurrent;
729         indexInfo->ii_Concurrent = stmt->concurrent;
730         indexInfo->ii_BrokenHotChain = false;
731         indexInfo->ii_ParallelWorkers = 0;
732         indexInfo->ii_Am = accessMethodId;
733         indexInfo->ii_AmCache = NULL;
734         indexInfo->ii_Context = CurrentMemoryContext;
735
736         typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
737         collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
738         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
739         coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
740         ComputeIndexAttrs(indexInfo,
741                                           typeObjectId, collationObjectId, classObjectId,
742                                           coloptions, allIndexParams,
743                                           stmt->excludeOpNames, relationId,
744                                           accessMethodName, accessMethodId,
745                                           amcanorder, stmt->isconstraint);
746
747         /*
748          * Extra checks when creating a PRIMARY KEY index.
749          */
750         if (stmt->primary)
751                 index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
752
753         /*
754          * If this table is partitioned and we're creating a unique index or a
755          * primary key, make sure that the indexed columns are part of the
756          * partition key.  Otherwise it would be possible to violate uniqueness by
757          * putting values that ought to be unique in different partitions.
758          *
759          * We could lift this limitation if we had global indexes, but those have
760          * their own problems, so this is a useful feature combination.
761          */
762         if (partitioned && (stmt->unique || stmt->primary))
763         {
764                 PartitionKey key = rel->rd_partkey;
765                 int                     i;
766
767                 /*
768                  * A partitioned table can have unique indexes, as long as all the
769                  * columns in the partition key appear in the unique key.  A
770                  * partition-local index can enforce global uniqueness iff the PK
771                  * value completely determines the partition that a row is in.
772                  *
773                  * Thus, verify that all the columns in the partition key appear in
774                  * the unique key definition.
775                  */
776                 for (i = 0; i < key->partnatts; i++)
777                 {
778                         bool            found = false;
779                         int                     j;
780                         const char *constraint_type;
781
782                         if (stmt->primary)
783                                 constraint_type = "PRIMARY KEY";
784                         else if (stmt->unique)
785                                 constraint_type = "UNIQUE";
786                         else if (stmt->excludeOpNames != NIL)
787                                 constraint_type = "EXCLUDE";
788                         else
789                         {
790                                 elog(ERROR, "unknown constraint type");
791                                 constraint_type = NULL; /* keep compiler quiet */
792                         }
793
794                         /*
795                          * It may be possible to support UNIQUE constraints when partition
796                          * keys are expressions, but is it worth it?  Give up for now.
797                          */
798                         if (key->partattrs[i] == 0)
799                                 ereport(ERROR,
800                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
801                                                  errmsg("unsupported %s constraint with partition key definition",
802                                                                 constraint_type),
803                                                  errdetail("%s constraints cannot be used when partition keys include expressions.",
804                                                                    constraint_type)));
805
806                         for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
807                         {
808                                 if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
809                                 {
810                                         found = true;
811                                         break;
812                                 }
813                         }
814                         if (!found)
815                         {
816                                 Form_pg_attribute att;
817
818                                 att = TupleDescAttr(RelationGetDescr(rel), key->partattrs[i] - 1);
819                                 ereport(ERROR,
820                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
821                                                  errmsg("insufficient columns in %s constraint definition",
822                                                                 constraint_type),
823                                                  errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
824                                                                    constraint_type, RelationGetRelationName(rel),
825                                                                    NameStr(att->attname))));
826                         }
827                 }
828         }
829
830
831         /*
832          * We disallow indexes on system columns.  They would not necessarily get
833          * updated correctly, and they don't seem useful anyway.
834          */
835         for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
836         {
837                 AttrNumber      attno = indexInfo->ii_IndexAttrNumbers[i];
838
839                 if (attno < 0)
840                         ereport(ERROR,
841                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842                                          errmsg("index creation on system columns is not supported")));
843         }
844
845         /*
846          * Also check for system columns used in expressions or predicates.
847          */
848         if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
849         {
850                 Bitmapset  *indexattrs = NULL;
851
852                 pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
853                 pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
854
855                 for (i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
856                 {
857                         if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
858                                                           indexattrs))
859                                 ereport(ERROR,
860                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
861                                                  errmsg("index creation on system columns is not supported")));
862                 }
863         }
864
865         /*
866          * Report index creation if appropriate (delay this till after most of the
867          * error checks)
868          */
869         if (stmt->isconstraint && !quiet)
870         {
871                 const char *constraint_type;
872
873                 if (stmt->primary)
874                         constraint_type = "PRIMARY KEY";
875                 else if (stmt->unique)
876                         constraint_type = "UNIQUE";
877                 else if (stmt->excludeOpNames != NIL)
878                         constraint_type = "EXCLUDE";
879                 else
880                 {
881                         elog(ERROR, "unknown constraint type");
882                         constraint_type = NULL; /* keep compiler quiet */
883                 }
884
885                 ereport(DEBUG1,
886                                 (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
887                                                 is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
888                                                 constraint_type,
889                                                 indexRelationName, RelationGetRelationName(rel))));
890         }
891
892         /*
893          * A valid stmt->oldNode implies that we already have a built form of the
894          * index.  The caller should also decline any index build.
895          */
896         Assert(!OidIsValid(stmt->oldNode) || (skip_build && !stmt->concurrent));
897
898         /*
899          * Make the catalog entries for the index, including constraints. This
900          * step also actually builds the index, except if caller requested not to
901          * or in concurrent mode, in which case it'll be done later, or doing a
902          * partitioned index (because those don't have storage).
903          */
904         flags = constr_flags = 0;
905         if (stmt->isconstraint)
906                 flags |= INDEX_CREATE_ADD_CONSTRAINT;
907         if (skip_build || stmt->concurrent || partitioned)
908                 flags |= INDEX_CREATE_SKIP_BUILD;
909         if (stmt->if_not_exists)
910                 flags |= INDEX_CREATE_IF_NOT_EXISTS;
911         if (stmt->concurrent)
912                 flags |= INDEX_CREATE_CONCURRENT;
913         if (partitioned)
914                 flags |= INDEX_CREATE_PARTITIONED;
915         if (stmt->primary)
916                 flags |= INDEX_CREATE_IS_PRIMARY;
917
918         /*
919          * If the table is partitioned, and recursion was declined but partitions
920          * exist, mark the index as invalid.
921          */
922         if (partitioned && stmt->relation && !stmt->relation->inh)
923         {
924                 PartitionDesc   pd = RelationGetPartitionDesc(rel);
925
926                 if (pd->nparts != 0)
927                         flags |= INDEX_CREATE_INVALID;
928         }
929
930         if (stmt->deferrable)
931                 constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
932         if (stmt->initdeferred)
933                 constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
934
935         indexRelationId =
936                 index_create(rel, indexRelationName, indexRelationId, parentIndexId,
937                                          parentConstraintId,
938                                          stmt->oldNode, indexInfo, indexColNames,
939                                          accessMethodId, tablespaceId,
940                                          collationObjectId, classObjectId,
941                                          coloptions, reloptions,
942                                          flags, constr_flags,
943                                          allowSystemTableMods, !check_rights,
944                                          &createdConstraintId);
945
946         ObjectAddressSet(address, RelationRelationId, indexRelationId);
947
948         if (!OidIsValid(indexRelationId))
949         {
950                 table_close(rel, NoLock);
951                 return address;
952         }
953
954         /* Add any requested comment */
955         if (stmt->idxcomment != NULL)
956                 CreateComments(indexRelationId, RelationRelationId, 0,
957                                            stmt->idxcomment);
958
959         if (partitioned)
960         {
961                 /*
962                  * Unless caller specified to skip this step (via ONLY), process each
963                  * partition to make sure they all contain a corresponding index.
964                  *
965                  * If we're called internally (no stmt->relation), recurse always.
966                  */
967                 if (!stmt->relation || stmt->relation->inh)
968                 {
969                         PartitionDesc partdesc = RelationGetPartitionDesc(rel);
970                         int                     nparts = partdesc->nparts;
971                         Oid                *part_oids = palloc(sizeof(Oid) * nparts);
972                         bool            invalidate_parent = false;
973                         TupleDesc       parentDesc;
974                         Oid                *opfamOids;
975
976                         memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
977
978                         parentDesc = CreateTupleDescCopy(RelationGetDescr(rel));
979                         opfamOids = palloc(sizeof(Oid) * numberOfKeyAttributes);
980                         for (i = 0; i < numberOfKeyAttributes; i++)
981                                 opfamOids[i] = get_opclass_family(classObjectId[i]);
982
983                         table_close(rel, NoLock);
984
985                         /*
986                          * For each partition, scan all existing indexes; if one matches
987                          * our index definition and is not already attached to some other
988                          * parent index, attach it to the one we just created.
989                          *
990                          * If none matches, build a new index by calling ourselves
991                          * recursively with the same options (except for the index name).
992                          */
993                         for (i = 0; i < nparts; i++)
994                         {
995                                 Oid                     childRelid = part_oids[i];
996                                 Relation        childrel;
997                                 List       *childidxs;
998                                 ListCell   *cell;
999                                 AttrNumber *attmap;
1000                                 bool            found = false;
1001                                 int                     maplen;
1002
1003                                 childrel = table_open(childRelid, lockmode);
1004                                 childidxs = RelationGetIndexList(childrel);
1005                                 attmap =
1006                                         convert_tuples_by_name_map(RelationGetDescr(childrel),
1007                                                                                            parentDesc,
1008                                                                                            gettext_noop("could not convert row type"));
1009                                 maplen = parentDesc->natts;
1010
1011                                 foreach(cell, childidxs)
1012                                 {
1013                                         Oid                     cldidxid = lfirst_oid(cell);
1014                                         Relation        cldidx;
1015                                         IndexInfo  *cldIdxInfo;
1016
1017                                         /* this index is already partition of another one */
1018                                         if (has_superclass(cldidxid))
1019                                                 continue;
1020
1021                                         cldidx = index_open(cldidxid, lockmode);
1022                                         cldIdxInfo = BuildIndexInfo(cldidx);
1023                                         if (CompareIndexInfo(cldIdxInfo, indexInfo,
1024                                                                                  cldidx->rd_indcollation,
1025                                                                                  collationObjectId,
1026                                                                                  cldidx->rd_opfamily,
1027                                                                                  opfamOids,
1028                                                                                  attmap, maplen))
1029                                         {
1030                                                 Oid                     cldConstrOid = InvalidOid;
1031
1032                                                 /*
1033                                                  * Found a match.
1034                                                  *
1035                                                  * If this index is being created in the parent
1036                                                  * because of a constraint, then the child needs to
1037                                                  * have a constraint also, so look for one.  If there
1038                                                  * is no such constraint, this index is no good, so
1039                                                  * keep looking.
1040                                                  */
1041                                                 if (createdConstraintId != InvalidOid)
1042                                                 {
1043                                                         cldConstrOid =
1044                                                                 get_relation_idx_constraint_oid(childRelid,
1045                                                                                                                                 cldidxid);
1046                                                         if (cldConstrOid == InvalidOid)
1047                                                         {
1048                                                                 index_close(cldidx, lockmode);
1049                                                                 continue;
1050                                                         }
1051                                                 }
1052
1053                                                 /* Attach index to parent and we're done. */
1054                                                 IndexSetParentIndex(cldidx, indexRelationId);
1055                                                 if (createdConstraintId != InvalidOid)
1056                                                         ConstraintSetParentConstraint(cldConstrOid,
1057                                                                                                                   createdConstraintId,
1058                                                                                                                   childRelid);
1059
1060                                                 if (!cldidx->rd_index->indisvalid)
1061                                                         invalidate_parent = true;
1062
1063                                                 found = true;
1064                                                 /* keep lock till commit */
1065                                                 index_close(cldidx, NoLock);
1066                                                 break;
1067                                         }
1068
1069                                         index_close(cldidx, lockmode);
1070                                 }
1071
1072                                 list_free(childidxs);
1073                                 table_close(childrel, NoLock);
1074
1075                                 /*
1076                                  * If no matching index was found, create our own.
1077                                  */
1078                                 if (!found)
1079                                 {
1080                                         IndexStmt  *childStmt = copyObject(stmt);
1081                                         bool            found_whole_row;
1082                                         ListCell   *lc;
1083
1084                                         /*
1085                                          * Adjust any Vars (both in expressions and in the index's
1086                                          * WHERE clause) to match the partition's column numbering
1087                                          * in case it's different from the parent's.
1088                                          */
1089                                         foreach(lc, childStmt->indexParams)
1090                                         {
1091                                                 IndexElem  *ielem = lfirst(lc);
1092
1093                                                 /*
1094                                                  * If the index parameter is an expression, we must
1095                                                  * translate it to contain child Vars.
1096                                                  */
1097                                                 if (ielem->expr)
1098                                                 {
1099                                                         ielem->expr =
1100                                                                 map_variable_attnos((Node *) ielem->expr,
1101                                                                                                         1, 0, attmap, maplen,
1102                                                                                                         InvalidOid,
1103                                                                                                         &found_whole_row);
1104                                                         if (found_whole_row)
1105                                                                 elog(ERROR, "cannot convert whole-row table reference");
1106                                                 }
1107                                         }
1108                                         childStmt->whereClause =
1109                                                 map_variable_attnos(stmt->whereClause, 1, 0,
1110                                                                                         attmap, maplen,
1111                                                                                         InvalidOid, &found_whole_row);
1112                                         if (found_whole_row)
1113                                                 elog(ERROR, "cannot convert whole-row table reference");
1114
1115                                         childStmt->idxname = NULL;
1116                                         childStmt->relation = NULL;
1117                                         DefineIndex(childRelid, childStmt,
1118                                                                 InvalidOid, /* no predefined OID */
1119                                                                 indexRelationId,        /* this is our child */
1120                                                                 createdConstraintId,
1121                                                                 is_alter_table, check_rights, check_not_in_use,
1122                                                                 skip_build, quiet);
1123                                 }
1124
1125                                 pfree(attmap);
1126                         }
1127
1128                         /*
1129                          * The pg_index row we inserted for this index was marked
1130                          * indisvalid=true.  But if we attached an existing index that is
1131                          * invalid, this is incorrect, so update our row to invalid too.
1132                          */
1133                         if (invalidate_parent)
1134                         {
1135                                 Relation        pg_index = table_open(IndexRelationId, RowExclusiveLock);
1136                                 HeapTuple       tup,
1137                                                         newtup;
1138
1139                                 tup = SearchSysCache1(INDEXRELID,
1140                                                                           ObjectIdGetDatum(indexRelationId));
1141                                 if (!tup)
1142                                         elog(ERROR, "cache lookup failed for index %u",
1143                                                  indexRelationId);
1144                                 newtup = heap_copytuple(tup);
1145                                 ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
1146                                 CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
1147                                 ReleaseSysCache(tup);
1148                                 table_close(pg_index, RowExclusiveLock);
1149                                 heap_freetuple(newtup);
1150                         }
1151                 }
1152                 else
1153                         table_close(rel, NoLock);
1154
1155                 /*
1156                  * Indexes on partitioned tables are not themselves built, so we're
1157                  * done here.
1158                  */
1159                 return address;
1160         }
1161
1162         if (!stmt->concurrent)
1163         {
1164                 /* Close the heap and we're done, in the non-concurrent case */
1165                 table_close(rel, NoLock);
1166                 return address;
1167         }
1168
1169         /* save lockrelid and locktag for below, then close rel */
1170         heaprelid = rel->rd_lockInfo.lockRelId;
1171         SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
1172         table_close(rel, NoLock);
1173
1174         /*
1175          * For a concurrent build, it's important to make the catalog entries
1176          * visible to other transactions before we start to build the index. That
1177          * will prevent them from making incompatible HOT updates.  The new index
1178          * will be marked not indisready and not indisvalid, so that no one else
1179          * tries to either insert into it or use it for queries.
1180          *
1181          * We must commit our current transaction so that the index becomes
1182          * visible; then start another.  Note that all the data structures we just
1183          * built are lost in the commit.  The only data we keep past here are the
1184          * relation IDs.
1185          *
1186          * Before committing, get a session-level lock on the table, to ensure
1187          * that neither it nor the index can be dropped before we finish. This
1188          * cannot block, even if someone else is waiting for access, because we
1189          * already have the same lock within our transaction.
1190          *
1191          * Note: we don't currently bother with a session lock on the index,
1192          * because there are no operations that could change its state while we
1193          * hold lock on the parent table.  This might need to change later.
1194          */
1195         LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1196
1197         PopActiveSnapshot();
1198         CommitTransactionCommand();
1199         StartTransactionCommand();
1200
1201         /*
1202          * Phase 2 of concurrent index build (see comments for validate_index()
1203          * for an overview of how this works)
1204          *
1205          * Now we must wait until no running transaction could have the table open
1206          * with the old list of indexes.  Use ShareLock to consider running
1207          * transactions that hold locks that permit writing to the table.  Note we
1208          * do not need to worry about xacts that open the table for writing after
1209          * this point; they will see the new index when they open it.
1210          *
1211          * Note: the reason we use actual lock acquisition here, rather than just
1212          * checking the ProcArray and sleeping, is that deadlock is possible if
1213          * one of the transactions in question is blocked trying to acquire an
1214          * exclusive lock on our table.  The lock code will detect deadlock and
1215          * error out properly.
1216          */
1217         WaitForLockers(heaplocktag, ShareLock);
1218
1219         /*
1220          * At this moment we are sure that there are no transactions with the
1221          * table open for write that don't have this new index in their list of
1222          * indexes.  We have waited out all the existing transactions and any new
1223          * transaction will have the new index in its list, but the index is still
1224          * marked as "not-ready-for-inserts".  The index is consulted while
1225          * deciding HOT-safety though.  This arrangement ensures that no new HOT
1226          * chains can be created where the new tuple and the old tuple in the
1227          * chain have different index keys.
1228          *
1229          * We now take a new snapshot, and build the index using all tuples that
1230          * are visible in this snapshot.  We can be sure that any HOT updates to
1231          * these tuples will be compatible with the index, since any updates made
1232          * by transactions that didn't know about the index are now committed or
1233          * rolled back.  Thus, each visible tuple is either the end of its
1234          * HOT-chain or the extension of the chain is HOT-safe for this index.
1235          */
1236
1237         /* Set ActiveSnapshot since functions in the indexes may need it */
1238         PushActiveSnapshot(GetTransactionSnapshot());
1239
1240         /* Perform concurrent build of index */
1241         index_concurrently_build(relationId, indexRelationId);
1242
1243         /* we can do away with our snapshot */
1244         PopActiveSnapshot();
1245
1246         /*
1247          * Commit this transaction to make the indisready update visible.
1248          */
1249         CommitTransactionCommand();
1250         StartTransactionCommand();
1251
1252         /*
1253          * Phase 3 of concurrent index build
1254          *
1255          * We once again wait until no transaction can have the table open with
1256          * the index marked as read-only for updates.
1257          */
1258         WaitForLockers(heaplocktag, ShareLock);
1259
1260         /*
1261          * Now take the "reference snapshot" that will be used by validate_index()
1262          * to filter candidate tuples.  Beware!  There might still be snapshots in
1263          * use that treat some transaction as in-progress that our reference
1264          * snapshot treats as committed.  If such a recently-committed transaction
1265          * deleted tuples in the table, we will not include them in the index; yet
1266          * those transactions which see the deleting one as still-in-progress will
1267          * expect such tuples to be there once we mark the index as valid.
1268          *
1269          * We solve this by waiting for all endangered transactions to exit before
1270          * we mark the index as valid.
1271          *
1272          * We also set ActiveSnapshot to this snap, since functions in indexes may
1273          * need a snapshot.
1274          */
1275         snapshot = RegisterSnapshot(GetTransactionSnapshot());
1276         PushActiveSnapshot(snapshot);
1277
1278         /*
1279          * Scan the index and the heap, insert any missing index entries.
1280          */
1281         validate_index(relationId, indexRelationId, snapshot);
1282
1283         /*
1284          * Drop the reference snapshot.  We must do this before waiting out other
1285          * snapshot holders, else we will deadlock against other processes also
1286          * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
1287          * they must wait for.  But first, save the snapshot's xmin to use as
1288          * limitXmin for GetCurrentVirtualXIDs().
1289          */
1290         limitXmin = snapshot->xmin;
1291
1292         PopActiveSnapshot();
1293         UnregisterSnapshot(snapshot);
1294
1295         /*
1296          * The snapshot subsystem could still contain registered snapshots that
1297          * are holding back our process's advertised xmin; in particular, if
1298          * default_transaction_isolation = serializable, there is a transaction
1299          * snapshot that is still active.  The CatalogSnapshot is likewise a
1300          * hazard.  To ensure no deadlocks, we must commit and start yet another
1301          * transaction, and do our wait before any snapshot has been taken in it.
1302          */
1303         CommitTransactionCommand();
1304         StartTransactionCommand();
1305
1306         /* We should now definitely not be advertising any xmin. */
1307         Assert(MyPgXact->xmin == InvalidTransactionId);
1308
1309         /*
1310          * The index is now valid in the sense that it contains all currently
1311          * interesting tuples.  But since it might not contain tuples deleted just
1312          * before the reference snap was taken, we have to wait out any
1313          * transactions that might have older snapshots.
1314          */
1315         WaitForOlderSnapshots(limitXmin);
1316
1317         /*
1318          * Index can now be marked valid -- update its pg_index entry
1319          */
1320         index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
1321
1322         /*
1323          * The pg_index update will cause backends (including this one) to update
1324          * relcache entries for the index itself, but we should also send a
1325          * relcache inval on the parent table to force replanning of cached plans.
1326          * Otherwise existing sessions might fail to use the new index where it
1327          * would be useful.  (Note that our earlier commits did not create reasons
1328          * to replan; so relcache flush on the index itself was sufficient.)
1329          */
1330         CacheInvalidateRelcacheByRelid(heaprelid.relId);
1331
1332         /*
1333          * Last thing to do is release the session-level lock on the parent table.
1334          */
1335         UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1336
1337         return address;
1338 }
1339
1340
1341 /*
1342  * CheckMutability
1343  *              Test whether given expression is mutable
1344  */
1345 static bool
1346 CheckMutability(Expr *expr)
1347 {
1348         /*
1349          * First run the expression through the planner.  This has a couple of
1350          * important consequences.  First, function default arguments will get
1351          * inserted, which may affect volatility (consider "default now()").
1352          * Second, inline-able functions will get inlined, which may allow us to
1353          * conclude that the function is really less volatile than it's marked. As
1354          * an example, polymorphic functions must be marked with the most volatile
1355          * behavior that they have for any input type, but once we inline the
1356          * function we may be able to conclude that it's not so volatile for the
1357          * particular input type we're dealing with.
1358          *
1359          * We assume here that expression_planner() won't scribble on its input.
1360          */
1361         expr = expression_planner(expr);
1362
1363         /* Now we can search for non-immutable functions */
1364         return contain_mutable_functions((Node *) expr);
1365 }
1366
1367
1368 /*
1369  * CheckPredicate
1370  *              Checks that the given partial-index predicate is valid.
1371  *
1372  * This used to also constrain the form of the predicate to forms that
1373  * indxpath.c could do something with.  However, that seems overly
1374  * restrictive.  One useful application of partial indexes is to apply
1375  * a UNIQUE constraint across a subset of a table, and in that scenario
1376  * any evaluable predicate will work.  So accept any predicate here
1377  * (except ones requiring a plan), and let indxpath.c fend for itself.
1378  */
1379 static void
1380 CheckPredicate(Expr *predicate)
1381 {
1382         /*
1383          * transformExpr() should have already rejected subqueries, aggregates,
1384          * and window functions, based on the EXPR_KIND_ for a predicate.
1385          */
1386
1387         /*
1388          * A predicate using mutable functions is probably wrong, for the same
1389          * reasons that we don't allow an index expression to use one.
1390          */
1391         if (CheckMutability(predicate))
1392                 ereport(ERROR,
1393                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1394                                  errmsg("functions in index predicate must be marked IMMUTABLE")));
1395 }
1396
1397 /*
1398  * Compute per-index-column information, including indexed column numbers
1399  * or index expressions, opclasses, and indoptions. Note, all output vectors
1400  * should be allocated for all columns, including "including" ones.
1401  */
1402 static void
1403 ComputeIndexAttrs(IndexInfo *indexInfo,
1404                                   Oid *typeOidP,
1405                                   Oid *collationOidP,
1406                                   Oid *classOidP,
1407                                   int16 *colOptionP,
1408                                   List *attList,        /* list of IndexElem's */
1409                                   List *exclusionOpNames,
1410                                   Oid relId,
1411                                   const char *accessMethodName,
1412                                   Oid accessMethodId,
1413                                   bool amcanorder,
1414                                   bool isconstraint)
1415 {
1416         ListCell   *nextExclOp;
1417         ListCell   *lc;
1418         int                     attn;
1419         int                     nkeycols = indexInfo->ii_NumIndexKeyAttrs;
1420
1421         /* Allocate space for exclusion operator info, if needed */
1422         if (exclusionOpNames)
1423         {
1424                 Assert(list_length(exclusionOpNames) == nkeycols);
1425                 indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
1426                 indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
1427                 indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
1428                 nextExclOp = list_head(exclusionOpNames);
1429         }
1430         else
1431                 nextExclOp = NULL;
1432
1433         /*
1434          * process attributeList
1435          */
1436         attn = 0;
1437         foreach(lc, attList)
1438         {
1439                 IndexElem  *attribute = (IndexElem *) lfirst(lc);
1440                 Oid                     atttype;
1441                 Oid                     attcollation;
1442
1443                 /*
1444                  * Process the column-or-expression to be indexed.
1445                  */
1446                 if (attribute->name != NULL)
1447                 {
1448                         /* Simple index attribute */
1449                         HeapTuple       atttuple;
1450                         Form_pg_attribute attform;
1451
1452                         Assert(attribute->expr == NULL);
1453                         atttuple = SearchSysCacheAttName(relId, attribute->name);
1454                         if (!HeapTupleIsValid(atttuple))
1455                         {
1456                                 /* difference in error message spellings is historical */
1457                                 if (isconstraint)
1458                                         ereport(ERROR,
1459                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
1460                                                          errmsg("column \"%s\" named in key does not exist",
1461                                                                         attribute->name)));
1462                                 else
1463                                         ereport(ERROR,
1464                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
1465                                                          errmsg("column \"%s\" does not exist",
1466                                                                         attribute->name)));
1467                         }
1468                         attform = (Form_pg_attribute) GETSTRUCT(atttuple);
1469                         indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
1470                         atttype = attform->atttypid;
1471                         attcollation = attform->attcollation;
1472                         ReleaseSysCache(atttuple);
1473                 }
1474                 else
1475                 {
1476                         /* Index expression */
1477                         Node       *expr = attribute->expr;
1478
1479                         Assert(expr != NULL);
1480
1481                         if (attn >= nkeycols)
1482                                 ereport(ERROR,
1483                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1484                                                  errmsg("expressions are not supported in included columns")));
1485                         atttype = exprType(expr);
1486                         attcollation = exprCollation(expr);
1487
1488                         /*
1489                          * Strip any top-level COLLATE clause.  This ensures that we treat
1490                          * "x COLLATE y" and "(x COLLATE y)" alike.
1491                          */
1492                         while (IsA(expr, CollateExpr))
1493                                 expr = (Node *) ((CollateExpr *) expr)->arg;
1494
1495                         if (IsA(expr, Var) &&
1496                                 ((Var *) expr)->varattno != InvalidAttrNumber)
1497                         {
1498                                 /*
1499                                  * User wrote "(column)" or "(column COLLATE something)".
1500                                  * Treat it like simple attribute anyway.
1501                                  */
1502                                 indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
1503                         }
1504                         else
1505                         {
1506                                 indexInfo->ii_IndexAttrNumbers[attn] = 0;       /* marks expression */
1507                                 indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
1508                                                                                                         expr);
1509
1510                                 /*
1511                                  * transformExpr() should have already rejected subqueries,
1512                                  * aggregates, and window functions, based on the EXPR_KIND_
1513                                  * for an index expression.
1514                                  */
1515
1516                                 /*
1517                                  * An expression using mutable functions is probably wrong,
1518                                  * since if you aren't going to get the same result for the
1519                                  * same data every time, it's not clear what the index entries
1520                                  * mean at all.
1521                                  */
1522                                 if (CheckMutability((Expr *) expr))
1523                                         ereport(ERROR,
1524                                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1525                                                          errmsg("functions in index expression must be marked IMMUTABLE")));
1526                         }
1527                 }
1528
1529                 typeOidP[attn] = atttype;
1530
1531                 /*
1532                  * Included columns have no collation, no opclass and no ordering
1533                  * options.
1534                  */
1535                 if (attn >= nkeycols)
1536                 {
1537                         if (attribute->collation)
1538                                 ereport(ERROR,
1539                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1540                                                  errmsg("including column does not support a collation")));
1541                         if (attribute->opclass)
1542                                 ereport(ERROR,
1543                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1544                                                  errmsg("including column does not support an operator class")));
1545                         if (attribute->ordering != SORTBY_DEFAULT)
1546                                 ereport(ERROR,
1547                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1548                                                  errmsg("including column does not support ASC/DESC options")));
1549                         if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1550                                 ereport(ERROR,
1551                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1552                                                  errmsg("including column does not support NULLS FIRST/LAST options")));
1553
1554                         classOidP[attn] = InvalidOid;
1555                         colOptionP[attn] = 0;
1556                         collationOidP[attn] = InvalidOid;
1557                         attn++;
1558
1559                         continue;
1560                 }
1561
1562                 /*
1563                  * Apply collation override if any
1564                  */
1565                 if (attribute->collation)
1566                         attcollation = get_collation_oid(attribute->collation, false);
1567
1568                 /*
1569                  * Check we have a collation iff it's a collatable type.  The only
1570                  * expected failures here are (1) COLLATE applied to a noncollatable
1571                  * type, or (2) index expression had an unresolved collation.  But we
1572                  * might as well code this to be a complete consistency check.
1573                  */
1574                 if (type_is_collatable(atttype))
1575                 {
1576                         if (!OidIsValid(attcollation))
1577                                 ereport(ERROR,
1578                                                 (errcode(ERRCODE_INDETERMINATE_COLLATION),
1579                                                  errmsg("could not determine which collation to use for index expression"),
1580                                                  errhint("Use the COLLATE clause to set the collation explicitly.")));
1581                 }
1582                 else
1583                 {
1584                         if (OidIsValid(attcollation))
1585                                 ereport(ERROR,
1586                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
1587                                                  errmsg("collations are not supported by type %s",
1588                                                                 format_type_be(atttype))));
1589                 }
1590
1591                 collationOidP[attn] = attcollation;
1592
1593                 /*
1594                  * Identify the opclass to use.
1595                  */
1596                 classOidP[attn] = ResolveOpClass(attribute->opclass,
1597                                                                                  atttype,
1598                                                                                  accessMethodName,
1599                                                                                  accessMethodId);
1600
1601                 /*
1602                  * Identify the exclusion operator, if any.
1603                  */
1604                 if (nextExclOp)
1605                 {
1606                         List       *opname = (List *) lfirst(nextExclOp);
1607                         Oid                     opid;
1608                         Oid                     opfamily;
1609                         int                     strat;
1610
1611                         /*
1612                          * Find the operator --- it must accept the column datatype
1613                          * without runtime coercion (but binary compatibility is OK)
1614                          */
1615                         opid = compatible_oper_opid(opname, atttype, atttype, false);
1616
1617                         /*
1618                          * Only allow commutative operators to be used in exclusion
1619                          * constraints. If X conflicts with Y, but Y does not conflict
1620                          * with X, bad things will happen.
1621                          */
1622                         if (get_commutator(opid) != opid)
1623                                 ereport(ERROR,
1624                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1625                                                  errmsg("operator %s is not commutative",
1626                                                                 format_operator(opid)),
1627                                                  errdetail("Only commutative operators can be used in exclusion constraints.")));
1628
1629                         /*
1630                          * Operator must be a member of the right opfamily, too
1631                          */
1632                         opfamily = get_opclass_family(classOidP[attn]);
1633                         strat = get_op_opfamily_strategy(opid, opfamily);
1634                         if (strat == 0)
1635                         {
1636                                 HeapTuple       opftuple;
1637                                 Form_pg_opfamily opfform;
1638
1639                                 /*
1640                                  * attribute->opclass might not explicitly name the opfamily,
1641                                  * so fetch the name of the selected opfamily for use in the
1642                                  * error message.
1643                                  */
1644                                 opftuple = SearchSysCache1(OPFAMILYOID,
1645                                                                                    ObjectIdGetDatum(opfamily));
1646                                 if (!HeapTupleIsValid(opftuple))
1647                                         elog(ERROR, "cache lookup failed for opfamily %u",
1648                                                  opfamily);
1649                                 opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
1650
1651                                 ereport(ERROR,
1652                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1653                                                  errmsg("operator %s is not a member of operator family \"%s\"",
1654                                                                 format_operator(opid),
1655                                                                 NameStr(opfform->opfname)),
1656                                                  errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
1657                         }
1658
1659                         indexInfo->ii_ExclusionOps[attn] = opid;
1660                         indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
1661                         indexInfo->ii_ExclusionStrats[attn] = strat;
1662                         nextExclOp = lnext(nextExclOp);
1663                 }
1664
1665                 /*
1666                  * Set up the per-column options (indoption field).  For now, this is
1667                  * zero for any un-ordered index, while ordered indexes have DESC and
1668                  * NULLS FIRST/LAST options.
1669                  */
1670                 colOptionP[attn] = 0;
1671                 if (amcanorder)
1672                 {
1673                         /* default ordering is ASC */
1674                         if (attribute->ordering == SORTBY_DESC)
1675                                 colOptionP[attn] |= INDOPTION_DESC;
1676                         /* default null ordering is LAST for ASC, FIRST for DESC */
1677                         if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
1678                         {
1679                                 if (attribute->ordering == SORTBY_DESC)
1680                                         colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1681                         }
1682                         else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
1683                                 colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1684                 }
1685                 else
1686                 {
1687                         /* index AM does not support ordering */
1688                         if (attribute->ordering != SORTBY_DEFAULT)
1689                                 ereport(ERROR,
1690                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1691                                                  errmsg("access method \"%s\" does not support ASC/DESC options",
1692                                                                 accessMethodName)));
1693                         if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1694                                 ereport(ERROR,
1695                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1696                                                  errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
1697                                                                 accessMethodName)));
1698                 }
1699
1700                 attn++;
1701         }
1702 }
1703
1704 /*
1705  * Resolve possibly-defaulted operator class specification
1706  *
1707  * Note: This is used to resolve operator class specification in index and
1708  * partition key definitions.
1709  */
1710 Oid
1711 ResolveOpClass(List *opclass, Oid attrType,
1712                            const char *accessMethodName, Oid accessMethodId)
1713 {
1714         char       *schemaname;
1715         char       *opcname;
1716         HeapTuple       tuple;
1717         Form_pg_opclass opform;
1718         Oid                     opClassId,
1719                                 opInputType;
1720
1721         /*
1722          * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
1723          * ignore those opclass names so the default *_ops is used.  This can be
1724          * removed in some later release.  bjm 2000/02/07
1725          *
1726          * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
1727          * 2000/07/30
1728          *
1729          * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
1730          * too for awhile.  I'm starting to think we need a better approach. tgl
1731          * 2000/10/01
1732          *
1733          * Release 8.0 removes bigbox_ops (which was dead code for a long while
1734          * anyway).  tgl 2003/11/11
1735          */
1736         if (list_length(opclass) == 1)
1737         {
1738                 char       *claname = strVal(linitial(opclass));
1739
1740                 if (strcmp(claname, "network_ops") == 0 ||
1741                         strcmp(claname, "timespan_ops") == 0 ||
1742                         strcmp(claname, "datetime_ops") == 0 ||
1743                         strcmp(claname, "lztext_ops") == 0 ||
1744                         strcmp(claname, "timestamp_ops") == 0 ||
1745                         strcmp(claname, "bigbox_ops") == 0)
1746                         opclass = NIL;
1747         }
1748
1749         if (opclass == NIL)
1750         {
1751                 /* no operator class specified, so find the default */
1752                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
1753                 if (!OidIsValid(opClassId))
1754                         ereport(ERROR,
1755                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
1756                                          errmsg("data type %s has no default operator class for access method \"%s\"",
1757                                                         format_type_be(attrType), accessMethodName),
1758                                          errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
1759                 return opClassId;
1760         }
1761
1762         /*
1763          * Specific opclass name given, so look up the opclass.
1764          */
1765
1766         /* deconstruct the name list */
1767         DeconstructQualifiedName(opclass, &schemaname, &opcname);
1768
1769         if (schemaname)
1770         {
1771                 /* Look in specific schema only */
1772                 Oid                     namespaceId;
1773
1774                 namespaceId = LookupExplicitNamespace(schemaname, false);
1775                 tuple = SearchSysCache3(CLAAMNAMENSP,
1776                                                                 ObjectIdGetDatum(accessMethodId),
1777                                                                 PointerGetDatum(opcname),
1778                                                                 ObjectIdGetDatum(namespaceId));
1779         }
1780         else
1781         {
1782                 /* Unqualified opclass name, so search the search path */
1783                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
1784                 if (!OidIsValid(opClassId))
1785                         ereport(ERROR,
1786                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
1787                                          errmsg("operator class \"%s\" does not exist for access method \"%s\"",
1788                                                         opcname, accessMethodName)));
1789                 tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
1790         }
1791
1792         if (!HeapTupleIsValid(tuple))
1793                 ereport(ERROR,
1794                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
1795                                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
1796                                                 NameListToString(opclass), accessMethodName)));
1797
1798         /*
1799          * Verify that the index operator class accepts this datatype.  Note we
1800          * will accept binary compatibility.
1801          */
1802         opform = (Form_pg_opclass) GETSTRUCT(tuple);
1803         opClassId = opform->oid;
1804         opInputType = opform->opcintype;
1805
1806         if (!IsBinaryCoercible(attrType, opInputType))
1807                 ereport(ERROR,
1808                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
1809                                  errmsg("operator class \"%s\" does not accept data type %s",
1810                                                 NameListToString(opclass), format_type_be(attrType))));
1811
1812         ReleaseSysCache(tuple);
1813
1814         return opClassId;
1815 }
1816
1817 /*
1818  * GetDefaultOpClass
1819  *
1820  * Given the OIDs of a datatype and an access method, find the default
1821  * operator class, if any.  Returns InvalidOid if there is none.
1822  */
1823 Oid
1824 GetDefaultOpClass(Oid type_id, Oid am_id)
1825 {
1826         Oid                     result = InvalidOid;
1827         int                     nexact = 0;
1828         int                     ncompatible = 0;
1829         int                     ncompatiblepreferred = 0;
1830         Relation        rel;
1831         ScanKeyData skey[1];
1832         SysScanDesc scan;
1833         HeapTuple       tup;
1834         TYPCATEGORY tcategory;
1835
1836         /* If it's a domain, look at the base type instead */
1837         type_id = getBaseType(type_id);
1838
1839         tcategory = TypeCategory(type_id);
1840
1841         /*
1842          * We scan through all the opclasses available for the access method,
1843          * looking for one that is marked default and matches the target type
1844          * (either exactly or binary-compatibly, but prefer an exact match).
1845          *
1846          * We could find more than one binary-compatible match.  If just one is
1847          * for a preferred type, use that one; otherwise we fail, forcing the user
1848          * to specify which one he wants.  (The preferred-type special case is a
1849          * kluge for varchar: it's binary-compatible to both text and bpchar, so
1850          * we need a tiebreaker.)  If we find more than one exact match, then
1851          * someone put bogus entries in pg_opclass.
1852          */
1853         rel = table_open(OperatorClassRelationId, AccessShareLock);
1854
1855         ScanKeyInit(&skey[0],
1856                                 Anum_pg_opclass_opcmethod,
1857                                 BTEqualStrategyNumber, F_OIDEQ,
1858                                 ObjectIdGetDatum(am_id));
1859
1860         scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
1861                                                           NULL, 1, skey);
1862
1863         while (HeapTupleIsValid(tup = systable_getnext(scan)))
1864         {
1865                 Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
1866
1867                 /* ignore altogether if not a default opclass */
1868                 if (!opclass->opcdefault)
1869                         continue;
1870                 if (opclass->opcintype == type_id)
1871                 {
1872                         nexact++;
1873                         result = opclass->oid;
1874                 }
1875                 else if (nexact == 0 &&
1876                                  IsBinaryCoercible(type_id, opclass->opcintype))
1877                 {
1878                         if (IsPreferredType(tcategory, opclass->opcintype))
1879                         {
1880                                 ncompatiblepreferred++;
1881                                 result = opclass->oid;
1882                         }
1883                         else if (ncompatiblepreferred == 0)
1884                         {
1885                                 ncompatible++;
1886                                 result = opclass->oid;
1887                         }
1888                 }
1889         }
1890
1891         systable_endscan(scan);
1892
1893         table_close(rel, AccessShareLock);
1894
1895         /* raise error if pg_opclass contains inconsistent data */
1896         if (nexact > 1)
1897                 ereport(ERROR,
1898                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
1899                                  errmsg("there are multiple default operator classes for data type %s",
1900                                                 format_type_be(type_id))));
1901
1902         if (nexact == 1 ||
1903                 ncompatiblepreferred == 1 ||
1904                 (ncompatiblepreferred == 0 && ncompatible == 1))
1905                 return result;
1906
1907         return InvalidOid;
1908 }
1909
1910 /*
1911  *      makeObjectName()
1912  *
1913  *      Create a name for an implicitly created index, sequence, constraint,
1914  *      extended statistics, etc.
1915  *
1916  *      The parameters are typically: the original table name, the original field
1917  *      name, and a "type" string (such as "seq" or "pkey").    The field name
1918  *      and/or type can be NULL if not relevant.
1919  *
1920  *      The result is a palloc'd string.
1921  *
1922  *      The basic result we want is "name1_name2_label", omitting "_name2" or
1923  *      "_label" when those parameters are NULL.  However, we must generate
1924  *      a name with less than NAMEDATALEN characters!  So, we truncate one or
1925  *      both names if necessary to make a short-enough string.  The label part
1926  *      is never truncated (so it had better be reasonably short).
1927  *
1928  *      The caller is responsible for checking uniqueness of the generated
1929  *      name and retrying as needed; retrying will be done by altering the
1930  *      "label" string (which is why we never truncate that part).
1931  */
1932 char *
1933 makeObjectName(const char *name1, const char *name2, const char *label)
1934 {
1935         char       *name;
1936         int                     overhead = 0;   /* chars needed for label and underscores */
1937         int                     availchars;             /* chars available for name(s) */
1938         int                     name1chars;             /* chars allocated to name1 */
1939         int                     name2chars;             /* chars allocated to name2 */
1940         int                     ndx;
1941
1942         name1chars = strlen(name1);
1943         if (name2)
1944         {
1945                 name2chars = strlen(name2);
1946                 overhead++;                             /* allow for separating underscore */
1947         }
1948         else
1949                 name2chars = 0;
1950         if (label)
1951                 overhead += strlen(label) + 1;
1952
1953         availchars = NAMEDATALEN - 1 - overhead;
1954         Assert(availchars > 0);         /* else caller chose a bad label */
1955
1956         /*
1957          * If we must truncate,  preferentially truncate the longer name. This
1958          * logic could be expressed without a loop, but it's simple and obvious as
1959          * a loop.
1960          */
1961         while (name1chars + name2chars > availchars)
1962         {
1963                 if (name1chars > name2chars)
1964                         name1chars--;
1965                 else
1966                         name2chars--;
1967         }
1968
1969         name1chars = pg_mbcliplen(name1, name1chars, name1chars);
1970         if (name2)
1971                 name2chars = pg_mbcliplen(name2, name2chars, name2chars);
1972
1973         /* Now construct the string using the chosen lengths */
1974         name = palloc(name1chars + name2chars + overhead + 1);
1975         memcpy(name, name1, name1chars);
1976         ndx = name1chars;
1977         if (name2)
1978         {
1979                 name[ndx++] = '_';
1980                 memcpy(name + ndx, name2, name2chars);
1981                 ndx += name2chars;
1982         }
1983         if (label)
1984         {
1985                 name[ndx++] = '_';
1986                 strcpy(name + ndx, label);
1987         }
1988         else
1989                 name[ndx] = '\0';
1990
1991         return name;
1992 }
1993
1994 /*
1995  * Select a nonconflicting name for a new relation.  This is ordinarily
1996  * used to choose index names (which is why it's here) but it can also
1997  * be used for sequences, or any autogenerated relation kind.
1998  *
1999  * name1, name2, and label are used the same way as for makeObjectName(),
2000  * except that the label can't be NULL; digits will be appended to the label
2001  * if needed to create a name that is unique within the specified namespace.
2002  *
2003  * If isconstraint is true, we also avoid choosing a name matching any
2004  * existing constraint in the same namespace.  (This is stricter than what
2005  * Postgres itself requires, but the SQL standard says that constraint names
2006  * should be unique within schemas, so we follow that for autogenerated
2007  * constraint names.)
2008  *
2009  * Note: it is theoretically possible to get a collision anyway, if someone
2010  * else chooses the same name concurrently.  This is fairly unlikely to be
2011  * a problem in practice, especially if one is holding an exclusive lock on
2012  * the relation identified by name1.  However, if choosing multiple names
2013  * within a single command, you'd better create the new object and do
2014  * CommandCounterIncrement before choosing the next one!
2015  *
2016  * Returns a palloc'd string.
2017  */
2018 char *
2019 ChooseRelationName(const char *name1, const char *name2,
2020                                    const char *label, Oid namespaceid,
2021                                    bool isconstraint)
2022 {
2023         int                     pass = 0;
2024         char       *relname = NULL;
2025         char            modlabel[NAMEDATALEN];
2026
2027         /* try the unmodified label first */
2028         StrNCpy(modlabel, label, sizeof(modlabel));
2029
2030         for (;;)
2031         {
2032                 relname = makeObjectName(name1, name2, modlabel);
2033
2034                 if (!OidIsValid(get_relname_relid(relname, namespaceid)))
2035                 {
2036                         if (!isconstraint ||
2037                                 !ConstraintNameExists(relname, namespaceid))
2038                                 break;
2039                 }
2040
2041                 /* found a conflict, so try a new name component */
2042                 pfree(relname);
2043                 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
2044         }
2045
2046         return relname;
2047 }
2048
2049 /*
2050  * Select the name to be used for an index.
2051  *
2052  * The argument list is pretty ad-hoc :-(
2053  */
2054 static char *
2055 ChooseIndexName(const char *tabname, Oid namespaceId,
2056                                 List *colnames, List *exclusionOpNames,
2057                                 bool primary, bool isconstraint)
2058 {
2059         char       *indexname;
2060
2061         if (primary)
2062         {
2063                 /* the primary key's name does not depend on the specific column(s) */
2064                 indexname = ChooseRelationName(tabname,
2065                                                                            NULL,
2066                                                                            "pkey",
2067                                                                            namespaceId,
2068                                                                            true);
2069         }
2070         else if (exclusionOpNames != NIL)
2071         {
2072                 indexname = ChooseRelationName(tabname,
2073                                                                            ChooseIndexNameAddition(colnames),
2074                                                                            "excl",
2075                                                                            namespaceId,
2076                                                                            true);
2077         }
2078         else if (isconstraint)
2079         {
2080                 indexname = ChooseRelationName(tabname,
2081                                                                            ChooseIndexNameAddition(colnames),
2082                                                                            "key",
2083                                                                            namespaceId,
2084                                                                            true);
2085         }
2086         else
2087         {
2088                 indexname = ChooseRelationName(tabname,
2089                                                                            ChooseIndexNameAddition(colnames),
2090                                                                            "idx",
2091                                                                            namespaceId,
2092                                                                            false);
2093         }
2094
2095         return indexname;
2096 }
2097
2098 /*
2099  * Generate "name2" for a new index given the list of column names for it
2100  * (as produced by ChooseIndexColumnNames).  This will be passed to
2101  * ChooseRelationName along with the parent table name and a suitable label.
2102  *
2103  * We know that less than NAMEDATALEN characters will actually be used,
2104  * so we can truncate the result once we've generated that many.
2105  *
2106  * XXX See also ChooseForeignKeyConstraintNameAddition and
2107  * ChooseExtendedStatisticNameAddition.
2108  */
2109 static char *
2110 ChooseIndexNameAddition(List *colnames)
2111 {
2112         char            buf[NAMEDATALEN * 2];
2113         int                     buflen = 0;
2114         ListCell   *lc;
2115
2116         buf[0] = '\0';
2117         foreach(lc, colnames)
2118         {
2119                 const char *name = (const char *) lfirst(lc);
2120
2121                 if (buflen > 0)
2122                         buf[buflen++] = '_';    /* insert _ between names */
2123
2124                 /*
2125                  * At this point we have buflen <= NAMEDATALEN.  name should be less
2126                  * than NAMEDATALEN already, but use strlcpy for paranoia.
2127                  */
2128                 strlcpy(buf + buflen, name, NAMEDATALEN);
2129                 buflen += strlen(buf + buflen);
2130                 if (buflen >= NAMEDATALEN)
2131                         break;
2132         }
2133         return pstrdup(buf);
2134 }
2135
2136 /*
2137  * Select the actual names to be used for the columns of an index, given the
2138  * list of IndexElems for the columns.  This is mostly about ensuring the
2139  * names are unique so we don't get a conflicting-attribute-names error.
2140  *
2141  * Returns a List of plain strings (char *, not String nodes).
2142  */
2143 static List *
2144 ChooseIndexColumnNames(List *indexElems)
2145 {
2146         List       *result = NIL;
2147         ListCell   *lc;
2148
2149         foreach(lc, indexElems)
2150         {
2151                 IndexElem  *ielem = (IndexElem *) lfirst(lc);
2152                 const char *origname;
2153                 const char *curname;
2154                 int                     i;
2155                 char            buf[NAMEDATALEN];
2156
2157                 /* Get the preliminary name from the IndexElem */
2158                 if (ielem->indexcolname)
2159                         origname = ielem->indexcolname; /* caller-specified name */
2160                 else if (ielem->name)
2161                         origname = ielem->name; /* simple column reference */
2162                 else
2163                         origname = "expr";      /* default name for expression */
2164
2165                 /* If it conflicts with any previous column, tweak it */
2166                 curname = origname;
2167                 for (i = 1;; i++)
2168                 {
2169                         ListCell   *lc2;
2170                         char            nbuf[32];
2171                         int                     nlen;
2172
2173                         foreach(lc2, result)
2174                         {
2175                                 if (strcmp(curname, (char *) lfirst(lc2)) == 0)
2176                                         break;
2177                         }
2178                         if (lc2 == NULL)
2179                                 break;                  /* found nonconflicting name */
2180
2181                         sprintf(nbuf, "%d", i);
2182
2183                         /* Ensure generated names are shorter than NAMEDATALEN */
2184                         nlen = pg_mbcliplen(origname, strlen(origname),
2185                                                                 NAMEDATALEN - 1 - strlen(nbuf));
2186                         memcpy(buf, origname, nlen);
2187                         strcpy(buf + nlen, nbuf);
2188                         curname = buf;
2189                 }
2190
2191                 /* And attach to the result list */
2192                 result = lappend(result, pstrdup(curname));
2193         }
2194         return result;
2195 }
2196
2197 /*
2198  * ReindexIndex
2199  *              Recreate a specific index.
2200  */
2201 void
2202 ReindexIndex(RangeVar *indexRelation, int options, bool concurrent)
2203 {
2204         Oid                     indOid;
2205         Oid                     heapOid = InvalidOid;
2206         Relation        irel;
2207         char            persistence;
2208
2209         /*
2210          * Find and lock index, and check permissions on table; use callback to
2211          * obtain lock on table first, to avoid deadlock hazard.  The lock level
2212          * used here must match the index lock obtained in reindex_index().
2213          */
2214         indOid = RangeVarGetRelidExtended(indexRelation,
2215                                                                           concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock,
2216                                                                           0,
2217                                                                           RangeVarCallbackForReindexIndex,
2218                                                                           (void *) &heapOid);
2219
2220         /*
2221          * Obtain the current persistence of the existing index.  We already hold
2222          * lock on the index.
2223          */
2224         irel = index_open(indOid, NoLock);
2225
2226         if (irel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2227         {
2228                 ReindexPartitionedIndex(irel);
2229                 return;
2230         }
2231
2232         persistence = irel->rd_rel->relpersistence;
2233         index_close(irel, NoLock);
2234
2235         if (concurrent)
2236                 ReindexRelationConcurrently(indOid, options);
2237         else
2238                 reindex_index(indOid, false, persistence, options);
2239 }
2240
2241 /*
2242  * Check permissions on table before acquiring relation lock; also lock
2243  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
2244  * deadlocks.
2245  */
2246 static void
2247 RangeVarCallbackForReindexIndex(const RangeVar *relation,
2248                                                                 Oid relId, Oid oldRelId, void *arg)
2249 {
2250         char            relkind;
2251         Oid                *heapOid = (Oid *) arg;
2252
2253         /*
2254          * If we previously locked some other index's heap, and the name we're
2255          * looking up no longer refers to that relation, release the now-useless
2256          * lock.
2257          */
2258         if (relId != oldRelId && OidIsValid(oldRelId))
2259         {
2260                 /* lock level here should match reindex_index() heap lock */
2261                 UnlockRelationOid(*heapOid, ShareLock);
2262                 *heapOid = InvalidOid;
2263         }
2264
2265         /* If the relation does not exist, there's nothing more to do. */
2266         if (!OidIsValid(relId))
2267                 return;
2268
2269         /*
2270          * If the relation does exist, check whether it's an index.  But note that
2271          * the relation might have been dropped between the time we did the name
2272          * lookup and now.  In that case, there's nothing to do.
2273          */
2274         relkind = get_rel_relkind(relId);
2275         if (!relkind)
2276                 return;
2277         if (relkind != RELKIND_INDEX &&
2278                 relkind != RELKIND_PARTITIONED_INDEX)
2279                 ereport(ERROR,
2280                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2281                                  errmsg("\"%s\" is not an index", relation->relname)));
2282
2283         /* Check permissions */
2284         if (!pg_class_ownercheck(relId, GetUserId()))
2285                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname);
2286
2287         /* Lock heap before index to avoid deadlock. */
2288         if (relId != oldRelId)
2289         {
2290                 /*
2291                  * Lock level here should match reindex_index() heap lock. If the OID
2292                  * isn't valid, it means the index as concurrently dropped, which is
2293                  * not a problem for us; just return normally.
2294                  */
2295                 *heapOid = IndexGetRelation(relId, true);
2296                 if (OidIsValid(*heapOid))
2297                         LockRelationOid(*heapOid, ShareLock);
2298         }
2299 }
2300
2301 /*
2302  * ReindexTable
2303  *              Recreate all indexes of a table (and of its toast table, if any)
2304  */
2305 Oid
2306 ReindexTable(RangeVar *relation, int options, bool concurrent)
2307 {
2308         Oid                     heapOid;
2309         bool            result;
2310
2311         /* The lock level used here should match reindex_relation(). */
2312         heapOid = RangeVarGetRelidExtended(relation,
2313                                                                            concurrent ? ShareUpdateExclusiveLock : ShareLock,
2314                                                                            0,
2315                                                                            RangeVarCallbackOwnsTable, NULL);
2316
2317         if (concurrent)
2318                 result = ReindexRelationConcurrently(heapOid, options);
2319         else
2320                 result = reindex_relation(heapOid,
2321                                                                   REINDEX_REL_PROCESS_TOAST |
2322                                                                   REINDEX_REL_CHECK_CONSTRAINTS,
2323                                                                   options);
2324
2325         if (!result)
2326                 ereport(NOTICE,
2327                                 (errmsg("table \"%s\" has no indexes",
2328                                                 relation->relname)));
2329
2330         return heapOid;
2331 }
2332
2333 /*
2334  * ReindexMultipleTables
2335  *              Recreate indexes of tables selected by objectName/objectKind.
2336  *
2337  * To reduce the probability of deadlocks, each table is reindexed in a
2338  * separate transaction, so we can release the lock on it right away.
2339  * That means this must not be called within a user transaction block!
2340  */
2341 void
2342 ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
2343                                           int options, bool concurrent)
2344 {
2345         Oid                     objectOid;
2346         Relation        relationRelation;
2347         TableScanDesc scan;
2348         ScanKeyData scan_keys[1];
2349         HeapTuple       tuple;
2350         MemoryContext private_context;
2351         MemoryContext old;
2352         List       *relids = NIL;
2353         ListCell   *l;
2354         int                     num_keys;
2355         bool            concurrent_warning = false;
2356
2357         AssertArg(objectName);
2358         Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
2359                    objectKind == REINDEX_OBJECT_SYSTEM ||
2360                    objectKind == REINDEX_OBJECT_DATABASE);
2361
2362         if (objectKind == REINDEX_OBJECT_SYSTEM && concurrent)
2363                 ereport(ERROR,
2364                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2365                                  errmsg("concurrent reindex of system catalogs is not supported")));
2366
2367         /*
2368          * Get OID of object to reindex, being the database currently being used
2369          * by session for a database or for system catalogs, or the schema defined
2370          * by caller. At the same time do permission checks that need different
2371          * processing depending on the object type.
2372          */
2373         if (objectKind == REINDEX_OBJECT_SCHEMA)
2374         {
2375                 objectOid = get_namespace_oid(objectName, false);
2376
2377                 if (!pg_namespace_ownercheck(objectOid, GetUserId()))
2378                         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
2379                                                    objectName);
2380         }
2381         else
2382         {
2383                 objectOid = MyDatabaseId;
2384
2385                 if (strcmp(objectName, get_database_name(objectOid)) != 0)
2386                         ereport(ERROR,
2387                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2388                                          errmsg("can only reindex the currently open database")));
2389                 if (!pg_database_ownercheck(objectOid, GetUserId()))
2390                         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2391                                                    objectName);
2392         }
2393
2394         /*
2395          * Create a memory context that will survive forced transaction commits we
2396          * do below.  Since it is a child of PortalContext, it will go away
2397          * eventually even if we suffer an error; there's no need for special
2398          * abort cleanup logic.
2399          */
2400         private_context = AllocSetContextCreate(PortalContext,
2401                                                                                         "ReindexMultipleTables",
2402                                                                                         ALLOCSET_SMALL_SIZES);
2403
2404         /*
2405          * Define the search keys to find the objects to reindex. For a schema, we
2406          * select target relations using relnamespace, something not necessary for
2407          * a database-wide operation.
2408          */
2409         if (objectKind == REINDEX_OBJECT_SCHEMA)
2410         {
2411                 num_keys = 1;
2412                 ScanKeyInit(&scan_keys[0],
2413                                         Anum_pg_class_relnamespace,
2414                                         BTEqualStrategyNumber, F_OIDEQ,
2415                                         ObjectIdGetDatum(objectOid));
2416         }
2417         else
2418                 num_keys = 0;
2419
2420         /*
2421          * Scan pg_class to build a list of the relations we need to reindex.
2422          *
2423          * We only consider plain relations and materialized views here (toast
2424          * rels will be processed indirectly by reindex_relation).
2425          */
2426         relationRelation = table_open(RelationRelationId, AccessShareLock);
2427         scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
2428         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2429         {
2430                 Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
2431                 Oid                     relid = classtuple->oid;
2432
2433                 /*
2434                  * Only regular tables and matviews can have indexes, so ignore any
2435                  * other kind of relation.
2436                  *
2437                  * It is tempting to also consider partitioned tables here, but that
2438                  * has the problem that if the children are in the same schema, they
2439                  * would be processed twice.  Maybe we could have a separate list of
2440                  * partitioned tables, and expand that afterwards into relids,
2441                  * ignoring any duplicates.
2442                  */
2443                 if (classtuple->relkind != RELKIND_RELATION &&
2444                         classtuple->relkind != RELKIND_MATVIEW)
2445                         continue;
2446
2447                 /* Skip temp tables of other backends; we can't reindex them at all */
2448                 if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
2449                         !isTempNamespace(classtuple->relnamespace))
2450                         continue;
2451
2452                 /* Check user/system classification, and optionally skip */
2453                 if (objectKind == REINDEX_OBJECT_SYSTEM &&
2454                         !IsSystemClass(relid, classtuple))
2455                         continue;
2456
2457                 /*
2458                  * The table can be reindexed if the user is superuser, the table
2459                  * owner, or the database/schema owner (but in the latter case, only
2460                  * if it's not a shared relation).  pg_class_ownercheck includes the
2461                  * superuser case, and depending on objectKind we already know that
2462                  * the user has permission to run REINDEX on this database or schema
2463                  * per the permission checks at the beginning of this routine.
2464                  */
2465                 if (classtuple->relisshared &&
2466                         !pg_class_ownercheck(relid, GetUserId()))
2467                         continue;
2468
2469                 /*
2470                  * Skip system tables that index_create() would reject to index
2471                  * concurrently.  XXX We need the additional check for
2472                  * FirstNormalObjectId to skip information_schema tables, because
2473                  * IsCatalogClass() here does not cover information_schema, but the
2474                  * check in index_create() will error on the TOAST tables of
2475                  * information_schema tables.
2476                  */
2477                 if (concurrent &&
2478                         (IsCatalogClass(relid, classtuple) || relid < FirstNormalObjectId))
2479                 {
2480                         if (!concurrent_warning)
2481                                 ereport(WARNING,
2482                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2483                                                  errmsg("concurrent reindex is not supported for catalog relations, skipping all")));
2484                         concurrent_warning = true;
2485                         continue;
2486                 }
2487
2488                 /* Save the list of relation OIDs in private context */
2489                 old = MemoryContextSwitchTo(private_context);
2490
2491                 /*
2492                  * We always want to reindex pg_class first if it's selected to be
2493                  * reindexed.  This ensures that if there is any corruption in
2494                  * pg_class' indexes, they will be fixed before we process any other
2495                  * tables.  This is critical because reindexing itself will try to
2496                  * update pg_class.
2497                  */
2498                 if (relid == RelationRelationId)
2499                         relids = lcons_oid(relid, relids);
2500                 else
2501                         relids = lappend_oid(relids, relid);
2502
2503                 MemoryContextSwitchTo(old);
2504         }
2505         table_endscan(scan);
2506         table_close(relationRelation, AccessShareLock);
2507
2508         /* Now reindex each rel in a separate transaction */
2509         PopActiveSnapshot();
2510         CommitTransactionCommand();
2511         foreach(l, relids)
2512         {
2513                 Oid                     relid = lfirst_oid(l);
2514                 bool            result;
2515
2516                 StartTransactionCommand();
2517                 /* functions in indexes may want a snapshot set */
2518                 PushActiveSnapshot(GetTransactionSnapshot());
2519
2520                 if (concurrent)
2521                 {
2522                         result = ReindexRelationConcurrently(relid, options);
2523                         /* ReindexRelationConcurrently() does the verbose output */
2524                 }
2525                 else
2526                 {
2527                         result = reindex_relation(relid,
2528                                                                           REINDEX_REL_PROCESS_TOAST |
2529                                                                           REINDEX_REL_CHECK_CONSTRAINTS,
2530                                                                           options);
2531
2532                         if (result && (options & REINDEXOPT_VERBOSE))
2533                                 ereport(INFO,
2534                                                 (errmsg("table \"%s.%s\" was reindexed",
2535                                                                 get_namespace_name(get_rel_namespace(relid)),
2536                                                                 get_rel_name(relid))));
2537
2538                         PopActiveSnapshot();
2539                 }
2540
2541                 CommitTransactionCommand();
2542         }
2543         StartTransactionCommand();
2544
2545         MemoryContextDelete(private_context);
2546 }
2547
2548
2549 /*
2550  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
2551  * relation OID
2552  *
2553  * The relation can be either an index or a table.  If it is a table, all its
2554  * valid indexes will be rebuilt, including its associated toast table
2555  * indexes.  If it is an index, this index itself will be rebuilt.
2556  *
2557  * The locks taken on parent tables and involved indexes are kept until the
2558  * transaction is committed, at which point a session lock is taken on each
2559  * relation.  Both of these protect against concurrent schema changes.
2560  */
2561 static bool
2562 ReindexRelationConcurrently(Oid relationOid, int options)
2563 {
2564         List       *heapRelationIds = NIL;
2565         List       *indexIds = NIL;
2566         List       *newIndexIds = NIL;
2567         List       *relationLocks = NIL;
2568         List       *lockTags = NIL;
2569         ListCell   *lc,
2570                            *lc2;
2571         MemoryContext private_context;
2572         MemoryContext oldcontext;
2573         char            relkind;
2574         char       *relationName = NULL;
2575         char       *relationNamespace = NULL;
2576         PGRUsage        ru0;
2577
2578         /*
2579          * Create a memory context that will survive forced transaction commits we
2580          * do below.  Since it is a child of PortalContext, it will go away
2581          * eventually even if we suffer an error; there's no need for special
2582          * abort cleanup logic.
2583          */
2584         private_context = AllocSetContextCreate(PortalContext,
2585                                                                                         "ReindexConcurrent",
2586                                                                                         ALLOCSET_SMALL_SIZES);
2587
2588         if (options & REINDEXOPT_VERBOSE)
2589         {
2590                 /* Save data needed by REINDEX VERBOSE in private context */
2591                 oldcontext = MemoryContextSwitchTo(private_context);
2592
2593                 relationName = get_rel_name(relationOid);
2594                 relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
2595
2596                 pg_rusage_init(&ru0);
2597
2598                 MemoryContextSwitchTo(oldcontext);
2599         }
2600
2601         relkind = get_rel_relkind(relationOid);
2602
2603         /*
2604          * Extract the list of indexes that are going to be rebuilt based on the
2605          * list of relation Oids given by caller.
2606          */
2607         switch (relkind)
2608         {
2609                 case RELKIND_RELATION:
2610                 case RELKIND_MATVIEW:
2611                 case RELKIND_TOASTVALUE:
2612                         {
2613                                 /*
2614                                  * In the case of a relation, find all its indexes including
2615                                  * toast indexes.
2616                                  */
2617                                 Relation        heapRelation;
2618
2619                                 /* Save the list of relation OIDs in private context */
2620                                 oldcontext = MemoryContextSwitchTo(private_context);
2621
2622                                 /* Track this relation for session locks */
2623                                 heapRelationIds = lappend_oid(heapRelationIds, relationOid);
2624
2625                                 MemoryContextSwitchTo(oldcontext);
2626
2627                                 /* Open relation to get its indexes */
2628                                 heapRelation = table_open(relationOid, ShareUpdateExclusiveLock);
2629
2630                                 /* Add all the valid indexes of relation to list */
2631                                 foreach(lc, RelationGetIndexList(heapRelation))
2632                                 {
2633                                         Oid                     cellOid = lfirst_oid(lc);
2634                                         Relation        indexRelation = index_open(cellOid,
2635                                                                                                                    ShareUpdateExclusiveLock);
2636
2637                                         if (!indexRelation->rd_index->indisvalid)
2638                                                 ereport(WARNING,
2639                                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2640                                                                  errmsg("cannot reindex concurrently invalid index \"%s.%s\", skipping",
2641                                                                                 get_namespace_name(get_rel_namespace(cellOid)),
2642                                                                                 get_rel_name(cellOid))));
2643                                         else if (indexRelation->rd_index->indisexclusion)
2644                                                 ereport(WARNING,
2645                                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2646                                                                  errmsg("cannot reindex concurrently exclusion constraint index \"%s.%s\", skipping",
2647                                                                                 get_namespace_name(get_rel_namespace(cellOid)),
2648                                                                                 get_rel_name(cellOid))));
2649                                         else
2650                                         {
2651                                                 /* Save the list of relation OIDs in private context */
2652                                                 oldcontext = MemoryContextSwitchTo(private_context);
2653
2654                                                 indexIds = lappend_oid(indexIds, cellOid);
2655
2656                                                 MemoryContextSwitchTo(oldcontext);
2657                                         }
2658
2659                                         index_close(indexRelation, NoLock);
2660                                 }
2661
2662                                 /* Also add the toast indexes */
2663                                 if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
2664                                 {
2665                                         Oid                     toastOid = heapRelation->rd_rel->reltoastrelid;
2666                                         Relation        toastRelation = table_open(toastOid,
2667                                                                                                                    ShareUpdateExclusiveLock);
2668
2669                                         /* Save the list of relation OIDs in private context */
2670                                         oldcontext = MemoryContextSwitchTo(private_context);
2671
2672                                         /* Track this relation for session locks */
2673                                         heapRelationIds = lappend_oid(heapRelationIds, toastOid);
2674
2675                                         MemoryContextSwitchTo(oldcontext);
2676
2677                                         foreach(lc2, RelationGetIndexList(toastRelation))
2678                                         {
2679                                                 Oid                     cellOid = lfirst_oid(lc2);
2680                                                 Relation        indexRelation = index_open(cellOid,
2681                                                                                                                            ShareUpdateExclusiveLock);
2682
2683                                                 if (!indexRelation->rd_index->indisvalid)
2684                                                         ereport(WARNING,
2685                                                                         (errcode(ERRCODE_INDEX_CORRUPTED),
2686                                                                          errmsg("cannot reindex concurrently invalid index \"%s.%s\", skipping",
2687                                                                                         get_namespace_name(get_rel_namespace(cellOid)),
2688                                                                                         get_rel_name(cellOid))));
2689                                                 else
2690                                                 {
2691                                                         /*
2692                                                          * Save the list of relation OIDs in private
2693                                                          * context
2694                                                          */
2695                                                         oldcontext = MemoryContextSwitchTo(private_context);
2696
2697                                                         indexIds = lappend_oid(indexIds, cellOid);
2698
2699                                                         MemoryContextSwitchTo(oldcontext);
2700                                                 }
2701
2702                                                 index_close(indexRelation, NoLock);
2703                                         }
2704
2705                                         table_close(toastRelation, NoLock);
2706                                 }
2707
2708                                 table_close(heapRelation, NoLock);
2709                                 break;
2710                         }
2711                 case RELKIND_INDEX:
2712                         {
2713                                 /*
2714                                  * For an index simply add its Oid to list. Invalid indexes
2715                                  * cannot be included in list.
2716                                  */
2717                                 Relation        indexRelation = index_open(relationOid, ShareUpdateExclusiveLock);
2718                                 Oid                     heapId = IndexGetRelation(relationOid, false);
2719
2720                                 /* A shared relation cannot be reindexed concurrently */
2721                                 if (IsSharedRelation(heapId))
2722                                         ereport(ERROR,
2723                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2724                                                          errmsg("concurrent reindex is not supported for shared relations")));
2725
2726                                 /* A system catalog cannot be reindexed concurrently */
2727                                 if (IsSystemNamespace(get_rel_namespace(heapId)))
2728                                         ereport(ERROR,
2729                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2730                                                          errmsg("concurrent reindex is not supported for catalog relations")));
2731
2732                                 /* Save the list of relation OIDs in private context */
2733                                 oldcontext = MemoryContextSwitchTo(private_context);
2734
2735                                 /* Track the heap relation of this index for session locks */
2736                                 heapRelationIds = list_make1_oid(heapId);
2737
2738                                 MemoryContextSwitchTo(oldcontext);
2739
2740                                 if (!indexRelation->rd_index->indisvalid)
2741                                         ereport(WARNING,
2742                                                         (errcode(ERRCODE_INDEX_CORRUPTED),
2743                                                          errmsg("cannot reindex concurrently invalid index \"%s.%s\", skipping",
2744                                                                         get_namespace_name(get_rel_namespace(relationOid)),
2745                                                                         get_rel_name(relationOid))));
2746                                 else
2747                                 {
2748                                         /* Save the list of relation OIDs in private context */
2749                                         oldcontext = MemoryContextSwitchTo(private_context);
2750
2751                                         indexIds = lappend_oid(indexIds, relationOid);
2752
2753                                         MemoryContextSwitchTo(oldcontext);
2754                                 }
2755
2756                                 index_close(indexRelation, NoLock);
2757                                 break;
2758                         }
2759                 case RELKIND_PARTITIONED_TABLE:
2760                         /* see reindex_relation() */
2761                         ereport(WARNING,
2762                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2763                                          errmsg("REINDEX of partitioned tables is not yet implemented, skipping \"%s\"",
2764                                                         get_rel_name(relationOid))));
2765                         return false;
2766                 default:
2767                         /* Return error if type of relation is not supported */
2768                         ereport(ERROR,
2769                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2770                                          errmsg("cannot reindex concurrently this type of relation")));
2771                         break;
2772         }
2773
2774         /* Definitely no indexes, so leave */
2775         if (indexIds == NIL)
2776         {
2777                 PopActiveSnapshot();
2778                 return false;
2779         }
2780
2781         Assert(heapRelationIds != NIL);
2782
2783         /*-----
2784          * Now we have all the indexes we want to process in indexIds.
2785          *
2786          * The phases now are:
2787          *
2788          * 1. create new indexes in the catalog
2789          * 2. build new indexes
2790          * 3. let new indexes catch up with tuples inserted in the meantime
2791          * 4. swap index names
2792          * 5. mark old indexes as dead
2793          * 6. drop old indexes
2794          *
2795          * We process each phase for all indexes before moving to the next phase,
2796          * for efficiency.
2797          */
2798
2799         /*
2800          * Phase 1 of REINDEX CONCURRENTLY
2801          *
2802          * Create a new index with the same properties as the old one, but it is
2803          * only registered in catalogs and will be built later.  Then get session
2804          * locks on all involved tables.  See analogous code in DefineIndex() for
2805          * more detailed comments.
2806          */
2807
2808         foreach(lc, indexIds)
2809         {
2810                 char       *concurrentName;
2811                 Oid                     indexId = lfirst_oid(lc);
2812                 Oid                     newIndexId;
2813                 Relation        indexRel;
2814                 Relation        heapRel;
2815                 Relation        newIndexRel;
2816                 LockRelId  *lockrelid;
2817
2818                 indexRel = index_open(indexId, ShareUpdateExclusiveLock);
2819                 heapRel = table_open(indexRel->rd_index->indrelid,
2820                                                          ShareUpdateExclusiveLock);
2821
2822                 /* Choose a temporary relation name for the new index */
2823                 concurrentName = ChooseRelationName(get_rel_name(indexId),
2824                                                                                         NULL,
2825                                                                                         "ccnew",
2826                                                                                         get_rel_namespace(indexRel->rd_index->indrelid),
2827                                                                                         false);
2828
2829                 /* Create new index definition based on given index */
2830                 newIndexId = index_concurrently_create_copy(heapRel,
2831                                                                                                         indexId,
2832                                                                                                         concurrentName);
2833
2834                 /* Now open the relation of the new index, a lock is also needed on it */
2835                 newIndexRel = index_open(indexId, ShareUpdateExclusiveLock);
2836
2837                 /*
2838                  * Save the list of OIDs and locks in private context
2839                  */
2840                 oldcontext = MemoryContextSwitchTo(private_context);
2841
2842                 newIndexIds = lappend_oid(newIndexIds, newIndexId);
2843
2844                 /*
2845                  * Save lockrelid to protect each relation from drop then close
2846                  * relations. The lockrelid on parent relation is not taken here to
2847                  * avoid multiple locks taken on the same relation, instead we rely on
2848                  * parentRelationIds built earlier.
2849                  */
2850                 lockrelid = palloc(sizeof(*lockrelid));
2851                 *lockrelid = indexRel->rd_lockInfo.lockRelId;
2852                 relationLocks = lappend(relationLocks, lockrelid);
2853                 lockrelid = palloc(sizeof(*lockrelid));
2854                 *lockrelid = newIndexRel->rd_lockInfo.lockRelId;
2855                 relationLocks = lappend(relationLocks, lockrelid);
2856
2857                 MemoryContextSwitchTo(oldcontext);
2858
2859                 index_close(indexRel, NoLock);
2860                 index_close(newIndexRel, NoLock);
2861                 table_close(heapRel, NoLock);
2862         }
2863
2864         /*
2865          * Save the heap lock for following visibility checks with other backends
2866          * might conflict with this session.
2867          */
2868         foreach(lc, heapRelationIds)
2869         {
2870                 Relation        heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
2871                 LockRelId  *lockrelid;
2872                 LOCKTAG    *heaplocktag;
2873
2874                 /* Save the list of locks in private context */
2875                 oldcontext = MemoryContextSwitchTo(private_context);
2876
2877                 /* Add lockrelid of heap relation to the list of locked relations */
2878                 lockrelid = palloc(sizeof(*lockrelid));
2879                 *lockrelid = heapRelation->rd_lockInfo.lockRelId;
2880                 relationLocks = lappend(relationLocks, lockrelid);
2881
2882                 heaplocktag = (LOCKTAG *) palloc(sizeof(LOCKTAG));
2883
2884                 /* Save the LOCKTAG for this parent relation for the wait phase */
2885                 SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
2886                 lockTags = lappend(lockTags, heaplocktag);
2887
2888                 MemoryContextSwitchTo(oldcontext);
2889
2890                 /* Close heap relation */
2891                 table_close(heapRelation, NoLock);
2892         }
2893
2894         /* Get a session-level lock on each table. */
2895         foreach(lc, relationLocks)
2896         {
2897                 LockRelId   *lockrelid = (LockRelId *) lfirst(lc);
2898
2899                 LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
2900         }
2901
2902         PopActiveSnapshot();
2903         CommitTransactionCommand();
2904         StartTransactionCommand();
2905
2906         /*
2907          * Phase 2 of REINDEX CONCURRENTLY
2908          *
2909          * Build the new indexes in a separate transaction for each index to avoid
2910          * having open transactions for an unnecessary long time.  But before
2911          * doing that, wait until no running transactions could have the table of
2912          * the index open with the old list of indexes.  See "phase 2" in
2913          * DefineIndex() for more details.
2914          */
2915
2916         WaitForLockersMultiple(lockTags, ShareLock);
2917         CommitTransactionCommand();
2918
2919         forboth(lc, indexIds, lc2, newIndexIds)
2920         {
2921                 Relation        indexRel;
2922                 Oid                     oldIndexId = lfirst_oid(lc);
2923                 Oid                     newIndexId = lfirst_oid(lc2);
2924                 Oid                     heapId;
2925
2926                 CHECK_FOR_INTERRUPTS();
2927
2928                 /* Start new transaction for this index's concurrent build */
2929                 StartTransactionCommand();
2930
2931                 /* Set ActiveSnapshot since functions in the indexes may need it */
2932                 PushActiveSnapshot(GetTransactionSnapshot());
2933
2934                 /*
2935                  * Index relation has been closed by previous commit, so reopen it to
2936                  * get its information.
2937                  */
2938                 indexRel = index_open(oldIndexId, ShareUpdateExclusiveLock);
2939                 heapId = indexRel->rd_index->indrelid;
2940                 index_close(indexRel, NoLock);
2941
2942                 /* Perform concurrent build of new index */
2943                 index_concurrently_build(heapId, newIndexId);
2944
2945                 PopActiveSnapshot();
2946                 CommitTransactionCommand();
2947         }
2948         StartTransactionCommand();
2949
2950         /*
2951          * Phase 3 of REINDEX CONCURRENTLY
2952          *
2953          * During this phase the old indexes catch up with any new tuples that
2954          * were created during the previous phase.  See "phase 3" in DefineIndex()
2955          * for more details.
2956          */
2957
2958         WaitForLockersMultiple(lockTags, ShareLock);
2959         CommitTransactionCommand();
2960
2961         foreach(lc, newIndexIds)
2962         {
2963                 Oid                     newIndexId = lfirst_oid(lc);
2964                 Oid                     heapId;
2965                 TransactionId limitXmin;
2966                 Snapshot        snapshot;
2967
2968                 CHECK_FOR_INTERRUPTS();
2969
2970                 StartTransactionCommand();
2971
2972                 heapId = IndexGetRelation(newIndexId, false);
2973
2974                 /*
2975                  * Take the "reference snapshot" that will be used by validate_index()
2976                  * to filter candidate tuples.
2977                  */
2978                 snapshot = RegisterSnapshot(GetTransactionSnapshot());
2979                 PushActiveSnapshot(snapshot);
2980
2981                 validate_index(heapId, newIndexId, snapshot);
2982
2983                 /*
2984                  * We can now do away with our active snapshot, we still need to save
2985                  * the xmin limit to wait for older snapshots.
2986                  */
2987                 limitXmin = snapshot->xmin;
2988
2989                 PopActiveSnapshot();
2990                 UnregisterSnapshot(snapshot);
2991
2992                 /*
2993                  * To ensure no deadlocks, we must commit and start yet another
2994                  * transaction, and do our wait before any snapshot has been taken in
2995                  * it.
2996                  */
2997                 CommitTransactionCommand();
2998                 StartTransactionCommand();
2999
3000                 /*
3001                  * The index is now valid in the sense that it contains all currently
3002                  * interesting tuples.  But since it might not contain tuples deleted just
3003                  * before the reference snap was taken, we have to wait out any
3004                  * transactions that might have older snapshots.
3005                  */
3006                 WaitForOlderSnapshots(limitXmin);
3007
3008                 CommitTransactionCommand();
3009         }
3010
3011         /*
3012          * Phase 4 of REINDEX CONCURRENTLY
3013          *
3014          * Now that the new indexes have been validated, swap each new index with
3015          * its corresponding old index.
3016          *
3017          * We mark the new indexes as valid and the old indexes as not valid at
3018          * the same time to make sure we only get constraint violations from the
3019          * indexes with the correct names.
3020          */
3021
3022         StartTransactionCommand();
3023
3024         forboth(lc, indexIds, lc2, newIndexIds)
3025         {
3026                 char       *oldName;
3027                 Oid                     oldIndexId = lfirst_oid(lc);
3028                 Oid                     newIndexId = lfirst_oid(lc2);
3029                 Oid                     heapId;
3030
3031                 CHECK_FOR_INTERRUPTS();
3032
3033                 heapId = IndexGetRelation(oldIndexId, false);
3034
3035                 /* Choose a relation name for old index */
3036                 oldName = ChooseRelationName(get_rel_name(oldIndexId),
3037                                                                          NULL,
3038                                                                          "ccold",
3039                                                                          get_rel_namespace(heapId),
3040                                                                          false);
3041
3042                 /*
3043                  * Swap old index with the new one.  This also marks the new one as
3044                  * valid and the old one as not valid.
3045                  */
3046                 index_concurrently_swap(newIndexId, oldIndexId, oldName);
3047
3048                 /*
3049                  * Invalidate the relcache for the table, so that after this commit
3050                  * all sessions will refresh any cached plans that might reference the
3051                  * index.
3052                  */
3053                 CacheInvalidateRelcacheByRelid(heapId);
3054
3055                 /*
3056                  * CCI here so that subsequent iterations see the oldName in the
3057                  * catalog and can choose a nonconflicting name for their oldName.
3058                  * Otherwise, this could lead to conflicts if a table has two indexes
3059                  * whose names are equal for the first NAMEDATALEN-minus-a-few
3060                  * characters.
3061                  */
3062                 CommandCounterIncrement();
3063         }
3064
3065         /* Commit this transaction and make index swaps visible */
3066         CommitTransactionCommand();
3067         StartTransactionCommand();
3068
3069         /*
3070          * Phase 5 of REINDEX CONCURRENTLY
3071          *
3072          * Mark the old indexes as dead.  First we must wait until no running
3073          * transaction could be using the index for a query.  See also
3074          * index_drop() for more details.
3075          */
3076
3077         WaitForLockersMultiple(lockTags, AccessExclusiveLock);
3078
3079         foreach(lc, indexIds)
3080         {
3081                 Oid                     oldIndexId = lfirst_oid(lc);
3082                 Oid                     heapId;
3083
3084                 CHECK_FOR_INTERRUPTS();
3085                 heapId = IndexGetRelation(oldIndexId, false);
3086                 index_concurrently_set_dead(heapId, oldIndexId);
3087         }
3088
3089         /* Commit this transaction to make the updates visible. */
3090         CommitTransactionCommand();
3091         StartTransactionCommand();
3092
3093         /*
3094          * Phase 6 of REINDEX CONCURRENTLY
3095          *
3096          * Drop the old indexes.
3097          */
3098
3099         WaitForLockersMultiple(lockTags, AccessExclusiveLock);
3100
3101         PushActiveSnapshot(GetTransactionSnapshot());
3102
3103         {
3104                 ObjectAddresses *objects = new_object_addresses();
3105
3106                 foreach(lc, indexIds)
3107                 {
3108                         Oid                     oldIndexId = lfirst_oid(lc);
3109                         ObjectAddress object;
3110
3111                         object.classId = RelationRelationId;
3112                         object.objectId = oldIndexId;
3113                         object.objectSubId = 0;
3114
3115                         add_exact_object_address(&object, objects);
3116                 }
3117
3118                 /*
3119                  * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
3120                  * right lock level.
3121                  */
3122                 performMultipleDeletions(objects, DROP_RESTRICT,
3123                                                                  PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
3124         }
3125
3126         PopActiveSnapshot();
3127         CommitTransactionCommand();
3128
3129         /*
3130          * Finally, release the session-level lock on the table.
3131          */
3132         foreach(lc, relationLocks)
3133         {
3134                 LockRelId   *lockrelid = (LockRelId *) lfirst(lc);
3135
3136                 UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
3137         }
3138
3139         /* Start a new transaction to finish process properly */
3140         StartTransactionCommand();
3141
3142         /* Log what we did */
3143         if (options & REINDEXOPT_VERBOSE)
3144         {
3145                 if (relkind == RELKIND_INDEX)
3146                         ereport(INFO,
3147                                         (errmsg("index \"%s.%s\" was reindexed",
3148                                                         relationNamespace, relationName),
3149                                          errdetail("%s.",
3150                                                            pg_rusage_show(&ru0))));
3151                 else
3152                 {
3153                         foreach(lc, newIndexIds)
3154                         {
3155                                 Oid                     indOid = lfirst_oid(lc);
3156
3157                                 ereport(INFO,
3158                                                 (errmsg("index \"%s.%s\" was reindexed",
3159                                                                 get_namespace_name(get_rel_namespace(indOid)),
3160                                                                 get_rel_name(indOid))));
3161                                 /* Don't show rusage here, since it's not per index. */
3162                         }
3163
3164                         ereport(INFO,
3165                                         (errmsg("table \"%s.%s\" was reindexed",
3166                                                         relationNamespace, relationName),
3167                                          errdetail("%s.",
3168                                                            pg_rusage_show(&ru0))));
3169                 }
3170         }
3171
3172         MemoryContextDelete(private_context);
3173
3174         return true;
3175 }
3176
3177 /*
3178  *      ReindexPartitionedIndex
3179  *              Reindex each child of the given partitioned index.
3180  *
3181  * Not yet implemented.
3182  */
3183 static void
3184 ReindexPartitionedIndex(Relation parentIdx)
3185 {
3186         ereport(ERROR,
3187                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3188                          errmsg("REINDEX is not yet implemented for partitioned indexes")));
3189 }
3190
3191 /*
3192  * Insert or delete an appropriate pg_inherits tuple to make the given index
3193  * be a partition of the indicated parent index.
3194  *
3195  * This also corrects the pg_depend information for the affected index.
3196  */
3197 void
3198 IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
3199 {
3200         Relation        pg_inherits;
3201         ScanKeyData key[2];
3202         SysScanDesc scan;
3203         Oid                     partRelid = RelationGetRelid(partitionIdx);
3204         HeapTuple       tuple;
3205         bool            fix_dependencies;
3206
3207         /* Make sure this is an index */
3208         Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
3209                    partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
3210
3211         /*
3212          * Scan pg_inherits for rows linking our index to some parent.
3213          */
3214         pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
3215         ScanKeyInit(&key[0],
3216                                 Anum_pg_inherits_inhrelid,
3217                                 BTEqualStrategyNumber, F_OIDEQ,
3218                                 ObjectIdGetDatum(partRelid));
3219         ScanKeyInit(&key[1],
3220                                 Anum_pg_inherits_inhseqno,
3221                                 BTEqualStrategyNumber, F_INT4EQ,
3222                                 Int32GetDatum(1));
3223         scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
3224                                                           NULL, 2, key);
3225         tuple = systable_getnext(scan);
3226
3227         if (!HeapTupleIsValid(tuple))
3228         {
3229                 if (parentOid == InvalidOid)
3230                 {
3231                         /*
3232                          * No pg_inherits row, and no parent wanted: nothing to do in this
3233                          * case.
3234                          */
3235                         fix_dependencies = false;
3236                 }
3237                 else
3238                 {
3239                         Datum           values[Natts_pg_inherits];
3240                         bool            isnull[Natts_pg_inherits];
3241
3242                         /*
3243                          * No pg_inherits row exists, and we want a parent for this index,
3244                          * so insert it.
3245                          */
3246                         values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(partRelid);
3247                         values[Anum_pg_inherits_inhparent - 1] =
3248                                 ObjectIdGetDatum(parentOid);
3249                         values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(1);
3250                         memset(isnull, false, sizeof(isnull));
3251
3252                         tuple = heap_form_tuple(RelationGetDescr(pg_inherits),
3253                                                                         values, isnull);
3254                         CatalogTupleInsert(pg_inherits, tuple);
3255
3256                         fix_dependencies = true;
3257                 }
3258         }
3259         else
3260         {
3261                 Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
3262
3263                 if (parentOid == InvalidOid)
3264                 {
3265                         /*
3266                          * There exists a pg_inherits row, which we want to clear; do so.
3267                          */
3268                         CatalogTupleDelete(pg_inherits, &tuple->t_self);
3269                         fix_dependencies = true;
3270                 }
3271                 else
3272                 {
3273                         /*
3274                          * A pg_inherits row exists.  If it's the same we want, then we're
3275                          * good; if it differs, that amounts to a corrupt catalog and
3276                          * should not happen.
3277                          */
3278                         if (inhForm->inhparent != parentOid)
3279                         {
3280                                 /* unexpected: we should not get called in this case */
3281                                 elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
3282                                          inhForm->inhrelid, inhForm->inhparent);
3283                         }
3284
3285                         /* already in the right state */
3286                         fix_dependencies = false;
3287                 }
3288         }
3289
3290         /* done with pg_inherits */
3291         systable_endscan(scan);
3292         relation_close(pg_inherits, RowExclusiveLock);
3293
3294         /* set relhassubclass if an index partition has been added to the parent */
3295         if (OidIsValid(parentOid))
3296                 SetRelationHasSubclass(parentOid, true);
3297
3298         if (fix_dependencies)
3299         {
3300                 /*
3301                  * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
3302                  * dependencies on the parent index and the table; if removing a
3303                  * parent, delete PARTITION dependencies.
3304                  */
3305                 if (OidIsValid(parentOid))
3306                 {
3307                         ObjectAddress partIdx;
3308                         ObjectAddress parentIdx;
3309                         ObjectAddress partitionTbl;
3310
3311                         ObjectAddressSet(partIdx, RelationRelationId, partRelid);
3312                         ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
3313                         ObjectAddressSet(partitionTbl, RelationRelationId,
3314                                                          partitionIdx->rd_index->indrelid);
3315                         recordDependencyOn(&partIdx, &parentIdx,
3316                                                            DEPENDENCY_PARTITION_PRI);
3317                         recordDependencyOn(&partIdx, &partitionTbl,
3318                                                            DEPENDENCY_PARTITION_SEC);
3319                 }
3320                 else
3321                 {
3322                         deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3323                                                                                         RelationRelationId,
3324                                                                                         DEPENDENCY_PARTITION_PRI);
3325                         deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3326                                                                                         RelationRelationId,
3327                                                                                         DEPENDENCY_PARTITION_SEC);
3328                 }
3329
3330                 /* make our updates visible */
3331                 CommandCounterIncrement();
3332         }
3333 }