]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Fix typos in comments.
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - initialize shared-catalog entries
19  *              RelationCacheInitializePhase3   - finish initializing relcache
20  *              RelationIdGetRelation                   - get a reldesc by relation id
21  *              RelationClose                                   - close an open relation
22  *
23  * NOTES
24  *              The following code contains many undocumented hacks.  Please be
25  *              careful....
26  */
27 #include "postgres.h"
28
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/nbtree.h"
36 #include "access/reloptions.h"
37 #include "access/sysattr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "catalog/catalog.h"
41 #include "catalog/index.h"
42 #include "catalog/indexing.h"
43 #include "catalog/namespace.h"
44 #include "catalog/partition.h"
45 #include "catalog/pg_am.h"
46 #include "catalog/pg_amproc.h"
47 #include "catalog/pg_attrdef.h"
48 #include "catalog/pg_authid.h"
49 #include "catalog/pg_auth_members.h"
50 #include "catalog/pg_constraint.h"
51 #include "catalog/pg_database.h"
52 #include "catalog/pg_namespace.h"
53 #include "catalog/pg_opclass.h"
54 #include "catalog/pg_partitioned_table.h"
55 #include "catalog/pg_proc.h"
56 #include "catalog/pg_publication.h"
57 #include "catalog/pg_rewrite.h"
58 #include "catalog/pg_shseclabel.h"
59 #include "catalog/pg_subscription.h"
60 #include "catalog/pg_tablespace.h"
61 #include "catalog/pg_trigger.h"
62 #include "catalog/pg_type.h"
63 #include "catalog/schemapg.h"
64 #include "catalog/storage.h"
65 #include "commands/policy.h"
66 #include "commands/trigger.h"
67 #include "miscadmin.h"
68 #include "nodes/nodeFuncs.h"
69 #include "optimizer/clauses.h"
70 #include "optimizer/prep.h"
71 #include "optimizer/var.h"
72 #include "rewrite/rewriteDefine.h"
73 #include "rewrite/rowsecurity.h"
74 #include "storage/lmgr.h"
75 #include "storage/smgr.h"
76 #include "utils/array.h"
77 #include "utils/builtins.h"
78 #include "utils/fmgroids.h"
79 #include "utils/inval.h"
80 #include "utils/lsyscache.h"
81 #include "utils/memutils.h"
82 #include "utils/relmapper.h"
83 #include "utils/resowner_private.h"
84 #include "utils/snapmgr.h"
85 #include "utils/syscache.h"
86 #include "utils/tqual.h"
87
88
89 /*
90  *              name of relcache init file(s), used to speed up backend startup
91  */
92 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
93
94 #define RELCACHE_INIT_FILEMAGIC         0x573266        /* version ID value */
95
96 /*
97  *              hardcoded tuple descriptors, contents generated by genbki.pl
98  */
99 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
100 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
101 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
102 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
103 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
104 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
105 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
106 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
107 static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
108 static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
109
110 /*
111  *              Hash tables that index the relation cache
112  *
113  *              We used to index the cache by both name and OID, but now there
114  *              is only an index by OID.
115  */
116 typedef struct relidcacheent
117 {
118         Oid                     reloid;
119         Relation        reldesc;
120 } RelIdCacheEnt;
121
122 static HTAB *RelationIdCache;
123
124 /*
125  * This flag is false until we have prepared the critical relcache entries
126  * that are needed to do indexscans on the tables read by relcache building.
127  */
128 bool            criticalRelcachesBuilt = false;
129
130 /*
131  * This flag is false until we have prepared the critical relcache entries
132  * for shared catalogs (which are the tables needed for login).
133  */
134 bool            criticalSharedRelcachesBuilt = false;
135
136 /*
137  * This counter counts relcache inval events received since backend startup
138  * (but only for rels that are actually in cache).  Presently, we use it only
139  * to detect whether data about to be written by write_relcache_init_file()
140  * might already be obsolete.
141  */
142 static long relcacheInvalsReceived = 0L;
143
144 /*
145  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
146  * cleanup work.  This list intentionally has limited size; if it overflows,
147  * we fall back to scanning the whole hashtable.  There is no value in a very
148  * large list because (1) at some point, a hash_seq_search scan is faster than
149  * retail lookups, and (2) the value of this is to reduce EOXact work for
150  * short transactions, which can't have dirtied all that many tables anyway.
151  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
152  * cleanup processing must be idempotent.
153  */
154 #define MAX_EOXACT_LIST 32
155 static Oid      eoxact_list[MAX_EOXACT_LIST];
156 static int      eoxact_list_len = 0;
157 static bool eoxact_list_overflowed = false;
158
159 #define EOXactListAdd(rel) \
160         do { \
161                 if (eoxact_list_len < MAX_EOXACT_LIST) \
162                         eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
163                 else \
164                         eoxact_list_overflowed = true; \
165         } while (0)
166
167 /*
168  * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
169  * cleanup work.  The array expands as needed; there is no hashtable because
170  * we don't need to access individual items except at EOXact.
171  */
172 static TupleDesc *EOXactTupleDescArray;
173 static int      NextEOXactTupleDescNum = 0;
174 static int      EOXactTupleDescArrayLen = 0;
175
176 /*
177  *              macros to manipulate the lookup hashtable
178  */
179 #define RelationCacheInsert(RELATION, replace_allowed)  \
180 do { \
181         RelIdCacheEnt *hentry; bool found; \
182         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
183                                                                                    (void *) &((RELATION)->rd_id), \
184                                                                                    HASH_ENTER, &found); \
185         if (found) \
186         { \
187                 /* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
188                 Relation _old_rel = hentry->reldesc; \
189                 Assert(replace_allowed); \
190                 hentry->reldesc = (RELATION); \
191                 if (RelationHasReferenceCountZero(_old_rel)) \
192                         RelationDestroyRelation(_old_rel, false); \
193                 else if (!IsBootstrapProcessingMode()) \
194                         elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
195                                  RelationGetRelationName(_old_rel)); \
196         } \
197         else \
198                 hentry->reldesc = (RELATION); \
199 } while(0)
200
201 #define RelationIdCacheLookup(ID, RELATION) \
202 do { \
203         RelIdCacheEnt *hentry; \
204         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
205                                                                                    (void *) &(ID), \
206                                                                                    HASH_FIND, NULL); \
207         if (hentry) \
208                 RELATION = hentry->reldesc; \
209         else \
210                 RELATION = NULL; \
211 } while(0)
212
213 #define RelationCacheDelete(RELATION) \
214 do { \
215         RelIdCacheEnt *hentry; \
216         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
217                                                                                    (void *) &((RELATION)->rd_id), \
218                                                                                    HASH_REMOVE, NULL); \
219         if (hentry == NULL) \
220                 elog(WARNING, "failed to delete relcache entry for OID %u", \
221                          (RELATION)->rd_id); \
222 } while(0)
223
224
225 /*
226  * Special cache for opclass-related information
227  *
228  * Note: only default support procs get cached, ie, those with
229  * lefttype = righttype = opcintype.
230  */
231 typedef struct opclasscacheent
232 {
233         Oid                     opclassoid;             /* lookup key: OID of opclass */
234         bool            valid;                  /* set TRUE after successful fill-in */
235         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
236         Oid                     opcfamily;              /* OID of opclass's family */
237         Oid                     opcintype;              /* OID of opclass's declared input type */
238         RegProcedure *supportProcs; /* OIDs of support procedures */
239 } OpClassCacheEnt;
240
241 static HTAB *OpClassCache = NULL;
242
243
244 /* non-export function prototypes */
245
246 static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
247 static void RelationClearRelation(Relation relation, bool rebuild);
248
249 static void RelationReloadIndexInfo(Relation relation);
250 static void RelationFlushRelation(Relation relation);
251 static void RememberToFreeTupleDescAtEOX(TupleDesc td);
252 static void AtEOXact_cleanup(Relation relation, bool isCommit);
253 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
254                                         SubTransactionId mySubid, SubTransactionId parentSubid);
255 static bool load_relcache_init_file(bool shared);
256 static void write_relcache_init_file(bool shared);
257 static void write_item(const void *data, Size len, FILE *fp);
258
259 static void formrdesc(const char *relationName, Oid relationReltype,
260                   bool isshared, bool hasoids,
261                   int natts, const FormData_pg_attribute *attrs);
262
263 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
264 static Relation AllocateRelationDesc(Form_pg_class relp);
265 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
266 static void RelationBuildTupleDesc(Relation relation);
267 static void RelationBuildPartitionKey(Relation relation);
268 static PartitionKey copy_partition_key(PartitionKey fromkey);
269 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
270 static void RelationInitPhysicalAddr(Relation relation);
271 static void load_critical_index(Oid indexoid, Oid heapoid);
272 static TupleDesc GetPgClassDescriptor(void);
273 static TupleDesc GetPgIndexDescriptor(void);
274 static void AttrDefaultFetch(Relation relation);
275 static void CheckConstraintFetch(Relation relation);
276 static int      CheckConstraintCmp(const void *a, const void *b);
277 static List *insert_ordered_oid(List *list, Oid datum);
278 static void InitIndexAmRoutine(Relation relation);
279 static void IndexSupportInitialize(oidvector *indclass,
280                                            RegProcedure *indexSupport,
281                                            Oid *opFamily,
282                                            Oid *opcInType,
283                                            StrategyNumber maxSupportNumber,
284                                            AttrNumber maxAttributeNumber);
285 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
286                                   StrategyNumber numSupport);
287 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
288 static void unlink_initfile(const char *initfilename);
289 static bool equalPartitionDescs(PartitionKey key, PartitionDesc partdesc1,
290                                         PartitionDesc partdesc2);
291
292
293 /*
294  *              ScanPgRelation
295  *
296  *              This is used by RelationBuildDesc to find a pg_class
297  *              tuple matching targetRelId.  The caller must hold at least
298  *              AccessShareLock on the target relid to prevent concurrent-update
299  *              scenarios; it isn't guaranteed that all scans used to build the
300  *              relcache entry will use the same snapshot.  If, for example,
301  *              an attribute were to be added after scanning pg_class and before
302  *              scanning pg_attribute, relnatts wouldn't match.
303  *
304  *              NB: the returned tuple has been copied into palloc'd storage
305  *              and must eventually be freed with heap_freetuple.
306  */
307 static HeapTuple
308 ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
309 {
310         HeapTuple       pg_class_tuple;
311         Relation        pg_class_desc;
312         SysScanDesc pg_class_scan;
313         ScanKeyData key[1];
314         Snapshot        snapshot;
315
316         /*
317          * If something goes wrong during backend startup, we might find ourselves
318          * trying to read pg_class before we've selected a database.  That ain't
319          * gonna work, so bail out with a useful error message.  If this happens,
320          * it probably means a relcache entry that needs to be nailed isn't.
321          */
322         if (!OidIsValid(MyDatabaseId))
323                 elog(FATAL, "cannot read pg_class without having selected a database");
324
325         /*
326          * form a scan key
327          */
328         ScanKeyInit(&key[0],
329                                 ObjectIdAttributeNumber,
330                                 BTEqualStrategyNumber, F_OIDEQ,
331                                 ObjectIdGetDatum(targetRelId));
332
333         /*
334          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
335          * built the critical relcache entries (this includes initdb and startup
336          * without a pg_internal.init file).  The caller can also force a heap
337          * scan by setting indexOK == false.
338          */
339         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
340
341         /*
342          * The caller might need a tuple that's newer than the one the historic
343          * snapshot; currently the only case requiring to do so is looking up the
344          * relfilenode of non mapped system relations during decoding.
345          */
346         if (force_non_historic)
347                 snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
348         else
349                 snapshot = GetCatalogSnapshot(RelationRelationId);
350
351         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
352                                                                            indexOK && criticalRelcachesBuilt,
353                                                                            snapshot,
354                                                                            1, key);
355
356         pg_class_tuple = systable_getnext(pg_class_scan);
357
358         /*
359          * Must copy tuple before releasing buffer.
360          */
361         if (HeapTupleIsValid(pg_class_tuple))
362                 pg_class_tuple = heap_copytuple(pg_class_tuple);
363
364         /* all done */
365         systable_endscan(pg_class_scan);
366         heap_close(pg_class_desc, AccessShareLock);
367
368         return pg_class_tuple;
369 }
370
371 /*
372  *              AllocateRelationDesc
373  *
374  *              This is used to allocate memory for a new relation descriptor
375  *              and initialize the rd_rel field from the given pg_class tuple.
376  */
377 static Relation
378 AllocateRelationDesc(Form_pg_class relp)
379 {
380         Relation        relation;
381         MemoryContext oldcxt;
382         Form_pg_class relationForm;
383
384         /* Relcache entries must live in CacheMemoryContext */
385         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
386
387         /*
388          * allocate and zero space for new relation descriptor
389          */
390         relation = (Relation) palloc0(sizeof(RelationData));
391
392         /* make sure relation is marked as having no open file yet */
393         relation->rd_smgr = NULL;
394
395         /*
396          * Copy the relation tuple form
397          *
398          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
399          * variable-length fields (relacl, reloptions) are NOT stored in the
400          * relcache --- there'd be little point in it, since we don't copy the
401          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
402          * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
403          * it from the syscache if you need it.  The same goes for the original
404          * form of reloptions (however, we do store the parsed form of reloptions
405          * in rd_options).
406          */
407         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
408
409         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
410
411         /* initialize relation tuple form */
412         relation->rd_rel = relationForm;
413
414         /* and allocate attribute tuple form storage */
415         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
416                                                                                            relationForm->relhasoids);
417         /* which we mark as a reference-counted tupdesc */
418         relation->rd_att->tdrefcount = 1;
419
420         MemoryContextSwitchTo(oldcxt);
421
422         return relation;
423 }
424
425 /*
426  * RelationParseRelOptions
427  *              Convert pg_class.reloptions into pre-parsed rd_options
428  *
429  * tuple is the real pg_class tuple (not rd_rel!) for relation
430  *
431  * Note: rd_rel and (if an index) rd_amroutine must be valid already
432  */
433 static void
434 RelationParseRelOptions(Relation relation, HeapTuple tuple)
435 {
436         bytea      *options;
437
438         relation->rd_options = NULL;
439
440         /* Fall out if relkind should not have options */
441         switch (relation->rd_rel->relkind)
442         {
443                 case RELKIND_RELATION:
444                 case RELKIND_TOASTVALUE:
445                 case RELKIND_INDEX:
446                 case RELKIND_VIEW:
447                 case RELKIND_MATVIEW:
448                 case RELKIND_PARTITIONED_TABLE:
449                         break;
450                 default:
451                         return;
452         }
453
454         /*
455          * Fetch reloptions from tuple; have to use a hardwired descriptor because
456          * we might not have any other for pg_class yet (consider executing this
457          * code for pg_class itself)
458          */
459         options = extractRelOptions(tuple,
460                                                                 GetPgClassDescriptor(),
461                                                                 relation->rd_rel->relkind == RELKIND_INDEX ?
462                                                                 relation->rd_amroutine->amoptions : NULL);
463
464         /*
465          * Copy parsed data into CacheMemoryContext.  To guard against the
466          * possibility of leaks in the reloptions code, we want to do the actual
467          * parsing in the caller's memory context and copy the results into
468          * CacheMemoryContext after the fact.
469          */
470         if (options)
471         {
472                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
473                                                                                                   VARSIZE(options));
474                 memcpy(relation->rd_options, options, VARSIZE(options));
475                 pfree(options);
476         }
477 }
478
479 /*
480  *              RelationBuildTupleDesc
481  *
482  *              Form the relation's tuple descriptor from information in
483  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
484  */
485 static void
486 RelationBuildTupleDesc(Relation relation)
487 {
488         HeapTuple       pg_attribute_tuple;
489         Relation        pg_attribute_desc;
490         SysScanDesc pg_attribute_scan;
491         ScanKeyData skey[2];
492         int                     need;
493         TupleConstr *constr;
494         AttrDefault *attrdef = NULL;
495         int                     ndef = 0;
496
497         /* copy some fields from pg_class row to rd_att */
498         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
499         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
500         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
501
502         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
503                                                                                                 sizeof(TupleConstr));
504         constr->has_not_null = false;
505
506         /*
507          * Form a scan key that selects only user attributes (attnum > 0).
508          * (Eliminating system attribute rows at the index level is lots faster
509          * than fetching them.)
510          */
511         ScanKeyInit(&skey[0],
512                                 Anum_pg_attribute_attrelid,
513                                 BTEqualStrategyNumber, F_OIDEQ,
514                                 ObjectIdGetDatum(RelationGetRelid(relation)));
515         ScanKeyInit(&skey[1],
516                                 Anum_pg_attribute_attnum,
517                                 BTGreaterStrategyNumber, F_INT2GT,
518                                 Int16GetDatum(0));
519
520         /*
521          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
522          * built the critical relcache entries (this includes initdb and startup
523          * without a pg_internal.init file).
524          */
525         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
526         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
527                                                                                    AttributeRelidNumIndexId,
528                                                                                    criticalRelcachesBuilt,
529                                                                                    NULL,
530                                                                                    2, skey);
531
532         /*
533          * add attribute data to relation->rd_att
534          */
535         need = relation->rd_rel->relnatts;
536
537         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
538         {
539                 Form_pg_attribute attp;
540
541                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
542
543                 if (attp->attnum <= 0 ||
544                         attp->attnum > relation->rd_rel->relnatts)
545                         elog(ERROR, "invalid attribute number %d for %s",
546                                  attp->attnum, RelationGetRelationName(relation));
547
548                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
549                            attp,
550                            ATTRIBUTE_FIXED_PART_SIZE);
551
552                 /* Update constraint/default info */
553                 if (attp->attnotnull)
554                         constr->has_not_null = true;
555
556                 if (attp->atthasdef)
557                 {
558                         if (attrdef == NULL)
559                                 attrdef = (AttrDefault *)
560                                         MemoryContextAllocZero(CacheMemoryContext,
561                                                                                    relation->rd_rel->relnatts *
562                                                                                    sizeof(AttrDefault));
563                         attrdef[ndef].adnum = attp->attnum;
564                         attrdef[ndef].adbin = NULL;
565                         ndef++;
566                 }
567                 need--;
568                 if (need == 0)
569                         break;
570         }
571
572         /*
573          * end the scan and close the attribute relation
574          */
575         systable_endscan(pg_attribute_scan);
576         heap_close(pg_attribute_desc, AccessShareLock);
577
578         if (need != 0)
579                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
580                          need, RelationGetRelid(relation));
581
582         /*
583          * The attcacheoff values we read from pg_attribute should all be -1
584          * ("unknown").  Verify this if assert checking is on.  They will be
585          * computed when and if needed during tuple access.
586          */
587 #ifdef USE_ASSERT_CHECKING
588         {
589                 int                     i;
590
591                 for (i = 0; i < relation->rd_rel->relnatts; i++)
592                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
593         }
594 #endif
595
596         /*
597          * However, we can easily set the attcacheoff value for the first
598          * attribute: it must be zero.  This eliminates the need for special cases
599          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
600          */
601         if (relation->rd_rel->relnatts > 0)
602                 relation->rd_att->attrs[0]->attcacheoff = 0;
603
604         /*
605          * Set up constraint/default info
606          */
607         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
608         {
609                 relation->rd_att->constr = constr;
610
611                 if (ndef > 0)                   /* DEFAULTs */
612                 {
613                         if (ndef < relation->rd_rel->relnatts)
614                                 constr->defval = (AttrDefault *)
615                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
616                         else
617                                 constr->defval = attrdef;
618                         constr->num_defval = ndef;
619                         AttrDefaultFetch(relation);
620                 }
621                 else
622                         constr->num_defval = 0;
623
624                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
625                 {
626                         constr->num_check = relation->rd_rel->relchecks;
627                         constr->check = (ConstrCheck *)
628                                 MemoryContextAllocZero(CacheMemoryContext,
629                                                                         constr->num_check * sizeof(ConstrCheck));
630                         CheckConstraintFetch(relation);
631                 }
632                 else
633                         constr->num_check = 0;
634         }
635         else
636         {
637                 pfree(constr);
638                 relation->rd_att->constr = NULL;
639         }
640 }
641
642 /*
643  *              RelationBuildRuleLock
644  *
645  *              Form the relation's rewrite rules from information in
646  *              the pg_rewrite system catalog.
647  *
648  * Note: The rule parsetrees are potentially very complex node structures.
649  * To allow these trees to be freed when the relcache entry is flushed,
650  * we make a private memory context to hold the RuleLock information for
651  * each relcache entry that has associated rules.  The context is used
652  * just for rule info, not for any other subsidiary data of the relcache
653  * entry, because that keeps the update logic in RelationClearRelation()
654  * manageable.  The other subsidiary data structures are simple enough
655  * to be easy to free explicitly, anyway.
656  */
657 static void
658 RelationBuildRuleLock(Relation relation)
659 {
660         MemoryContext rulescxt;
661         MemoryContext oldcxt;
662         HeapTuple       rewrite_tuple;
663         Relation        rewrite_desc;
664         TupleDesc       rewrite_tupdesc;
665         SysScanDesc rewrite_scan;
666         ScanKeyData key;
667         RuleLock   *rulelock;
668         int                     numlocks;
669         RewriteRule **rules;
670         int                     maxlocks;
671
672         /*
673          * Make the private context.  Assume it'll not contain much data.
674          */
675         rulescxt = AllocSetContextCreate(CacheMemoryContext,
676                                                                          RelationGetRelationName(relation),
677                                                                          ALLOCSET_SMALL_SIZES);
678         relation->rd_rulescxt = rulescxt;
679
680         /*
681          * allocate an array to hold the rewrite rules (the array is extended if
682          * necessary)
683          */
684         maxlocks = 4;
685         rules = (RewriteRule **)
686                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
687         numlocks = 0;
688
689         /*
690          * form a scan key
691          */
692         ScanKeyInit(&key,
693                                 Anum_pg_rewrite_ev_class,
694                                 BTEqualStrategyNumber, F_OIDEQ,
695                                 ObjectIdGetDatum(RelationGetRelid(relation)));
696
697         /*
698          * open pg_rewrite and begin a scan
699          *
700          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
701          * be reading the rules in name order, except possibly during
702          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
703          * ensures that rules will be fired in name order.
704          */
705         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
706         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
707         rewrite_scan = systable_beginscan(rewrite_desc,
708                                                                           RewriteRelRulenameIndexId,
709                                                                           true, NULL,
710                                                                           1, &key);
711
712         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
713         {
714                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
715                 bool            isnull;
716                 Datum           rule_datum;
717                 char       *rule_str;
718                 RewriteRule *rule;
719
720                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
721                                                                                                   sizeof(RewriteRule));
722
723                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
724
725                 rule->event = rewrite_form->ev_type - '0';
726                 rule->enabled = rewrite_form->ev_enabled;
727                 rule->isInstead = rewrite_form->is_instead;
728
729                 /*
730                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
731                  * rule strings are often large enough to be toasted.  To avoid
732                  * leaking memory in the caller's context, do the detoasting here so
733                  * we can free the detoasted version.
734                  */
735                 rule_datum = heap_getattr(rewrite_tuple,
736                                                                   Anum_pg_rewrite_ev_action,
737                                                                   rewrite_tupdesc,
738                                                                   &isnull);
739                 Assert(!isnull);
740                 rule_str = TextDatumGetCString(rule_datum);
741                 oldcxt = MemoryContextSwitchTo(rulescxt);
742                 rule->actions = (List *) stringToNode(rule_str);
743                 MemoryContextSwitchTo(oldcxt);
744                 pfree(rule_str);
745
746                 rule_datum = heap_getattr(rewrite_tuple,
747                                                                   Anum_pg_rewrite_ev_qual,
748                                                                   rewrite_tupdesc,
749                                                                   &isnull);
750                 Assert(!isnull);
751                 rule_str = TextDatumGetCString(rule_datum);
752                 oldcxt = MemoryContextSwitchTo(rulescxt);
753                 rule->qual = (Node *) stringToNode(rule_str);
754                 MemoryContextSwitchTo(oldcxt);
755                 pfree(rule_str);
756
757                 /*
758                  * We want the rule's table references to be checked as though by the
759                  * table owner, not the user referencing the rule.  Therefore, scan
760                  * through the rule's actions and set the checkAsUser field on all
761                  * rtable entries.  We have to look at the qual as well, in case it
762                  * contains sublinks.
763                  *
764                  * The reason for doing this when the rule is loaded, rather than when
765                  * it is stored, is that otherwise ALTER TABLE OWNER would have to
766                  * grovel through stored rules to update checkAsUser fields. Scanning
767                  * the rule tree during load is relatively cheap (compared to
768                  * constructing it in the first place), so we do it here.
769                  */
770                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
771                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
772
773                 if (numlocks >= maxlocks)
774                 {
775                         maxlocks *= 2;
776                         rules = (RewriteRule **)
777                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
778                 }
779                 rules[numlocks++] = rule;
780         }
781
782         /*
783          * end the scan and close the attribute relation
784          */
785         systable_endscan(rewrite_scan);
786         heap_close(rewrite_desc, AccessShareLock);
787
788         /*
789          * there might not be any rules (if relhasrules is out-of-date)
790          */
791         if (numlocks == 0)
792         {
793                 relation->rd_rules = NULL;
794                 relation->rd_rulescxt = NULL;
795                 MemoryContextDelete(rulescxt);
796                 return;
797         }
798
799         /*
800          * form a RuleLock and insert into relation
801          */
802         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
803         rulelock->numLocks = numlocks;
804         rulelock->rules = rules;
805
806         relation->rd_rules = rulelock;
807 }
808
809 /*
810  * RelationBuildPartitionKey
811  *              Build and attach to relcache partition key data of relation
812  *
813  * Partitioning key data is stored in CacheMemoryContext to ensure it survives
814  * as long as the relcache.  To avoid leaking memory in that context in case
815  * of an error partway through this function, we build the structure in the
816  * working context (which must be short-lived) and copy the completed
817  * structure into the cache memory.
818  *
819  * Also, since the structure being created here is sufficiently complex, we
820  * make a private child context of CacheMemoryContext for each relation that
821  * has associated partition key information.  That means no complicated logic
822  * to free individual elements whenever the relcache entry is flushed - just
823  * delete the context.
824  */
825 static void
826 RelationBuildPartitionKey(Relation relation)
827 {
828         Form_pg_partitioned_table form;
829         HeapTuple       tuple;
830         bool            isnull;
831         int                     i;
832         PartitionKey key;
833         AttrNumber *attrs;
834         oidvector  *opclass;
835         oidvector  *collation;
836         ListCell   *partexprs_item;
837         Datum           datum;
838         MemoryContext partkeycxt,
839                                 oldcxt;
840
841         tuple = SearchSysCache1(PARTRELID,
842                                                         ObjectIdGetDatum(RelationGetRelid(relation)));
843
844         /*
845          * The following happens when we have created our pg_class entry but not
846          * the pg_partitioned_table entry yet.
847          */
848         if (!HeapTupleIsValid(tuple))
849                 return;
850
851         key = (PartitionKey) palloc0(sizeof(PartitionKeyData));
852
853         /* Fixed-length attributes */
854         form = (Form_pg_partitioned_table) GETSTRUCT(tuple);
855         key->strategy = form->partstrat;
856         key->partnatts = form->partnatts;
857
858         /*
859          * We can rely on the first variable-length attribute being mapped to the
860          * relevant field of the catalog's C struct, because all previous
861          * attributes are non-nullable and fixed-length.
862          */
863         attrs = form->partattrs.values;
864
865         /* But use the hard way to retrieve further variable-length attributes */
866         /* Operator class */
867         datum = SysCacheGetAttr(PARTRELID, tuple,
868                                                         Anum_pg_partitioned_table_partclass, &isnull);
869         Assert(!isnull);
870         opclass = (oidvector *) DatumGetPointer(datum);
871
872         /* Collation */
873         datum = SysCacheGetAttr(PARTRELID, tuple,
874                                                         Anum_pg_partitioned_table_partcollation, &isnull);
875         Assert(!isnull);
876         collation = (oidvector *) DatumGetPointer(datum);
877
878         /* Expressions */
879         datum = SysCacheGetAttr(PARTRELID, tuple,
880                                                         Anum_pg_partitioned_table_partexprs, &isnull);
881         if (!isnull)
882         {
883                 char       *exprString;
884                 Node       *expr;
885
886                 exprString = TextDatumGetCString(datum);
887                 expr = stringToNode(exprString);
888                 pfree(exprString);
889
890                 /*
891                  * Run the expressions through const-simplification since the planner
892                  * will be comparing them to similarly-processed qual clause operands,
893                  * and may fail to detect valid matches without this step.  We don't
894                  * need to bother with canonicalize_qual() though, because partition
895                  * expressions are not full-fledged qualification clauses.
896                  */
897                 expr = eval_const_expressions(NULL, (Node *) expr);
898
899                 /* May as well fix opfuncids too */
900                 fix_opfuncids((Node *) expr);
901                 key->partexprs = (List *) expr;
902         }
903
904         key->partattrs = (AttrNumber *) palloc0(key->partnatts * sizeof(AttrNumber));
905         key->partopfamily = (Oid *) palloc0(key->partnatts * sizeof(Oid));
906         key->partopcintype = (Oid *) palloc0(key->partnatts * sizeof(Oid));
907         key->partsupfunc = (FmgrInfo *) palloc0(key->partnatts * sizeof(FmgrInfo));
908
909         key->partcollation = (Oid *) palloc0(key->partnatts * sizeof(Oid));
910
911         /* Gather type and collation info as well */
912         key->parttypid = (Oid *) palloc0(key->partnatts * sizeof(Oid));
913         key->parttypmod = (int32 *) palloc0(key->partnatts * sizeof(int32));
914         key->parttyplen = (int16 *) palloc0(key->partnatts * sizeof(int16));
915         key->parttypbyval = (bool *) palloc0(key->partnatts * sizeof(bool));
916         key->parttypalign = (char *) palloc0(key->partnatts * sizeof(char));
917         key->parttypcoll = (Oid *) palloc0(key->partnatts * sizeof(Oid));
918
919         /* Copy partattrs and fill other per-attribute info */
920         memcpy(key->partattrs, attrs, key->partnatts * sizeof(int16));
921         partexprs_item = list_head(key->partexprs);
922         for (i = 0; i < key->partnatts; i++)
923         {
924                 AttrNumber      attno = key->partattrs[i];
925                 HeapTuple       opclasstup;
926                 Form_pg_opclass opclassform;
927                 Oid                     funcid;
928
929                 /* Collect opfamily information */
930                 opclasstup = SearchSysCache1(CLAOID,
931                                                                          ObjectIdGetDatum(opclass->values[i]));
932                 if (!HeapTupleIsValid(opclasstup))
933                         elog(ERROR, "cache lookup failed for opclass %u", opclass->values[i]);
934
935                 opclassform = (Form_pg_opclass) GETSTRUCT(opclasstup);
936                 key->partopfamily[i] = opclassform->opcfamily;
937                 key->partopcintype[i] = opclassform->opcintype;
938
939                 /*
940                  * A btree support function covers the cases of list and range methods
941                  * currently supported.
942                  */
943                 funcid = get_opfamily_proc(opclassform->opcfamily,
944                                                                    opclassform->opcintype,
945                                                                    opclassform->opcintype,
946                                                                    BTORDER_PROC);
947
948                 fmgr_info(funcid, &key->partsupfunc[i]);
949
950                 /* Collation */
951                 key->partcollation[i] = collation->values[i];
952
953                 /* Collect type information */
954                 if (attno != 0)
955                 {
956                         key->parttypid[i] = relation->rd_att->attrs[attno - 1]->atttypid;
957                         key->parttypmod[i] = relation->rd_att->attrs[attno - 1]->atttypmod;
958                         key->parttypcoll[i] = relation->rd_att->attrs[attno - 1]->attcollation;
959                 }
960                 else
961                 {
962                         key->parttypid[i] = exprType(lfirst(partexprs_item));
963                         key->parttypmod[i] = exprTypmod(lfirst(partexprs_item));
964                         key->parttypcoll[i] = exprCollation(lfirst(partexprs_item));
965                 }
966                 get_typlenbyvalalign(key->parttypid[i],
967                                                          &key->parttyplen[i],
968                                                          &key->parttypbyval[i],
969                                                          &key->parttypalign[i]);
970
971                 ReleaseSysCache(opclasstup);
972         }
973
974         ReleaseSysCache(tuple);
975
976         /* Success --- now copy to the cache memory */
977         partkeycxt = AllocSetContextCreate(CacheMemoryContext,
978                                                                            RelationGetRelationName(relation),
979                                                                            ALLOCSET_SMALL_SIZES);
980         relation->rd_partkeycxt = partkeycxt;
981         oldcxt = MemoryContextSwitchTo(relation->rd_partkeycxt);
982         relation->rd_partkey = copy_partition_key(key);
983         MemoryContextSwitchTo(oldcxt);
984 }
985
986 /*
987  * copy_partition_key
988  *
989  * The copy is allocated in the current memory context.
990  */
991 static PartitionKey
992 copy_partition_key(PartitionKey fromkey)
993 {
994         PartitionKey newkey;
995         int                     n;
996
997         newkey = (PartitionKey) palloc(sizeof(PartitionKeyData));
998
999         newkey->strategy = fromkey->strategy;
1000         newkey->partnatts = n = fromkey->partnatts;
1001
1002         newkey->partattrs = (AttrNumber *) palloc(n * sizeof(AttrNumber));
1003         memcpy(newkey->partattrs, fromkey->partattrs, n * sizeof(AttrNumber));
1004
1005         newkey->partexprs = copyObject(fromkey->partexprs);
1006
1007         newkey->partopfamily = (Oid *) palloc(n * sizeof(Oid));
1008         memcpy(newkey->partopfamily, fromkey->partopfamily, n * sizeof(Oid));
1009
1010         newkey->partopcintype = (Oid *) palloc(n * sizeof(Oid));
1011         memcpy(newkey->partopcintype, fromkey->partopcintype, n * sizeof(Oid));
1012
1013         newkey->partsupfunc = (FmgrInfo *) palloc(n * sizeof(FmgrInfo));
1014         memcpy(newkey->partsupfunc, fromkey->partsupfunc, n * sizeof(FmgrInfo));
1015
1016         newkey->partcollation = (Oid *) palloc(n * sizeof(Oid));
1017         memcpy(newkey->partcollation, fromkey->partcollation, n * sizeof(Oid));
1018
1019         newkey->parttypid = (Oid *) palloc(n * sizeof(Oid));
1020         memcpy(newkey->parttypid, fromkey->parttypid, n * sizeof(Oid));
1021
1022         newkey->parttypmod = (int32 *) palloc(n * sizeof(int32));
1023         memcpy(newkey->parttypmod, fromkey->parttypmod, n * sizeof(int32));
1024
1025         newkey->parttyplen = (int16 *) palloc(n * sizeof(int16));
1026         memcpy(newkey->parttyplen, fromkey->parttyplen, n * sizeof(int16));
1027
1028         newkey->parttypbyval = (bool *) palloc(n * sizeof(bool));
1029         memcpy(newkey->parttypbyval, fromkey->parttypbyval, n * sizeof(bool));
1030
1031         newkey->parttypalign = (char *) palloc(n * sizeof(bool));
1032         memcpy(newkey->parttypalign, fromkey->parttypalign, n * sizeof(char));
1033
1034         newkey->parttypcoll = (Oid *) palloc(n * sizeof(Oid));
1035         memcpy(newkey->parttypcoll, fromkey->parttypcoll, n * sizeof(Oid));
1036
1037         return newkey;
1038 }
1039
1040 /*
1041  *              equalRuleLocks
1042  *
1043  *              Determine whether two RuleLocks are equivalent
1044  *
1045  *              Probably this should be in the rules code someplace...
1046  */
1047 static bool
1048 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
1049 {
1050         int                     i;
1051
1052         /*
1053          * As of 7.3 we assume the rule ordering is repeatable, because
1054          * RelationBuildRuleLock should read 'em in a consistent order.  So just
1055          * compare corresponding slots.
1056          */
1057         if (rlock1 != NULL)
1058         {
1059                 if (rlock2 == NULL)
1060                         return false;
1061                 if (rlock1->numLocks != rlock2->numLocks)
1062                         return false;
1063                 for (i = 0; i < rlock1->numLocks; i++)
1064                 {
1065                         RewriteRule *rule1 = rlock1->rules[i];
1066                         RewriteRule *rule2 = rlock2->rules[i];
1067
1068                         if (rule1->ruleId != rule2->ruleId)
1069                                 return false;
1070                         if (rule1->event != rule2->event)
1071                                 return false;
1072                         if (rule1->enabled != rule2->enabled)
1073                                 return false;
1074                         if (rule1->isInstead != rule2->isInstead)
1075                                 return false;
1076                         if (!equal(rule1->qual, rule2->qual))
1077                                 return false;
1078                         if (!equal(rule1->actions, rule2->actions))
1079                                 return false;
1080                 }
1081         }
1082         else if (rlock2 != NULL)
1083                 return false;
1084         return true;
1085 }
1086
1087 /*
1088  *              equalPolicy
1089  *
1090  *              Determine whether two policies are equivalent
1091  */
1092 static bool
1093 equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
1094 {
1095         int                     i;
1096         Oid                *r1,
1097                            *r2;
1098
1099         if (policy1 != NULL)
1100         {
1101                 if (policy2 == NULL)
1102                         return false;
1103
1104                 if (policy1->polcmd != policy2->polcmd)
1105                         return false;
1106                 if (policy1->hassublinks != policy2->hassublinks)
1107                         return false;
1108                 if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
1109                         return false;
1110                 if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
1111                         return false;
1112
1113                 r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
1114                 r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
1115
1116                 for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
1117                 {
1118                         if (r1[i] != r2[i])
1119                                 return false;
1120                 }
1121
1122                 if (!equal(policy1->qual, policy2->qual))
1123                         return false;
1124                 if (!equal(policy1->with_check_qual, policy2->with_check_qual))
1125                         return false;
1126         }
1127         else if (policy2 != NULL)
1128                 return false;
1129
1130         return true;
1131 }
1132
1133 /*
1134  *              equalRSDesc
1135  *
1136  *              Determine whether two RowSecurityDesc's are equivalent
1137  */
1138 static bool
1139 equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
1140 {
1141         ListCell   *lc,
1142                            *rc;
1143
1144         if (rsdesc1 == NULL && rsdesc2 == NULL)
1145                 return true;
1146
1147         if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
1148                 (rsdesc1 == NULL && rsdesc2 != NULL))
1149                 return false;
1150
1151         if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
1152                 return false;
1153
1154         /* RelationBuildRowSecurity should build policies in order */
1155         forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
1156         {
1157                 RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
1158                 RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
1159
1160                 if (!equalPolicy(l, r))
1161                         return false;
1162         }
1163
1164         return true;
1165 }
1166
1167 /*
1168  * equalPartitionDescs
1169  *              Compare two partition descriptors for logical equality
1170  */
1171 static bool
1172 equalPartitionDescs(PartitionKey key, PartitionDesc partdesc1,
1173                                         PartitionDesc partdesc2)
1174 {
1175         int                     i;
1176
1177         if (partdesc1 != NULL)
1178         {
1179                 if (partdesc2 == NULL)
1180                         return false;
1181                 if (partdesc1->nparts != partdesc2->nparts)
1182                         return false;
1183
1184                 Assert(key != NULL || partdesc1->nparts == 0);
1185
1186                 /*
1187                  * Same oids? If the partitioning structure did not change, that is,
1188                  * no partitions were added or removed to the relation, the oids array
1189                  * should still match element-by-element.
1190                  */
1191                 for (i = 0; i < partdesc1->nparts; i++)
1192                 {
1193                         if (partdesc1->oids[i] != partdesc2->oids[i])
1194                                 return false;
1195                 }
1196
1197                 /*
1198                  * Now compare partition bound collections.  The logic to iterate over
1199                  * the collections is private to partition.c.
1200                  */
1201                 if (partdesc1->boundinfo != NULL)
1202                 {
1203                         if (partdesc2->boundinfo == NULL)
1204                                 return false;
1205
1206                         if (!partition_bounds_equal(key, partdesc1->boundinfo,
1207                                                                                 partdesc2->boundinfo))
1208                                 return false;
1209                 }
1210                 else if (partdesc2->boundinfo != NULL)
1211                         return false;
1212         }
1213         else if (partdesc2 != NULL)
1214                 return false;
1215
1216         return true;
1217 }
1218
1219 /*
1220  *              RelationBuildDesc
1221  *
1222  *              Build a relation descriptor.  The caller must hold at least
1223  *              AccessShareLock on the target relid.
1224  *
1225  *              The new descriptor is inserted into the hash table if insertIt is true.
1226  *
1227  *              Returns NULL if no pg_class row could be found for the given relid
1228  *              (suggesting we are trying to access a just-deleted relation).
1229  *              Any other error is reported via elog.
1230  */
1231 static Relation
1232 RelationBuildDesc(Oid targetRelId, bool insertIt)
1233 {
1234         Relation        relation;
1235         Oid                     relid;
1236         HeapTuple       pg_class_tuple;
1237         Form_pg_class relp;
1238
1239         /*
1240          * find the tuple in pg_class corresponding to the given relation id
1241          */
1242         pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1243
1244         /*
1245          * if no such tuple exists, return NULL
1246          */
1247         if (!HeapTupleIsValid(pg_class_tuple))
1248                 return NULL;
1249
1250         /*
1251          * get information from the pg_class_tuple
1252          */
1253         relid = HeapTupleGetOid(pg_class_tuple);
1254         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1255         Assert(relid == targetRelId);
1256
1257         /*
1258          * allocate storage for the relation descriptor, and copy pg_class_tuple
1259          * to relation->rd_rel.
1260          */
1261         relation = AllocateRelationDesc(relp);
1262
1263         /*
1264          * initialize the relation's relation id (relation->rd_id)
1265          */
1266         RelationGetRelid(relation) = relid;
1267
1268         /*
1269          * normal relations are not nailed into the cache; nor can a pre-existing
1270          * relation be new.  It could be temp though.  (Actually, it could be new
1271          * too, but it's okay to forget that fact if forced to flush the entry.)
1272          */
1273         relation->rd_refcnt = 0;
1274         relation->rd_isnailed = false;
1275         relation->rd_createSubid = InvalidSubTransactionId;
1276         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1277         switch (relation->rd_rel->relpersistence)
1278         {
1279                 case RELPERSISTENCE_UNLOGGED:
1280                 case RELPERSISTENCE_PERMANENT:
1281                         relation->rd_backend = InvalidBackendId;
1282                         relation->rd_islocaltemp = false;
1283                         break;
1284                 case RELPERSISTENCE_TEMP:
1285                         if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1286                         {
1287                                 relation->rd_backend = BackendIdForTempRelations();
1288                                 relation->rd_islocaltemp = true;
1289                         }
1290                         else
1291                         {
1292                                 /*
1293                                  * If it's a temp table, but not one of ours, we have to use
1294                                  * the slow, grotty method to figure out the owning backend.
1295                                  *
1296                                  * Note: it's possible that rd_backend gets set to MyBackendId
1297                                  * here, in case we are looking at a pg_class entry left over
1298                                  * from a crashed backend that coincidentally had the same
1299                                  * BackendId we're using.  We should *not* consider such a
1300                                  * table to be "ours"; this is why we need the separate
1301                                  * rd_islocaltemp flag.  The pg_class entry will get flushed
1302                                  * if/when we clean out the corresponding temp table namespace
1303                                  * in preparation for using it.
1304                                  */
1305                                 relation->rd_backend =
1306                                         GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
1307                                 Assert(relation->rd_backend != InvalidBackendId);
1308                                 relation->rd_islocaltemp = false;
1309                         }
1310                         break;
1311                 default:
1312                         elog(ERROR, "invalid relpersistence: %c",
1313                                  relation->rd_rel->relpersistence);
1314                         break;
1315         }
1316
1317         /*
1318          * initialize the tuple descriptor (relation->rd_att).
1319          */
1320         RelationBuildTupleDesc(relation);
1321
1322         /*
1323          * Fetch rules and triggers that affect this relation
1324          */
1325         if (relation->rd_rel->relhasrules)
1326                 RelationBuildRuleLock(relation);
1327         else
1328         {
1329                 relation->rd_rules = NULL;
1330                 relation->rd_rulescxt = NULL;
1331         }
1332
1333         if (relation->rd_rel->relhastriggers)
1334                 RelationBuildTriggers(relation);
1335         else
1336                 relation->trigdesc = NULL;
1337
1338         if (relation->rd_rel->relrowsecurity)
1339                 RelationBuildRowSecurity(relation);
1340         else
1341                 relation->rd_rsdesc = NULL;
1342
1343         /* foreign key data is not loaded till asked for */
1344         relation->rd_fkeylist = NIL;
1345         relation->rd_fkeyvalid = false;
1346
1347         /* if a partitioned table, initialize key and partition descriptor info */
1348         if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1349         {
1350                 RelationBuildPartitionKey(relation);
1351                 RelationBuildPartitionDesc(relation);
1352         }
1353         else
1354         {
1355                 relation->rd_partkeycxt = NULL;
1356                 relation->rd_partkey = NULL;
1357                 relation->rd_partdesc = NULL;
1358                 relation->rd_pdcxt = NULL;
1359         }
1360
1361         /*
1362          * if it's an index, initialize index-related information
1363          */
1364         if (OidIsValid(relation->rd_rel->relam))
1365                 RelationInitIndexAccessInfo(relation);
1366
1367         /* extract reloptions if any */
1368         RelationParseRelOptions(relation, pg_class_tuple);
1369
1370         /*
1371          * initialize the relation lock manager information
1372          */
1373         RelationInitLockInfo(relation);         /* see lmgr.c */
1374
1375         /*
1376          * initialize physical addressing information for the relation
1377          */
1378         RelationInitPhysicalAddr(relation);
1379
1380         /* make sure relation is marked as having no open file yet */
1381         relation->rd_smgr = NULL;
1382
1383         /*
1384          * now we can free the memory allocated for pg_class_tuple
1385          */
1386         heap_freetuple(pg_class_tuple);
1387
1388         /*
1389          * Insert newly created relation into relcache hash table, if requested.
1390          *
1391          * There is one scenario in which we might find a hashtable entry already
1392          * present, even though our caller failed to find it: if the relation is a
1393          * system catalog or index that's used during relcache load, we might have
1394          * recursively created the same relcache entry during the preceding steps.
1395          * So allow RelationCacheInsert to delete any already-present relcache
1396          * entry for the same OID.  The already-present entry should have refcount
1397          * zero (else somebody forgot to close it); in the event that it doesn't,
1398          * we'll elog a WARNING and leak the already-present entry.
1399          */
1400         if (insertIt)
1401                 RelationCacheInsert(relation, true);
1402
1403         /* It's fully valid */
1404         relation->rd_isvalid = true;
1405
1406         return relation;
1407 }
1408
1409 /*
1410  * Initialize the physical addressing info (RelFileNode) for a relcache entry
1411  *
1412  * Note: at the physical level, relations in the pg_global tablespace must
1413  * be treated as shared, even if relisshared isn't set.  Hence we do not
1414  * look at relisshared here.
1415  */
1416 static void
1417 RelationInitPhysicalAddr(Relation relation)
1418 {
1419         if (relation->rd_rel->reltablespace)
1420                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
1421         else
1422                 relation->rd_node.spcNode = MyDatabaseTableSpace;
1423         if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1424                 relation->rd_node.dbNode = InvalidOid;
1425         else
1426                 relation->rd_node.dbNode = MyDatabaseId;
1427
1428         if (relation->rd_rel->relfilenode)
1429         {
1430                 /*
1431                  * Even if we are using a decoding snapshot that doesn't represent the
1432                  * current state of the catalog we need to make sure the filenode
1433                  * points to the current file since the older file will be gone (or
1434                  * truncated). The new file will still contain older rows so lookups
1435                  * in them will work correctly. This wouldn't work correctly if
1436                  * rewrites were allowed to change the schema in an incompatible way,
1437                  * but those are prevented both on catalog tables and on user tables
1438                  * declared as additional catalog tables.
1439                  */
1440                 if (HistoricSnapshotActive()
1441                         && RelationIsAccessibleInLogicalDecoding(relation)
1442                         && IsTransactionState())
1443                 {
1444                         HeapTuple       phys_tuple;
1445                         Form_pg_class physrel;
1446
1447                         phys_tuple = ScanPgRelation(RelationGetRelid(relation),
1448                                                            RelationGetRelid(relation) != ClassOidIndexId,
1449                                                                                 true);
1450                         if (!HeapTupleIsValid(phys_tuple))
1451                                 elog(ERROR, "could not find pg_class entry for %u",
1452                                          RelationGetRelid(relation));
1453                         physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
1454
1455                         relation->rd_rel->reltablespace = physrel->reltablespace;
1456                         relation->rd_rel->relfilenode = physrel->relfilenode;
1457                         heap_freetuple(phys_tuple);
1458                 }
1459
1460                 relation->rd_node.relNode = relation->rd_rel->relfilenode;
1461         }
1462         else
1463         {
1464                 /* Consult the relation mapper */
1465                 relation->rd_node.relNode =
1466                         RelationMapOidToFilenode(relation->rd_id,
1467                                                                          relation->rd_rel->relisshared);
1468                 if (!OidIsValid(relation->rd_node.relNode))
1469                         elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1470                                  RelationGetRelationName(relation), relation->rd_id);
1471         }
1472 }
1473
1474 /*
1475  * Fill in the IndexAmRoutine for an index relation.
1476  *
1477  * relation's rd_amhandler and rd_indexcxt must be valid already.
1478  */
1479 static void
1480 InitIndexAmRoutine(Relation relation)
1481 {
1482         IndexAmRoutine *cached,
1483                            *tmp;
1484
1485         /*
1486          * Call the amhandler in current, short-lived memory context, just in case
1487          * it leaks anything (it probably won't, but let's be paranoid).
1488          */
1489         tmp = GetIndexAmRoutine(relation->rd_amhandler);
1490
1491         /* OK, now transfer the data into relation's rd_indexcxt. */
1492         cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
1493                                                                                                    sizeof(IndexAmRoutine));
1494         memcpy(cached, tmp, sizeof(IndexAmRoutine));
1495         relation->rd_amroutine = cached;
1496
1497         pfree(tmp);
1498 }
1499
1500 /*
1501  * Initialize index-access-method support data for an index relation
1502  */
1503 void
1504 RelationInitIndexAccessInfo(Relation relation)
1505 {
1506         HeapTuple       tuple;
1507         Form_pg_am      aform;
1508         Datum           indcollDatum;
1509         Datum           indclassDatum;
1510         Datum           indoptionDatum;
1511         bool            isnull;
1512         oidvector  *indcoll;
1513         oidvector  *indclass;
1514         int2vector *indoption;
1515         MemoryContext indexcxt;
1516         MemoryContext oldcontext;
1517         int                     natts;
1518         uint16          amsupport;
1519
1520         /*
1521          * Make a copy of the pg_index entry for the index.  Since pg_index
1522          * contains variable-length and possibly-null fields, we have to do this
1523          * honestly rather than just treating it as a Form_pg_index struct.
1524          */
1525         tuple = SearchSysCache1(INDEXRELID,
1526                                                         ObjectIdGetDatum(RelationGetRelid(relation)));
1527         if (!HeapTupleIsValid(tuple))
1528                 elog(ERROR, "cache lookup failed for index %u",
1529                          RelationGetRelid(relation));
1530         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1531         relation->rd_indextuple = heap_copytuple(tuple);
1532         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1533         MemoryContextSwitchTo(oldcontext);
1534         ReleaseSysCache(tuple);
1535
1536         /*
1537          * Look up the index's access method, save the OID of its handler function
1538          */
1539         tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1540         if (!HeapTupleIsValid(tuple))
1541                 elog(ERROR, "cache lookup failed for access method %u",
1542                          relation->rd_rel->relam);
1543         aform = (Form_pg_am) GETSTRUCT(tuple);
1544         relation->rd_amhandler = aform->amhandler;
1545         ReleaseSysCache(tuple);
1546
1547         natts = relation->rd_rel->relnatts;
1548         if (natts != relation->rd_index->indnatts)
1549                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
1550                          RelationGetRelid(relation));
1551
1552         /*
1553          * Make the private context to hold index access info.  The reason we need
1554          * a context, and not just a couple of pallocs, is so that we won't leak
1555          * any subsidiary info attached to fmgr lookup records.
1556          */
1557         indexcxt = AllocSetContextCreate(CacheMemoryContext,
1558                                                                          RelationGetRelationName(relation),
1559                                                                          ALLOCSET_SMALL_SIZES);
1560         relation->rd_indexcxt = indexcxt;
1561
1562         /*
1563          * Now we can fetch the index AM's API struct
1564          */
1565         InitIndexAmRoutine(relation);
1566
1567         /*
1568          * Allocate arrays to hold data
1569          */
1570         relation->rd_opfamily = (Oid *)
1571                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1572         relation->rd_opcintype = (Oid *)
1573                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1574
1575         amsupport = relation->rd_amroutine->amsupport;
1576         if (amsupport > 0)
1577         {
1578                 int                     nsupport = natts * amsupport;
1579
1580                 relation->rd_support = (RegProcedure *)
1581                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1582                 relation->rd_supportinfo = (FmgrInfo *)
1583                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1584         }
1585         else
1586         {
1587                 relation->rd_support = NULL;
1588                 relation->rd_supportinfo = NULL;
1589         }
1590
1591         relation->rd_indcollation = (Oid *)
1592                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1593
1594         relation->rd_indoption = (int16 *)
1595                 MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
1596
1597         /*
1598          * indcollation cannot be referenced directly through the C struct,
1599          * because it comes after the variable-width indkey field.  Must extract
1600          * the datum the hard way...
1601          */
1602         indcollDatum = fastgetattr(relation->rd_indextuple,
1603                                                            Anum_pg_index_indcollation,
1604                                                            GetPgIndexDescriptor(),
1605                                                            &isnull);
1606         Assert(!isnull);
1607         indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1608         memcpy(relation->rd_indcollation, indcoll->values, natts * sizeof(Oid));
1609
1610         /*
1611          * indclass cannot be referenced directly through the C struct, because it
1612          * comes after the variable-width indkey field.  Must extract the datum
1613          * the hard way...
1614          */
1615         indclassDatum = fastgetattr(relation->rd_indextuple,
1616                                                                 Anum_pg_index_indclass,
1617                                                                 GetPgIndexDescriptor(),
1618                                                                 &isnull);
1619         Assert(!isnull);
1620         indclass = (oidvector *) DatumGetPointer(indclassDatum);
1621
1622         /*
1623          * Fill the support procedure OID array, as well as the info about
1624          * opfamilies and opclass input types.  (aminfo and supportinfo are left
1625          * as zeroes, and are filled on-the-fly when used)
1626          */
1627         IndexSupportInitialize(indclass, relation->rd_support,
1628                                                    relation->rd_opfamily, relation->rd_opcintype,
1629                                                    amsupport, natts);
1630
1631         /*
1632          * Similarly extract indoption and copy it to the cache entry
1633          */
1634         indoptionDatum = fastgetattr(relation->rd_indextuple,
1635                                                                  Anum_pg_index_indoption,
1636                                                                  GetPgIndexDescriptor(),
1637                                                                  &isnull);
1638         Assert(!isnull);
1639         indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1640         memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
1641
1642         /*
1643          * expressions, predicate, exclusion caches will be filled later
1644          */
1645         relation->rd_indexprs = NIL;
1646         relation->rd_indpred = NIL;
1647         relation->rd_exclops = NULL;
1648         relation->rd_exclprocs = NULL;
1649         relation->rd_exclstrats = NULL;
1650         relation->rd_amcache = NULL;
1651 }
1652
1653 /*
1654  * IndexSupportInitialize
1655  *              Initializes an index's cached opclass information,
1656  *              given the index's pg_index.indclass entry.
1657  *
1658  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1659  * which are arrays allocated by the caller.
1660  *
1661  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1662  * indicate the size of the arrays it has allocated --- but in practice these
1663  * numbers must always match those obtainable from the system catalog entries
1664  * for the index and access method.
1665  */
1666 static void
1667 IndexSupportInitialize(oidvector *indclass,
1668                                            RegProcedure *indexSupport,
1669                                            Oid *opFamily,
1670                                            Oid *opcInType,
1671                                            StrategyNumber maxSupportNumber,
1672                                            AttrNumber maxAttributeNumber)
1673 {
1674         int                     attIndex;
1675
1676         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1677         {
1678                 OpClassCacheEnt *opcentry;
1679
1680                 if (!OidIsValid(indclass->values[attIndex]))
1681                         elog(ERROR, "bogus pg_index tuple");
1682
1683                 /* look up the info for this opclass, using a cache */
1684                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1685                                                                          maxSupportNumber);
1686
1687                 /* copy cached data into relcache entry */
1688                 opFamily[attIndex] = opcentry->opcfamily;
1689                 opcInType[attIndex] = opcentry->opcintype;
1690                 if (maxSupportNumber > 0)
1691                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1692                                    opcentry->supportProcs,
1693                                    maxSupportNumber * sizeof(RegProcedure));
1694         }
1695 }
1696
1697 /*
1698  * LookupOpclassInfo
1699  *
1700  * This routine maintains a per-opclass cache of the information needed
1701  * by IndexSupportInitialize().  This is more efficient than relying on
1702  * the catalog cache, because we can load all the info about a particular
1703  * opclass in a single indexscan of pg_amproc.
1704  *
1705  * The information from pg_am about expected range of support function
1706  * numbers is passed in, rather than being looked up, mainly because the
1707  * caller will have it already.
1708  *
1709  * Note there is no provision for flushing the cache.  This is OK at the
1710  * moment because there is no way to ALTER any interesting properties of an
1711  * existing opclass --- all you can do is drop it, which will result in
1712  * a useless but harmless dead entry in the cache.  To support altering
1713  * opclass membership (not the same as opfamily membership!), we'd need to
1714  * be able to flush this cache as well as the contents of relcache entries
1715  * for indexes.
1716  */
1717 static OpClassCacheEnt *
1718 LookupOpclassInfo(Oid operatorClassOid,
1719                                   StrategyNumber numSupport)
1720 {
1721         OpClassCacheEnt *opcentry;
1722         bool            found;
1723         Relation        rel;
1724         SysScanDesc scan;
1725         ScanKeyData skey[3];
1726         HeapTuple       htup;
1727         bool            indexOK;
1728
1729         if (OpClassCache == NULL)
1730         {
1731                 /* First time through: initialize the opclass cache */
1732                 HASHCTL         ctl;
1733
1734                 MemSet(&ctl, 0, sizeof(ctl));
1735                 ctl.keysize = sizeof(Oid);
1736                 ctl.entrysize = sizeof(OpClassCacheEnt);
1737                 OpClassCache = hash_create("Operator class cache", 64,
1738                                                                    &ctl, HASH_ELEM | HASH_BLOBS);
1739
1740                 /* Also make sure CacheMemoryContext exists */
1741                 if (!CacheMemoryContext)
1742                         CreateCacheMemoryContext();
1743         }
1744
1745         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1746                                                                                            (void *) &operatorClassOid,
1747                                                                                            HASH_ENTER, &found);
1748
1749         if (!found)
1750         {
1751                 /* Need to allocate memory for new entry */
1752                 opcentry->valid = false;        /* until known OK */
1753                 opcentry->numSupport = numSupport;
1754
1755                 if (numSupport > 0)
1756                         opcentry->supportProcs = (RegProcedure *)
1757                                 MemoryContextAllocZero(CacheMemoryContext,
1758                                                                            numSupport * sizeof(RegProcedure));
1759                 else
1760                         opcentry->supportProcs = NULL;
1761         }
1762         else
1763         {
1764                 Assert(numSupport == opcentry->numSupport);
1765         }
1766
1767         /*
1768          * When testing for cache-flush hazards, we intentionally disable the
1769          * operator class cache and force reloading of the info on each call. This
1770          * is helpful because we want to test the case where a cache flush occurs
1771          * while we are loading the info, and it's very hard to provoke that if
1772          * this happens only once per opclass per backend.
1773          */
1774 #if defined(CLOBBER_CACHE_ALWAYS)
1775         opcentry->valid = false;
1776 #endif
1777
1778         if (opcentry->valid)
1779                 return opcentry;
1780
1781         /*
1782          * Need to fill in new entry.
1783          *
1784          * To avoid infinite recursion during startup, force heap scans if we're
1785          * looking up info for the opclasses used by the indexes we would like to
1786          * reference here.
1787          */
1788         indexOK = criticalRelcachesBuilt ||
1789                 (operatorClassOid != OID_BTREE_OPS_OID &&
1790                  operatorClassOid != INT2_BTREE_OPS_OID);
1791
1792         /*
1793          * We have to fetch the pg_opclass row to determine its opfamily and
1794          * opcintype, which are needed to look up related operators and functions.
1795          * It'd be convenient to use the syscache here, but that probably doesn't
1796          * work while bootstrapping.
1797          */
1798         ScanKeyInit(&skey[0],
1799                                 ObjectIdAttributeNumber,
1800                                 BTEqualStrategyNumber, F_OIDEQ,
1801                                 ObjectIdGetDatum(operatorClassOid));
1802         rel = heap_open(OperatorClassRelationId, AccessShareLock);
1803         scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1804                                                           NULL, 1, skey);
1805
1806         if (HeapTupleIsValid(htup = systable_getnext(scan)))
1807         {
1808                 Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1809
1810                 opcentry->opcfamily = opclassform->opcfamily;
1811                 opcentry->opcintype = opclassform->opcintype;
1812         }
1813         else
1814                 elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1815
1816         systable_endscan(scan);
1817         heap_close(rel, AccessShareLock);
1818
1819         /*
1820          * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1821          * the default ones (those with lefttype = righttype = opcintype).
1822          */
1823         if (numSupport > 0)
1824         {
1825                 ScanKeyInit(&skey[0],
1826                                         Anum_pg_amproc_amprocfamily,
1827                                         BTEqualStrategyNumber, F_OIDEQ,
1828                                         ObjectIdGetDatum(opcentry->opcfamily));
1829                 ScanKeyInit(&skey[1],
1830                                         Anum_pg_amproc_amproclefttype,
1831                                         BTEqualStrategyNumber, F_OIDEQ,
1832                                         ObjectIdGetDatum(opcentry->opcintype));
1833                 ScanKeyInit(&skey[2],
1834                                         Anum_pg_amproc_amprocrighttype,
1835                                         BTEqualStrategyNumber, F_OIDEQ,
1836                                         ObjectIdGetDatum(opcentry->opcintype));
1837                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1838                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1839                                                                   NULL, 3, skey);
1840
1841                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1842                 {
1843                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1844
1845                         if (amprocform->amprocnum <= 0 ||
1846                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1847                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1848                                          amprocform->amprocnum, operatorClassOid);
1849
1850                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1851                                 amprocform->amproc;
1852                 }
1853
1854                 systable_endscan(scan);
1855                 heap_close(rel, AccessShareLock);
1856         }
1857
1858         opcentry->valid = true;
1859         return opcentry;
1860 }
1861
1862
1863 /*
1864  *              formrdesc
1865  *
1866  *              This is a special cut-down version of RelationBuildDesc(),
1867  *              used while initializing the relcache.
1868  *              The relation descriptor is built just from the supplied parameters,
1869  *              without actually looking at any system table entries.  We cheat
1870  *              quite a lot since we only need to work for a few basic system
1871  *              catalogs.
1872  *
1873  * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
1874  * pg_shseclabel, pg_class, pg_attribute, pg_proc, and pg_type
1875  * (see RelationCacheInitializePhase2/3).
1876  *
1877  * Note that these catalogs can't have constraints (except attnotnull),
1878  * default values, rules, or triggers, since we don't cope with any of that.
1879  * (Well, actually, this only matters for properties that need to be valid
1880  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1881  * these properties matter then...)
1882  *
1883  * NOTE: we assume we are already switched into CacheMemoryContext.
1884  */
1885 static void
1886 formrdesc(const char *relationName, Oid relationReltype,
1887                   bool isshared, bool hasoids,
1888                   int natts, const FormData_pg_attribute *attrs)
1889 {
1890         Relation        relation;
1891         int                     i;
1892         bool            has_not_null;
1893
1894         /*
1895          * allocate new relation desc, clear all fields of reldesc
1896          */
1897         relation = (Relation) palloc0(sizeof(RelationData));
1898
1899         /* make sure relation is marked as having no open file yet */
1900         relation->rd_smgr = NULL;
1901
1902         /*
1903          * initialize reference count: 1 because it is nailed in cache
1904          */
1905         relation->rd_refcnt = 1;
1906
1907         /*
1908          * all entries built with this routine are nailed-in-cache; none are for
1909          * new or temp relations.
1910          */
1911         relation->rd_isnailed = true;
1912         relation->rd_createSubid = InvalidSubTransactionId;
1913         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1914         relation->rd_backend = InvalidBackendId;
1915         relation->rd_islocaltemp = false;
1916
1917         /*
1918          * initialize relation tuple form
1919          *
1920          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1921          * get us launched.  RelationCacheInitializePhase3() will read the real
1922          * data from pg_class and replace what we've done here.  Note in
1923          * particular that relowner is left as zero; this cues
1924          * RelationCacheInitializePhase3 that the real data isn't there yet.
1925          */
1926         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1927
1928         namestrcpy(&relation->rd_rel->relname, relationName);
1929         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1930         relation->rd_rel->reltype = relationReltype;
1931
1932         /*
1933          * It's important to distinguish between shared and non-shared relations,
1934          * even at bootstrap time, to make sure we know where they are stored.
1935          */
1936         relation->rd_rel->relisshared = isshared;
1937         if (isshared)
1938                 relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1939
1940         /* formrdesc is used only for permanent relations */
1941         relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1942
1943         /* ... and they're always populated, too */
1944         relation->rd_rel->relispopulated = true;
1945
1946         relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1947         relation->rd_rel->relpages = 0;
1948         relation->rd_rel->reltuples = 0;
1949         relation->rd_rel->relallvisible = 0;
1950         relation->rd_rel->relkind = RELKIND_RELATION;
1951         relation->rd_rel->relhasoids = hasoids;
1952         relation->rd_rel->relnatts = (int16) natts;
1953
1954         /*
1955          * initialize attribute tuple form
1956          *
1957          * Unlike the case with the relation tuple, this data had better be right
1958          * because it will never be replaced.  The data comes from
1959          * src/include/catalog/ headers via genbki.pl.
1960          */
1961         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1962         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1963
1964         relation->rd_att->tdtypeid = relationReltype;
1965         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1966
1967         /*
1968          * initialize tuple desc info
1969          */
1970         has_not_null = false;
1971         for (i = 0; i < natts; i++)
1972         {
1973                 memcpy(relation->rd_att->attrs[i],
1974                            &attrs[i],
1975                            ATTRIBUTE_FIXED_PART_SIZE);
1976                 has_not_null |= attrs[i].attnotnull;
1977                 /* make sure attcacheoff is valid */
1978                 relation->rd_att->attrs[i]->attcacheoff = -1;
1979         }
1980
1981         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1982         relation->rd_att->attrs[0]->attcacheoff = 0;
1983
1984         /* mark not-null status */
1985         if (has_not_null)
1986         {
1987                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1988
1989                 constr->has_not_null = true;
1990                 relation->rd_att->constr = constr;
1991         }
1992
1993         /*
1994          * initialize relation id from info in att array (my, this is ugly)
1995          */
1996         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1997
1998         /*
1999          * All relations made with formrdesc are mapped.  This is necessarily so
2000          * because there is no other way to know what filenode they currently
2001          * have.  In bootstrap mode, add them to the initial relation mapper data,
2002          * specifying that the initial filenode is the same as the OID.
2003          */
2004         relation->rd_rel->relfilenode = InvalidOid;
2005         if (IsBootstrapProcessingMode())
2006                 RelationMapUpdateMap(RelationGetRelid(relation),
2007                                                          RelationGetRelid(relation),
2008                                                          isshared, true);
2009
2010         /*
2011          * initialize the relation lock manager information
2012          */
2013         RelationInitLockInfo(relation);         /* see lmgr.c */
2014
2015         /*
2016          * initialize physical addressing information for the relation
2017          */
2018         RelationInitPhysicalAddr(relation);
2019
2020         /*
2021          * initialize the rel-has-index flag, using hardwired knowledge
2022          */
2023         if (IsBootstrapProcessingMode())
2024         {
2025                 /* In bootstrap mode, we have no indexes */
2026                 relation->rd_rel->relhasindex = false;
2027         }
2028         else
2029         {
2030                 /* Otherwise, all the rels formrdesc is used for have indexes */
2031                 relation->rd_rel->relhasindex = true;
2032         }
2033
2034         /*
2035          * add new reldesc to relcache
2036          */
2037         RelationCacheInsert(relation, false);
2038
2039         /* It's fully valid */
2040         relation->rd_isvalid = true;
2041 }
2042
2043
2044 /* ----------------------------------------------------------------
2045  *                               Relation Descriptor Lookup Interface
2046  * ----------------------------------------------------------------
2047  */
2048
2049 /*
2050  *              RelationIdGetRelation
2051  *
2052  *              Lookup a reldesc by OID; make one if not already in cache.
2053  *
2054  *              Returns NULL if no pg_class row could be found for the given relid
2055  *              (suggesting we are trying to access a just-deleted relation).
2056  *              Any other error is reported via elog.
2057  *
2058  *              NB: caller should already have at least AccessShareLock on the
2059  *              relation ID, else there are nasty race conditions.
2060  *
2061  *              NB: relation ref count is incremented, or set to 1 if new entry.
2062  *              Caller should eventually decrement count.  (Usually,
2063  *              that happens by calling RelationClose().)
2064  */
2065 Relation
2066 RelationIdGetRelation(Oid relationId)
2067 {
2068         Relation        rd;
2069
2070         /* Make sure we're in an xact, even if this ends up being a cache hit */
2071         Assert(IsTransactionState());
2072
2073         /*
2074          * first try to find reldesc in the cache
2075          */
2076         RelationIdCacheLookup(relationId, rd);
2077
2078         if (RelationIsValid(rd))
2079         {
2080                 RelationIncrementReferenceCount(rd);
2081                 /* revalidate cache entry if necessary */
2082                 if (!rd->rd_isvalid)
2083                 {
2084                         /*
2085                          * Indexes only have a limited number of possible schema changes,
2086                          * and we don't want to use the full-blown procedure because it's
2087                          * a headache for indexes that reload itself depends on.
2088                          */
2089                         if (rd->rd_rel->relkind == RELKIND_INDEX)
2090                                 RelationReloadIndexInfo(rd);
2091                         else
2092                                 RelationClearRelation(rd, true);
2093                         Assert(rd->rd_isvalid);
2094                 }
2095                 return rd;
2096         }
2097
2098         /*
2099          * no reldesc in the cache, so have RelationBuildDesc() build one and add
2100          * it.
2101          */
2102         rd = RelationBuildDesc(relationId, true);
2103         if (RelationIsValid(rd))
2104                 RelationIncrementReferenceCount(rd);
2105         return rd;
2106 }
2107
2108 /* ----------------------------------------------------------------
2109  *                              cache invalidation support routines
2110  * ----------------------------------------------------------------
2111  */
2112
2113 /*
2114  * RelationIncrementReferenceCount
2115  *              Increments relation reference count.
2116  *
2117  * Note: bootstrap mode has its own weird ideas about relation refcount
2118  * behavior; we ought to fix it someday, but for now, just disable
2119  * reference count ownership tracking in bootstrap mode.
2120  */
2121 void
2122 RelationIncrementReferenceCount(Relation rel)
2123 {
2124         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
2125         rel->rd_refcnt += 1;
2126         if (!IsBootstrapProcessingMode())
2127                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
2128 }
2129
2130 /*
2131  * RelationDecrementReferenceCount
2132  *              Decrements relation reference count.
2133  */
2134 void
2135 RelationDecrementReferenceCount(Relation rel)
2136 {
2137         Assert(rel->rd_refcnt > 0);
2138         rel->rd_refcnt -= 1;
2139         if (!IsBootstrapProcessingMode())
2140                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
2141 }
2142
2143 /*
2144  * RelationClose - close an open relation
2145  *
2146  *      Actually, we just decrement the refcount.
2147  *
2148  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
2149  *      will be freed as soon as their refcount goes to zero.  In combination
2150  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
2151  *      to catch references to already-released relcache entries.  It slows
2152  *      things down quite a bit, however.
2153  */
2154 void
2155 RelationClose(Relation relation)
2156 {
2157         /* Note: no locking manipulations needed */
2158         RelationDecrementReferenceCount(relation);
2159
2160 #ifdef RELCACHE_FORCE_RELEASE
2161         if (RelationHasReferenceCountZero(relation) &&
2162                 relation->rd_createSubid == InvalidSubTransactionId &&
2163                 relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2164                 RelationClearRelation(relation, false);
2165 #endif
2166 }
2167
2168 /*
2169  * RelationReloadIndexInfo - reload minimal information for an open index
2170  *
2171  *      This function is used only for indexes.  A relcache inval on an index
2172  *      can mean that its pg_class or pg_index row changed.  There are only
2173  *      very limited changes that are allowed to an existing index's schema,
2174  *      so we can update the relcache entry without a complete rebuild; which
2175  *      is fortunate because we can't rebuild an index entry that is "nailed"
2176  *      and/or in active use.  We support full replacement of the pg_class row,
2177  *      as well as updates of a few simple fields of the pg_index row.
2178  *
2179  *      We can't necessarily reread the catalog rows right away; we might be
2180  *      in a failed transaction when we receive the SI notification.  If so,
2181  *      RelationClearRelation just marks the entry as invalid by setting
2182  *      rd_isvalid to false.  This routine is called to fix the entry when it
2183  *      is next needed.
2184  *
2185  *      We assume that at the time we are called, we have at least AccessShareLock
2186  *      on the target index.  (Note: in the calls from RelationClearRelation,
2187  *      this is legitimate because we know the rel has positive refcount.)
2188  *
2189  *      If the target index is an index on pg_class or pg_index, we'd better have
2190  *      previously gotten at least AccessShareLock on its underlying catalog,
2191  *      else we are at risk of deadlock against someone trying to exclusive-lock
2192  *      the heap and index in that order.  This is ensured in current usage by
2193  *      only applying this to indexes being opened or having positive refcount.
2194  */
2195 static void
2196 RelationReloadIndexInfo(Relation relation)
2197 {
2198         bool            indexOK;
2199         HeapTuple       pg_class_tuple;
2200         Form_pg_class relp;
2201
2202         /* Should be called only for invalidated indexes */
2203         Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
2204                    !relation->rd_isvalid);
2205
2206         /* Ensure it's closed at smgr level */
2207         RelationCloseSmgr(relation);
2208
2209         /* Must free any AM cached data upon relcache flush */
2210         if (relation->rd_amcache)
2211                 pfree(relation->rd_amcache);
2212         relation->rd_amcache = NULL;
2213
2214         /*
2215          * If it's a shared index, we might be called before backend startup has
2216          * finished selecting a database, in which case we have no way to read
2217          * pg_class yet.  However, a shared index can never have any significant
2218          * schema updates, so it's okay to ignore the invalidation signal.  Just
2219          * mark it valid and return without doing anything more.
2220          */
2221         if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
2222         {
2223                 relation->rd_isvalid = true;
2224                 return;
2225         }
2226
2227         /*
2228          * Read the pg_class row
2229          *
2230          * Don't try to use an indexscan of pg_class_oid_index to reload the info
2231          * for pg_class_oid_index ...
2232          */
2233         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2234         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
2235         if (!HeapTupleIsValid(pg_class_tuple))
2236                 elog(ERROR, "could not find pg_class tuple for index %u",
2237                          RelationGetRelid(relation));
2238         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2239         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2240         /* Reload reloptions in case they changed */
2241         if (relation->rd_options)
2242                 pfree(relation->rd_options);
2243         RelationParseRelOptions(relation, pg_class_tuple);
2244         /* done with pg_class tuple */
2245         heap_freetuple(pg_class_tuple);
2246         /* We must recalculate physical address in case it changed */
2247         RelationInitPhysicalAddr(relation);
2248
2249         /*
2250          * For a non-system index, there are fields of the pg_index row that are
2251          * allowed to change, so re-read that row and update the relcache entry.
2252          * Most of the info derived from pg_index (such as support function lookup
2253          * info) cannot change, and indeed the whole point of this routine is to
2254          * update the relcache entry without clobbering that data; so wholesale
2255          * replacement is not appropriate.
2256          */
2257         if (!IsSystemRelation(relation))
2258         {
2259                 HeapTuple       tuple;
2260                 Form_pg_index index;
2261
2262                 tuple = SearchSysCache1(INDEXRELID,
2263                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2264                 if (!HeapTupleIsValid(tuple))
2265                         elog(ERROR, "cache lookup failed for index %u",
2266                                  RelationGetRelid(relation));
2267                 index = (Form_pg_index) GETSTRUCT(tuple);
2268
2269                 /*
2270                  * Basically, let's just copy all the bool fields.  There are one or
2271                  * two of these that can't actually change in the current code, but
2272                  * it's not worth it to track exactly which ones they are.  None of
2273                  * the array fields are allowed to change, though.
2274                  */
2275                 relation->rd_index->indisunique = index->indisunique;
2276                 relation->rd_index->indisprimary = index->indisprimary;
2277                 relation->rd_index->indisexclusion = index->indisexclusion;
2278                 relation->rd_index->indimmediate = index->indimmediate;
2279                 relation->rd_index->indisclustered = index->indisclustered;
2280                 relation->rd_index->indisvalid = index->indisvalid;
2281                 relation->rd_index->indcheckxmin = index->indcheckxmin;
2282                 relation->rd_index->indisready = index->indisready;
2283                 relation->rd_index->indislive = index->indislive;
2284
2285                 /* Copy xmin too, as that is needed to make sense of indcheckxmin */
2286                 HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
2287                                                            HeapTupleHeaderGetXmin(tuple->t_data));
2288
2289                 ReleaseSysCache(tuple);
2290         }
2291
2292         /* Okay, now it's valid again */
2293         relation->rd_isvalid = true;
2294 }
2295
2296 /*
2297  * RelationDestroyRelation
2298  *
2299  *      Physically delete a relation cache entry and all subsidiary data.
2300  *      Caller must already have unhooked the entry from the hash table.
2301  */
2302 static void
2303 RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2304 {
2305         Assert(RelationHasReferenceCountZero(relation));
2306
2307         /*
2308          * Make sure smgr and lower levels close the relation's files, if they
2309          * weren't closed already.  (This was probably done by caller, but let's
2310          * just be real sure.)
2311          */
2312         RelationCloseSmgr(relation);
2313
2314         /*
2315          * Free all the subsidiary data structures of the relcache entry, then the
2316          * entry itself.
2317          */
2318         if (relation->rd_rel)
2319                 pfree(relation->rd_rel);
2320         /* can't use DecrTupleDescRefCount here */
2321         Assert(relation->rd_att->tdrefcount > 0);
2322         if (--relation->rd_att->tdrefcount == 0)
2323         {
2324                 /*
2325                  * If we Rebuilt a relcache entry during a transaction then its
2326                  * possible we did that because the TupDesc changed as the result of
2327                  * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
2328                  * possible someone copied that TupDesc, in which case the copy would
2329                  * point to free'd memory. So if we rebuild an entry we keep the
2330                  * TupDesc around until end of transaction, to be safe.
2331                  */
2332                 if (remember_tupdesc)
2333                         RememberToFreeTupleDescAtEOX(relation->rd_att);
2334                 else
2335                         FreeTupleDesc(relation->rd_att);
2336         }
2337         FreeTriggerDesc(relation->trigdesc);
2338         list_free_deep(relation->rd_fkeylist);
2339         list_free(relation->rd_indexlist);
2340         bms_free(relation->rd_indexattr);
2341         bms_free(relation->rd_keyattr);
2342         bms_free(relation->rd_pkattr);
2343         bms_free(relation->rd_idattr);
2344         if (relation->rd_pubactions)
2345                 pfree(relation->rd_pubactions);
2346         if (relation->rd_options)
2347                 pfree(relation->rd_options);
2348         if (relation->rd_indextuple)
2349                 pfree(relation->rd_indextuple);
2350         if (relation->rd_indexcxt)
2351                 MemoryContextDelete(relation->rd_indexcxt);
2352         if (relation->rd_rulescxt)
2353                 MemoryContextDelete(relation->rd_rulescxt);
2354         if (relation->rd_rsdesc)
2355                 MemoryContextDelete(relation->rd_rsdesc->rscxt);
2356         if (relation->rd_partkeycxt)
2357                 MemoryContextDelete(relation->rd_partkeycxt);
2358         if (relation->rd_pdcxt)
2359                 MemoryContextDelete(relation->rd_pdcxt);
2360         if (relation->rd_partcheck)
2361                 pfree(relation->rd_partcheck);
2362         if (relation->rd_fdwroutine)
2363                 pfree(relation->rd_fdwroutine);
2364         pfree(relation);
2365 }
2366
2367 /*
2368  * RelationClearRelation
2369  *
2370  *       Physically blow away a relation cache entry, or reset it and rebuild
2371  *       it from scratch (that is, from catalog entries).  The latter path is
2372  *       used when we are notified of a change to an open relation (one with
2373  *       refcount > 0).
2374  *
2375  *       NB: when rebuilding, we'd better hold some lock on the relation,
2376  *       else the catalog data we need to read could be changing under us.
2377  *       Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2378  *       an sinval reset could happen while we're accessing the catalogs, and
2379  *       the rel would get blown away underneath us by RelationCacheInvalidate
2380  *       if it has zero refcnt.
2381  *
2382  *       The "rebuild" parameter is redundant in current usage because it has
2383  *       to match the relation's refcnt status, but we keep it as a crosscheck
2384  *       that we're doing what the caller expects.
2385  */
2386 static void
2387 RelationClearRelation(Relation relation, bool rebuild)
2388 {
2389         /*
2390          * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2391          * course it would be an equally bad idea to blow away one with nonzero
2392          * refcnt, since that would leave someone somewhere with a dangling
2393          * pointer.  All callers are expected to have verified that this holds.
2394          */
2395         Assert(rebuild ?
2396                    !RelationHasReferenceCountZero(relation) :
2397                    RelationHasReferenceCountZero(relation));
2398
2399         /*
2400          * Make sure smgr and lower levels close the relation's files, if they
2401          * weren't closed already.  If the relation is not getting deleted, the
2402          * next smgr access should reopen the files automatically.  This ensures
2403          * that the low-level file access state is updated after, say, a vacuum
2404          * truncation.
2405          */
2406         RelationCloseSmgr(relation);
2407
2408         /*
2409          * Never, never ever blow away a nailed-in system relation, because we'd
2410          * be unable to recover.  However, we must redo RelationInitPhysicalAddr
2411          * in case it is a mapped relation whose mapping changed.
2412          *
2413          * If it's a nailed-but-not-mapped index, then we need to re-read the
2414          * pg_class row to see if its relfilenode changed. We do that immediately
2415          * if we're inside a valid transaction and the relation is open (not
2416          * counting the nailed refcount).  Otherwise just mark the entry as
2417          * possibly invalid, and it'll be fixed when next opened.
2418          */
2419         if (relation->rd_isnailed)
2420         {
2421                 RelationInitPhysicalAddr(relation);
2422
2423                 if (relation->rd_rel->relkind == RELKIND_INDEX)
2424                 {
2425                         relation->rd_isvalid = false;           /* needs to be revalidated */
2426                         if (relation->rd_refcnt > 1 && IsTransactionState())
2427                                 RelationReloadIndexInfo(relation);
2428                 }
2429                 return;
2430         }
2431
2432         /*
2433          * Even non-system indexes should not be blown away if they are open and
2434          * have valid index support information.  This avoids problems with active
2435          * use of the index support information.  As with nailed indexes, we
2436          * re-read the pg_class row to handle possible physical relocation of the
2437          * index, and we check for pg_index updates too.
2438          */
2439         if (relation->rd_rel->relkind == RELKIND_INDEX &&
2440                 relation->rd_refcnt > 0 &&
2441                 relation->rd_indexcxt != NULL)
2442         {
2443                 relation->rd_isvalid = false;   /* needs to be revalidated */
2444                 if (IsTransactionState())
2445                         RelationReloadIndexInfo(relation);
2446                 return;
2447         }
2448
2449         /* Mark it invalid until we've finished rebuild */
2450         relation->rd_isvalid = false;
2451
2452         /*
2453          * If we're really done with the relcache entry, blow it away. But if
2454          * someone is still using it, reconstruct the whole deal without moving
2455          * the physical RelationData record (so that the someone's pointer is
2456          * still valid).
2457          */
2458         if (!rebuild)
2459         {
2460                 /* Remove it from the hash table */
2461                 RelationCacheDelete(relation);
2462
2463                 /* And release storage */
2464                 RelationDestroyRelation(relation, false);
2465         }
2466         else if (!IsTransactionState())
2467         {
2468                 /*
2469                  * If we're not inside a valid transaction, we can't do any catalog
2470                  * access so it's not possible to rebuild yet.  Just exit, leaving
2471                  * rd_isvalid = false so that the rebuild will occur when the entry is
2472                  * next opened.
2473                  *
2474                  * Note: it's possible that we come here during subtransaction abort,
2475                  * and the reason for wanting to rebuild is that the rel is open in
2476                  * the outer transaction.  In that case it might seem unsafe to not
2477                  * rebuild immediately, since whatever code has the rel already open
2478                  * will keep on using the relcache entry as-is.  However, in such a
2479                  * case the outer transaction should be holding a lock that's
2480                  * sufficient to prevent any significant change in the rel's schema,
2481                  * so the existing entry contents should be good enough for its
2482                  * purposes; at worst we might be behind on statistics updates or the
2483                  * like.  (See also CheckTableNotInUse() and its callers.)      These same
2484                  * remarks also apply to the cases above where we exit without having
2485                  * done RelationReloadIndexInfo() yet.
2486                  */
2487                 return;
2488         }
2489         else
2490         {
2491                 /*
2492                  * Our strategy for rebuilding an open relcache entry is to build a
2493                  * new entry from scratch, swap its contents with the old entry, and
2494                  * finally delete the new entry (along with any infrastructure swapped
2495                  * over from the old entry).  This is to avoid trouble in case an
2496                  * error causes us to lose control partway through.  The old entry
2497                  * will still be marked !rd_isvalid, so we'll try to rebuild it again
2498                  * on next access.  Meanwhile it's not any less valid than it was
2499                  * before, so any code that might expect to continue accessing it
2500                  * isn't hurt by the rebuild failure.  (Consider for example a
2501                  * subtransaction that ALTERs a table and then gets canceled partway
2502                  * through the cache entry rebuild.  The outer transaction should
2503                  * still see the not-modified cache entry as valid.)  The worst
2504                  * consequence of an error is leaking the necessarily-unreferenced new
2505                  * entry, and this shouldn't happen often enough for that to be a big
2506                  * problem.
2507                  *
2508                  * When rebuilding an open relcache entry, we must preserve ref count,
2509                  * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
2510                  * attempt to preserve the pg_class entry (rd_rel), tupledesc,
2511                  * rewrite-rule, partition key, and partition descriptor substructures
2512                  * in place, because various places assume that these structures won't
2513                  * move while they are working with an open relcache entry.  (Note:
2514                  * the refcount mechanism for tupledescs might someday allow us to
2515                  * remove this hack for the tupledesc.)
2516                  *
2517                  * Note that this process does not touch CurrentResourceOwner; which
2518                  * is good because whatever ref counts the entry may have do not
2519                  * necessarily belong to that resource owner.
2520                  */
2521                 Relation        newrel;
2522                 Oid                     save_relid = RelationGetRelid(relation);
2523                 bool            keep_tupdesc;
2524                 bool            keep_rules;
2525                 bool            keep_policies;
2526                 bool            keep_partkey;
2527                 bool            keep_partdesc;
2528
2529                 /* Build temporary entry, but don't link it into hashtable */
2530                 newrel = RelationBuildDesc(save_relid, false);
2531                 if (newrel == NULL)
2532                 {
2533                         /*
2534                          * We can validly get here, if we're using a historic snapshot in
2535                          * which a relation, accessed from outside logical decoding, is
2536                          * still invisible. In that case it's fine to just mark the
2537                          * relation as invalid and return - it'll fully get reloaded by
2538                          * the cache reset at the end of logical decoding (or at the next
2539                          * access).  During normal processing we don't want to ignore this
2540                          * case as it shouldn't happen there, as explained below.
2541                          */
2542                         if (HistoricSnapshotActive())
2543                                 return;
2544
2545                         /*
2546                          * This shouldn't happen as dropping a relation is intended to be
2547                          * impossible if still referenced (c.f. CheckTableNotInUse()). But
2548                          * if we get here anyway, we can't just delete the relcache entry,
2549                          * as it possibly could get accessed later (as e.g. the error
2550                          * might get trapped and handled via a subtransaction rollback).
2551                          */
2552                         elog(ERROR, "relation %u deleted while still in use", save_relid);
2553                 }
2554
2555                 keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
2556                 keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2557                 keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2558                 keep_partkey = (relation->rd_partkey != NULL);
2559                 keep_partdesc = equalPartitionDescs(relation->rd_partkey,
2560                                                                                         relation->rd_partdesc,
2561                                                                                         newrel->rd_partdesc);
2562
2563                 /*
2564                  * Perform swapping of the relcache entry contents.  Within this
2565                  * process the old entry is momentarily invalid, so there *must* be no
2566                  * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
2567                  * all-in-line code for safety.
2568                  *
2569                  * Since the vast majority of fields should be swapped, our method is
2570                  * to swap the whole structures and then re-swap those few fields we
2571                  * didn't want swapped.
2572                  */
2573 #define SWAPFIELD(fldtype, fldname) \
2574                 do { \
2575                         fldtype _tmp = newrel->fldname; \
2576                         newrel->fldname = relation->fldname; \
2577                         relation->fldname = _tmp; \
2578                 } while (0)
2579
2580                 /* swap all Relation struct fields */
2581                 {
2582                         RelationData tmpstruct;
2583
2584                         memcpy(&tmpstruct, newrel, sizeof(RelationData));
2585                         memcpy(newrel, relation, sizeof(RelationData));
2586                         memcpy(relation, &tmpstruct, sizeof(RelationData));
2587                 }
2588
2589                 /* rd_smgr must not be swapped, due to back-links from smgr level */
2590                 SWAPFIELD(SMgrRelation, rd_smgr);
2591                 /* rd_refcnt must be preserved */
2592                 SWAPFIELD(int, rd_refcnt);
2593                 /* isnailed shouldn't change */
2594                 Assert(newrel->rd_isnailed == relation->rd_isnailed);
2595                 /* creation sub-XIDs must be preserved */
2596                 SWAPFIELD(SubTransactionId, rd_createSubid);
2597                 SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2598                 /* un-swap rd_rel pointers, swap contents instead */
2599                 SWAPFIELD(Form_pg_class, rd_rel);
2600                 /* ... but actually, we don't have to update newrel->rd_rel */
2601                 memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2602                 /* preserve old tupledesc and rules if no logical change */
2603                 if (keep_tupdesc)
2604                         SWAPFIELD(TupleDesc, rd_att);
2605                 if (keep_rules)
2606                 {
2607                         SWAPFIELD(RuleLock *, rd_rules);
2608                         SWAPFIELD(MemoryContext, rd_rulescxt);
2609                 }
2610                 if (keep_policies)
2611                         SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2612                 /* toast OID override must be preserved */
2613                 SWAPFIELD(Oid, rd_toastoid);
2614                 /* pgstat_info must be preserved */
2615                 SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2616                 /* partition key must be preserved, if we have one */
2617                 if (keep_partkey)
2618                 {
2619                         SWAPFIELD(PartitionKey, rd_partkey);
2620                         SWAPFIELD(MemoryContext, rd_partkeycxt);
2621                 }
2622                 /* preserve old partdesc if no logical change */
2623                 if (keep_partdesc)
2624                 {
2625                         SWAPFIELD(PartitionDesc, rd_partdesc);
2626                         SWAPFIELD(MemoryContext, rd_pdcxt);
2627                 }
2628
2629 #undef SWAPFIELD
2630
2631                 /* And now we can throw away the temporary entry */
2632                 RelationDestroyRelation(newrel, !keep_tupdesc);
2633         }
2634 }
2635
2636 /*
2637  * RelationFlushRelation
2638  *
2639  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
2640  *       This is used when we receive a cache invalidation event for the rel.
2641  */
2642 static void
2643 RelationFlushRelation(Relation relation)
2644 {
2645         if (relation->rd_createSubid != InvalidSubTransactionId ||
2646                 relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2647         {
2648                 /*
2649                  * New relcache entries are always rebuilt, not flushed; else we'd
2650                  * forget the "new" status of the relation, which is a useful
2651                  * optimization to have.  Ditto for the new-relfilenode status.
2652                  *
2653                  * The rel could have zero refcnt here, so temporarily increment the
2654                  * refcnt to ensure it's safe to rebuild it.  We can assume that the
2655                  * current transaction has some lock on the rel already.
2656                  */
2657                 RelationIncrementReferenceCount(relation);
2658                 RelationClearRelation(relation, true);
2659                 RelationDecrementReferenceCount(relation);
2660         }
2661         else
2662         {
2663                 /*
2664                  * Pre-existing rels can be dropped from the relcache if not open.
2665                  */
2666                 bool            rebuild = !RelationHasReferenceCountZero(relation);
2667
2668                 RelationClearRelation(relation, rebuild);
2669         }
2670 }
2671
2672 /*
2673  * RelationForgetRelation - unconditionally remove a relcache entry
2674  *
2675  *                 External interface for destroying a relcache entry when we
2676  *                 drop the relation.
2677  */
2678 void
2679 RelationForgetRelation(Oid rid)
2680 {
2681         Relation        relation;
2682
2683         RelationIdCacheLookup(rid, relation);
2684
2685         if (!PointerIsValid(relation))
2686                 return;                                 /* not in cache, nothing to do */
2687
2688         if (!RelationHasReferenceCountZero(relation))
2689                 elog(ERROR, "relation %u is still open", rid);
2690
2691         /* Unconditionally destroy the relcache entry */
2692         RelationClearRelation(relation, false);
2693 }
2694
2695 /*
2696  *              RelationCacheInvalidateEntry
2697  *
2698  *              This routine is invoked for SI cache flush messages.
2699  *
2700  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2701  * already determined that the relid belongs to our database or is a shared
2702  * relation.)
2703  *
2704  * We used to skip local relations, on the grounds that they could
2705  * not be targets of cross-backend SI update messages; but it seems
2706  * safer to process them, so that our *own* SI update messages will
2707  * have the same effects during CommandCounterIncrement for both
2708  * local and nonlocal relations.
2709  */
2710 void
2711 RelationCacheInvalidateEntry(Oid relationId)
2712 {
2713         Relation        relation;
2714
2715         RelationIdCacheLookup(relationId, relation);
2716
2717         if (PointerIsValid(relation))
2718         {
2719                 relcacheInvalsReceived++;
2720                 RelationFlushRelation(relation);
2721         }
2722 }
2723
2724 /*
2725  * RelationCacheInvalidate
2726  *       Blow away cached relation descriptors that have zero reference counts,
2727  *       and rebuild those with positive reference counts.  Also reset the smgr
2728  *       relation cache and re-read relation mapping data.
2729  *
2730  *       This is currently used only to recover from SI message buffer overflow,
2731  *       so we do not touch new-in-transaction relations; they cannot be targets
2732  *       of cross-backend SI updates (and our own updates now go through a
2733  *       separate linked list that isn't limited by the SI message buffer size).
2734  *       Likewise, we need not discard new-relfilenode-in-transaction hints,
2735  *       since any invalidation of those would be a local event.
2736  *
2737  *       We do this in two phases: the first pass deletes deletable items, and
2738  *       the second one rebuilds the rebuildable items.  This is essential for
2739  *       safety, because hash_seq_search only copes with concurrent deletion of
2740  *       the element it is currently visiting.  If a second SI overflow were to
2741  *       occur while we are walking the table, resulting in recursive entry to
2742  *       this routine, we could crash because the inner invocation blows away
2743  *       the entry next to be visited by the outer scan.  But this way is OK,
2744  *       because (a) during the first pass we won't process any more SI messages,
2745  *       so hash_seq_search will complete safely; (b) during the second pass we
2746  *       only hold onto pointers to nondeletable entries.
2747  *
2748  *       The two-phase approach also makes it easy to update relfilenodes for
2749  *       mapped relations before we do anything else, and to ensure that the
2750  *       second pass processes nailed-in-cache items before other nondeletable
2751  *       items.  This should ensure that system catalogs are up to date before
2752  *       we attempt to use them to reload information about other open relations.
2753  */
2754 void
2755 RelationCacheInvalidate(void)
2756 {
2757         HASH_SEQ_STATUS status;
2758         RelIdCacheEnt *idhentry;
2759         Relation        relation;
2760         List       *rebuildFirstList = NIL;
2761         List       *rebuildList = NIL;
2762         ListCell   *l;
2763
2764         /*
2765          * Reload relation mapping data before starting to reconstruct cache.
2766          */
2767         RelationMapInvalidateAll();
2768
2769         /* Phase 1 */
2770         hash_seq_init(&status, RelationIdCache);
2771
2772         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2773         {
2774                 relation = idhentry->reldesc;
2775
2776                 /* Must close all smgr references to avoid leaving dangling ptrs */
2777                 RelationCloseSmgr(relation);
2778
2779                 /*
2780                  * Ignore new relations; no other backend will manipulate them before
2781                  * we commit.  Likewise, before replacing a relation's relfilenode, we
2782                  * shall have acquired AccessExclusiveLock and drained any applicable
2783                  * pending invalidations.
2784                  */
2785                 if (relation->rd_createSubid != InvalidSubTransactionId ||
2786                         relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2787                         continue;
2788
2789                 relcacheInvalsReceived++;
2790
2791                 if (RelationHasReferenceCountZero(relation))
2792                 {
2793                         /* Delete this entry immediately */
2794                         Assert(!relation->rd_isnailed);
2795                         RelationClearRelation(relation, false);
2796                 }
2797                 else
2798                 {
2799                         /*
2800                          * If it's a mapped relation, immediately update its rd_node in
2801                          * case its relfilenode changed.  We must do this during phase 1
2802                          * in case the relation is consulted during rebuild of other
2803                          * relcache entries in phase 2.  It's safe since consulting the
2804                          * map doesn't involve any access to relcache entries.
2805                          */
2806                         if (RelationIsMapped(relation))
2807                                 RelationInitPhysicalAddr(relation);
2808
2809                         /*
2810                          * Add this entry to list of stuff to rebuild in second pass.
2811                          * pg_class goes to the front of rebuildFirstList while
2812                          * pg_class_oid_index goes to the back of rebuildFirstList, so
2813                          * they are done first and second respectively.  Other nailed
2814                          * relations go to the front of rebuildList, so they'll be done
2815                          * next in no particular order; and everything else goes to the
2816                          * back of rebuildList.
2817                          */
2818                         if (RelationGetRelid(relation) == RelationRelationId)
2819                                 rebuildFirstList = lcons(relation, rebuildFirstList);
2820                         else if (RelationGetRelid(relation) == ClassOidIndexId)
2821                                 rebuildFirstList = lappend(rebuildFirstList, relation);
2822                         else if (relation->rd_isnailed)
2823                                 rebuildList = lcons(relation, rebuildList);
2824                         else
2825                                 rebuildList = lappend(rebuildList, relation);
2826                 }
2827         }
2828
2829         /*
2830          * Now zap any remaining smgr cache entries.  This must happen before we
2831          * start to rebuild entries, since that may involve catalog fetches which
2832          * will re-open catalog files.
2833          */
2834         smgrcloseall();
2835
2836         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
2837         foreach(l, rebuildFirstList)
2838         {
2839                 relation = (Relation) lfirst(l);
2840                 RelationClearRelation(relation, true);
2841         }
2842         list_free(rebuildFirstList);
2843         foreach(l, rebuildList)
2844         {
2845                 relation = (Relation) lfirst(l);
2846                 RelationClearRelation(relation, true);
2847         }
2848         list_free(rebuildList);
2849 }
2850
2851 /*
2852  * RelationCloseSmgrByOid - close a relcache entry's smgr link
2853  *
2854  * Needed in some cases where we are changing a relation's physical mapping.
2855  * The link will be automatically reopened on next use.
2856  */
2857 void
2858 RelationCloseSmgrByOid(Oid relationId)
2859 {
2860         Relation        relation;
2861
2862         RelationIdCacheLookup(relationId, relation);
2863
2864         if (!PointerIsValid(relation))
2865                 return;                                 /* not in cache, nothing to do */
2866
2867         RelationCloseSmgr(relation);
2868 }
2869
2870 static void
2871 RememberToFreeTupleDescAtEOX(TupleDesc td)
2872 {
2873         if (EOXactTupleDescArray == NULL)
2874         {
2875                 MemoryContext oldcxt;
2876
2877                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2878
2879                 EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
2880                 EOXactTupleDescArrayLen = 16;
2881                 NextEOXactTupleDescNum = 0;
2882                 MemoryContextSwitchTo(oldcxt);
2883         }
2884         else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
2885         {
2886                 int32           newlen = EOXactTupleDescArrayLen * 2;
2887
2888                 Assert(EOXactTupleDescArrayLen > 0);
2889
2890                 EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
2891                                                                                                  newlen * sizeof(TupleDesc));
2892                 EOXactTupleDescArrayLen = newlen;
2893         }
2894
2895         EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
2896 }
2897
2898 /*
2899  * AtEOXact_RelationCache
2900  *
2901  *      Clean up the relcache at main-transaction commit or abort.
2902  *
2903  * Note: this must be called *before* processing invalidation messages.
2904  * In the case of abort, we don't want to try to rebuild any invalidated
2905  * cache entries (since we can't safely do database accesses).  Therefore
2906  * we must reset refcnts before handling pending invalidations.
2907  *
2908  * As of PostgreSQL 8.1, relcache refcnts should get released by the
2909  * ResourceOwner mechanism.  This routine just does a debugging
2910  * cross-check that no pins remain.  However, we also need to do special
2911  * cleanup when the current transaction created any relations or made use
2912  * of forced index lists.
2913  */
2914 void
2915 AtEOXact_RelationCache(bool isCommit)
2916 {
2917         HASH_SEQ_STATUS status;
2918         RelIdCacheEnt *idhentry;
2919         int                     i;
2920
2921         /*
2922          * Unless the eoxact_list[] overflowed, we only need to examine the rels
2923          * listed in it.  Otherwise fall back on a hash_seq_search scan.
2924          *
2925          * For simplicity, eoxact_list[] entries are not deleted till end of
2926          * top-level transaction, even though we could remove them at
2927          * subtransaction end in some cases, or remove relations from the list if
2928          * they are cleared for other reasons.  Therefore we should expect the
2929          * case that list entries are not found in the hashtable; if not, there's
2930          * nothing to do for them.
2931          */
2932         if (eoxact_list_overflowed)
2933         {
2934                 hash_seq_init(&status, RelationIdCache);
2935                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2936                 {
2937                         AtEOXact_cleanup(idhentry->reldesc, isCommit);
2938                 }
2939         }
2940         else
2941         {
2942                 for (i = 0; i < eoxact_list_len; i++)
2943                 {
2944                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
2945                                                                                                          (void *) &eoxact_list[i],
2946                                                                                                          HASH_FIND,
2947                                                                                                          NULL);
2948                         if (idhentry != NULL)
2949                                 AtEOXact_cleanup(idhentry->reldesc, isCommit);
2950                 }
2951         }
2952
2953         if (EOXactTupleDescArrayLen > 0)
2954         {
2955                 Assert(EOXactTupleDescArray != NULL);
2956                 for (i = 0; i < NextEOXactTupleDescNum; i++)
2957                         FreeTupleDesc(EOXactTupleDescArray[i]);
2958                 pfree(EOXactTupleDescArray);
2959                 EOXactTupleDescArray = NULL;
2960         }
2961
2962         /* Now we're out of the transaction and can clear the lists */
2963         eoxact_list_len = 0;
2964         eoxact_list_overflowed = false;
2965         NextEOXactTupleDescNum = 0;
2966         EOXactTupleDescArrayLen = 0;
2967 }
2968
2969 /*
2970  * AtEOXact_cleanup
2971  *
2972  *      Clean up a single rel at main-transaction commit or abort
2973  *
2974  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
2975  * bother to prevent duplicate entries in eoxact_list[].
2976  */
2977 static void
2978 AtEOXact_cleanup(Relation relation, bool isCommit)
2979 {
2980         /*
2981          * The relcache entry's ref count should be back to its normal
2982          * not-in-a-transaction state: 0 unless it's nailed in cache.
2983          *
2984          * In bootstrap mode, this is NOT true, so don't check it --- the
2985          * bootstrap code expects relations to stay open across start/commit
2986          * transaction calls.  (That seems bogus, but it's not worth fixing.)
2987          *
2988          * Note: ideally this check would be applied to every relcache entry, not
2989          * just those that have eoxact work to do.  But it's not worth forcing a
2990          * scan of the whole relcache just for this.  (Moreover, doing so would
2991          * mean that assert-enabled testing never tests the hash_search code path
2992          * above, which seems a bad idea.)
2993          */
2994 #ifdef USE_ASSERT_CHECKING
2995         if (!IsBootstrapProcessingMode())
2996         {
2997                 int                     expected_refcnt;
2998
2999                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
3000                 Assert(relation->rd_refcnt == expected_refcnt);
3001         }
3002 #endif
3003
3004         /*
3005          * Is it a relation created in the current transaction?
3006          *
3007          * During commit, reset the flag to zero, since we are now out of the
3008          * creating transaction.  During abort, simply delete the relcache entry
3009          * --- it isn't interesting any longer.  (NOTE: if we have forgotten the
3010          * new-ness of a new relation due to a forced cache flush, the entry will
3011          * get deleted anyway by shared-cache-inval processing of the aborted
3012          * pg_class insertion.)
3013          */
3014         if (relation->rd_createSubid != InvalidSubTransactionId)
3015         {
3016                 if (isCommit)
3017                         relation->rd_createSubid = InvalidSubTransactionId;
3018                 else if (RelationHasReferenceCountZero(relation))
3019                 {
3020                         RelationClearRelation(relation, false);
3021                         return;
3022                 }
3023                 else
3024                 {
3025                         /*
3026                          * Hmm, somewhere there's a (leaked?) reference to the relation.
3027                          * We daren't remove the entry for fear of dereferencing a
3028                          * dangling pointer later.  Bleat, and mark it as not belonging to
3029                          * the current transaction.  Hopefully it'll get cleaned up
3030                          * eventually.  This must be just a WARNING to avoid
3031                          * error-during-error-recovery loops.
3032                          */
3033                         relation->rd_createSubid = InvalidSubTransactionId;
3034                         elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3035                                  RelationGetRelationName(relation));
3036                 }
3037         }
3038
3039         /*
3040          * Likewise, reset the hint about the relfilenode being new.
3041          */
3042         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3043
3044         /*
3045          * Flush any temporary index list.
3046          */
3047         if (relation->rd_indexvalid == 2)
3048         {
3049                 list_free(relation->rd_indexlist);
3050                 relation->rd_indexlist = NIL;
3051                 relation->rd_oidindex = InvalidOid;
3052                 relation->rd_pkindex = InvalidOid;
3053                 relation->rd_replidindex = InvalidOid;
3054                 relation->rd_indexvalid = 0;
3055         }
3056 }
3057
3058 /*
3059  * AtEOSubXact_RelationCache
3060  *
3061  *      Clean up the relcache at sub-transaction commit or abort.
3062  *
3063  * Note: this must be called *before* processing invalidation messages.
3064  */
3065 void
3066 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
3067                                                   SubTransactionId parentSubid)
3068 {
3069         HASH_SEQ_STATUS status;
3070         RelIdCacheEnt *idhentry;
3071         int                     i;
3072
3073         /*
3074          * Unless the eoxact_list[] overflowed, we only need to examine the rels
3075          * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
3076          * logic as in AtEOXact_RelationCache.
3077          */
3078         if (eoxact_list_overflowed)
3079         {
3080                 hash_seq_init(&status, RelationIdCache);
3081                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3082                 {
3083                         AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3084                                                                 mySubid, parentSubid);
3085                 }
3086         }
3087         else
3088         {
3089                 for (i = 0; i < eoxact_list_len; i++)
3090                 {
3091                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3092                                                                                                          (void *) &eoxact_list[i],
3093                                                                                                          HASH_FIND,
3094                                                                                                          NULL);
3095                         if (idhentry != NULL)
3096                                 AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3097                                                                         mySubid, parentSubid);
3098                 }
3099         }
3100
3101         /* Don't reset the list; we still need more cleanup later */
3102 }
3103
3104 /*
3105  * AtEOSubXact_cleanup
3106  *
3107  *      Clean up a single rel at subtransaction commit or abort
3108  *
3109  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3110  * bother to prevent duplicate entries in eoxact_list[].
3111  */
3112 static void
3113 AtEOSubXact_cleanup(Relation relation, bool isCommit,
3114                                         SubTransactionId mySubid, SubTransactionId parentSubid)
3115 {
3116         /*
3117          * Is it a relation created in the current subtransaction?
3118          *
3119          * During subcommit, mark it as belonging to the parent, instead. During
3120          * subabort, simply delete the relcache entry.
3121          */
3122         if (relation->rd_createSubid == mySubid)
3123         {
3124                 if (isCommit)
3125                         relation->rd_createSubid = parentSubid;
3126                 else if (RelationHasReferenceCountZero(relation))
3127                 {
3128                         RelationClearRelation(relation, false);
3129                         return;
3130                 }
3131                 else
3132                 {
3133                         /*
3134                          * Hmm, somewhere there's a (leaked?) reference to the relation.
3135                          * We daren't remove the entry for fear of dereferencing a
3136                          * dangling pointer later.  Bleat, and transfer it to the parent
3137                          * subtransaction so we can try again later.  This must be just a
3138                          * WARNING to avoid error-during-error-recovery loops.
3139                          */
3140                         relation->rd_createSubid = parentSubid;
3141                         elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3142                                  RelationGetRelationName(relation));
3143                 }
3144         }
3145
3146         /*
3147          * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
3148          */
3149         if (relation->rd_newRelfilenodeSubid == mySubid)
3150         {
3151                 if (isCommit)
3152                         relation->rd_newRelfilenodeSubid = parentSubid;
3153                 else
3154                         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3155         }
3156
3157         /*
3158          * Flush any temporary index list.
3159          */
3160         if (relation->rd_indexvalid == 2)
3161         {
3162                 list_free(relation->rd_indexlist);
3163                 relation->rd_indexlist = NIL;
3164                 relation->rd_oidindex = InvalidOid;
3165                 relation->rd_pkindex = InvalidOid;
3166                 relation->rd_replidindex = InvalidOid;
3167                 relation->rd_indexvalid = 0;
3168         }
3169 }
3170
3171
3172 /*
3173  *              RelationBuildLocalRelation
3174  *                      Build a relcache entry for an about-to-be-created relation,
3175  *                      and enter it into the relcache.
3176  */
3177 Relation
3178 RelationBuildLocalRelation(const char *relname,
3179                                                    Oid relnamespace,
3180                                                    TupleDesc tupDesc,
3181                                                    Oid relid,
3182                                                    Oid relfilenode,
3183                                                    Oid reltablespace,
3184                                                    bool shared_relation,
3185                                                    bool mapped_relation,
3186                                                    char relpersistence,
3187                                                    char relkind)
3188 {
3189         Relation        rel;
3190         MemoryContext oldcxt;
3191         int                     natts = tupDesc->natts;
3192         int                     i;
3193         bool            has_not_null;
3194         bool            nailit;
3195
3196         AssertArg(natts >= 0);
3197
3198         /*
3199          * check for creation of a rel that must be nailed in cache.
3200          *
3201          * XXX this list had better match the relations specially handled in
3202          * RelationCacheInitializePhase2/3.
3203          */
3204         switch (relid)
3205         {
3206                 case DatabaseRelationId:
3207                 case AuthIdRelationId:
3208                 case AuthMemRelationId:
3209                 case RelationRelationId:
3210                 case AttributeRelationId:
3211                 case ProcedureRelationId:
3212                 case TypeRelationId:
3213                         nailit = true;
3214                         break;
3215                 default:
3216                         nailit = false;
3217                         break;
3218         }
3219
3220         /*
3221          * check that hardwired list of shared rels matches what's in the
3222          * bootstrap .bki file.  If you get a failure here during initdb, you
3223          * probably need to fix IsSharedRelation() to match whatever you've done
3224          * to the set of shared relations.
3225          */
3226         if (shared_relation != IsSharedRelation(relid))
3227                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
3228                          relname, relid);
3229
3230         /* Shared relations had better be mapped, too */
3231         Assert(mapped_relation || !shared_relation);
3232
3233         /*
3234          * switch to the cache context to create the relcache entry.
3235          */
3236         if (!CacheMemoryContext)
3237                 CreateCacheMemoryContext();
3238
3239         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3240
3241         /*
3242          * allocate a new relation descriptor and fill in basic state fields.
3243          */
3244         rel = (Relation) palloc0(sizeof(RelationData));
3245
3246         /* make sure relation is marked as having no open file yet */
3247         rel->rd_smgr = NULL;
3248
3249         /* mark it nailed if appropriate */
3250         rel->rd_isnailed = nailit;
3251
3252         rel->rd_refcnt = nailit ? 1 : 0;
3253
3254         /* it's being created in this transaction */
3255         rel->rd_createSubid = GetCurrentSubTransactionId();
3256         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3257
3258         /*
3259          * create a new tuple descriptor from the one passed in.  We do this
3260          * partly to copy it into the cache context, and partly because the new
3261          * relation can't have any defaults or constraints yet; they have to be
3262          * added in later steps, because they require additions to multiple system
3263          * catalogs.  We can copy attnotnull constraints here, however.
3264          */
3265         rel->rd_att = CreateTupleDescCopy(tupDesc);
3266         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
3267         has_not_null = false;
3268         for (i = 0; i < natts; i++)
3269         {
3270                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
3271                 has_not_null |= tupDesc->attrs[i]->attnotnull;
3272         }
3273
3274         if (has_not_null)
3275         {
3276                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3277
3278                 constr->has_not_null = true;
3279                 rel->rd_att->constr = constr;
3280         }
3281
3282         /*
3283          * initialize relation tuple form (caller may add/override data later)
3284          */
3285         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3286
3287         namestrcpy(&rel->rd_rel->relname, relname);
3288         rel->rd_rel->relnamespace = relnamespace;
3289
3290         rel->rd_rel->relkind = relkind;
3291         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
3292         rel->rd_rel->relnatts = natts;
3293         rel->rd_rel->reltype = InvalidOid;
3294         /* needed when bootstrapping: */
3295         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3296
3297         /* set up persistence and relcache fields dependent on it */
3298         rel->rd_rel->relpersistence = relpersistence;
3299         switch (relpersistence)
3300         {
3301                 case RELPERSISTENCE_UNLOGGED:
3302                 case RELPERSISTENCE_PERMANENT:
3303                         rel->rd_backend = InvalidBackendId;
3304                         rel->rd_islocaltemp = false;
3305                         break;
3306                 case RELPERSISTENCE_TEMP:
3307                         Assert(isTempOrTempToastNamespace(relnamespace));
3308                         rel->rd_backend = BackendIdForTempRelations();
3309                         rel->rd_islocaltemp = true;
3310                         break;
3311                 default:
3312                         elog(ERROR, "invalid relpersistence: %c", relpersistence);
3313                         break;
3314         }
3315
3316         /* if it's a materialized view, it's not populated initially */
3317         if (relkind == RELKIND_MATVIEW)
3318                 rel->rd_rel->relispopulated = false;
3319         else
3320                 rel->rd_rel->relispopulated = true;
3321
3322         /* system relations and non-table objects don't have one */
3323         if (!IsSystemNamespace(relnamespace) &&
3324                 (relkind == RELKIND_RELATION ||
3325                  relkind == RELKIND_MATVIEW ||
3326                  relkind == RELKIND_PARTITIONED_TABLE))
3327                 rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
3328         else
3329                 rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
3330
3331         /*
3332          * Insert relation physical and logical identifiers (OIDs) into the right
3333          * places.  For a mapped relation, we set relfilenode to zero and rely on
3334          * RelationInitPhysicalAddr to consult the map.
3335          */
3336         rel->rd_rel->relisshared = shared_relation;
3337
3338         RelationGetRelid(rel) = relid;
3339
3340         for (i = 0; i < natts; i++)
3341                 rel->rd_att->attrs[i]->attrelid = relid;
3342
3343         rel->rd_rel->reltablespace = reltablespace;
3344
3345         if (mapped_relation)
3346         {
3347                 rel->rd_rel->relfilenode = InvalidOid;
3348                 /* Add it to the active mapping information */
3349                 RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3350         }
3351         else
3352                 rel->rd_rel->relfilenode = relfilenode;
3353
3354         RelationInitLockInfo(rel);      /* see lmgr.c */
3355
3356         RelationInitPhysicalAddr(rel);
3357
3358         /*
3359          * Okay to insert into the relcache hash table.
3360          *
3361          * Ordinarily, there should certainly not be an existing hash entry for
3362          * the same OID; but during bootstrap, when we create a "real" relcache
3363          * entry for one of the bootstrap relations, we'll be overwriting the
3364          * phony one created with formrdesc.  So allow that to happen for nailed
3365          * rels.
3366          */
3367         RelationCacheInsert(rel, nailit);
3368
3369         /*
3370          * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
3371          * can't do this before storing relid in it.
3372          */
3373         EOXactListAdd(rel);
3374
3375         /*
3376          * done building relcache entry.
3377          */
3378         MemoryContextSwitchTo(oldcxt);
3379
3380         /* It's fully valid */
3381         rel->rd_isvalid = true;
3382
3383         /*
3384          * Caller expects us to pin the returned entry.
3385          */
3386         RelationIncrementReferenceCount(rel);
3387
3388         return rel;
3389 }
3390
3391
3392 /*
3393  * RelationSetNewRelfilenode
3394  *
3395  * Assign a new relfilenode (physical file name) to the relation.
3396  *
3397  * This allows a full rewrite of the relation to be done with transactional
3398  * safety (since the filenode assignment can be rolled back).  Note however
3399  * that there is no simple way to access the relation's old data for the
3400  * remainder of the current transaction.  This limits the usefulness to cases
3401  * such as TRUNCATE or rebuilding an index from scratch.
3402  *
3403  * Caller must already hold exclusive lock on the relation.
3404  *
3405  * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
3406  * must be passed for indexes and sequences).  This should be a lower bound on
3407  * the XIDs that will be put into the new relation contents.
3408  *
3409  * The new filenode's persistence is set to the given value.  This is useful
3410  * for the cases that are changing the relation's persistence; other callers
3411  * need to pass the original relpersistence value.
3412  */
3413 void
3414 RelationSetNewRelfilenode(Relation relation, char persistence,
3415                                                   TransactionId freezeXid, MultiXactId minmulti)
3416 {
3417         Oid                     newrelfilenode;
3418         RelFileNodeBackend newrnode;
3419         Relation        pg_class;
3420         HeapTuple       tuple;
3421         Form_pg_class classform;
3422
3423         /* Indexes, sequences must have Invalid frozenxid; other rels must not */
3424         Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
3425                         relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
3426                    freezeXid == InvalidTransactionId :
3427                    TransactionIdIsNormal(freezeXid));
3428         Assert(TransactionIdIsNormal(freezeXid) == MultiXactIdIsValid(minmulti));
3429
3430         /* Allocate a new relfilenode */
3431         newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3432                                                                            persistence);
3433
3434         /*
3435          * Get a writable copy of the pg_class tuple for the given relation.
3436          */
3437         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
3438
3439         tuple = SearchSysCacheCopy1(RELOID,
3440                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3441         if (!HeapTupleIsValid(tuple))
3442                 elog(ERROR, "could not find tuple for relation %u",
3443                          RelationGetRelid(relation));
3444         classform = (Form_pg_class) GETSTRUCT(tuple);
3445
3446         /*
3447          * Create storage for the main fork of the new relfilenode.
3448          *
3449          * NOTE: any conflict in relfilenode value will be caught here, if
3450          * GetNewRelFileNode messes up for any reason.
3451          */
3452         newrnode.node = relation->rd_node;
3453         newrnode.node.relNode = newrelfilenode;
3454         newrnode.backend = relation->rd_backend;
3455         RelationCreateStorage(newrnode.node, persistence);
3456         smgrclosenode(newrnode);
3457
3458         /*
3459          * Schedule unlinking of the old storage at transaction commit.
3460          */
3461         RelationDropStorage(relation);
3462
3463         /*
3464          * Now update the pg_class row.  However, if we're dealing with a mapped
3465          * index, pg_class.relfilenode doesn't change; instead we have to send the
3466          * update to the relation mapper.
3467          */
3468         if (RelationIsMapped(relation))
3469                 RelationMapUpdateMap(RelationGetRelid(relation),
3470                                                          newrelfilenode,
3471                                                          relation->rd_rel->relisshared,
3472                                                          false);
3473         else
3474                 classform->relfilenode = newrelfilenode;
3475
3476         /* These changes are safe even for a mapped relation */
3477         if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
3478         {
3479                 classform->relpages = 0;        /* it's empty until further notice */
3480                 classform->reltuples = 0;
3481                 classform->relallvisible = 0;
3482         }
3483         classform->relfrozenxid = freezeXid;
3484         classform->relminmxid = minmulti;
3485         classform->relpersistence = persistence;
3486
3487         CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
3488
3489         heap_freetuple(tuple);
3490
3491         heap_close(pg_class, RowExclusiveLock);
3492
3493         /*
3494          * Make the pg_class row change visible, as well as the relation map
3495          * change if any.  This will cause the relcache entry to get updated, too.
3496          */
3497         CommandCounterIncrement();
3498
3499         /*
3500          * Mark the rel as having been given a new relfilenode in the current
3501          * (sub) transaction.  This is a hint that can be used to optimize later
3502          * operations on the rel in the same transaction.
3503          */
3504         relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3505
3506         /* Flag relation as needing eoxact cleanup (to remove the hint) */
3507         EOXactListAdd(relation);
3508 }
3509
3510
3511 /*
3512  *              RelationCacheInitialize
3513  *
3514  *              This initializes the relation descriptor cache.  At the time
3515  *              that this is invoked, we can't do database access yet (mainly
3516  *              because the transaction subsystem is not up); all we are doing
3517  *              is making an empty cache hashtable.  This must be done before
3518  *              starting the initialization transaction, because otherwise
3519  *              AtEOXact_RelationCache would crash if that transaction aborts
3520  *              before we can get the relcache set up.
3521  */
3522
3523 #define INITRELCACHESIZE                400
3524
3525 void
3526 RelationCacheInitialize(void)
3527 {
3528         HASHCTL         ctl;
3529
3530         /*
3531          * make sure cache memory context exists
3532          */
3533         if (!CacheMemoryContext)
3534                 CreateCacheMemoryContext();
3535
3536         /*
3537          * create hashtable that indexes the relcache
3538          */
3539         MemSet(&ctl, 0, sizeof(ctl));
3540         ctl.keysize = sizeof(Oid);
3541         ctl.entrysize = sizeof(RelIdCacheEnt);
3542         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3543                                                                   &ctl, HASH_ELEM | HASH_BLOBS);
3544
3545         /*
3546          * relation mapper needs to be initialized too
3547          */
3548         RelationMapInitialize();
3549 }
3550
3551 /*
3552  *              RelationCacheInitializePhase2
3553  *
3554  *              This is called to prepare for access to shared catalogs during startup.
3555  *              We must at least set up nailed reldescs for pg_database, pg_authid,
3556  *              pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
3557  *              for their indexes, too.  We attempt to load this information from the
3558  *              shared relcache init file.  If that's missing or broken, just make
3559  *              phony entries for the catalogs themselves.
3560  *              RelationCacheInitializePhase3 will clean up as needed.
3561  */
3562 void
3563 RelationCacheInitializePhase2(void)
3564 {
3565         MemoryContext oldcxt;
3566
3567         /*
3568          * relation mapper needs initialized too
3569          */
3570         RelationMapInitializePhase2();
3571
3572         /*
3573          * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
3574          * nothing.
3575          */
3576         if (IsBootstrapProcessingMode())
3577                 return;
3578
3579         /*
3580          * switch to cache memory context
3581          */
3582         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3583
3584         /*
3585          * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3586          * the cache with pre-made descriptors for the critical shared catalogs.
3587          */
3588         if (!load_relcache_init_file(true))
3589         {
3590                 formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3591                                   true, Natts_pg_database, Desc_pg_database);
3592                 formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3593                                   true, Natts_pg_authid, Desc_pg_authid);
3594                 formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3595                                   false, Natts_pg_auth_members, Desc_pg_auth_members);
3596                 formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3597                                   false, Natts_pg_shseclabel, Desc_pg_shseclabel);
3598                 formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3599                                   true, Natts_pg_subscription, Desc_pg_subscription);
3600
3601 #define NUM_CRITICAL_SHARED_RELS        5       /* fix if you change list above */
3602         }
3603
3604         MemoryContextSwitchTo(oldcxt);
3605 }
3606
3607 /*
3608  *              RelationCacheInitializePhase3
3609  *
3610  *              This is called as soon as the catcache and transaction system
3611  *              are functional and we have determined MyDatabaseId.  At this point
3612  *              we can actually read data from the database's system catalogs.
3613  *              We first try to read pre-computed relcache entries from the local
3614  *              relcache init file.  If that's missing or broken, make phony entries
3615  *              for the minimum set of nailed-in-cache relations.  Then (unless
3616  *              bootstrapping) make sure we have entries for the critical system
3617  *              indexes.  Once we've done all this, we have enough infrastructure to
3618  *              open any system catalog or use any catcache.  The last step is to
3619  *              rewrite the cache files if needed.
3620  */
3621 void
3622 RelationCacheInitializePhase3(void)
3623 {
3624         HASH_SEQ_STATUS status;
3625         RelIdCacheEnt *idhentry;
3626         MemoryContext oldcxt;
3627         bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
3628
3629         /*
3630          * relation mapper needs initialized too
3631          */
3632         RelationMapInitializePhase3();
3633
3634         /*
3635          * switch to cache memory context
3636          */
3637         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3638
3639         /*
3640          * Try to load the local relcache cache file.  If unsuccessful, bootstrap
3641          * the cache with pre-made descriptors for the critical "nailed-in" system
3642          * catalogs.
3643          */
3644         if (IsBootstrapProcessingMode() ||
3645                 !load_relcache_init_file(false))
3646         {
3647                 needNewCacheFile = true;
3648
3649                 formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
3650                                   true, Natts_pg_class, Desc_pg_class);
3651                 formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
3652                                   false, Natts_pg_attribute, Desc_pg_attribute);
3653                 formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
3654                                   true, Natts_pg_proc, Desc_pg_proc);
3655                 formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
3656                                   true, Natts_pg_type, Desc_pg_type);
3657
3658 #define NUM_CRITICAL_LOCAL_RELS 4               /* fix if you change list above */
3659         }
3660
3661         MemoryContextSwitchTo(oldcxt);
3662
3663         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3664         if (IsBootstrapProcessingMode())
3665                 return;
3666
3667         /*
3668          * If we didn't get the critical system indexes loaded into relcache, do
3669          * so now.  These are critical because the catcache and/or opclass cache
3670          * depend on them for fetches done during relcache load.  Thus, we have an
3671          * infinite-recursion problem.  We can break the recursion by doing
3672          * heapscans instead of indexscans at certain key spots. To avoid hobbling
3673          * performance, we only want to do that until we have the critical indexes
3674          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
3675          * decide whether to do heapscan or indexscan at the key spots, and we set
3676          * it true after we've loaded the critical indexes.
3677          *
3678          * The critical indexes are marked as "nailed in cache", partly to make it
3679          * easy for load_relcache_init_file to count them, but mainly because we
3680          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
3681          * true.  (NOTE: perhaps it would be possible to reload them by
3682          * temporarily setting criticalRelcachesBuilt to false again.  For now,
3683          * though, we just nail 'em in.)
3684          *
3685          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
3686          * in the same way as the others, because the critical catalogs don't
3687          * (currently) have any rules or triggers, and so these indexes can be
3688          * rebuilt without inducing recursion.  However they are used during
3689          * relcache load when a rel does have rules or triggers, so we choose to
3690          * nail them for performance reasons.
3691          */
3692         if (!criticalRelcachesBuilt)
3693         {
3694                 load_critical_index(ClassOidIndexId,
3695                                                         RelationRelationId);
3696                 load_critical_index(AttributeRelidNumIndexId,
3697                                                         AttributeRelationId);
3698                 load_critical_index(IndexRelidIndexId,
3699                                                         IndexRelationId);
3700                 load_critical_index(OpclassOidIndexId,
3701                                                         OperatorClassRelationId);
3702                 load_critical_index(AccessMethodProcedureIndexId,
3703                                                         AccessMethodProcedureRelationId);
3704                 load_critical_index(RewriteRelRulenameIndexId,
3705                                                         RewriteRelationId);
3706                 load_critical_index(TriggerRelidNameIndexId,
3707                                                         TriggerRelationId);
3708
3709 #define NUM_CRITICAL_LOCAL_INDEXES      7       /* fix if you change list above */
3710
3711                 criticalRelcachesBuilt = true;
3712         }
3713
3714         /*
3715          * Process critical shared indexes too.
3716          *
3717          * DatabaseNameIndexId isn't critical for relcache loading, but rather for
3718          * initial lookup of MyDatabaseId, without which we'll never find any
3719          * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
3720          * database OID, so it instead depends on DatabaseOidIndexId.  We also
3721          * need to nail up some indexes on pg_authid and pg_auth_members for use
3722          * during client authentication.  SharedSecLabelObjectIndexId isn't
3723          * critical for the core system, but authentication hooks might be
3724          * interested in it.
3725          */
3726         if (!criticalSharedRelcachesBuilt)
3727         {
3728                 load_critical_index(DatabaseNameIndexId,
3729                                                         DatabaseRelationId);
3730                 load_critical_index(DatabaseOidIndexId,
3731                                                         DatabaseRelationId);
3732                 load_critical_index(AuthIdRolnameIndexId,
3733                                                         AuthIdRelationId);
3734                 load_critical_index(AuthIdOidIndexId,
3735                                                         AuthIdRelationId);
3736                 load_critical_index(AuthMemMemRoleIndexId,
3737                                                         AuthMemRelationId);
3738                 load_critical_index(SharedSecLabelObjectIndexId,
3739                                                         SharedSecLabelRelationId);
3740
3741 #define NUM_CRITICAL_SHARED_INDEXES 6   /* fix if you change list above */
3742
3743                 criticalSharedRelcachesBuilt = true;
3744         }
3745
3746         /*
3747          * Now, scan all the relcache entries and update anything that might be
3748          * wrong in the results from formrdesc or the relcache cache file. If we
3749          * faked up relcache entries using formrdesc, then read the real pg_class
3750          * rows and replace the fake entries with them. Also, if any of the
3751          * relcache entries have rules, triggers, or security policies, load that
3752          * info the hard way since it isn't recorded in the cache file.
3753          *
3754          * Whenever we access the catalogs to read data, there is a possibility of
3755          * a shared-inval cache flush causing relcache entries to be removed.
3756          * Since hash_seq_search only guarantees to still work after the *current*
3757          * entry is removed, it's unsafe to continue the hashtable scan afterward.
3758          * We handle this by restarting the scan from scratch after each access.
3759          * This is theoretically O(N^2), but the number of entries that actually
3760          * need to be fixed is small enough that it doesn't matter.
3761          */
3762         hash_seq_init(&status, RelationIdCache);
3763
3764         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3765         {
3766                 Relation        relation = idhentry->reldesc;
3767                 bool            restart = false;
3768
3769                 /*
3770                  * Make sure *this* entry doesn't get flushed while we work with it.
3771                  */
3772                 RelationIncrementReferenceCount(relation);
3773
3774                 /*
3775                  * If it's a faked-up entry, read the real pg_class tuple.
3776                  */
3777                 if (relation->rd_rel->relowner == InvalidOid)
3778                 {
3779                         HeapTuple       htup;
3780                         Form_pg_class relp;
3781
3782                         htup = SearchSysCache1(RELOID,
3783                                                            ObjectIdGetDatum(RelationGetRelid(relation)));
3784                         if (!HeapTupleIsValid(htup))
3785                                 elog(FATAL, "cache lookup failed for relation %u",
3786                                          RelationGetRelid(relation));
3787                         relp = (Form_pg_class) GETSTRUCT(htup);
3788
3789                         /*
3790                          * Copy tuple to relation->rd_rel. (See notes in
3791                          * AllocateRelationDesc())
3792                          */
3793                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3794
3795                         /* Update rd_options while we have the tuple */
3796                         if (relation->rd_options)
3797                                 pfree(relation->rd_options);
3798                         RelationParseRelOptions(relation, htup);
3799
3800                         /*
3801                          * Check the values in rd_att were set up correctly.  (We cannot
3802                          * just copy them over now: formrdesc must have set up the rd_att
3803                          * data correctly to start with, because it may already have been
3804                          * copied into one or more catcache entries.)
3805                          */
3806                         Assert(relation->rd_att->tdtypeid == relp->reltype);
3807                         Assert(relation->rd_att->tdtypmod == -1);
3808                         Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3809
3810                         ReleaseSysCache(htup);
3811
3812                         /* relowner had better be OK now, else we'll loop forever */
3813                         if (relation->rd_rel->relowner == InvalidOid)
3814                                 elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
3815                                          RelationGetRelationName(relation));
3816
3817                         restart = true;
3818                 }
3819
3820                 /*
3821                  * Fix data that isn't saved in relcache cache file.
3822                  *
3823                  * relhasrules or relhastriggers could possibly be wrong or out of
3824                  * date.  If we don't actually find any rules or triggers, clear the
3825                  * local copy of the flag so that we don't get into an infinite loop
3826                  * here.  We don't make any attempt to fix the pg_class entry, though.
3827                  */
3828                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3829                 {
3830                         RelationBuildRuleLock(relation);
3831                         if (relation->rd_rules == NULL)
3832                                 relation->rd_rel->relhasrules = false;
3833                         restart = true;
3834                 }
3835                 if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
3836                 {
3837                         RelationBuildTriggers(relation);
3838                         if (relation->trigdesc == NULL)
3839                                 relation->rd_rel->relhastriggers = false;
3840                         restart = true;
3841                 }
3842
3843                 /*
3844                  * Re-load the row security policies if the relation has them, since
3845                  * they are not preserved in the cache.  Note that we can never NOT
3846                  * have a policy while relrowsecurity is true,
3847                  * RelationBuildRowSecurity will create a single default-deny policy
3848                  * if there is no policy defined in pg_policy.
3849                  */
3850                 if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
3851                 {
3852                         RelationBuildRowSecurity(relation);
3853
3854                         Assert(relation->rd_rsdesc != NULL);
3855                         restart = true;
3856                 }
3857
3858                 /*
3859                  * Reload partition key and descriptor for a partitioned table.
3860                  */
3861                 if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3862                 {
3863                         RelationBuildPartitionKey(relation);
3864                         Assert(relation->rd_partkey != NULL);
3865
3866                         RelationBuildPartitionDesc(relation);
3867                         Assert(relation->rd_partdesc != NULL);
3868
3869                         restart = true;
3870                 }
3871
3872                 /* Release hold on the relation */
3873                 RelationDecrementReferenceCount(relation);
3874
3875                 /* Now, restart the hashtable scan if needed */
3876                 if (restart)
3877                 {
3878                         hash_seq_term(&status);
3879                         hash_seq_init(&status, RelationIdCache);
3880                 }
3881         }
3882
3883         /*
3884          * Lastly, write out new relcache cache files if needed.  We don't bother
3885          * to distinguish cases where only one of the two needs an update.
3886          */
3887         if (needNewCacheFile)
3888         {
3889                 /*
3890                  * Force all the catcaches to finish initializing and thereby open the
3891                  * catalogs and indexes they use.  This will preload the relcache with
3892                  * entries for all the most important system catalogs and indexes, so
3893                  * that the init files will be most useful for future backends.
3894                  */
3895                 InitCatalogCachePhase2();
3896
3897                 /* now write the files */
3898                 write_relcache_init_file(true);
3899                 write_relcache_init_file(false);
3900         }
3901 }
3902
3903 /*
3904  * Load one critical system index into the relcache
3905  *
3906  * indexoid is the OID of the target index, heapoid is the OID of the catalog
3907  * it belongs to.
3908  */
3909 static void
3910 load_critical_index(Oid indexoid, Oid heapoid)
3911 {
3912         Relation        ird;
3913
3914         /*
3915          * We must lock the underlying catalog before locking the index to avoid
3916          * deadlock, since RelationBuildDesc might well need to read the catalog,
3917          * and if anyone else is exclusive-locking this catalog and index they'll
3918          * be doing it in that order.
3919          */
3920         LockRelationOid(heapoid, AccessShareLock);
3921         LockRelationOid(indexoid, AccessShareLock);
3922         ird = RelationBuildDesc(indexoid, true);
3923         if (ird == NULL)
3924                 elog(PANIC, "could not open critical system index %u", indexoid);
3925         ird->rd_isnailed = true;
3926         ird->rd_refcnt = 1;
3927         UnlockRelationOid(indexoid, AccessShareLock);
3928         UnlockRelationOid(heapoid, AccessShareLock);
3929 }
3930
3931 /*
3932  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3933  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
3934  *
3935  * We need this kluge because we have to be able to access non-fixed-width
3936  * fields of pg_class and pg_index before we have the standard catalog caches
3937  * available.  We use predefined data that's set up in just the same way as
3938  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
3939  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
3940  * does it have a TupleConstr field.  But it's good enough for the purpose of
3941  * extracting fields.
3942  */
3943 static TupleDesc
3944 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
3945                                                  bool hasoids)
3946 {
3947         TupleDesc       result;
3948         MemoryContext oldcxt;
3949         int                     i;
3950
3951         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3952
3953         result = CreateTemplateTupleDesc(natts, hasoids);
3954         result->tdtypeid = RECORDOID;           /* not right, but we don't care */
3955         result->tdtypmod = -1;
3956
3957         for (i = 0; i < natts; i++)
3958         {
3959                 memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3960                 /* make sure attcacheoff is valid */
3961                 result->attrs[i]->attcacheoff = -1;
3962         }
3963
3964         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3965         result->attrs[0]->attcacheoff = 0;
3966
3967         /* Note: we don't bother to set up a TupleConstr entry */
3968
3969         MemoryContextSwitchTo(oldcxt);
3970
3971         return result;
3972 }
3973
3974 static TupleDesc
3975 GetPgClassDescriptor(void)
3976 {
3977         static TupleDesc pgclassdesc = NULL;
3978
3979         /* Already done? */
3980         if (pgclassdesc == NULL)
3981                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
3982                                                                                            Desc_pg_class,
3983                                                                                            true);
3984
3985         return pgclassdesc;
3986 }
3987
3988 static TupleDesc
3989 GetPgIndexDescriptor(void)
3990 {
3991         static TupleDesc pgindexdesc = NULL;
3992
3993         /* Already done? */
3994         if (pgindexdesc == NULL)
3995                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
3996                                                                                            Desc_pg_index,
3997                                                                                            false);
3998
3999         return pgindexdesc;
4000 }
4001
4002 /*
4003  * Load any default attribute value definitions for the relation.
4004  */
4005 static void
4006 AttrDefaultFetch(Relation relation)
4007 {
4008         AttrDefault *attrdef = relation->rd_att->constr->defval;
4009         int                     ndef = relation->rd_att->constr->num_defval;
4010         Relation        adrel;
4011         SysScanDesc adscan;
4012         ScanKeyData skey;
4013         HeapTuple       htup;
4014         Datum           val;
4015         bool            isnull;
4016         int                     found;
4017         int                     i;
4018
4019         ScanKeyInit(&skey,
4020                                 Anum_pg_attrdef_adrelid,
4021                                 BTEqualStrategyNumber, F_OIDEQ,
4022                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4023
4024         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
4025         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
4026                                                                 NULL, 1, &skey);
4027         found = 0;
4028
4029         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
4030         {
4031                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
4032
4033                 for (i = 0; i < ndef; i++)
4034                 {
4035                         if (adform->adnum != attrdef[i].adnum)
4036                                 continue;
4037                         if (attrdef[i].adbin != NULL)
4038                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
4039                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
4040                                          RelationGetRelationName(relation));
4041                         else
4042                                 found++;
4043
4044                         val = fastgetattr(htup,
4045                                                           Anum_pg_attrdef_adbin,
4046                                                           adrel->rd_att, &isnull);
4047                         if (isnull)
4048                                 elog(WARNING, "null adbin for attr %s of rel %s",
4049                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
4050                                          RelationGetRelationName(relation));
4051                         else
4052                         {
4053                                 /* detoast and convert to cstring in caller's context */
4054                                 char       *s = TextDatumGetCString(val);
4055
4056                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
4057                                 pfree(s);
4058                         }
4059                         break;
4060                 }
4061
4062                 if (i >= ndef)
4063                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
4064                                  adform->adnum, RelationGetRelationName(relation));
4065         }
4066
4067         systable_endscan(adscan);
4068         heap_close(adrel, AccessShareLock);
4069
4070         if (found != ndef)
4071                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
4072                          ndef - found, RelationGetRelationName(relation));
4073 }
4074
4075 /*
4076  * Load any check constraints for the relation.
4077  */
4078 static void
4079 CheckConstraintFetch(Relation relation)
4080 {
4081         ConstrCheck *check = relation->rd_att->constr->check;
4082         int                     ncheck = relation->rd_att->constr->num_check;
4083         Relation        conrel;
4084         SysScanDesc conscan;
4085         ScanKeyData skey[1];
4086         HeapTuple       htup;
4087         int                     found = 0;
4088
4089         ScanKeyInit(&skey[0],
4090                                 Anum_pg_constraint_conrelid,
4091                                 BTEqualStrategyNumber, F_OIDEQ,
4092                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4093
4094         conrel = heap_open(ConstraintRelationId, AccessShareLock);
4095         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
4096                                                                  NULL, 1, skey);
4097
4098         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4099         {
4100                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4101                 Datum           val;
4102                 bool            isnull;
4103                 char       *s;
4104
4105                 /* We want check constraints only */
4106                 if (conform->contype != CONSTRAINT_CHECK)
4107                         continue;
4108
4109                 if (found >= ncheck)
4110                         elog(ERROR, "unexpected constraint record found for rel %s",
4111                                  RelationGetRelationName(relation));
4112
4113                 check[found].ccvalid = conform->convalidated;
4114                 check[found].ccnoinherit = conform->connoinherit;
4115                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4116                                                                                                   NameStr(conform->conname));
4117
4118                 /* Grab and test conbin is actually set */
4119                 val = fastgetattr(htup,
4120                                                   Anum_pg_constraint_conbin,
4121                                                   conrel->rd_att, &isnull);
4122                 if (isnull)
4123                         elog(ERROR, "null conbin for rel %s",
4124                                  RelationGetRelationName(relation));
4125
4126                 /* detoast and convert to cstring in caller's context */
4127                 s = TextDatumGetCString(val);
4128                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
4129                 pfree(s);
4130
4131                 found++;
4132         }
4133
4134         systable_endscan(conscan);
4135         heap_close(conrel, AccessShareLock);
4136
4137         if (found != ncheck)
4138                 elog(ERROR, "%d constraint record(s) missing for rel %s",
4139                          ncheck - found, RelationGetRelationName(relation));
4140
4141         /* Sort the records so that CHECKs are applied in a deterministic order */
4142         if (ncheck > 1)
4143                 qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
4144 }
4145
4146 /*
4147  * qsort comparator to sort ConstrCheck entries by name
4148  */
4149 static int
4150 CheckConstraintCmp(const void *a, const void *b)
4151 {
4152         const ConstrCheck *ca = (const ConstrCheck *) a;
4153         const ConstrCheck *cb = (const ConstrCheck *) b;
4154
4155         return strcmp(ca->ccname, cb->ccname);
4156 }
4157
4158 /*
4159  * RelationGetFKeyList -- get a list of foreign key info for the relation
4160  *
4161  * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
4162  * the given relation.  This data is a direct copy of relevant fields from
4163  * pg_constraint.  The list items are in no particular order.
4164  *
4165  * CAUTION: the returned list is part of the relcache's data, and could
4166  * vanish in a relcache entry reset.  Callers must inspect or copy it
4167  * before doing anything that might trigger a cache flush, such as
4168  * system catalog accesses.  copyObject() can be used if desired.
4169  * (We define it this way because current callers want to filter and
4170  * modify the list entries anyway, so copying would be a waste of time.)
4171  */
4172 List *
4173 RelationGetFKeyList(Relation relation)
4174 {
4175         List       *result;
4176         Relation        conrel;
4177         SysScanDesc conscan;
4178         ScanKeyData skey;
4179         HeapTuple       htup;
4180         List       *oldlist;
4181         MemoryContext oldcxt;
4182
4183         /* Quick exit if we already computed the list. */
4184         if (relation->rd_fkeyvalid)
4185                 return relation->rd_fkeylist;
4186
4187         /* Fast path: if it doesn't have any triggers, it can't have FKs */
4188         if (!relation->rd_rel->relhastriggers)
4189                 return NIL;
4190
4191         /*
4192          * We build the list we intend to return (in the caller's context) while
4193          * doing the scan.  After successfully completing the scan, we copy that
4194          * list into the relcache entry.  This avoids cache-context memory leakage
4195          * if we get some sort of error partway through.
4196          */
4197         result = NIL;
4198
4199         /* Prepare to scan pg_constraint for entries having conrelid = this rel. */
4200         ScanKeyInit(&skey,
4201                                 Anum_pg_constraint_conrelid,
4202                                 BTEqualStrategyNumber, F_OIDEQ,
4203                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4204
4205         conrel = heap_open(ConstraintRelationId, AccessShareLock);
4206         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
4207                                                                  NULL, 1, &skey);
4208
4209         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4210         {
4211                 Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
4212                 ForeignKeyCacheInfo *info;
4213                 Datum           adatum;
4214                 bool            isnull;
4215                 ArrayType  *arr;
4216                 int                     nelem;
4217
4218                 /* consider only foreign keys */
4219                 if (constraint->contype != CONSTRAINT_FOREIGN)
4220                         continue;
4221
4222                 info = makeNode(ForeignKeyCacheInfo);
4223                 info->conrelid = constraint->conrelid;
4224                 info->confrelid = constraint->confrelid;
4225
4226                 /* Extract data from conkey field */
4227                 adatum = fastgetattr(htup, Anum_pg_constraint_conkey,
4228                                                          conrel->rd_att, &isnull);
4229                 if (isnull)
4230                         elog(ERROR, "null conkey for rel %s",
4231                                  RelationGetRelationName(relation));
4232
4233                 arr = DatumGetArrayTypeP(adatum);               /* ensure not toasted */
4234                 nelem = ARR_DIMS(arr)[0];
4235                 if (ARR_NDIM(arr) != 1 ||
4236                         nelem < 1 ||
4237                         nelem > INDEX_MAX_KEYS ||
4238                         ARR_HASNULL(arr) ||
4239                         ARR_ELEMTYPE(arr) != INT2OID)
4240                         elog(ERROR, "conkey is not a 1-D smallint array");
4241
4242                 info->nkeys = nelem;
4243                 memcpy(info->conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
4244
4245                 /* Likewise for confkey */
4246                 adatum = fastgetattr(htup, Anum_pg_constraint_confkey,
4247                                                          conrel->rd_att, &isnull);
4248                 if (isnull)
4249                         elog(ERROR, "null confkey for rel %s",
4250                                  RelationGetRelationName(relation));
4251
4252                 arr = DatumGetArrayTypeP(adatum);               /* ensure not toasted */
4253                 nelem = ARR_DIMS(arr)[0];
4254                 if (ARR_NDIM(arr) != 1 ||
4255                         nelem != info->nkeys ||
4256                         ARR_HASNULL(arr) ||
4257                         ARR_ELEMTYPE(arr) != INT2OID)
4258                         elog(ERROR, "confkey is not a 1-D smallint array");
4259
4260                 memcpy(info->confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
4261
4262                 /* Likewise for conpfeqop */
4263                 adatum = fastgetattr(htup, Anum_pg_constraint_conpfeqop,
4264                                                          conrel->rd_att, &isnull);
4265                 if (isnull)
4266                         elog(ERROR, "null conpfeqop for rel %s",
4267                                  RelationGetRelationName(relation));
4268
4269                 arr = DatumGetArrayTypeP(adatum);               /* ensure not toasted */
4270                 nelem = ARR_DIMS(arr)[0];
4271                 if (ARR_NDIM(arr) != 1 ||
4272                         nelem != info->nkeys ||
4273                         ARR_HASNULL(arr) ||
4274                         ARR_ELEMTYPE(arr) != OIDOID)
4275                         elog(ERROR, "conpfeqop is not a 1-D OID array");
4276
4277                 memcpy(info->conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
4278
4279                 /* Add FK's node to the result list */
4280                 result = lappend(result, info);
4281         }
4282
4283         systable_endscan(conscan);
4284         heap_close(conrel, AccessShareLock);
4285
4286         /* Now save a copy of the completed list in the relcache entry. */
4287         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4288         oldlist = relation->rd_fkeylist;
4289         relation->rd_fkeylist = copyObject(result);
4290         relation->rd_fkeyvalid = true;
4291         MemoryContextSwitchTo(oldcxt);
4292
4293         /* Don't leak the old list, if there is one */
4294         list_free_deep(oldlist);
4295
4296         return result;
4297 }
4298
4299 /*
4300  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
4301  *
4302  * The index list is created only if someone requests it.  We scan pg_index
4303  * to find relevant indexes, and add the list to the relcache entry so that
4304  * we won't have to compute it again.  Note that shared cache inval of a
4305  * relcache entry will delete the old list and set rd_indexvalid to 0,
4306  * so that we must recompute the index list on next request.  This handles
4307  * creation or deletion of an index.
4308  *
4309  * Indexes that are marked not IndexIsLive are omitted from the returned list.
4310  * Such indexes are expected to be dropped momentarily, and should not be
4311  * touched at all by any caller of this function.
4312  *
4313  * The returned list is guaranteed to be sorted in order by OID.  This is
4314  * needed by the executor, since for index types that we obtain exclusive
4315  * locks on when updating the index, all backends must lock the indexes in
4316  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
4317  * consistent ordering would do, but ordering by OID is easy.
4318  *
4319  * Since shared cache inval causes the relcache's copy of the list to go away,
4320  * we return a copy of the list palloc'd in the caller's context.  The caller
4321  * may list_free() the returned list after scanning it. This is necessary
4322  * since the caller will typically be doing syscache lookups on the relevant
4323  * indexes, and syscache lookup could cause SI messages to be processed!
4324  *
4325  * We also update rd_oidindex, which this module treats as effectively part
4326  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
4327  * it is the pg_class OID of a unique index on OID when the relation has one,
4328  * and InvalidOid if there is no such index.
4329  *
4330  * In exactly the same way, we update rd_replidindex, which is the pg_class
4331  * OID of an index to be used as the relation's replication identity index,
4332  * or InvalidOid if there is no such index.
4333  */
4334 List *
4335 RelationGetIndexList(Relation relation)
4336 {
4337         Relation        indrel;
4338         SysScanDesc indscan;
4339         ScanKeyData skey;
4340         HeapTuple       htup;
4341         List       *result;
4342         List       *oldlist;
4343         char            replident = relation->rd_rel->relreplident;
4344         Oid                     oidIndex = InvalidOid;
4345         Oid                     pkeyIndex = InvalidOid;
4346         Oid                     candidateIndex = InvalidOid;
4347         MemoryContext oldcxt;
4348
4349         /* Quick exit if we already computed the list. */
4350         if (relation->rd_indexvalid != 0)
4351                 return list_copy(relation->rd_indexlist);
4352
4353         /*
4354          * We build the list we intend to return (in the caller's context) while
4355          * doing the scan.  After successfully completing the scan, we copy that
4356          * list into the relcache entry.  This avoids cache-context memory leakage
4357          * if we get some sort of error partway through.
4358          */
4359         result = NIL;
4360         oidIndex = InvalidOid;
4361
4362         /* Prepare to scan pg_index for entries having indrelid = this rel. */
4363         ScanKeyInit(&skey,
4364                                 Anum_pg_index_indrelid,
4365                                 BTEqualStrategyNumber, F_OIDEQ,
4366                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4367
4368         indrel = heap_open(IndexRelationId, AccessShareLock);
4369         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4370                                                                  NULL, 1, &skey);
4371
4372         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4373         {
4374                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4375                 Datum           indclassDatum;
4376                 oidvector  *indclass;
4377                 bool            isnull;
4378
4379                 /*
4380                  * Ignore any indexes that are currently being dropped.  This will
4381                  * prevent them from being searched, inserted into, or considered in
4382                  * HOT-safety decisions.  It's unsafe to touch such an index at all
4383                  * since its catalog entries could disappear at any instant.
4384                  */
4385                 if (!IndexIsLive(index))
4386                         continue;
4387
4388                 /* Add index's OID to result list in the proper order */
4389                 result = insert_ordered_oid(result, index->indexrelid);
4390
4391                 /*
4392                  * indclass cannot be referenced directly through the C struct,
4393                  * because it comes after the variable-width indkey field.  Must
4394                  * extract the datum the hard way...
4395                  */
4396                 indclassDatum = heap_getattr(htup,
4397                                                                          Anum_pg_index_indclass,
4398                                                                          GetPgIndexDescriptor(),
4399                                                                          &isnull);
4400                 Assert(!isnull);
4401                 indclass = (oidvector *) DatumGetPointer(indclassDatum);
4402
4403                 /*
4404                  * Invalid, non-unique, non-immediate or predicate indexes aren't
4405                  * interesting for either oid indexes or replication identity indexes,
4406                  * so don't check them.
4407                  */
4408                 if (!IndexIsValid(index) || !index->indisunique ||
4409                         !index->indimmediate ||
4410                         !heap_attisnull(htup, Anum_pg_index_indpred))
4411                         continue;
4412
4413                 /* Check to see if is a usable btree index on OID */
4414                 if (index->indnatts == 1 &&
4415                         index->indkey.values[0] == ObjectIdAttributeNumber &&
4416                         indclass->values[0] == OID_BTREE_OPS_OID)
4417                         oidIndex = index->indexrelid;
4418
4419                 /* remember primary key index if any */
4420                 if (index->indisprimary)
4421                         pkeyIndex = index->indexrelid;
4422
4423                 /* remember explicitly chosen replica index */
4424                 if (index->indisreplident)
4425                         candidateIndex = index->indexrelid;
4426         }
4427
4428         systable_endscan(indscan);
4429
4430         heap_close(indrel, AccessShareLock);
4431
4432         /* Now save a copy of the completed list in the relcache entry. */
4433         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4434         oldlist = relation->rd_indexlist;
4435         relation->rd_indexlist = list_copy(result);
4436         relation->rd_oidindex = oidIndex;
4437         relation->rd_pkindex = pkeyIndex;
4438         if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
4439                 relation->rd_replidindex = pkeyIndex;
4440         else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
4441                 relation->rd_replidindex = candidateIndex;
4442         else
4443                 relation->rd_replidindex = InvalidOid;
4444         relation->rd_indexvalid = 1;
4445         MemoryContextSwitchTo(oldcxt);
4446
4447         /* Don't leak the old list, if there is one */
4448         list_free(oldlist);
4449
4450         return result;
4451 }
4452
4453 /*
4454  * insert_ordered_oid
4455  *              Insert a new Oid into a sorted list of Oids, preserving ordering
4456  *
4457  * Building the ordered list this way is O(N^2), but with a pretty small
4458  * constant, so for the number of entries we expect it will probably be
4459  * faster than trying to apply qsort().  Most tables don't have very many
4460  * indexes...
4461  */
4462 static List *
4463 insert_ordered_oid(List *list, Oid datum)
4464 {
4465         ListCell   *prev;
4466
4467         /* Does the datum belong at the front? */
4468         if (list == NIL || datum < linitial_oid(list))
4469                 return lcons_oid(datum, list);
4470         /* No, so find the entry it belongs after */
4471         prev = list_head(list);
4472         for (;;)
4473         {
4474                 ListCell   *curr = lnext(prev);
4475
4476                 if (curr == NULL || datum < lfirst_oid(curr))
4477                         break;                          /* it belongs after 'prev', before 'curr' */
4478
4479                 prev = curr;
4480         }
4481         /* Insert datum into list after 'prev' */
4482         lappend_cell_oid(list, prev, datum);
4483         return list;
4484 }
4485
4486 /*
4487  * RelationSetIndexList -- externally force the index list contents
4488  *
4489  * This is used to temporarily override what we think the set of valid
4490  * indexes is (including the presence or absence of an OID index).
4491  * The forcing will be valid only until transaction commit or abort.
4492  *
4493  * This should only be applied to nailed relations, because in a non-nailed
4494  * relation the hacked index list could be lost at any time due to SI
4495  * messages.  In practice it is only used on pg_class (see REINDEX).
4496  *
4497  * It is up to the caller to make sure the given list is correctly ordered.
4498  *
4499  * We deliberately do not change rd_indexattr here: even when operating
4500  * with a temporary partial index list, HOT-update decisions must be made
4501  * correctly with respect to the full index set.  It is up to the caller
4502  * to ensure that a correct rd_indexattr set has been cached before first
4503  * calling RelationSetIndexList; else a subsequent inquiry might cause a
4504  * wrong rd_indexattr set to get computed and cached.  Likewise, we do not
4505  * touch rd_keyattr, rd_pkattr or rd_idattr.
4506  */
4507 void
4508 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
4509 {
4510         MemoryContext oldcxt;
4511
4512         Assert(relation->rd_isnailed);
4513         /* Copy the list into the cache context (could fail for lack of mem) */
4514         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4515         indexIds = list_copy(indexIds);
4516         MemoryContextSwitchTo(oldcxt);
4517         /* Okay to replace old list */
4518         list_free(relation->rd_indexlist);
4519         relation->rd_indexlist = indexIds;
4520         relation->rd_oidindex = oidIndex;
4521         /*
4522          * For the moment, assume the target rel hasn't got a pk or replica
4523          * index. We'll load them on demand in the API that wraps access to them.
4524          */
4525         relation->rd_pkindex = InvalidOid;
4526         relation->rd_replidindex = InvalidOid;
4527         relation->rd_indexvalid = 2;    /* mark list as forced */
4528         /* Flag relation as needing eoxact cleanup (to reset the list) */
4529         EOXactListAdd(relation);
4530 }
4531
4532 /*
4533  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
4534  *
4535  * Returns InvalidOid if there is no such index.
4536  */
4537 Oid
4538 RelationGetOidIndex(Relation relation)
4539 {
4540         List       *ilist;
4541
4542         /*
4543          * If relation doesn't have OIDs at all, caller is probably confused. (We
4544          * could just silently return InvalidOid, but it seems better to throw an
4545          * assertion.)
4546          */
4547         Assert(relation->rd_rel->relhasoids);
4548
4549         if (relation->rd_indexvalid == 0)
4550         {
4551                 /* RelationGetIndexList does the heavy lifting. */
4552                 ilist = RelationGetIndexList(relation);
4553                 list_free(ilist);
4554                 Assert(relation->rd_indexvalid != 0);
4555         }
4556
4557         return relation->rd_oidindex;
4558 }
4559
4560 /*
4561  * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
4562  *
4563  * Returns InvalidOid if there is no such index.
4564  */
4565 Oid
4566 RelationGetPrimaryKeyIndex(Relation relation)
4567 {
4568         List       *ilist;
4569
4570         if (relation->rd_indexvalid == 0)
4571         {
4572                 /* RelationGetIndexList does the heavy lifting. */
4573                 ilist = RelationGetIndexList(relation);
4574                 list_free(ilist);
4575                 Assert(relation->rd_indexvalid != 0);
4576         }
4577
4578         return relation->rd_pkindex;
4579 }
4580
4581 /*
4582  * RelationGetReplicaIndex -- get OID of the relation's replica identity index
4583  *
4584  * Returns InvalidOid if there is no such index.
4585  */
4586 Oid
4587 RelationGetReplicaIndex(Relation relation)
4588 {
4589         List       *ilist;
4590
4591         if (relation->rd_indexvalid == 0)
4592         {
4593                 /* RelationGetIndexList does the heavy lifting. */
4594                 ilist = RelationGetIndexList(relation);
4595                 list_free(ilist);
4596                 Assert(relation->rd_indexvalid != 0);
4597         }
4598
4599         return relation->rd_replidindex;
4600 }
4601
4602 /*
4603  * RelationGetIndexExpressions -- get the index expressions for an index
4604  *
4605  * We cache the result of transforming pg_index.indexprs into a node tree.
4606  * If the rel is not an index or has no expressional columns, we return NIL.
4607  * Otherwise, the returned tree is copied into the caller's memory context.
4608  * (We don't want to return a pointer to the relcache copy, since it could
4609  * disappear due to relcache invalidation.)
4610  */
4611 List *
4612 RelationGetIndexExpressions(Relation relation)
4613 {
4614         List       *result;
4615         Datum           exprsDatum;
4616         bool            isnull;
4617         char       *exprsString;
4618         MemoryContext oldcxt;
4619
4620         /* Quick exit if we already computed the result. */
4621         if (relation->rd_indexprs)
4622                 return (List *) copyObject(relation->rd_indexprs);
4623
4624         /* Quick exit if there is nothing to do. */
4625         if (relation->rd_indextuple == NULL ||
4626                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
4627                 return NIL;
4628
4629         /*
4630          * We build the tree we intend to return in the caller's context. After
4631          * successfully completing the work, we copy it into the relcache entry.
4632          * This avoids problems if we get some sort of error partway through.
4633          */
4634         exprsDatum = heap_getattr(relation->rd_indextuple,
4635                                                           Anum_pg_index_indexprs,
4636                                                           GetPgIndexDescriptor(),
4637                                                           &isnull);
4638         Assert(!isnull);
4639         exprsString = TextDatumGetCString(exprsDatum);
4640         result = (List *) stringToNode(exprsString);
4641         pfree(exprsString);
4642
4643         /*
4644          * Run the expressions through eval_const_expressions. This is not just an
4645          * optimization, but is necessary, because the planner will be comparing
4646          * them to similarly-processed qual clauses, and may fail to detect valid
4647          * matches without this.  We don't bother with canonicalize_qual, however.
4648          */
4649         result = (List *) eval_const_expressions(NULL, (Node *) result);
4650
4651         result = (List *) canonicalize_qual((Expr *) result);
4652
4653         /* May as well fix opfuncids too */
4654         fix_opfuncids((Node *) result);
4655
4656         /* Now save a copy of the completed tree in the relcache entry. */
4657         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4658         relation->rd_indexprs = (List *) copyObject(result);
4659         MemoryContextSwitchTo(oldcxt);
4660
4661         return result;
4662 }
4663
4664 /*
4665  * RelationGetIndexPredicate -- get the index predicate for an index
4666  *
4667  * We cache the result of transforming pg_index.indpred into an implicit-AND
4668  * node tree (suitable for ExecQual).
4669  * If the rel is not an index or has no predicate, we return NIL.
4670  * Otherwise, the returned tree is copied into the caller's memory context.
4671  * (We don't want to return a pointer to the relcache copy, since it could
4672  * disappear due to relcache invalidation.)
4673  */
4674 List *
4675 RelationGetIndexPredicate(Relation relation)
4676 {
4677         List       *result;
4678         Datum           predDatum;
4679         bool            isnull;
4680         char       *predString;
4681         MemoryContext oldcxt;
4682
4683         /* Quick exit if we already computed the result. */
4684         if (relation->rd_indpred)
4685                 return (List *) copyObject(relation->rd_indpred);
4686
4687         /* Quick exit if there is nothing to do. */
4688         if (relation->rd_indextuple == NULL ||
4689                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
4690                 return NIL;
4691
4692         /*
4693          * We build the tree we intend to return in the caller's context. After
4694          * successfully completing the work, we copy it into the relcache entry.
4695          * This avoids problems if we get some sort of error partway through.
4696          */
4697         predDatum = heap_getattr(relation->rd_indextuple,
4698                                                          Anum_pg_index_indpred,
4699                                                          GetPgIndexDescriptor(),
4700                                                          &isnull);
4701         Assert(!isnull);
4702         predString = TextDatumGetCString(predDatum);
4703         result = (List *) stringToNode(predString);
4704         pfree(predString);
4705
4706         /*
4707          * Run the expression through const-simplification and canonicalization.
4708          * This is not just an optimization, but is necessary, because the planner
4709          * will be comparing it to similarly-processed qual clauses, and may fail
4710          * to detect valid matches without this.  This must match the processing
4711          * done to qual clauses in preprocess_expression()!  (We can skip the
4712          * stuff involving subqueries, however, since we don't allow any in index
4713          * predicates.)
4714          */
4715         result = (List *) eval_const_expressions(NULL, (Node *) result);
4716
4717         result = (List *) canonicalize_qual((Expr *) result);
4718
4719         /* Also convert to implicit-AND format */
4720         result = make_ands_implicit((Expr *) result);
4721
4722         /* May as well fix opfuncids too */
4723         fix_opfuncids((Node *) result);
4724
4725         /* Now save a copy of the completed tree in the relcache entry. */
4726         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4727         relation->rd_indpred = (List *) copyObject(result);
4728         MemoryContextSwitchTo(oldcxt);
4729
4730         return result;
4731 }
4732
4733 /*
4734  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
4735  *
4736  * The result has a bit set for each attribute used anywhere in the index
4737  * definitions of all the indexes on this relation.  (This includes not only
4738  * simple index keys, but attributes used in expressions and partial-index
4739  * predicates.)
4740  *
4741  * Depending on attrKind, a bitmap covering the attnums for all index columns,
4742  * for all potential foreign key columns, or for all columns in the configured
4743  * replica identity index is returned.
4744  *
4745  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
4746  * we can include system attributes (e.g., OID) in the bitmap representation.
4747  *
4748  * Caller had better hold at least RowExclusiveLock on the target relation
4749  * to ensure that it has a stable set of indexes.  This also makes it safe
4750  * (deadlock-free) for us to take locks on the relation's indexes.
4751  *
4752  * The returned result is palloc'd in the caller's memory context and should
4753  * be bms_free'd when not needed anymore.
4754  */
4755 Bitmapset *
4756 RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
4757 {
4758         Bitmapset  *indexattrs;         /* indexed columns */
4759         Bitmapset  *uindexattrs;        /* columns in unique indexes */
4760         Bitmapset  *pkindexattrs;       /* columns in the primary index */
4761         Bitmapset  *idindexattrs;       /* columns in the replica identity */
4762         List       *indexoidlist;
4763         Oid                     relpkindex;
4764         Oid                     relreplindex;
4765         ListCell   *l;
4766         MemoryContext oldcxt;
4767
4768         /* Quick exit if we already computed the result. */
4769         if (relation->rd_indexattr != NULL)
4770         {
4771                 switch (attrKind)
4772                 {
4773                         case INDEX_ATTR_BITMAP_ALL:
4774                                 return bms_copy(relation->rd_indexattr);
4775                         case INDEX_ATTR_BITMAP_KEY:
4776                                 return bms_copy(relation->rd_keyattr);
4777                         case INDEX_ATTR_BITMAP_PRIMARY_KEY:
4778                                 return bms_copy(relation->rd_pkattr);
4779                         case INDEX_ATTR_BITMAP_IDENTITY_KEY:
4780                                 return bms_copy(relation->rd_idattr);
4781                         default:
4782                                 elog(ERROR, "unknown attrKind %u", attrKind);
4783                 }
4784         }
4785
4786         /* Fast path if definitely no indexes */
4787         if (!RelationGetForm(relation)->relhasindex)
4788                 return NULL;
4789
4790         /*
4791          * Get cached list of index OIDs
4792          */
4793         indexoidlist = RelationGetIndexList(relation);
4794
4795         /* Fall out if no indexes (but relhasindex was set) */
4796         if (indexoidlist == NIL)
4797                 return NULL;
4798
4799         /*
4800          * Copy the rd_pkindex and rd_replidindex value computed by
4801          * RelationGetIndexList before proceeding.  This is needed because a
4802          * relcache flush could occur inside index_open below, resetting the
4803          * fields managed by RelationGetIndexList. (The values we're computing
4804          * will still be valid, assuming that caller has a sufficient lock on
4805          * the relation.)
4806          */
4807         relpkindex = relation->rd_pkindex;
4808         relreplindex = relation->rd_replidindex;
4809
4810         /*
4811          * For each index, add referenced attributes to indexattrs.
4812          *
4813          * Note: we consider all indexes returned by RelationGetIndexList, even if
4814          * they are not indisready or indisvalid.  This is important because an
4815          * index for which CREATE INDEX CONCURRENTLY has just started must be
4816          * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
4817          * CONCURRENTLY is far enough along that we should ignore the index, it
4818          * won't be returned at all by RelationGetIndexList.
4819          */
4820         indexattrs = NULL;
4821         uindexattrs = NULL;
4822         pkindexattrs = NULL;
4823         idindexattrs = NULL;
4824         foreach(l, indexoidlist)
4825         {
4826                 Oid                     indexOid = lfirst_oid(l);
4827                 Relation        indexDesc;
4828                 IndexInfo  *indexInfo;
4829                 int                     i;
4830                 bool            isKey;          /* candidate key */
4831                 bool            isPK;           /* primary key */
4832                 bool            isIDKey;        /* replica identity index */
4833
4834                 indexDesc = index_open(indexOid, AccessShareLock);
4835
4836                 /* Extract index key information from the index's pg_index row */
4837                 indexInfo = BuildIndexInfo(indexDesc);
4838
4839                 /* Can this index be referenced by a foreign key? */
4840                 isKey = indexInfo->ii_Unique &&
4841                         indexInfo->ii_Expressions == NIL &&
4842                         indexInfo->ii_Predicate == NIL;
4843
4844                 /* Is this a primary key? */
4845                 isPK = (indexOid == relpkindex);
4846
4847                 /* Is this index the configured (or default) replica identity? */
4848                 isIDKey = (indexOid == relreplindex);
4849
4850                 /* Collect simple attribute references */
4851                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
4852                 {
4853                         int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
4854
4855                         if (attrnum != 0)
4856                         {
4857                                 indexattrs = bms_add_member(indexattrs,
4858                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
4859
4860                                 if (isKey)
4861                                         uindexattrs = bms_add_member(uindexattrs,
4862                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
4863
4864                                 if (isPK)
4865                                         pkindexattrs = bms_add_member(pkindexattrs,
4866                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
4867
4868                                 if (isIDKey)
4869                                         idindexattrs = bms_add_member(idindexattrs,
4870                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
4871                         }
4872                 }
4873
4874                 /* Collect all attributes used in expressions, too */
4875                 pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
4876
4877                 /* Collect all attributes in the index predicate, too */
4878                 pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
4879
4880                 index_close(indexDesc, AccessShareLock);
4881         }
4882
4883         list_free(indexoidlist);
4884
4885         /* Don't leak the old values of these bitmaps, if any */
4886         bms_free(relation->rd_indexattr);
4887         relation->rd_indexattr = NULL;
4888         bms_free(relation->rd_keyattr);
4889         relation->rd_keyattr = NULL;
4890         bms_free(relation->rd_pkattr);
4891         relation->rd_pkattr = NULL;
4892         bms_free(relation->rd_idattr);
4893         relation->rd_idattr = NULL;
4894
4895         /*
4896          * Now save copies of the bitmaps in the relcache entry.  We intentionally
4897          * set rd_indexattr last, because that's the one that signals validity of
4898          * the values; if we run out of memory before making that copy, we won't
4899          * leave the relcache entry looking like the other ones are valid but
4900          * empty.
4901          */
4902         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4903         relation->rd_keyattr = bms_copy(uindexattrs);
4904         relation->rd_pkattr = bms_copy(pkindexattrs);
4905         relation->rd_idattr = bms_copy(idindexattrs);
4906         relation->rd_indexattr = bms_copy(indexattrs);
4907         MemoryContextSwitchTo(oldcxt);
4908
4909         /* We return our original working copy for caller to play with */
4910         switch (attrKind)
4911         {
4912                 case INDEX_ATTR_BITMAP_ALL:
4913                         return indexattrs;
4914                 case INDEX_ATTR_BITMAP_KEY:
4915                         return uindexattrs;
4916                 case INDEX_ATTR_BITMAP_PRIMARY_KEY:
4917                         return bms_copy(relation->rd_pkattr);
4918                 case INDEX_ATTR_BITMAP_IDENTITY_KEY:
4919                         return idindexattrs;
4920                 default:
4921                         elog(ERROR, "unknown attrKind %u", attrKind);
4922                         return NULL;
4923         }
4924 }
4925
4926 /*
4927  * RelationGetExclusionInfo -- get info about index's exclusion constraint
4928  *
4929  * This should be called only for an index that is known to have an
4930  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
4931  * context) of the exclusion operator OIDs, their underlying functions'
4932  * OIDs, and their strategy numbers in the index's opclasses.  We cache
4933  * all this information since it requires a fair amount of work to get.
4934  */
4935 void
4936 RelationGetExclusionInfo(Relation indexRelation,
4937                                                  Oid **operators,
4938                                                  Oid **procs,
4939                                                  uint16 **strategies)
4940 {
4941         int                     ncols = indexRelation->rd_rel->relnatts;
4942         Oid                *ops;
4943         Oid                *funcs;
4944         uint16     *strats;
4945         Relation        conrel;
4946         SysScanDesc conscan;
4947         ScanKeyData skey[1];
4948         HeapTuple       htup;
4949         bool            found;
4950         MemoryContext oldcxt;
4951         int                     i;
4952
4953         /* Allocate result space in caller context */
4954         *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
4955         *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
4956         *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
4957
4958         /* Quick exit if we have the data cached already */
4959         if (indexRelation->rd_exclstrats != NULL)
4960         {
4961                 memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
4962                 memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
4963                 memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
4964                 return;
4965         }
4966
4967         /*
4968          * Search pg_constraint for the constraint associated with the index. To
4969          * make this not too painfully slow, we use the index on conrelid; that
4970          * will hold the parent relation's OID not the index's own OID.
4971          */
4972         ScanKeyInit(&skey[0],
4973                                 Anum_pg_constraint_conrelid,
4974                                 BTEqualStrategyNumber, F_OIDEQ,
4975                                 ObjectIdGetDatum(indexRelation->rd_index->indrelid));
4976
4977         conrel = heap_open(ConstraintRelationId, AccessShareLock);
4978         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
4979                                                                  NULL, 1, skey);
4980         found = false;
4981
4982         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4983         {
4984                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4985                 Datum           val;
4986                 bool            isnull;
4987                 ArrayType  *arr;
4988                 int                     nelem;
4989
4990                 /* We want the exclusion constraint owning the index */
4991                 if (conform->contype != CONSTRAINT_EXCLUSION ||
4992                         conform->conindid != RelationGetRelid(indexRelation))
4993                         continue;
4994
4995                 /* There should be only one */
4996                 if (found)
4997                         elog(ERROR, "unexpected exclusion constraint record found for rel %s",
4998                                  RelationGetRelationName(indexRelation));
4999                 found = true;
5000
5001                 /* Extract the operator OIDS from conexclop */
5002                 val = fastgetattr(htup,
5003                                                   Anum_pg_constraint_conexclop,
5004                                                   conrel->rd_att, &isnull);
5005                 if (isnull)
5006                         elog(ERROR, "null conexclop for rel %s",
5007                                  RelationGetRelationName(indexRelation));
5008
5009                 arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
5010                 nelem = ARR_DIMS(arr)[0];
5011                 if (ARR_NDIM(arr) != 1 ||
5012                         nelem != ncols ||
5013                         ARR_HASNULL(arr) ||
5014                         ARR_ELEMTYPE(arr) != OIDOID)
5015                         elog(ERROR, "conexclop is not a 1-D Oid array");
5016
5017                 memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
5018         }
5019
5020         systable_endscan(conscan);
5021         heap_close(conrel, AccessShareLock);
5022
5023         if (!found)
5024                 elog(ERROR, "exclusion constraint record missing for rel %s",
5025                          RelationGetRelationName(indexRelation));
5026
5027         /* We need the func OIDs and strategy numbers too */
5028         for (i = 0; i < ncols; i++)
5029         {
5030                 funcs[i] = get_opcode(ops[i]);
5031                 strats[i] = get_op_opfamily_strategy(ops[i],
5032                                                                                          indexRelation->rd_opfamily[i]);
5033                 /* shouldn't fail, since it was checked at index creation */
5034                 if (strats[i] == InvalidStrategy)
5035                         elog(ERROR, "could not find strategy for operator %u in family %u",
5036                                  ops[i], indexRelation->rd_opfamily[i]);
5037         }
5038
5039         /* Save a copy of the results in the relcache entry. */
5040         oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
5041         indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
5042         indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
5043         indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
5044         memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
5045         memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
5046         memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
5047         MemoryContextSwitchTo(oldcxt);
5048 }
5049
5050 /*
5051  * Get publication actions for the given relation.
5052  */
5053 struct PublicationActions *
5054 GetRelationPublicationActions(Relation relation)
5055 {
5056         List       *puboids;
5057         ListCell   *lc;
5058         MemoryContext           oldcxt;
5059         PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
5060
5061         if (relation->rd_pubactions)
5062                 return memcpy(pubactions, relation->rd_pubactions,
5063                                           sizeof(PublicationActions));
5064
5065         /* Fetch the publication membership info. */
5066         puboids = GetRelationPublications(RelationGetRelid(relation));
5067         puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
5068
5069         foreach(lc, puboids)
5070         {
5071                 Oid                     pubid = lfirst_oid(lc);
5072                 HeapTuple       tup;
5073                 Form_pg_publication pubform;
5074
5075                 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
5076
5077                 if (!HeapTupleIsValid(tup))
5078                         elog(ERROR, "cache lookup failed for publication %u", pubid);
5079
5080                 pubform = (Form_pg_publication) GETSTRUCT(tup);
5081
5082                 pubactions->pubinsert |= pubform->pubinsert;
5083                 pubactions->pubupdate |= pubform->pubupdate;
5084                 pubactions->pubdelete |= pubform->pubdelete;
5085
5086                 ReleaseSysCache(tup);
5087
5088                 /*
5089                  * If we know everything is replicated, there is no point to check
5090                  * for other publications.
5091                  */
5092                 if (pubactions->pubinsert && pubactions->pubupdate &&
5093                         pubactions->pubdelete)
5094                         break;
5095         }
5096
5097         if (relation->rd_pubactions)
5098         {
5099                 pfree(relation->rd_pubactions);
5100                 relation->rd_pubactions = NULL;
5101         }
5102
5103         /* Now save copy of the actions in the relcache entry. */
5104         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5105         relation->rd_pubactions = palloc(sizeof(PublicationActions));
5106         memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
5107         MemoryContextSwitchTo(oldcxt);
5108
5109         return pubactions;
5110 }
5111
5112 /*
5113  * Routines to support ereport() reports of relation-related errors
5114  *
5115  * These could have been put into elog.c, but it seems like a module layering
5116  * violation to have elog.c calling relcache or syscache stuff --- and we
5117  * definitely don't want elog.h including rel.h.  So we put them here.
5118  */
5119
5120 /*
5121  * errtable --- stores schema_name and table_name of a table
5122  * within the current errordata.
5123  */
5124 int
5125 errtable(Relation rel)
5126 {
5127         err_generic_string(PG_DIAG_SCHEMA_NAME,
5128                                            get_namespace_name(RelationGetNamespace(rel)));
5129         err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
5130
5131         return 0;                                       /* return value does not matter */
5132 }
5133
5134 /*
5135  * errtablecol --- stores schema_name, table_name and column_name
5136  * of a table column within the current errordata.
5137  *
5138  * The column is specified by attribute number --- for most callers, this is
5139  * easier and less error-prone than getting the column name for themselves.
5140  */
5141 int
5142 errtablecol(Relation rel, int attnum)
5143 {
5144         TupleDesc       reldesc = RelationGetDescr(rel);
5145         const char *colname;
5146
5147         /* Use reldesc if it's a user attribute, else consult the catalogs */
5148         if (attnum > 0 && attnum <= reldesc->natts)
5149                 colname = NameStr(reldesc->attrs[attnum - 1]->attname);
5150         else
5151                 colname = get_relid_attribute_name(RelationGetRelid(rel), attnum);
5152
5153         return errtablecolname(rel, colname);
5154 }
5155
5156 /*
5157  * errtablecolname --- stores schema_name, table_name and column_name
5158  * of a table column within the current errordata, where the column name is
5159  * given directly rather than extracted from the relation's catalog data.
5160  *
5161  * Don't use this directly unless errtablecol() is inconvenient for some
5162  * reason.  This might possibly be needed during intermediate states in ALTER
5163  * TABLE, for instance.
5164  */
5165 int
5166 errtablecolname(Relation rel, const char *colname)
5167 {
5168         errtable(rel);
5169         err_generic_string(PG_DIAG_COLUMN_NAME, colname);
5170
5171         return 0;                                       /* return value does not matter */
5172 }
5173
5174 /*
5175  * errtableconstraint --- stores schema_name, table_name and constraint_name
5176  * of a table-related constraint within the current errordata.
5177  */
5178 int
5179 errtableconstraint(Relation rel, const char *conname)
5180 {
5181         errtable(rel);
5182         err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
5183
5184         return 0;                                       /* return value does not matter */
5185 }
5186
5187
5188 /*
5189  *      load_relcache_init_file, write_relcache_init_file
5190  *
5191  *              In late 1992, we started regularly having databases with more than
5192  *              a thousand classes in them.  With this number of classes, it became
5193  *              critical to do indexed lookups on the system catalogs.
5194  *
5195  *              Bootstrapping these lookups is very hard.  We want to be able to
5196  *              use an index on pg_attribute, for example, but in order to do so,
5197  *              we must have read pg_attribute for the attributes in the index,
5198  *              which implies that we need to use the index.
5199  *
5200  *              In order to get around the problem, we do the following:
5201  *
5202  *                 +  When the database system is initialized (at initdb time), we
5203  *                        don't use indexes.  We do sequential scans.
5204  *
5205  *                 +  When the backend is started up in normal mode, we load an image
5206  *                        of the appropriate relation descriptors, in internal format,
5207  *                        from an initialization file in the data/base/... directory.
5208  *
5209  *                 +  If the initialization file isn't there, then we create the
5210  *                        relation descriptors using sequential scans and write 'em to
5211  *                        the initialization file for use by subsequent backends.
5212  *
5213  *              As of Postgres 9.0, there is one local initialization file in each
5214  *              database, plus one shared initialization file for shared catalogs.
5215  *
5216  *              We could dispense with the initialization files and just build the
5217  *              critical reldescs the hard way on every backend startup, but that
5218  *              slows down backend startup noticeably.
5219  *
5220  *              We can in fact go further, and save more relcache entries than
5221  *              just the ones that are absolutely critical; this allows us to speed
5222  *              up backend startup by not having to build such entries the hard way.
5223  *              Presently, all the catalog and index entries that are referred to
5224  *              by catcaches are stored in the initialization files.
5225  *
5226  *              The same mechanism that detects when catcache and relcache entries
5227  *              need to be invalidated (due to catalog updates) also arranges to
5228  *              unlink the initialization files when the contents may be out of date.
5229  *              The files will then be rebuilt during the next backend startup.
5230  */
5231
5232 /*
5233  * load_relcache_init_file -- attempt to load cache from the shared
5234  * or local cache init file
5235  *
5236  * If successful, return TRUE and set criticalRelcachesBuilt or
5237  * criticalSharedRelcachesBuilt to true.
5238  * If not successful, return FALSE.
5239  *
5240  * NOTE: we assume we are already switched into CacheMemoryContext.
5241  */
5242 static bool
5243 load_relcache_init_file(bool shared)
5244 {
5245         FILE       *fp;
5246         char            initfilename[MAXPGPATH];
5247         Relation   *rels;
5248         int                     relno,
5249                                 num_rels,
5250                                 max_rels,
5251                                 nailed_rels,
5252                                 nailed_indexes,
5253                                 magic;
5254         int                     i;
5255
5256         if (shared)
5257                 snprintf(initfilename, sizeof(initfilename), "global/%s",
5258                                  RELCACHE_INIT_FILENAME);
5259         else
5260                 snprintf(initfilename, sizeof(initfilename), "%s/%s",
5261                                  DatabasePath, RELCACHE_INIT_FILENAME);
5262
5263         fp = AllocateFile(initfilename, PG_BINARY_R);
5264         if (fp == NULL)
5265                 return false;
5266
5267         /*
5268          * Read the index relcache entries from the file.  Note we will not enter
5269          * any of them into the cache if the read fails partway through; this
5270          * helps to guard against broken init files.
5271          */
5272         max_rels = 100;
5273         rels = (Relation *) palloc(max_rels * sizeof(Relation));
5274         num_rels = 0;
5275         nailed_rels = nailed_indexes = 0;
5276
5277         /* check for correct magic number (compatible version) */
5278         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5279                 goto read_failed;
5280         if (magic != RELCACHE_INIT_FILEMAGIC)
5281                 goto read_failed;
5282
5283         for (relno = 0;; relno++)
5284         {
5285                 Size            len;
5286                 size_t          nread;
5287                 Relation        rel;
5288                 Form_pg_class relform;
5289                 bool            has_not_null;
5290
5291                 /* first read the relation descriptor length */
5292                 nread = fread(&len, 1, sizeof(len), fp);
5293                 if (nread != sizeof(len))
5294                 {
5295                         if (nread == 0)
5296                                 break;                  /* end of file */
5297                         goto read_failed;
5298                 }
5299
5300                 /* safety check for incompatible relcache layout */
5301                 if (len != sizeof(RelationData))
5302                         goto read_failed;
5303
5304                 /* allocate another relcache header */
5305                 if (num_rels >= max_rels)
5306                 {
5307                         max_rels *= 2;
5308                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
5309                 }
5310
5311                 rel = rels[num_rels++] = (Relation) palloc(len);
5312
5313                 /* then, read the Relation structure */
5314                 if (fread(rel, 1, len, fp) != len)
5315                         goto read_failed;
5316
5317                 /* next read the relation tuple form */
5318                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5319                         goto read_failed;
5320
5321                 relform = (Form_pg_class) palloc(len);
5322                 if (fread(relform, 1, len, fp) != len)
5323                         goto read_failed;
5324
5325                 rel->rd_rel = relform;
5326
5327                 /* initialize attribute tuple forms */
5328                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
5329                                                                                           relform->relhasoids);
5330                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
5331
5332                 rel->rd_att->tdtypeid = relform->reltype;
5333                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
5334
5335                 /* next read all the attribute tuple form data entries */
5336                 has_not_null = false;
5337                 for (i = 0; i < relform->relnatts; i++)
5338                 {
5339                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5340                                 goto read_failed;
5341                         if (len != ATTRIBUTE_FIXED_PART_SIZE)
5342                                 goto read_failed;
5343                         if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
5344                                 goto read_failed;
5345
5346                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
5347                 }
5348
5349                 /* next read the access method specific field */
5350                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5351                         goto read_failed;
5352                 if (len > 0)
5353                 {
5354                         rel->rd_options = palloc(len);
5355                         if (fread(rel->rd_options, 1, len, fp) != len)
5356                                 goto read_failed;
5357                         if (len != VARSIZE(rel->rd_options))
5358                                 goto read_failed;               /* sanity check */
5359                 }
5360                 else
5361                 {
5362                         rel->rd_options = NULL;
5363                 }
5364
5365                 /* mark not-null status */
5366                 if (has_not_null)
5367                 {
5368                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
5369
5370                         constr->has_not_null = true;
5371                         rel->rd_att->constr = constr;
5372                 }
5373
5374                 /* If it's an index, there's more to do */
5375                 if (rel->rd_rel->relkind == RELKIND_INDEX)
5376                 {
5377                         MemoryContext indexcxt;
5378                         Oid                *opfamily;
5379                         Oid                *opcintype;
5380                         RegProcedure *support;
5381                         int                     nsupport;
5382                         int16      *indoption;
5383                         Oid                *indcollation;
5384
5385                         /* Count nailed indexes to ensure we have 'em all */
5386                         if (rel->rd_isnailed)
5387                                 nailed_indexes++;
5388
5389                         /* next, read the pg_index tuple */
5390                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5391                                 goto read_failed;
5392
5393                         rel->rd_indextuple = (HeapTuple) palloc(len);
5394                         if (fread(rel->rd_indextuple, 1, len, fp) != len)
5395                                 goto read_failed;
5396
5397                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
5398                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
5399                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
5400
5401                         /*
5402                          * prepare index info context --- parameters should match
5403                          * RelationInitIndexAccessInfo
5404                          */
5405                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
5406                                                                                          RelationGetRelationName(rel),
5407                                                                                          ALLOCSET_SMALL_SIZES);
5408                         rel->rd_indexcxt = indexcxt;
5409
5410                         /*
5411                          * Now we can fetch the index AM's API struct.  (We can't store
5412                          * that in the init file, since it contains function pointers that
5413                          * might vary across server executions.  Fortunately, it should be
5414                          * safe to call the amhandler even while bootstrapping indexes.)
5415                          */
5416                         InitIndexAmRoutine(rel);
5417
5418                         /* next, read the vector of opfamily OIDs */
5419                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5420                                 goto read_failed;
5421
5422                         opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
5423                         if (fread(opfamily, 1, len, fp) != len)
5424                                 goto read_failed;
5425
5426                         rel->rd_opfamily = opfamily;
5427
5428                         /* next, read the vector of opcintype OIDs */
5429                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5430                                 goto read_failed;
5431
5432                         opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
5433                         if (fread(opcintype, 1, len, fp) != len)
5434                                 goto read_failed;
5435
5436                         rel->rd_opcintype = opcintype;
5437
5438                         /* next, read the vector of support procedure OIDs */
5439                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5440                                 goto read_failed;
5441                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
5442                         if (fread(support, 1, len, fp) != len)
5443                                 goto read_failed;
5444
5445                         rel->rd_support = support;
5446
5447                         /* next, read the vector of collation OIDs */
5448                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5449                                 goto read_failed;
5450
5451                         indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
5452                         if (fread(indcollation, 1, len, fp) != len)
5453                                 goto read_failed;
5454
5455                         rel->rd_indcollation = indcollation;
5456
5457                         /* finally, read the vector of indoption values */
5458                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5459                                 goto read_failed;
5460
5461                         indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
5462                         if (fread(indoption, 1, len, fp) != len)
5463                                 goto read_failed;
5464
5465                         rel->rd_indoption = indoption;
5466
5467                         /* set up zeroed fmgr-info vector */
5468                         nsupport = relform->relnatts * rel->rd_amroutine->amsupport;
5469                         rel->rd_supportinfo = (FmgrInfo *)
5470                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
5471                 }
5472                 else
5473                 {
5474                         /* Count nailed rels to ensure we have 'em all */
5475                         if (rel->rd_isnailed)
5476                                 nailed_rels++;
5477
5478                         Assert(rel->rd_index == NULL);
5479                         Assert(rel->rd_indextuple == NULL);
5480                         Assert(rel->rd_indexcxt == NULL);
5481                         Assert(rel->rd_amroutine == NULL);
5482                         Assert(rel->rd_opfamily == NULL);
5483                         Assert(rel->rd_opcintype == NULL);
5484                         Assert(rel->rd_support == NULL);
5485                         Assert(rel->rd_supportinfo == NULL);
5486                         Assert(rel->rd_indoption == NULL);
5487                         Assert(rel->rd_indcollation == NULL);
5488                 }
5489
5490                 /*
5491                  * Rules and triggers are not saved (mainly because the internal
5492                  * format is complex and subject to change).  They must be rebuilt if
5493                  * needed by RelationCacheInitializePhase3.  This is not expected to
5494                  * be a big performance hit since few system catalogs have such. Ditto
5495                  * for RLS policy data, index expressions, predicates, exclusion info,
5496                  * and FDW info.
5497                  */
5498                 rel->rd_rules = NULL;
5499                 rel->rd_rulescxt = NULL;
5500                 rel->trigdesc = NULL;
5501                 rel->rd_rsdesc = NULL;
5502                 rel->rd_partkeycxt = NULL;
5503                 rel->rd_partkey = NULL;
5504                 rel->rd_partdesc = NULL;
5505                 rel->rd_partcheck = NIL;
5506                 rel->rd_indexprs = NIL;
5507                 rel->rd_indpred = NIL;
5508                 rel->rd_exclops = NULL;
5509                 rel->rd_exclprocs = NULL;
5510                 rel->rd_exclstrats = NULL;
5511                 rel->rd_fdwroutine = NULL;
5512
5513                 /*
5514                  * Reset transient-state fields in the relcache entry
5515                  */
5516                 rel->rd_smgr = NULL;
5517                 if (rel->rd_isnailed)
5518                         rel->rd_refcnt = 1;
5519                 else
5520                         rel->rd_refcnt = 0;
5521                 rel->rd_indexvalid = 0;
5522                 rel->rd_fkeylist = NIL;
5523                 rel->rd_fkeyvalid = false;
5524                 rel->rd_indexlist = NIL;
5525                 rel->rd_oidindex = InvalidOid;
5526                 rel->rd_pkindex = InvalidOid;
5527                 rel->rd_replidindex = InvalidOid;
5528                 rel->rd_indexattr = NULL;
5529                 rel->rd_keyattr = NULL;
5530                 rel->rd_pkattr = NULL;
5531                 rel->rd_idattr = NULL;
5532                 rel->rd_pubactions = NULL;
5533                 rel->rd_createSubid = InvalidSubTransactionId;
5534                 rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
5535                 rel->rd_amcache = NULL;
5536                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
5537
5538                 /*
5539                  * Recompute lock and physical addressing info.  This is needed in
5540                  * case the pg_internal.init file was copied from some other database
5541                  * by CREATE DATABASE.
5542                  */
5543                 RelationInitLockInfo(rel);
5544                 RelationInitPhysicalAddr(rel);
5545         }
5546
5547         /*
5548          * We reached the end of the init file without apparent problem.  Did we
5549          * get the right number of nailed items?  This is a useful crosscheck in
5550          * case the set of critical rels or indexes changes.  However, that should
5551          * not happen in a normally-running system, so let's bleat if it does.
5552          *
5553          * For the shared init file, we're called before client authentication is
5554          * done, which means that elog(WARNING) will go only to the postmaster
5555          * log, where it's easily missed.  To ensure that developers notice bad
5556          * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
5557          * an Assert(false) there.
5558          */
5559         if (shared)
5560         {
5561                 if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
5562                         nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
5563                 {
5564                         elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
5565                                  nailed_rels, nailed_indexes,
5566                                  NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
5567                         /* Make sure we get developers' attention about this */
5568                         Assert(false);
5569                         /* In production builds, recover by bootstrapping the relcache */
5570                         goto read_failed;
5571                 }
5572         }
5573         else
5574         {
5575                 if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
5576                         nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
5577                 {
5578                         elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
5579                                  nailed_rels, nailed_indexes,
5580                                  NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
5581                         /* We don't need an Assert() in this case */
5582                         goto read_failed;
5583                 }
5584         }
5585
5586         /*
5587          * OK, all appears well.
5588          *
5589          * Now insert all the new relcache entries into the cache.
5590          */
5591         for (relno = 0; relno < num_rels; relno++)
5592         {
5593                 RelationCacheInsert(rels[relno], false);
5594         }
5595
5596         pfree(rels);
5597         FreeFile(fp);
5598
5599         if (shared)
5600                 criticalSharedRelcachesBuilt = true;
5601         else
5602                 criticalRelcachesBuilt = true;
5603         return true;
5604
5605         /*
5606          * init file is broken, so do it the hard way.  We don't bother trying to
5607          * free the clutter we just allocated; it's not in the relcache so it
5608          * won't hurt.
5609          */
5610 read_failed:
5611         pfree(rels);
5612         FreeFile(fp);
5613
5614         return false;
5615 }
5616
5617 /*
5618  * Write out a new initialization file with the current contents
5619  * of the relcache (either shared rels or local rels, as indicated).
5620  */
5621 static void
5622 write_relcache_init_file(bool shared)
5623 {
5624         FILE       *fp;
5625         char            tempfilename[MAXPGPATH];
5626         char            finalfilename[MAXPGPATH];
5627         int                     magic;
5628         HASH_SEQ_STATUS status;
5629         RelIdCacheEnt *idhentry;
5630         int                     i;
5631
5632         /*
5633          * If we have already received any relcache inval events, there's no
5634          * chance of succeeding so we may as well skip the whole thing.
5635          */
5636         if (relcacheInvalsReceived != 0L)
5637                 return;
5638
5639         /*
5640          * We must write a temporary file and rename it into place. Otherwise,
5641          * another backend starting at about the same time might crash trying to
5642          * read the partially-complete file.
5643          */
5644         if (shared)
5645         {
5646                 snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
5647                                  RELCACHE_INIT_FILENAME, MyProcPid);
5648                 snprintf(finalfilename, sizeof(finalfilename), "global/%s",
5649                                  RELCACHE_INIT_FILENAME);
5650         }
5651         else
5652         {
5653                 snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
5654                                  DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
5655                 snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
5656                                  DatabasePath, RELCACHE_INIT_FILENAME);
5657         }
5658
5659         unlink(tempfilename);           /* in case it exists w/wrong permissions */
5660
5661         fp = AllocateFile(tempfilename, PG_BINARY_W);
5662         if (fp == NULL)
5663         {
5664                 /*
5665                  * We used to consider this a fatal error, but we might as well
5666                  * continue with backend startup ...
5667                  */
5668                 ereport(WARNING,
5669                                 (errcode_for_file_access(),
5670                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
5671                                                 tempfilename),
5672                           errdetail("Continuing anyway, but there's something wrong.")));
5673                 return;
5674         }
5675
5676         /*
5677          * Write a magic number to serve as a file version identifier.  We can
5678          * change the magic number whenever the relcache layout changes.
5679          */
5680         magic = RELCACHE_INIT_FILEMAGIC;
5681         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5682                 elog(FATAL, "could not write init file");
5683
5684         /*
5685          * Write all the appropriate reldescs (in no particular order).
5686          */
5687         hash_seq_init(&status, RelationIdCache);
5688
5689         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
5690         {
5691                 Relation        rel = idhentry->reldesc;
5692                 Form_pg_class relform = rel->rd_rel;
5693
5694                 /* ignore if not correct group */
5695                 if (relform->relisshared != shared)
5696                         continue;
5697
5698                 /*
5699                  * Ignore if not supposed to be in init file.  We can allow any shared
5700                  * relation that's been loaded so far to be in the shared init file,
5701                  * but unshared relations must be ones that should be in the local
5702                  * file per RelationIdIsInInitFile.  (Note: if you want to change the
5703                  * criterion for rels to be kept in the init file, see also inval.c.
5704                  * The reason for filtering here is to be sure that we don't put
5705                  * anything into the local init file for which a relcache inval would
5706                  * not cause invalidation of that init file.)
5707                  */
5708                 if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
5709                 {
5710                         /* Nailed rels had better get stored. */
5711                         Assert(!rel->rd_isnailed);
5712                         continue;
5713                 }
5714
5715                 /* first write the relcache entry proper */
5716                 write_item(rel, sizeof(RelationData), fp);
5717
5718                 /* next write the relation tuple form */
5719                 write_item(relform, CLASS_TUPLE_SIZE, fp);
5720
5721                 /* next, do all the attribute tuple form data entries */
5722                 for (i = 0; i < relform->relnatts; i++)
5723                 {
5724                         write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
5725                 }
5726
5727                 /* next, do the access method specific field */
5728                 write_item(rel->rd_options,
5729                                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
5730                                    fp);
5731
5732                 /* If it's an index, there's more to do */
5733                 if (rel->rd_rel->relkind == RELKIND_INDEX)
5734                 {
5735                         /* write the pg_index tuple */
5736                         /* we assume this was created by heap_copytuple! */
5737                         write_item(rel->rd_indextuple,
5738                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
5739                                            fp);
5740
5741                         /* next, write the vector of opfamily OIDs */
5742                         write_item(rel->rd_opfamily,
5743                                            relform->relnatts * sizeof(Oid),
5744                                            fp);
5745
5746                         /* next, write the vector of opcintype OIDs */
5747                         write_item(rel->rd_opcintype,
5748                                            relform->relnatts * sizeof(Oid),
5749                                            fp);
5750
5751                         /* next, write the vector of support procedure OIDs */
5752                         write_item(rel->rd_support,
5753                                            relform->relnatts * (rel->rd_amroutine->amsupport * sizeof(RegProcedure)),
5754                                            fp);
5755
5756                         /* next, write the vector of collation OIDs */
5757                         write_item(rel->rd_indcollation,
5758                                            relform->relnatts * sizeof(Oid),
5759                                            fp);
5760
5761                         /* finally, write the vector of indoption values */
5762                         write_item(rel->rd_indoption,
5763                                            relform->relnatts * sizeof(int16),
5764                                            fp);
5765                 }
5766         }
5767
5768         if (FreeFile(fp))
5769                 elog(FATAL, "could not write init file");
5770
5771         /*
5772          * Now we have to check whether the data we've so painstakingly
5773          * accumulated is already obsolete due to someone else's just-committed
5774          * catalog changes.  If so, we just delete the temp file and leave it to
5775          * the next backend to try again.  (Our own relcache entries will be
5776          * updated by SI message processing, but we can't be sure whether what we
5777          * wrote out was up-to-date.)
5778          *
5779          * This mustn't run concurrently with the code that unlinks an init file
5780          * and sends SI messages, so grab a serialization lock for the duration.
5781          */
5782         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
5783
5784         /* Make sure we have seen all incoming SI messages */
5785         AcceptInvalidationMessages();
5786
5787         /*
5788          * If we have received any SI relcache invals since backend start, assume
5789          * we may have written out-of-date data.
5790          */
5791         if (relcacheInvalsReceived == 0L)
5792         {
5793                 /*
5794                  * OK, rename the temp file to its final name, deleting any
5795                  * previously-existing init file.
5796                  *
5797                  * Note: a failure here is possible under Cygwin, if some other
5798                  * backend is holding open an unlinked-but-not-yet-gone init file. So
5799                  * treat this as a noncritical failure; just remove the useless temp
5800                  * file on failure.
5801                  */
5802                 if (rename(tempfilename, finalfilename) < 0)
5803                         unlink(tempfilename);
5804         }
5805         else
5806         {
5807                 /* Delete the already-obsolete temp file */
5808                 unlink(tempfilename);
5809         }
5810
5811         LWLockRelease(RelCacheInitLock);
5812 }
5813
5814 /* write a chunk of data preceded by its length */
5815 static void
5816 write_item(const void *data, Size len, FILE *fp)
5817 {
5818         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
5819                 elog(FATAL, "could not write init file");
5820         if (fwrite(data, 1, len, fp) != len)
5821                 elog(FATAL, "could not write init file");
5822 }
5823
5824 /*
5825  * Determine whether a given relation (identified by OID) is one of the ones
5826  * we should store in the local relcache init file.
5827  *
5828  * We must cache all nailed rels, and for efficiency we should cache every rel
5829  * that supports a syscache.  The former set is almost but not quite a subset
5830  * of the latter.  Currently, we must special-case TriggerRelidNameIndexId,
5831  * which RelationCacheInitializePhase3 chooses to nail for efficiency reasons,
5832  * but which does not support any syscache.
5833  *
5834  * Note: this function is currently never called for shared rels.  If it were,
5835  * we'd probably also need a special case for DatabaseNameIndexId, which is
5836  * critical but does not support a syscache.
5837  */
5838 bool
5839 RelationIdIsInInitFile(Oid relationId)
5840 {
5841         if (relationId == TriggerRelidNameIndexId)
5842         {
5843                 /* If this Assert fails, we don't need this special case anymore. */
5844                 Assert(!RelationSupportsSysCache(relationId));
5845                 return true;
5846         }
5847         return RelationSupportsSysCache(relationId);
5848 }
5849
5850 /*
5851  * Tells whether any index for the relation is unlogged.
5852  *
5853  * Any index using the hash AM is implicitly unlogged.
5854  *
5855  * Note: There doesn't seem to be any way to have an unlogged index attached
5856  * to a permanent table except to create a hash index, but it seems best to
5857  * keep this general so that it returns sensible results even when they seem
5858  * obvious (like for an unlogged table) and to handle possible future unlogged
5859  * indexes on permanent tables.
5860  */
5861 bool
5862 RelationHasUnloggedIndex(Relation rel)
5863 {
5864         List       *indexoidlist;
5865         ListCell   *indexoidscan;
5866         bool            result = false;
5867
5868         indexoidlist = RelationGetIndexList(rel);
5869
5870         foreach(indexoidscan, indexoidlist)
5871         {
5872                 Oid                     indexoid = lfirst_oid(indexoidscan);
5873                 HeapTuple       tp;
5874                 Form_pg_class reltup;
5875
5876                 tp = SearchSysCache1(RELOID, ObjectIdGetDatum(indexoid));
5877                 if (!HeapTupleIsValid(tp))
5878                         elog(ERROR, "cache lookup failed for relation %u", indexoid);
5879                 reltup = (Form_pg_class) GETSTRUCT(tp);
5880
5881                 if (reltup->relpersistence == RELPERSISTENCE_UNLOGGED
5882                         || reltup->relam == HASH_AM_OID)
5883                         result = true;
5884
5885                 ReleaseSysCache(tp);
5886
5887                 if (result == true)
5888                         break;
5889         }
5890
5891         list_free(indexoidlist);
5892
5893         return result;
5894 }
5895
5896 /*
5897  * Invalidate (remove) the init file during commit of a transaction that
5898  * changed one or more of the relation cache entries that are kept in the
5899  * local init file.
5900  *
5901  * To be safe against concurrent inspection or rewriting of the init file,
5902  * we must take RelCacheInitLock, then remove the old init file, then send
5903  * the SI messages that include relcache inval for such relations, and then
5904  * release RelCacheInitLock.  This serializes the whole affair against
5905  * write_relcache_init_file, so that we can be sure that any other process
5906  * that's concurrently trying to create a new init file won't move an
5907  * already-stale version into place after we unlink.  Also, because we unlink
5908  * before sending the SI messages, a backend that's currently starting cannot
5909  * read the now-obsolete init file and then miss the SI messages that will
5910  * force it to update its relcache entries.  (This works because the backend
5911  * startup sequence gets into the sinval array before trying to load the init
5912  * file.)
5913  *
5914  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
5915  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
5916  * send any pending SI messages between those calls.
5917  *
5918  * Notice this deals only with the local init file, not the shared init file.
5919  * The reason is that there can never be a "significant" change to the
5920  * relcache entry of a shared relation; the most that could happen is
5921  * updates of noncritical fields such as relpages/reltuples.  So, while
5922  * it's worth updating the shared init file from time to time, it can never
5923  * be invalid enough to make it necessary to remove it.
5924  */
5925 void
5926 RelationCacheInitFilePreInvalidate(void)
5927 {
5928         char            initfilename[MAXPGPATH];
5929
5930         snprintf(initfilename, sizeof(initfilename), "%s/%s",
5931                          DatabasePath, RELCACHE_INIT_FILENAME);
5932
5933         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
5934
5935         if (unlink(initfilename) < 0)
5936         {
5937                 /*
5938                  * The file might not be there if no backend has been started since
5939                  * the last removal.  But complain about failures other than ENOENT.
5940                  * Fortunately, it's not too late to abort the transaction if we can't
5941                  * get rid of the would-be-obsolete init file.
5942                  */
5943                 if (errno != ENOENT)
5944                         ereport(ERROR,
5945                                         (errcode_for_file_access(),
5946                                          errmsg("could not remove cache file \"%s\": %m",
5947                                                         initfilename)));
5948         }
5949 }
5950
5951 void
5952 RelationCacheInitFilePostInvalidate(void)
5953 {
5954         LWLockRelease(RelCacheInitLock);
5955 }
5956
5957 /*
5958  * Remove the init files during postmaster startup.
5959  *
5960  * We used to keep the init files across restarts, but that is unsafe in PITR
5961  * scenarios, and even in simple crash-recovery cases there are windows for
5962  * the init files to become out-of-sync with the database.  So now we just
5963  * remove them during startup and expect the first backend launch to rebuild
5964  * them.  Of course, this has to happen in each database of the cluster.
5965  */
5966 void
5967 RelationCacheInitFileRemove(void)
5968 {
5969         const char *tblspcdir = "pg_tblspc";
5970         DIR                *dir;
5971         struct dirent *de;
5972         char            path[MAXPGPATH];
5973
5974         /*
5975          * We zap the shared cache file too.  In theory it can't get out of sync
5976          * enough to be a problem, but in data-corruption cases, who knows ...
5977          */
5978         snprintf(path, sizeof(path), "global/%s",
5979                          RELCACHE_INIT_FILENAME);
5980         unlink_initfile(path);
5981
5982         /* Scan everything in the default tablespace */
5983         RelationCacheInitFileRemoveInDir("base");
5984
5985         /* Scan the tablespace link directory to find non-default tablespaces */
5986         dir = AllocateDir(tblspcdir);
5987         if (dir == NULL)
5988         {
5989                 elog(LOG, "could not open tablespace link directory \"%s\": %m",
5990                          tblspcdir);
5991                 return;
5992         }
5993
5994         while ((de = ReadDir(dir, tblspcdir)) != NULL)
5995         {
5996                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
5997                 {
5998                         /* Scan the tablespace dir for per-database dirs */
5999                         snprintf(path, sizeof(path), "%s/%s/%s",
6000                                          tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
6001                         RelationCacheInitFileRemoveInDir(path);
6002                 }
6003         }
6004
6005         FreeDir(dir);
6006 }
6007
6008 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
6009 static void
6010 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
6011 {
6012         DIR                *dir;
6013         struct dirent *de;
6014         char            initfilename[MAXPGPATH];
6015
6016         /* Scan the tablespace directory to find per-database directories */
6017         dir = AllocateDir(tblspcpath);
6018         if (dir == NULL)
6019         {
6020                 elog(LOG, "could not open tablespace directory \"%s\": %m",
6021                          tblspcpath);
6022                 return;
6023         }
6024
6025         while ((de = ReadDir(dir, tblspcpath)) != NULL)
6026         {
6027                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6028                 {
6029                         /* Try to remove the init file in each database */
6030                         snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
6031                                          tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
6032                         unlink_initfile(initfilename);
6033                 }
6034         }
6035
6036         FreeDir(dir);
6037 }
6038
6039 static void
6040 unlink_initfile(const char *initfilename)
6041 {
6042         if (unlink(initfilename) < 0)
6043         {
6044                 /* It might not be there, but log any error other than ENOENT */
6045                 if (errno != ENOENT)
6046                         elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
6047         }
6048 }