]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
bcf4f104cf65f1bc9cf948cd69faef779f019a01
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - initialize shared-catalog entries
19  *              RelationCacheInitializePhase3   - finish initializing relcache
20  *              RelationIdGetRelation                   - get a reldesc by relation id
21  *              RelationClose                                   - close an open relation
22  *
23  * NOTES
24  *              The following code contains many undocumented hacks.  Please be
25  *              careful....
26  */
27 #include "postgres.h"
28
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32
33 #include "access/hash.h"
34 #include "access/htup_details.h"
35 #include "access/multixact.h"
36 #include "access/nbtree.h"
37 #include "access/reloptions.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/tupdesc_details.h"
41 #include "access/xact.h"
42 #include "access/xlog.h"
43 #include "catalog/catalog.h"
44 #include "catalog/index.h"
45 #include "catalog/indexing.h"
46 #include "catalog/namespace.h"
47 #include "catalog/partition.h"
48 #include "catalog/pg_am.h"
49 #include "catalog/pg_amproc.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_authid.h"
52 #include "catalog/pg_auth_members.h"
53 #include "catalog/pg_constraint.h"
54 #include "catalog/pg_database.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_partitioned_table.h"
58 #include "catalog/pg_proc.h"
59 #include "catalog/pg_publication.h"
60 #include "catalog/pg_rewrite.h"
61 #include "catalog/pg_shseclabel.h"
62 #include "catalog/pg_statistic_ext.h"
63 #include "catalog/pg_subscription.h"
64 #include "catalog/pg_tablespace.h"
65 #include "catalog/pg_trigger.h"
66 #include "catalog/pg_type.h"
67 #include "catalog/schemapg.h"
68 #include "catalog/storage.h"
69 #include "commands/policy.h"
70 #include "commands/trigger.h"
71 #include "miscadmin.h"
72 #include "nodes/nodeFuncs.h"
73 #include "optimizer/clauses.h"
74 #include "optimizer/prep.h"
75 #include "optimizer/var.h"
76 #include "partitioning/partbounds.h"
77 #include "rewrite/rewriteDefine.h"
78 #include "rewrite/rowsecurity.h"
79 #include "storage/lmgr.h"
80 #include "storage/smgr.h"
81 #include "utils/array.h"
82 #include "utils/builtins.h"
83 #include "utils/datum.h"
84 #include "utils/fmgroids.h"
85 #include "utils/inval.h"
86 #include "utils/lsyscache.h"
87 #include "utils/memutils.h"
88 #include "utils/partcache.h"
89 #include "utils/relmapper.h"
90 #include "utils/resowner_private.h"
91 #include "utils/snapmgr.h"
92 #include "utils/syscache.h"
93
94
95 #define RELCACHE_INIT_FILEMAGIC         0x573266        /* version ID value */
96
97 /*
98  *              hardcoded tuple descriptors, contents generated by genbki.pl
99  */
100 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
101 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
102 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
103 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
104 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
105 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
106 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
107 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
108 static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
109 static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
110
111 /*
112  *              Hash tables that index the relation cache
113  *
114  *              We used to index the cache by both name and OID, but now there
115  *              is only an index by OID.
116  */
117 typedef struct relidcacheent
118 {
119         Oid                     reloid;
120         Relation        reldesc;
121 } RelIdCacheEnt;
122
123 static HTAB *RelationIdCache;
124
125 /*
126  * This flag is false until we have prepared the critical relcache entries
127  * that are needed to do indexscans on the tables read by relcache building.
128  */
129 bool            criticalRelcachesBuilt = false;
130
131 /*
132  * This flag is false until we have prepared the critical relcache entries
133  * for shared catalogs (which are the tables needed for login).
134  */
135 bool            criticalSharedRelcachesBuilt = false;
136
137 /*
138  * This counter counts relcache inval events received since backend startup
139  * (but only for rels that are actually in cache).  Presently, we use it only
140  * to detect whether data about to be written by write_relcache_init_file()
141  * might already be obsolete.
142  */
143 static long relcacheInvalsReceived = 0L;
144
145 /*
146  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
147  * cleanup work.  This list intentionally has limited size; if it overflows,
148  * we fall back to scanning the whole hashtable.  There is no value in a very
149  * large list because (1) at some point, a hash_seq_search scan is faster than
150  * retail lookups, and (2) the value of this is to reduce EOXact work for
151  * short transactions, which can't have dirtied all that many tables anyway.
152  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
153  * cleanup processing must be idempotent.
154  */
155 #define MAX_EOXACT_LIST 32
156 static Oid      eoxact_list[MAX_EOXACT_LIST];
157 static int      eoxact_list_len = 0;
158 static bool eoxact_list_overflowed = false;
159
160 #define EOXactListAdd(rel) \
161         do { \
162                 if (eoxact_list_len < MAX_EOXACT_LIST) \
163                         eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
164                 else \
165                         eoxact_list_overflowed = true; \
166         } while (0)
167
168 /*
169  * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
170  * cleanup work.  The array expands as needed; there is no hashtable because
171  * we don't need to access individual items except at EOXact.
172  */
173 static TupleDesc *EOXactTupleDescArray;
174 static int      NextEOXactTupleDescNum = 0;
175 static int      EOXactTupleDescArrayLen = 0;
176
177 /*
178  *              macros to manipulate the lookup hashtable
179  */
180 #define RelationCacheInsert(RELATION, replace_allowed)  \
181 do { \
182         RelIdCacheEnt *hentry; bool found; \
183         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
184                                                                                    (void *) &((RELATION)->rd_id), \
185                                                                                    HASH_ENTER, &found); \
186         if (found) \
187         { \
188                 /* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
189                 Relation _old_rel = hentry->reldesc; \
190                 Assert(replace_allowed); \
191                 hentry->reldesc = (RELATION); \
192                 if (RelationHasReferenceCountZero(_old_rel)) \
193                         RelationDestroyRelation(_old_rel, false); \
194                 else if (!IsBootstrapProcessingMode()) \
195                         elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
196                                  RelationGetRelationName(_old_rel)); \
197         } \
198         else \
199                 hentry->reldesc = (RELATION); \
200 } while(0)
201
202 #define RelationIdCacheLookup(ID, RELATION) \
203 do { \
204         RelIdCacheEnt *hentry; \
205         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
206                                                                                    (void *) &(ID), \
207                                                                                    HASH_FIND, NULL); \
208         if (hentry) \
209                 RELATION = hentry->reldesc; \
210         else \
211                 RELATION = NULL; \
212 } while(0)
213
214 #define RelationCacheDelete(RELATION) \
215 do { \
216         RelIdCacheEnt *hentry; \
217         hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
218                                                                                    (void *) &((RELATION)->rd_id), \
219                                                                                    HASH_REMOVE, NULL); \
220         if (hentry == NULL) \
221                 elog(WARNING, "failed to delete relcache entry for OID %u", \
222                          (RELATION)->rd_id); \
223 } while(0)
224
225
226 /*
227  * Special cache for opclass-related information
228  *
229  * Note: only default support procs get cached, ie, those with
230  * lefttype = righttype = opcintype.
231  */
232 typedef struct opclasscacheent
233 {
234         Oid                     opclassoid;             /* lookup key: OID of opclass */
235         bool            valid;                  /* set true after successful fill-in */
236         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
237         Oid                     opcfamily;              /* OID of opclass's family */
238         Oid                     opcintype;              /* OID of opclass's declared input type */
239         RegProcedure *supportProcs; /* OIDs of support procedures */
240 } OpClassCacheEnt;
241
242 static HTAB *OpClassCache = NULL;
243
244
245 /* non-export function prototypes */
246
247 static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
248 static void RelationClearRelation(Relation relation, bool rebuild);
249
250 static void RelationReloadIndexInfo(Relation relation);
251 static void RelationReloadNailed(Relation relation);
252 static void RelationFlushRelation(Relation relation);
253 static void RememberToFreeTupleDescAtEOX(TupleDesc td);
254 static void AtEOXact_cleanup(Relation relation, bool isCommit);
255 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
256                                         SubTransactionId mySubid, SubTransactionId parentSubid);
257 static bool load_relcache_init_file(bool shared);
258 static void write_relcache_init_file(bool shared);
259 static void write_item(const void *data, Size len, FILE *fp);
260
261 static void formrdesc(const char *relationName, Oid relationReltype,
262                   bool isshared, int natts, const FormData_pg_attribute *attrs);
263
264 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
265 static Relation AllocateRelationDesc(Form_pg_class relp);
266 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
267 static void RelationBuildTupleDesc(Relation relation);
268 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
269 static void RelationInitPhysicalAddr(Relation relation);
270 static void load_critical_index(Oid indexoid, Oid heapoid);
271 static TupleDesc GetPgClassDescriptor(void);
272 static TupleDesc GetPgIndexDescriptor(void);
273 static void AttrDefaultFetch(Relation relation);
274 static void CheckConstraintFetch(Relation relation);
275 static int      CheckConstraintCmp(const void *a, const void *b);
276 static List *insert_ordered_oid(List *list, Oid datum);
277 static void InitIndexAmRoutine(Relation relation);
278 static void IndexSupportInitialize(oidvector *indclass,
279                                            RegProcedure *indexSupport,
280                                            Oid *opFamily,
281                                            Oid *opcInType,
282                                            StrategyNumber maxSupportNumber,
283                                            AttrNumber maxAttributeNumber);
284 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
285                                   StrategyNumber numSupport);
286 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
287 static void unlink_initfile(const char *initfilename, int elevel);
288 static bool equalPartitionDescs(PartitionKey key, PartitionDesc partdesc1,
289                                         PartitionDesc partdesc2);
290
291
292 /*
293  *              ScanPgRelation
294  *
295  *              This is used by RelationBuildDesc to find a pg_class
296  *              tuple matching targetRelId.  The caller must hold at least
297  *              AccessShareLock on the target relid to prevent concurrent-update
298  *              scenarios; it isn't guaranteed that all scans used to build the
299  *              relcache entry will use the same snapshot.  If, for example,
300  *              an attribute were to be added after scanning pg_class and before
301  *              scanning pg_attribute, relnatts wouldn't match.
302  *
303  *              NB: the returned tuple has been copied into palloc'd storage
304  *              and must eventually be freed with heap_freetuple.
305  */
306 static HeapTuple
307 ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
308 {
309         HeapTuple       pg_class_tuple;
310         Relation        pg_class_desc;
311         SysScanDesc pg_class_scan;
312         ScanKeyData key[1];
313         Snapshot        snapshot;
314
315         /*
316          * If something goes wrong during backend startup, we might find ourselves
317          * trying to read pg_class before we've selected a database.  That ain't
318          * gonna work, so bail out with a useful error message.  If this happens,
319          * it probably means a relcache entry that needs to be nailed isn't.
320          */
321         if (!OidIsValid(MyDatabaseId))
322                 elog(FATAL, "cannot read pg_class without having selected a database");
323
324         /*
325          * form a scan key
326          */
327         ScanKeyInit(&key[0],
328                                 Anum_pg_class_oid,
329                                 BTEqualStrategyNumber, F_OIDEQ,
330                                 ObjectIdGetDatum(targetRelId));
331
332         /*
333          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
334          * built the critical relcache entries (this includes initdb and startup
335          * without a pg_internal.init file).  The caller can also force a heap
336          * scan by setting indexOK == false.
337          */
338         pg_class_desc = table_open(RelationRelationId, AccessShareLock);
339
340         /*
341          * The caller might need a tuple that's newer than the one the historic
342          * snapshot; currently the only case requiring to do so is looking up the
343          * relfilenode of non mapped system relations during decoding.
344          */
345         if (force_non_historic)
346                 snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
347         else
348                 snapshot = GetCatalogSnapshot(RelationRelationId);
349
350         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
351                                                                            indexOK && criticalRelcachesBuilt,
352                                                                            snapshot,
353                                                                            1, key);
354
355         pg_class_tuple = systable_getnext(pg_class_scan);
356
357         /*
358          * Must copy tuple before releasing buffer.
359          */
360         if (HeapTupleIsValid(pg_class_tuple))
361                 pg_class_tuple = heap_copytuple(pg_class_tuple);
362
363         /* all done */
364         systable_endscan(pg_class_scan);
365         table_close(pg_class_desc, AccessShareLock);
366
367         return pg_class_tuple;
368 }
369
370 /*
371  *              AllocateRelationDesc
372  *
373  *              This is used to allocate memory for a new relation descriptor
374  *              and initialize the rd_rel field from the given pg_class tuple.
375  */
376 static Relation
377 AllocateRelationDesc(Form_pg_class relp)
378 {
379         Relation        relation;
380         MemoryContext oldcxt;
381         Form_pg_class relationForm;
382
383         /* Relcache entries must live in CacheMemoryContext */
384         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
385
386         /*
387          * allocate and zero space for new relation descriptor
388          */
389         relation = (Relation) palloc0(sizeof(RelationData));
390
391         /* make sure relation is marked as having no open file yet */
392         relation->rd_smgr = NULL;
393
394         /*
395          * Copy the relation tuple form
396          *
397          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
398          * variable-length fields (relacl, reloptions) are NOT stored in the
399          * relcache --- there'd be little point in it, since we don't copy the
400          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
401          * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
402          * it from the syscache if you need it.  The same goes for the original
403          * form of reloptions (however, we do store the parsed form of reloptions
404          * in rd_options).
405          */
406         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
407
408         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
409
410         /* initialize relation tuple form */
411         relation->rd_rel = relationForm;
412
413         /* and allocate attribute tuple form storage */
414         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
415         /* which we mark as a reference-counted tupdesc */
416         relation->rd_att->tdrefcount = 1;
417
418         MemoryContextSwitchTo(oldcxt);
419
420         return relation;
421 }
422
423 /*
424  * RelationParseRelOptions
425  *              Convert pg_class.reloptions into pre-parsed rd_options
426  *
427  * tuple is the real pg_class tuple (not rd_rel!) for relation
428  *
429  * Note: rd_rel and (if an index) rd_indam must be valid already
430  */
431 static void
432 RelationParseRelOptions(Relation relation, HeapTuple tuple)
433 {
434         bytea      *options;
435         amoptions_function amoptsfn;
436
437         relation->rd_options = NULL;
438
439         /*
440          * Look up any AM-specific parse function; fall out if relkind should not
441          * have options.
442          */
443         switch (relation->rd_rel->relkind)
444         {
445                 case RELKIND_RELATION:
446                 case RELKIND_TOASTVALUE:
447                 case RELKIND_VIEW:
448                 case RELKIND_MATVIEW:
449                 case RELKIND_PARTITIONED_TABLE:
450                         amoptsfn = NULL;
451                         break;
452                 case RELKIND_INDEX:
453                 case RELKIND_PARTITIONED_INDEX:
454                         amoptsfn = relation->rd_indam->amoptions;
455                         break;
456                 default:
457                         return;
458         }
459
460         /*
461          * Fetch reloptions from tuple; have to use a hardwired descriptor because
462          * we might not have any other for pg_class yet (consider executing this
463          * code for pg_class itself)
464          */
465         options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
466
467         /*
468          * Copy parsed data into CacheMemoryContext.  To guard against the
469          * possibility of leaks in the reloptions code, we want to do the actual
470          * parsing in the caller's memory context and copy the results into
471          * CacheMemoryContext after the fact.
472          */
473         if (options)
474         {
475                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
476                                                                                                   VARSIZE(options));
477                 memcpy(relation->rd_options, options, VARSIZE(options));
478                 pfree(options);
479         }
480 }
481
482 /*
483  *              RelationBuildTupleDesc
484  *
485  *              Form the relation's tuple descriptor from information in
486  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
487  */
488 static void
489 RelationBuildTupleDesc(Relation relation)
490 {
491         HeapTuple       pg_attribute_tuple;
492         Relation        pg_attribute_desc;
493         SysScanDesc pg_attribute_scan;
494         ScanKeyData skey[2];
495         int                     need;
496         TupleConstr *constr;
497         AttrDefault *attrdef = NULL;
498         AttrMissing *attrmiss = NULL;
499         int                     ndef = 0;
500
501         /* copy some fields from pg_class row to rd_att */
502         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
503         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
504
505         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
506                                                                                                 sizeof(TupleConstr));
507         constr->has_not_null = false;
508
509         /*
510          * Form a scan key that selects only user attributes (attnum > 0).
511          * (Eliminating system attribute rows at the index level is lots faster
512          * than fetching them.)
513          */
514         ScanKeyInit(&skey[0],
515                                 Anum_pg_attribute_attrelid,
516                                 BTEqualStrategyNumber, F_OIDEQ,
517                                 ObjectIdGetDatum(RelationGetRelid(relation)));
518         ScanKeyInit(&skey[1],
519                                 Anum_pg_attribute_attnum,
520                                 BTGreaterStrategyNumber, F_INT2GT,
521                                 Int16GetDatum(0));
522
523         /*
524          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
525          * built the critical relcache entries (this includes initdb and startup
526          * without a pg_internal.init file).
527          */
528         pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
529         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
530                                                                                    AttributeRelidNumIndexId,
531                                                                                    criticalRelcachesBuilt,
532                                                                                    NULL,
533                                                                                    2, skey);
534
535         /*
536          * add attribute data to relation->rd_att
537          */
538         need = RelationGetNumberOfAttributes(relation);
539
540         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
541         {
542                 Form_pg_attribute attp;
543                 int                     attnum;
544
545                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
546
547                 attnum = attp->attnum;
548                 if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
549                         elog(ERROR, "invalid attribute number %d for %s",
550                                  attp->attnum, RelationGetRelationName(relation));
551
552
553                 memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
554                            attp,
555                            ATTRIBUTE_FIXED_PART_SIZE);
556
557                 /* Update constraint/default info */
558                 if (attp->attnotnull)
559                         constr->has_not_null = true;
560
561                 /* If the column has a default, fill it into the attrdef array */
562                 if (attp->atthasdef)
563                 {
564                         if (attrdef == NULL)
565                                 attrdef = (AttrDefault *)
566                                         MemoryContextAllocZero(CacheMemoryContext,
567                                                                                    RelationGetNumberOfAttributes(relation) *
568                                                                                    sizeof(AttrDefault));
569                         attrdef[ndef].adnum = attnum;
570                         attrdef[ndef].adbin = NULL;
571
572                         ndef++;
573                 }
574
575                 /* Likewise for a missing value */
576                 if (attp->atthasmissing)
577                 {
578                         Datum           missingval;
579                         bool            missingNull;
580
581                         /* Do we have a missing value? */
582                         missingval = heap_getattr(pg_attribute_tuple,
583                                                                           Anum_pg_attribute_attmissingval,
584                                                                           pg_attribute_desc->rd_att,
585                                                                           &missingNull);
586                         if (!missingNull)
587                         {
588                                 /* Yes, fetch from the array */
589                                 MemoryContext oldcxt;
590                                 bool            is_null;
591                                 int                     one = 1;
592                                 Datum           missval;
593
594                                 if (attrmiss == NULL)
595                                         attrmiss = (AttrMissing *)
596                                                 MemoryContextAllocZero(CacheMemoryContext,
597                                                                                            relation->rd_rel->relnatts *
598                                                                                            sizeof(AttrMissing));
599
600                                 missval = array_get_element(missingval,
601                                                                                         1,
602                                                                                         &one,
603                                                                                         -1,
604                                                                                         attp->attlen,
605                                                                                         attp->attbyval,
606                                                                                         attp->attalign,
607                                                                                         &is_null);
608                                 Assert(!is_null);
609                                 if (attp->attbyval)
610                                 {
611                                         /* for copy by val just copy the datum direct */
612                                         attrmiss[attnum - 1].am_value = missval;
613                                 }
614                                 else
615                                 {
616                                         /* otherwise copy in the correct context */
617                                         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
618                                         attrmiss[attnum - 1].am_value = datumCopy(missval,
619                                                                                                                           attp->attbyval,
620                                                                                                                           attp->attlen);
621                                         MemoryContextSwitchTo(oldcxt);
622                                 }
623                                 attrmiss[attnum - 1].am_present = true;
624                         }
625                 }
626                 need--;
627                 if (need == 0)
628                         break;
629         }
630
631         /*
632          * end the scan and close the attribute relation
633          */
634         systable_endscan(pg_attribute_scan);
635         table_close(pg_attribute_desc, AccessShareLock);
636
637         if (need != 0)
638                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
639                          need, RelationGetRelid(relation));
640
641         /*
642          * The attcacheoff values we read from pg_attribute should all be -1
643          * ("unknown").  Verify this if assert checking is on.  They will be
644          * computed when and if needed during tuple access.
645          */
646 #ifdef USE_ASSERT_CHECKING
647         {
648                 int                     i;
649
650                 for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
651                         Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
652         }
653 #endif
654
655         /*
656          * However, we can easily set the attcacheoff value for the first
657          * attribute: it must be zero.  This eliminates the need for special cases
658          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
659          */
660         if (RelationGetNumberOfAttributes(relation) > 0)
661                 TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
662
663         /*
664          * Set up constraint/default info
665          */
666         if (constr->has_not_null || ndef > 0 ||
667                 attrmiss || relation->rd_rel->relchecks)
668         {
669                 relation->rd_att->constr = constr;
670
671                 if (ndef > 0)                   /* DEFAULTs */
672                 {
673                         if (ndef < RelationGetNumberOfAttributes(relation))
674                                 constr->defval = (AttrDefault *)
675                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
676                         else
677                                 constr->defval = attrdef;
678                         constr->num_defval = ndef;
679                         AttrDefaultFetch(relation);
680                 }
681                 else
682                         constr->num_defval = 0;
683
684                 constr->missing = attrmiss;
685
686                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
687                 {
688                         constr->num_check = relation->rd_rel->relchecks;
689                         constr->check = (ConstrCheck *)
690                                 MemoryContextAllocZero(CacheMemoryContext,
691                                                                            constr->num_check * sizeof(ConstrCheck));
692                         CheckConstraintFetch(relation);
693                 }
694                 else
695                         constr->num_check = 0;
696         }
697         else
698         {
699                 pfree(constr);
700                 relation->rd_att->constr = NULL;
701         }
702 }
703
704 /*
705  *              RelationBuildRuleLock
706  *
707  *              Form the relation's rewrite rules from information in
708  *              the pg_rewrite system catalog.
709  *
710  * Note: The rule parsetrees are potentially very complex node structures.
711  * To allow these trees to be freed when the relcache entry is flushed,
712  * we make a private memory context to hold the RuleLock information for
713  * each relcache entry that has associated rules.  The context is used
714  * just for rule info, not for any other subsidiary data of the relcache
715  * entry, because that keeps the update logic in RelationClearRelation()
716  * manageable.  The other subsidiary data structures are simple enough
717  * to be easy to free explicitly, anyway.
718  */
719 static void
720 RelationBuildRuleLock(Relation relation)
721 {
722         MemoryContext rulescxt;
723         MemoryContext oldcxt;
724         HeapTuple       rewrite_tuple;
725         Relation        rewrite_desc;
726         TupleDesc       rewrite_tupdesc;
727         SysScanDesc rewrite_scan;
728         ScanKeyData key;
729         RuleLock   *rulelock;
730         int                     numlocks;
731         RewriteRule **rules;
732         int                     maxlocks;
733
734         /*
735          * Make the private context.  Assume it'll not contain much data.
736          */
737         rulescxt = AllocSetContextCreate(CacheMemoryContext,
738                                                                          "relation rules",
739                                                                          ALLOCSET_SMALL_SIZES);
740         relation->rd_rulescxt = rulescxt;
741         MemoryContextCopyAndSetIdentifier(rulescxt,
742                                                                           RelationGetRelationName(relation));
743
744         /*
745          * allocate an array to hold the rewrite rules (the array is extended if
746          * necessary)
747          */
748         maxlocks = 4;
749         rules = (RewriteRule **)
750                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
751         numlocks = 0;
752
753         /*
754          * form a scan key
755          */
756         ScanKeyInit(&key,
757                                 Anum_pg_rewrite_ev_class,
758                                 BTEqualStrategyNumber, F_OIDEQ,
759                                 ObjectIdGetDatum(RelationGetRelid(relation)));
760
761         /*
762          * open pg_rewrite and begin a scan
763          *
764          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
765          * be reading the rules in name order, except possibly during
766          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
767          * ensures that rules will be fired in name order.
768          */
769         rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
770         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
771         rewrite_scan = systable_beginscan(rewrite_desc,
772                                                                           RewriteRelRulenameIndexId,
773                                                                           true, NULL,
774                                                                           1, &key);
775
776         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
777         {
778                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
779                 bool            isnull;
780                 Datum           rule_datum;
781                 char       *rule_str;
782                 RewriteRule *rule;
783
784                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
785                                                                                                   sizeof(RewriteRule));
786
787                 rule->ruleId = rewrite_form->oid;
788
789                 rule->event = rewrite_form->ev_type - '0';
790                 rule->enabled = rewrite_form->ev_enabled;
791                 rule->isInstead = rewrite_form->is_instead;
792
793                 /*
794                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
795                  * rule strings are often large enough to be toasted.  To avoid
796                  * leaking memory in the caller's context, do the detoasting here so
797                  * we can free the detoasted version.
798                  */
799                 rule_datum = heap_getattr(rewrite_tuple,
800                                                                   Anum_pg_rewrite_ev_action,
801                                                                   rewrite_tupdesc,
802                                                                   &isnull);
803                 Assert(!isnull);
804                 rule_str = TextDatumGetCString(rule_datum);
805                 oldcxt = MemoryContextSwitchTo(rulescxt);
806                 rule->actions = (List *) stringToNode(rule_str);
807                 MemoryContextSwitchTo(oldcxt);
808                 pfree(rule_str);
809
810                 rule_datum = heap_getattr(rewrite_tuple,
811                                                                   Anum_pg_rewrite_ev_qual,
812                                                                   rewrite_tupdesc,
813                                                                   &isnull);
814                 Assert(!isnull);
815                 rule_str = TextDatumGetCString(rule_datum);
816                 oldcxt = MemoryContextSwitchTo(rulescxt);
817                 rule->qual = (Node *) stringToNode(rule_str);
818                 MemoryContextSwitchTo(oldcxt);
819                 pfree(rule_str);
820
821                 /*
822                  * We want the rule's table references to be checked as though by the
823                  * table owner, not the user referencing the rule.  Therefore, scan
824                  * through the rule's actions and set the checkAsUser field on all
825                  * rtable entries.  We have to look at the qual as well, in case it
826                  * contains sublinks.
827                  *
828                  * The reason for doing this when the rule is loaded, rather than when
829                  * it is stored, is that otherwise ALTER TABLE OWNER would have to
830                  * grovel through stored rules to update checkAsUser fields. Scanning
831                  * the rule tree during load is relatively cheap (compared to
832                  * constructing it in the first place), so we do it here.
833                  */
834                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
835                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
836
837                 if (numlocks >= maxlocks)
838                 {
839                         maxlocks *= 2;
840                         rules = (RewriteRule **)
841                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
842                 }
843                 rules[numlocks++] = rule;
844         }
845
846         /*
847          * end the scan and close the attribute relation
848          */
849         systable_endscan(rewrite_scan);
850         table_close(rewrite_desc, AccessShareLock);
851
852         /*
853          * there might not be any rules (if relhasrules is out-of-date)
854          */
855         if (numlocks == 0)
856         {
857                 relation->rd_rules = NULL;
858                 relation->rd_rulescxt = NULL;
859                 MemoryContextDelete(rulescxt);
860                 return;
861         }
862
863         /*
864          * form a RuleLock and insert into relation
865          */
866         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
867         rulelock->numLocks = numlocks;
868         rulelock->rules = rules;
869
870         relation->rd_rules = rulelock;
871 }
872
873 /*
874  *              equalRuleLocks
875  *
876  *              Determine whether two RuleLocks are equivalent
877  *
878  *              Probably this should be in the rules code someplace...
879  */
880 static bool
881 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
882 {
883         int                     i;
884
885         /*
886          * As of 7.3 we assume the rule ordering is repeatable, because
887          * RelationBuildRuleLock should read 'em in a consistent order.  So just
888          * compare corresponding slots.
889          */
890         if (rlock1 != NULL)
891         {
892                 if (rlock2 == NULL)
893                         return false;
894                 if (rlock1->numLocks != rlock2->numLocks)
895                         return false;
896                 for (i = 0; i < rlock1->numLocks; i++)
897                 {
898                         RewriteRule *rule1 = rlock1->rules[i];
899                         RewriteRule *rule2 = rlock2->rules[i];
900
901                         if (rule1->ruleId != rule2->ruleId)
902                                 return false;
903                         if (rule1->event != rule2->event)
904                                 return false;
905                         if (rule1->enabled != rule2->enabled)
906                                 return false;
907                         if (rule1->isInstead != rule2->isInstead)
908                                 return false;
909                         if (!equal(rule1->qual, rule2->qual))
910                                 return false;
911                         if (!equal(rule1->actions, rule2->actions))
912                                 return false;
913                 }
914         }
915         else if (rlock2 != NULL)
916                 return false;
917         return true;
918 }
919
920 /*
921  *              equalPolicy
922  *
923  *              Determine whether two policies are equivalent
924  */
925 static bool
926 equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
927 {
928         int                     i;
929         Oid                *r1,
930                            *r2;
931
932         if (policy1 != NULL)
933         {
934                 if (policy2 == NULL)
935                         return false;
936
937                 if (policy1->polcmd != policy2->polcmd)
938                         return false;
939                 if (policy1->hassublinks != policy2->hassublinks)
940                         return false;
941                 if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
942                         return false;
943                 if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
944                         return false;
945
946                 r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
947                 r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
948
949                 for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
950                 {
951                         if (r1[i] != r2[i])
952                                 return false;
953                 }
954
955                 if (!equal(policy1->qual, policy2->qual))
956                         return false;
957                 if (!equal(policy1->with_check_qual, policy2->with_check_qual))
958                         return false;
959         }
960         else if (policy2 != NULL)
961                 return false;
962
963         return true;
964 }
965
966 /*
967  *              equalRSDesc
968  *
969  *              Determine whether two RowSecurityDesc's are equivalent
970  */
971 static bool
972 equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
973 {
974         ListCell   *lc,
975                            *rc;
976
977         if (rsdesc1 == NULL && rsdesc2 == NULL)
978                 return true;
979
980         if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
981                 (rsdesc1 == NULL && rsdesc2 != NULL))
982                 return false;
983
984         if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
985                 return false;
986
987         /* RelationBuildRowSecurity should build policies in order */
988         forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
989         {
990                 RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
991                 RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
992
993                 if (!equalPolicy(l, r))
994                         return false;
995         }
996
997         return true;
998 }
999
1000 /*
1001  * equalPartitionDescs
1002  *              Compare two partition descriptors for logical equality
1003  */
1004 static bool
1005 equalPartitionDescs(PartitionKey key, PartitionDesc partdesc1,
1006                                         PartitionDesc partdesc2)
1007 {
1008         int                     i;
1009
1010         if (partdesc1 != NULL)
1011         {
1012                 if (partdesc2 == NULL)
1013                         return false;
1014                 if (partdesc1->nparts != partdesc2->nparts)
1015                         return false;
1016
1017                 Assert(key != NULL || partdesc1->nparts == 0);
1018
1019                 /*
1020                  * Same oids? If the partitioning structure did not change, that is,
1021                  * no partitions were added or removed to the relation, the oids array
1022                  * should still match element-by-element.
1023                  */
1024                 for (i = 0; i < partdesc1->nparts; i++)
1025                 {
1026                         if (partdesc1->oids[i] != partdesc2->oids[i])
1027                                 return false;
1028                 }
1029
1030                 /*
1031                  * Now compare partition bound collections.  The logic to iterate over
1032                  * the collections is private to partition.c.
1033                  */
1034                 if (partdesc1->boundinfo != NULL)
1035                 {
1036                         if (partdesc2->boundinfo == NULL)
1037                                 return false;
1038
1039                         if (!partition_bounds_equal(key->partnatts, key->parttyplen,
1040                                                                                 key->parttypbyval,
1041                                                                                 partdesc1->boundinfo,
1042                                                                                 partdesc2->boundinfo))
1043                                 return false;
1044                 }
1045                 else if (partdesc2->boundinfo != NULL)
1046                         return false;
1047         }
1048         else if (partdesc2 != NULL)
1049                 return false;
1050
1051         return true;
1052 }
1053
1054 /*
1055  *              RelationBuildDesc
1056  *
1057  *              Build a relation descriptor.  The caller must hold at least
1058  *              AccessShareLock on the target relid.
1059  *
1060  *              The new descriptor is inserted into the hash table if insertIt is true.
1061  *
1062  *              Returns NULL if no pg_class row could be found for the given relid
1063  *              (suggesting we are trying to access a just-deleted relation).
1064  *              Any other error is reported via elog.
1065  */
1066 static Relation
1067 RelationBuildDesc(Oid targetRelId, bool insertIt)
1068 {
1069         Relation        relation;
1070         Oid                     relid;
1071         HeapTuple       pg_class_tuple;
1072         Form_pg_class relp;
1073
1074         /*
1075          * find the tuple in pg_class corresponding to the given relation id
1076          */
1077         pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1078
1079         /*
1080          * if no such tuple exists, return NULL
1081          */
1082         if (!HeapTupleIsValid(pg_class_tuple))
1083                 return NULL;
1084
1085         /*
1086          * get information from the pg_class_tuple
1087          */
1088         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1089         relid = relp->oid;
1090         Assert(relid == targetRelId);
1091
1092         /*
1093          * allocate storage for the relation descriptor, and copy pg_class_tuple
1094          * to relation->rd_rel.
1095          */
1096         relation = AllocateRelationDesc(relp);
1097
1098         /*
1099          * initialize the relation's relation id (relation->rd_id)
1100          */
1101         RelationGetRelid(relation) = relid;
1102
1103         /*
1104          * normal relations are not nailed into the cache; nor can a pre-existing
1105          * relation be new.  It could be temp though.  (Actually, it could be new
1106          * too, but it's okay to forget that fact if forced to flush the entry.)
1107          */
1108         relation->rd_refcnt = 0;
1109         relation->rd_isnailed = false;
1110         relation->rd_createSubid = InvalidSubTransactionId;
1111         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1112         switch (relation->rd_rel->relpersistence)
1113         {
1114                 case RELPERSISTENCE_UNLOGGED:
1115                 case RELPERSISTENCE_PERMANENT:
1116                         relation->rd_backend = InvalidBackendId;
1117                         relation->rd_islocaltemp = false;
1118                         break;
1119                 case RELPERSISTENCE_TEMP:
1120                         if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1121                         {
1122                                 relation->rd_backend = BackendIdForTempRelations();
1123                                 relation->rd_islocaltemp = true;
1124                         }
1125                         else
1126                         {
1127                                 /*
1128                                  * If it's a temp table, but not one of ours, we have to use
1129                                  * the slow, grotty method to figure out the owning backend.
1130                                  *
1131                                  * Note: it's possible that rd_backend gets set to MyBackendId
1132                                  * here, in case we are looking at a pg_class entry left over
1133                                  * from a crashed backend that coincidentally had the same
1134                                  * BackendId we're using.  We should *not* consider such a
1135                                  * table to be "ours"; this is why we need the separate
1136                                  * rd_islocaltemp flag.  The pg_class entry will get flushed
1137                                  * if/when we clean out the corresponding temp table namespace
1138                                  * in preparation for using it.
1139                                  */
1140                                 relation->rd_backend =
1141                                         GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
1142                                 Assert(relation->rd_backend != InvalidBackendId);
1143                                 relation->rd_islocaltemp = false;
1144                         }
1145                         break;
1146                 default:
1147                         elog(ERROR, "invalid relpersistence: %c",
1148                                  relation->rd_rel->relpersistence);
1149                         break;
1150         }
1151
1152         /*
1153          * initialize the tuple descriptor (relation->rd_att).
1154          */
1155         RelationBuildTupleDesc(relation);
1156
1157         /*
1158          * Fetch rules and triggers that affect this relation
1159          */
1160         if (relation->rd_rel->relhasrules)
1161                 RelationBuildRuleLock(relation);
1162         else
1163         {
1164                 relation->rd_rules = NULL;
1165                 relation->rd_rulescxt = NULL;
1166         }
1167
1168         if (relation->rd_rel->relhastriggers)
1169                 RelationBuildTriggers(relation);
1170         else
1171                 relation->trigdesc = NULL;
1172
1173         if (relation->rd_rel->relrowsecurity)
1174                 RelationBuildRowSecurity(relation);
1175         else
1176                 relation->rd_rsdesc = NULL;
1177
1178         /* foreign key data is not loaded till asked for */
1179         relation->rd_fkeylist = NIL;
1180         relation->rd_fkeyvalid = false;
1181
1182         /* if a partitioned table, initialize key and partition descriptor info */
1183         if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1184         {
1185                 RelationBuildPartitionKey(relation);
1186                 RelationBuildPartitionDesc(relation);
1187         }
1188         else
1189         {
1190                 relation->rd_partkeycxt = NULL;
1191                 relation->rd_partkey = NULL;
1192                 relation->rd_partdesc = NULL;
1193                 relation->rd_pdcxt = NULL;
1194         }
1195
1196         /*
1197          * if it's an index, initialize index-related information
1198          */
1199         if (OidIsValid(relation->rd_rel->relam))
1200                 RelationInitIndexAccessInfo(relation);
1201
1202         /* extract reloptions if any */
1203         RelationParseRelOptions(relation, pg_class_tuple);
1204
1205         /*
1206          * initialize the relation lock manager information
1207          */
1208         RelationInitLockInfo(relation); /* see lmgr.c */
1209
1210         /*
1211          * initialize physical addressing information for the relation
1212          */
1213         RelationInitPhysicalAddr(relation);
1214
1215         /* make sure relation is marked as having no open file yet */
1216         relation->rd_smgr = NULL;
1217
1218         /*
1219          * now we can free the memory allocated for pg_class_tuple
1220          */
1221         heap_freetuple(pg_class_tuple);
1222
1223         /*
1224          * Insert newly created relation into relcache hash table, if requested.
1225          *
1226          * There is one scenario in which we might find a hashtable entry already
1227          * present, even though our caller failed to find it: if the relation is a
1228          * system catalog or index that's used during relcache load, we might have
1229          * recursively created the same relcache entry during the preceding steps.
1230          * So allow RelationCacheInsert to delete any already-present relcache
1231          * entry for the same OID.  The already-present entry should have refcount
1232          * zero (else somebody forgot to close it); in the event that it doesn't,
1233          * we'll elog a WARNING and leak the already-present entry.
1234          */
1235         if (insertIt)
1236                 RelationCacheInsert(relation, true);
1237
1238         /* It's fully valid */
1239         relation->rd_isvalid = true;
1240
1241         return relation;
1242 }
1243
1244 /*
1245  * Initialize the physical addressing info (RelFileNode) for a relcache entry
1246  *
1247  * Note: at the physical level, relations in the pg_global tablespace must
1248  * be treated as shared, even if relisshared isn't set.  Hence we do not
1249  * look at relisshared here.
1250  */
1251 static void
1252 RelationInitPhysicalAddr(Relation relation)
1253 {
1254         /* these relations kinds never have storage */
1255         if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
1256                 return;
1257
1258         if (relation->rd_rel->reltablespace)
1259                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
1260         else
1261                 relation->rd_node.spcNode = MyDatabaseTableSpace;
1262         if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1263                 relation->rd_node.dbNode = InvalidOid;
1264         else
1265                 relation->rd_node.dbNode = MyDatabaseId;
1266
1267         if (relation->rd_rel->relfilenode)
1268         {
1269                 /*
1270                  * Even if we are using a decoding snapshot that doesn't represent the
1271                  * current state of the catalog we need to make sure the filenode
1272                  * points to the current file since the older file will be gone (or
1273                  * truncated). The new file will still contain older rows so lookups
1274                  * in them will work correctly. This wouldn't work correctly if
1275                  * rewrites were allowed to change the schema in an incompatible way,
1276                  * but those are prevented both on catalog tables and on user tables
1277                  * declared as additional catalog tables.
1278                  */
1279                 if (HistoricSnapshotActive()
1280                         && RelationIsAccessibleInLogicalDecoding(relation)
1281                         && IsTransactionState())
1282                 {
1283                         HeapTuple       phys_tuple;
1284                         Form_pg_class physrel;
1285
1286                         phys_tuple = ScanPgRelation(RelationGetRelid(relation),
1287                                                                                 RelationGetRelid(relation) != ClassOidIndexId,
1288                                                                                 true);
1289                         if (!HeapTupleIsValid(phys_tuple))
1290                                 elog(ERROR, "could not find pg_class entry for %u",
1291                                          RelationGetRelid(relation));
1292                         physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
1293
1294                         relation->rd_rel->reltablespace = physrel->reltablespace;
1295                         relation->rd_rel->relfilenode = physrel->relfilenode;
1296                         heap_freetuple(phys_tuple);
1297                 }
1298
1299                 relation->rd_node.relNode = relation->rd_rel->relfilenode;
1300         }
1301         else
1302         {
1303                 /* Consult the relation mapper */
1304                 relation->rd_node.relNode =
1305                         RelationMapOidToFilenode(relation->rd_id,
1306                                                                          relation->rd_rel->relisshared);
1307                 if (!OidIsValid(relation->rd_node.relNode))
1308                         elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1309                                  RelationGetRelationName(relation), relation->rd_id);
1310         }
1311 }
1312
1313 /*
1314  * Fill in the IndexAmRoutine for an index relation.
1315  *
1316  * relation's rd_amhandler and rd_indexcxt must be valid already.
1317  */
1318 static void
1319 InitIndexAmRoutine(Relation relation)
1320 {
1321         IndexAmRoutine *cached,
1322                            *tmp;
1323
1324         /*
1325          * Call the amhandler in current, short-lived memory context, just in case
1326          * it leaks anything (it probably won't, but let's be paranoid).
1327          */
1328         tmp = GetIndexAmRoutine(relation->rd_amhandler);
1329
1330         /* OK, now transfer the data into relation's rd_indexcxt. */
1331         cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
1332                                                                                                    sizeof(IndexAmRoutine));
1333         memcpy(cached, tmp, sizeof(IndexAmRoutine));
1334         relation->rd_indam = cached;
1335
1336         pfree(tmp);
1337 }
1338
1339 /*
1340  * Initialize index-access-method support data for an index relation
1341  */
1342 void
1343 RelationInitIndexAccessInfo(Relation relation)
1344 {
1345         HeapTuple       tuple;
1346         Form_pg_am      aform;
1347         Datum           indcollDatum;
1348         Datum           indclassDatum;
1349         Datum           indoptionDatum;
1350         bool            isnull;
1351         oidvector  *indcoll;
1352         oidvector  *indclass;
1353         int2vector *indoption;
1354         MemoryContext indexcxt;
1355         MemoryContext oldcontext;
1356         int                     indnatts;
1357         int                     indnkeyatts;
1358         uint16          amsupport;
1359
1360         /*
1361          * Make a copy of the pg_index entry for the index.  Since pg_index
1362          * contains variable-length and possibly-null fields, we have to do this
1363          * honestly rather than just treating it as a Form_pg_index struct.
1364          */
1365         tuple = SearchSysCache1(INDEXRELID,
1366                                                         ObjectIdGetDatum(RelationGetRelid(relation)));
1367         if (!HeapTupleIsValid(tuple))
1368                 elog(ERROR, "cache lookup failed for index %u",
1369                          RelationGetRelid(relation));
1370         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1371         relation->rd_indextuple = heap_copytuple(tuple);
1372         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1373         MemoryContextSwitchTo(oldcontext);
1374         ReleaseSysCache(tuple);
1375
1376         /*
1377          * Look up the index's access method, save the OID of its handler function
1378          */
1379         tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1380         if (!HeapTupleIsValid(tuple))
1381                 elog(ERROR, "cache lookup failed for access method %u",
1382                          relation->rd_rel->relam);
1383         aform = (Form_pg_am) GETSTRUCT(tuple);
1384         relation->rd_amhandler = aform->amhandler;
1385         ReleaseSysCache(tuple);
1386
1387         indnatts = RelationGetNumberOfAttributes(relation);
1388         if (indnatts != IndexRelationGetNumberOfAttributes(relation))
1389                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
1390                          RelationGetRelid(relation));
1391         indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
1392
1393         /*
1394          * Make the private context to hold index access info.  The reason we need
1395          * a context, and not just a couple of pallocs, is so that we won't leak
1396          * any subsidiary info attached to fmgr lookup records.
1397          */
1398         indexcxt = AllocSetContextCreate(CacheMemoryContext,
1399                                                                          "index info",
1400                                                                          ALLOCSET_SMALL_SIZES);
1401         relation->rd_indexcxt = indexcxt;
1402         MemoryContextCopyAndSetIdentifier(indexcxt,
1403                                                                           RelationGetRelationName(relation));
1404
1405         /*
1406          * Now we can fetch the index AM's API struct
1407          */
1408         InitIndexAmRoutine(relation);
1409
1410         /*
1411          * Allocate arrays to hold data. Opclasses are not used for included
1412          * columns, so allocate them for indnkeyatts only.
1413          */
1414         relation->rd_opfamily = (Oid *)
1415                 MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1416         relation->rd_opcintype = (Oid *)
1417                 MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1418
1419         amsupport = relation->rd_indam->amsupport;
1420         if (amsupport > 0)
1421         {
1422                 int                     nsupport = indnatts * amsupport;
1423
1424                 relation->rd_support = (RegProcedure *)
1425                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1426                 relation->rd_supportinfo = (FmgrInfo *)
1427                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1428         }
1429         else
1430         {
1431                 relation->rd_support = NULL;
1432                 relation->rd_supportinfo = NULL;
1433         }
1434
1435         relation->rd_indcollation = (Oid *)
1436                 MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1437
1438         relation->rd_indoption = (int16 *)
1439                 MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
1440
1441         /*
1442          * indcollation cannot be referenced directly through the C struct,
1443          * because it comes after the variable-width indkey field.  Must extract
1444          * the datum the hard way...
1445          */
1446         indcollDatum = fastgetattr(relation->rd_indextuple,
1447                                                            Anum_pg_index_indcollation,
1448                                                            GetPgIndexDescriptor(),
1449                                                            &isnull);
1450         Assert(!isnull);
1451         indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1452         memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
1453
1454         /*
1455          * indclass cannot be referenced directly through the C struct, because it
1456          * comes after the variable-width indkey field.  Must extract the datum
1457          * the hard way...
1458          */
1459         indclassDatum = fastgetattr(relation->rd_indextuple,
1460                                                                 Anum_pg_index_indclass,
1461                                                                 GetPgIndexDescriptor(),
1462                                                                 &isnull);
1463         Assert(!isnull);
1464         indclass = (oidvector *) DatumGetPointer(indclassDatum);
1465
1466         /*
1467          * Fill the support procedure OID array, as well as the info about
1468          * opfamilies and opclass input types.  (aminfo and supportinfo are left
1469          * as zeroes, and are filled on-the-fly when used)
1470          */
1471         IndexSupportInitialize(indclass, relation->rd_support,
1472                                                    relation->rd_opfamily, relation->rd_opcintype,
1473                                                    amsupport, indnkeyatts);
1474
1475         /*
1476          * Similarly extract indoption and copy it to the cache entry
1477          */
1478         indoptionDatum = fastgetattr(relation->rd_indextuple,
1479                                                                  Anum_pg_index_indoption,
1480                                                                  GetPgIndexDescriptor(),
1481                                                                  &isnull);
1482         Assert(!isnull);
1483         indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1484         memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
1485
1486         /*
1487          * expressions, predicate, exclusion caches will be filled later
1488          */
1489         relation->rd_indexprs = NIL;
1490         relation->rd_indpred = NIL;
1491         relation->rd_exclops = NULL;
1492         relation->rd_exclprocs = NULL;
1493         relation->rd_exclstrats = NULL;
1494         relation->rd_amcache = NULL;
1495 }
1496
1497 /*
1498  * IndexSupportInitialize
1499  *              Initializes an index's cached opclass information,
1500  *              given the index's pg_index.indclass entry.
1501  *
1502  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1503  * which are arrays allocated by the caller.
1504  *
1505  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1506  * indicate the size of the arrays it has allocated --- but in practice these
1507  * numbers must always match those obtainable from the system catalog entries
1508  * for the index and access method.
1509  */
1510 static void
1511 IndexSupportInitialize(oidvector *indclass,
1512                                            RegProcedure *indexSupport,
1513                                            Oid *opFamily,
1514                                            Oid *opcInType,
1515                                            StrategyNumber maxSupportNumber,
1516                                            AttrNumber maxAttributeNumber)
1517 {
1518         int                     attIndex;
1519
1520         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1521         {
1522                 OpClassCacheEnt *opcentry;
1523
1524                 if (!OidIsValid(indclass->values[attIndex]))
1525                         elog(ERROR, "bogus pg_index tuple");
1526
1527                 /* look up the info for this opclass, using a cache */
1528                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1529                                                                          maxSupportNumber);
1530
1531                 /* copy cached data into relcache entry */
1532                 opFamily[attIndex] = opcentry->opcfamily;
1533                 opcInType[attIndex] = opcentry->opcintype;
1534                 if (maxSupportNumber > 0)
1535                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1536                                    opcentry->supportProcs,
1537                                    maxSupportNumber * sizeof(RegProcedure));
1538         }
1539 }
1540
1541 /*
1542  * LookupOpclassInfo
1543  *
1544  * This routine maintains a per-opclass cache of the information needed
1545  * by IndexSupportInitialize().  This is more efficient than relying on
1546  * the catalog cache, because we can load all the info about a particular
1547  * opclass in a single indexscan of pg_amproc.
1548  *
1549  * The information from pg_am about expected range of support function
1550  * numbers is passed in, rather than being looked up, mainly because the
1551  * caller will have it already.
1552  *
1553  * Note there is no provision for flushing the cache.  This is OK at the
1554  * moment because there is no way to ALTER any interesting properties of an
1555  * existing opclass --- all you can do is drop it, which will result in
1556  * a useless but harmless dead entry in the cache.  To support altering
1557  * opclass membership (not the same as opfamily membership!), we'd need to
1558  * be able to flush this cache as well as the contents of relcache entries
1559  * for indexes.
1560  */
1561 static OpClassCacheEnt *
1562 LookupOpclassInfo(Oid operatorClassOid,
1563                                   StrategyNumber numSupport)
1564 {
1565         OpClassCacheEnt *opcentry;
1566         bool            found;
1567         Relation        rel;
1568         SysScanDesc scan;
1569         ScanKeyData skey[3];
1570         HeapTuple       htup;
1571         bool            indexOK;
1572
1573         if (OpClassCache == NULL)
1574         {
1575                 /* First time through: initialize the opclass cache */
1576                 HASHCTL         ctl;
1577
1578                 MemSet(&ctl, 0, sizeof(ctl));
1579                 ctl.keysize = sizeof(Oid);
1580                 ctl.entrysize = sizeof(OpClassCacheEnt);
1581                 OpClassCache = hash_create("Operator class cache", 64,
1582                                                                    &ctl, HASH_ELEM | HASH_BLOBS);
1583
1584                 /* Also make sure CacheMemoryContext exists */
1585                 if (!CacheMemoryContext)
1586                         CreateCacheMemoryContext();
1587         }
1588
1589         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1590                                                                                            (void *) &operatorClassOid,
1591                                                                                            HASH_ENTER, &found);
1592
1593         if (!found)
1594         {
1595                 /* Need to allocate memory for new entry */
1596                 opcentry->valid = false;        /* until known OK */
1597                 opcentry->numSupport = numSupport;
1598
1599                 if (numSupport > 0)
1600                         opcentry->supportProcs = (RegProcedure *)
1601                                 MemoryContextAllocZero(CacheMemoryContext,
1602                                                                            numSupport * sizeof(RegProcedure));
1603                 else
1604                         opcentry->supportProcs = NULL;
1605         }
1606         else
1607         {
1608                 Assert(numSupport == opcentry->numSupport);
1609         }
1610
1611         /*
1612          * When testing for cache-flush hazards, we intentionally disable the
1613          * operator class cache and force reloading of the info on each call. This
1614          * is helpful because we want to test the case where a cache flush occurs
1615          * while we are loading the info, and it's very hard to provoke that if
1616          * this happens only once per opclass per backend.
1617          */
1618 #if defined(CLOBBER_CACHE_ALWAYS)
1619         opcentry->valid = false;
1620 #endif
1621
1622         if (opcentry->valid)
1623                 return opcentry;
1624
1625         /*
1626          * Need to fill in new entry.
1627          *
1628          * To avoid infinite recursion during startup, force heap scans if we're
1629          * looking up info for the opclasses used by the indexes we would like to
1630          * reference here.
1631          */
1632         indexOK = criticalRelcachesBuilt ||
1633                 (operatorClassOid != OID_BTREE_OPS_OID &&
1634                  operatorClassOid != INT2_BTREE_OPS_OID);
1635
1636         /*
1637          * We have to fetch the pg_opclass row to determine its opfamily and
1638          * opcintype, which are needed to look up related operators and functions.
1639          * It'd be convenient to use the syscache here, but that probably doesn't
1640          * work while bootstrapping.
1641          */
1642         ScanKeyInit(&skey[0],
1643                                 Anum_pg_opclass_oid,
1644                                 BTEqualStrategyNumber, F_OIDEQ,
1645                                 ObjectIdGetDatum(operatorClassOid));
1646         rel = table_open(OperatorClassRelationId, AccessShareLock);
1647         scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1648                                                           NULL, 1, skey);
1649
1650         if (HeapTupleIsValid(htup = systable_getnext(scan)))
1651         {
1652                 Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1653
1654                 opcentry->opcfamily = opclassform->opcfamily;
1655                 opcentry->opcintype = opclassform->opcintype;
1656         }
1657         else
1658                 elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1659
1660         systable_endscan(scan);
1661         table_close(rel, AccessShareLock);
1662
1663         /*
1664          * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1665          * the default ones (those with lefttype = righttype = opcintype).
1666          */
1667         if (numSupport > 0)
1668         {
1669                 ScanKeyInit(&skey[0],
1670                                         Anum_pg_amproc_amprocfamily,
1671                                         BTEqualStrategyNumber, F_OIDEQ,
1672                                         ObjectIdGetDatum(opcentry->opcfamily));
1673                 ScanKeyInit(&skey[1],
1674                                         Anum_pg_amproc_amproclefttype,
1675                                         BTEqualStrategyNumber, F_OIDEQ,
1676                                         ObjectIdGetDatum(opcentry->opcintype));
1677                 ScanKeyInit(&skey[2],
1678                                         Anum_pg_amproc_amprocrighttype,
1679                                         BTEqualStrategyNumber, F_OIDEQ,
1680                                         ObjectIdGetDatum(opcentry->opcintype));
1681                 rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
1682                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1683                                                                   NULL, 3, skey);
1684
1685                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1686                 {
1687                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1688
1689                         if (amprocform->amprocnum <= 0 ||
1690                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1691                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1692                                          amprocform->amprocnum, operatorClassOid);
1693
1694                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1695                                 amprocform->amproc;
1696                 }
1697
1698                 systable_endscan(scan);
1699                 table_close(rel, AccessShareLock);
1700         }
1701
1702         opcentry->valid = true;
1703         return opcentry;
1704 }
1705
1706
1707 /*
1708  *              formrdesc
1709  *
1710  *              This is a special cut-down version of RelationBuildDesc(),
1711  *              used while initializing the relcache.
1712  *              The relation descriptor is built just from the supplied parameters,
1713  *              without actually looking at any system table entries.  We cheat
1714  *              quite a lot since we only need to work for a few basic system
1715  *              catalogs.
1716  *
1717  * The catalogs this is used for can't have constraints (except attnotnull),
1718  * default values, rules, or triggers, since we don't cope with any of that.
1719  * (Well, actually, this only matters for properties that need to be valid
1720  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1721  * these properties matter then...)
1722  *
1723  * NOTE: we assume we are already switched into CacheMemoryContext.
1724  */
1725 static void
1726 formrdesc(const char *relationName, Oid relationReltype,
1727                   bool isshared,
1728                   int natts, const FormData_pg_attribute *attrs)
1729 {
1730         Relation        relation;
1731         int                     i;
1732         bool            has_not_null;
1733
1734         /*
1735          * allocate new relation desc, clear all fields of reldesc
1736          */
1737         relation = (Relation) palloc0(sizeof(RelationData));
1738
1739         /* make sure relation is marked as having no open file yet */
1740         relation->rd_smgr = NULL;
1741
1742         /*
1743          * initialize reference count: 1 because it is nailed in cache
1744          */
1745         relation->rd_refcnt = 1;
1746
1747         /*
1748          * all entries built with this routine are nailed-in-cache; none are for
1749          * new or temp relations.
1750          */
1751         relation->rd_isnailed = true;
1752         relation->rd_createSubid = InvalidSubTransactionId;
1753         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1754         relation->rd_backend = InvalidBackendId;
1755         relation->rd_islocaltemp = false;
1756
1757         /*
1758          * initialize relation tuple form
1759          *
1760          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1761          * get us launched.  RelationCacheInitializePhase3() will read the real
1762          * data from pg_class and replace what we've done here.  Note in
1763          * particular that relowner is left as zero; this cues
1764          * RelationCacheInitializePhase3 that the real data isn't there yet.
1765          */
1766         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1767
1768         namestrcpy(&relation->rd_rel->relname, relationName);
1769         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1770         relation->rd_rel->reltype = relationReltype;
1771
1772         /*
1773          * It's important to distinguish between shared and non-shared relations,
1774          * even at bootstrap time, to make sure we know where they are stored.
1775          */
1776         relation->rd_rel->relisshared = isshared;
1777         if (isshared)
1778                 relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1779
1780         /* formrdesc is used only for permanent relations */
1781         relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1782
1783         /* ... and they're always populated, too */
1784         relation->rd_rel->relispopulated = true;
1785
1786         relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1787         relation->rd_rel->relpages = 0;
1788         relation->rd_rel->reltuples = 0;
1789         relation->rd_rel->relallvisible = 0;
1790         relation->rd_rel->relkind = RELKIND_RELATION;
1791         relation->rd_rel->relnatts = (int16) natts;
1792
1793         /*
1794          * initialize attribute tuple form
1795          *
1796          * Unlike the case with the relation tuple, this data had better be right
1797          * because it will never be replaced.  The data comes from
1798          * src/include/catalog/ headers via genbki.pl.
1799          */
1800         relation->rd_att = CreateTemplateTupleDesc(natts);
1801         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1802
1803         relation->rd_att->tdtypeid = relationReltype;
1804         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1805
1806         /*
1807          * initialize tuple desc info
1808          */
1809         has_not_null = false;
1810         for (i = 0; i < natts; i++)
1811         {
1812                 memcpy(TupleDescAttr(relation->rd_att, i),
1813                            &attrs[i],
1814                            ATTRIBUTE_FIXED_PART_SIZE);
1815                 has_not_null |= attrs[i].attnotnull;
1816                 /* make sure attcacheoff is valid */
1817                 TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
1818         }
1819
1820         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1821         TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
1822
1823         /* mark not-null status */
1824         if (has_not_null)
1825         {
1826                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1827
1828                 constr->has_not_null = true;
1829                 relation->rd_att->constr = constr;
1830         }
1831
1832         /*
1833          * initialize relation id from info in att array (my, this is ugly)
1834          */
1835         RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
1836
1837         /*
1838          * All relations made with formrdesc are mapped.  This is necessarily so
1839          * because there is no other way to know what filenode they currently
1840          * have.  In bootstrap mode, add them to the initial relation mapper data,
1841          * specifying that the initial filenode is the same as the OID.
1842          */
1843         relation->rd_rel->relfilenode = InvalidOid;
1844         if (IsBootstrapProcessingMode())
1845                 RelationMapUpdateMap(RelationGetRelid(relation),
1846                                                          RelationGetRelid(relation),
1847                                                          isshared, true);
1848
1849         /*
1850          * initialize the relation lock manager information
1851          */
1852         RelationInitLockInfo(relation); /* see lmgr.c */
1853
1854         /*
1855          * initialize physical addressing information for the relation
1856          */
1857         RelationInitPhysicalAddr(relation);
1858
1859         /*
1860          * initialize the rel-has-index flag, using hardwired knowledge
1861          */
1862         if (IsBootstrapProcessingMode())
1863         {
1864                 /* In bootstrap mode, we have no indexes */
1865                 relation->rd_rel->relhasindex = false;
1866         }
1867         else
1868         {
1869                 /* Otherwise, all the rels formrdesc is used for have indexes */
1870                 relation->rd_rel->relhasindex = true;
1871         }
1872
1873         /*
1874          * add new reldesc to relcache
1875          */
1876         RelationCacheInsert(relation, false);
1877
1878         /* It's fully valid */
1879         relation->rd_isvalid = true;
1880 }
1881
1882
1883 /* ----------------------------------------------------------------
1884  *                               Relation Descriptor Lookup Interface
1885  * ----------------------------------------------------------------
1886  */
1887
1888 /*
1889  *              RelationIdGetRelation
1890  *
1891  *              Lookup a reldesc by OID; make one if not already in cache.
1892  *
1893  *              Returns NULL if no pg_class row could be found for the given relid
1894  *              (suggesting we are trying to access a just-deleted relation).
1895  *              Any other error is reported via elog.
1896  *
1897  *              NB: caller should already have at least AccessShareLock on the
1898  *              relation ID, else there are nasty race conditions.
1899  *
1900  *              NB: relation ref count is incremented, or set to 1 if new entry.
1901  *              Caller should eventually decrement count.  (Usually,
1902  *              that happens by calling RelationClose().)
1903  */
1904 Relation
1905 RelationIdGetRelation(Oid relationId)
1906 {
1907         Relation        rd;
1908
1909         /* Make sure we're in an xact, even if this ends up being a cache hit */
1910         Assert(IsTransactionState());
1911
1912         /*
1913          * first try to find reldesc in the cache
1914          */
1915         RelationIdCacheLookup(relationId, rd);
1916
1917         if (RelationIsValid(rd))
1918         {
1919                 RelationIncrementReferenceCount(rd);
1920                 /* revalidate cache entry if necessary */
1921                 if (!rd->rd_isvalid)
1922                 {
1923                         /*
1924                          * Indexes only have a limited number of possible schema changes,
1925                          * and we don't want to use the full-blown procedure because it's
1926                          * a headache for indexes that reload itself depends on.
1927                          */
1928                         if (rd->rd_rel->relkind == RELKIND_INDEX ||
1929                                 rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
1930                                 RelationReloadIndexInfo(rd);
1931                         else
1932                                 RelationClearRelation(rd, true);
1933
1934                         /*
1935                          * Normally entries need to be valid here, but before the relcache
1936                          * has been initialized, not enough infrastructure exists to
1937                          * perform pg_class lookups. The structure of such entries doesn't
1938                          * change, but we still want to update the rd_rel entry. So
1939                          * rd_isvalid = false is left in place for a later lookup.
1940                          */
1941                         Assert(rd->rd_isvalid ||
1942                                    (rd->rd_isnailed && !criticalRelcachesBuilt));
1943                 }
1944                 return rd;
1945         }
1946
1947         /*
1948          * no reldesc in the cache, so have RelationBuildDesc() build one and add
1949          * it.
1950          */
1951         rd = RelationBuildDesc(relationId, true);
1952         if (RelationIsValid(rd))
1953                 RelationIncrementReferenceCount(rd);
1954         return rd;
1955 }
1956
1957 /* ----------------------------------------------------------------
1958  *                              cache invalidation support routines
1959  * ----------------------------------------------------------------
1960  */
1961
1962 /*
1963  * RelationIncrementReferenceCount
1964  *              Increments relation reference count.
1965  *
1966  * Note: bootstrap mode has its own weird ideas about relation refcount
1967  * behavior; we ought to fix it someday, but for now, just disable
1968  * reference count ownership tracking in bootstrap mode.
1969  */
1970 void
1971 RelationIncrementReferenceCount(Relation rel)
1972 {
1973         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1974         rel->rd_refcnt += 1;
1975         if (!IsBootstrapProcessingMode())
1976                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1977 }
1978
1979 /*
1980  * RelationDecrementReferenceCount
1981  *              Decrements relation reference count.
1982  */
1983 void
1984 RelationDecrementReferenceCount(Relation rel)
1985 {
1986         Assert(rel->rd_refcnt > 0);
1987         rel->rd_refcnt -= 1;
1988         if (!IsBootstrapProcessingMode())
1989                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1990 }
1991
1992 /*
1993  * RelationClose - close an open relation
1994  *
1995  *      Actually, we just decrement the refcount.
1996  *
1997  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1998  *      will be freed as soon as their refcount goes to zero.  In combination
1999  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
2000  *      to catch references to already-released relcache entries.  It slows
2001  *      things down quite a bit, however.
2002  */
2003 void
2004 RelationClose(Relation relation)
2005 {
2006         /* Note: no locking manipulations needed */
2007         RelationDecrementReferenceCount(relation);
2008
2009 #ifdef RELCACHE_FORCE_RELEASE
2010         if (RelationHasReferenceCountZero(relation) &&
2011                 relation->rd_createSubid == InvalidSubTransactionId &&
2012                 relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
2013                 RelationClearRelation(relation, false);
2014 #endif
2015 }
2016
2017 /*
2018  * RelationReloadIndexInfo - reload minimal information for an open index
2019  *
2020  *      This function is used only for indexes.  A relcache inval on an index
2021  *      can mean that its pg_class or pg_index row changed.  There are only
2022  *      very limited changes that are allowed to an existing index's schema,
2023  *      so we can update the relcache entry without a complete rebuild; which
2024  *      is fortunate because we can't rebuild an index entry that is "nailed"
2025  *      and/or in active use.  We support full replacement of the pg_class row,
2026  *      as well as updates of a few simple fields of the pg_index row.
2027  *
2028  *      We can't necessarily reread the catalog rows right away; we might be
2029  *      in a failed transaction when we receive the SI notification.  If so,
2030  *      RelationClearRelation just marks the entry as invalid by setting
2031  *      rd_isvalid to false.  This routine is called to fix the entry when it
2032  *      is next needed.
2033  *
2034  *      We assume that at the time we are called, we have at least AccessShareLock
2035  *      on the target index.  (Note: in the calls from RelationClearRelation,
2036  *      this is legitimate because we know the rel has positive refcount.)
2037  *
2038  *      If the target index is an index on pg_class or pg_index, we'd better have
2039  *      previously gotten at least AccessShareLock on its underlying catalog,
2040  *      else we are at risk of deadlock against someone trying to exclusive-lock
2041  *      the heap and index in that order.  This is ensured in current usage by
2042  *      only applying this to indexes being opened or having positive refcount.
2043  */
2044 static void
2045 RelationReloadIndexInfo(Relation relation)
2046 {
2047         bool            indexOK;
2048         HeapTuple       pg_class_tuple;
2049         Form_pg_class relp;
2050
2051         /* Should be called only for invalidated indexes */
2052         Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
2053                         relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2054                    !relation->rd_isvalid);
2055
2056         /* Ensure it's closed at smgr level */
2057         RelationCloseSmgr(relation);
2058
2059         /* Must free any AM cached data upon relcache flush */
2060         if (relation->rd_amcache)
2061                 pfree(relation->rd_amcache);
2062         relation->rd_amcache = NULL;
2063
2064         /*
2065          * If it's a shared index, we might be called before backend startup has
2066          * finished selecting a database, in which case we have no way to read
2067          * pg_class yet.  However, a shared index can never have any significant
2068          * schema updates, so it's okay to ignore the invalidation signal.  Just
2069          * mark it valid and return without doing anything more.
2070          */
2071         if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
2072         {
2073                 relation->rd_isvalid = true;
2074                 return;
2075         }
2076
2077         /*
2078          * Read the pg_class row
2079          *
2080          * Don't try to use an indexscan of pg_class_oid_index to reload the info
2081          * for pg_class_oid_index ...
2082          */
2083         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2084         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
2085         if (!HeapTupleIsValid(pg_class_tuple))
2086                 elog(ERROR, "could not find pg_class tuple for index %u",
2087                          RelationGetRelid(relation));
2088         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2089         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2090         /* Reload reloptions in case they changed */
2091         if (relation->rd_options)
2092                 pfree(relation->rd_options);
2093         RelationParseRelOptions(relation, pg_class_tuple);
2094         /* done with pg_class tuple */
2095         heap_freetuple(pg_class_tuple);
2096         /* We must recalculate physical address in case it changed */
2097         RelationInitPhysicalAddr(relation);
2098
2099         /*
2100          * For a non-system index, there are fields of the pg_index row that are
2101          * allowed to change, so re-read that row and update the relcache entry.
2102          * Most of the info derived from pg_index (such as support function lookup
2103          * info) cannot change, and indeed the whole point of this routine is to
2104          * update the relcache entry without clobbering that data; so wholesale
2105          * replacement is not appropriate.
2106          */
2107         if (!IsSystemRelation(relation))
2108         {
2109                 HeapTuple       tuple;
2110                 Form_pg_index index;
2111
2112                 tuple = SearchSysCache1(INDEXRELID,
2113                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2114                 if (!HeapTupleIsValid(tuple))
2115                         elog(ERROR, "cache lookup failed for index %u",
2116                                  RelationGetRelid(relation));
2117                 index = (Form_pg_index) GETSTRUCT(tuple);
2118
2119                 /*
2120                  * Basically, let's just copy all the bool fields.  There are one or
2121                  * two of these that can't actually change in the current code, but
2122                  * it's not worth it to track exactly which ones they are.  None of
2123                  * the array fields are allowed to change, though.
2124                  */
2125                 relation->rd_index->indisunique = index->indisunique;
2126                 relation->rd_index->indisprimary = index->indisprimary;
2127                 relation->rd_index->indisexclusion = index->indisexclusion;
2128                 relation->rd_index->indimmediate = index->indimmediate;
2129                 relation->rd_index->indisclustered = index->indisclustered;
2130                 relation->rd_index->indisvalid = index->indisvalid;
2131                 relation->rd_index->indcheckxmin = index->indcheckxmin;
2132                 relation->rd_index->indisready = index->indisready;
2133                 relation->rd_index->indislive = index->indislive;
2134
2135                 /* Copy xmin too, as that is needed to make sense of indcheckxmin */
2136                 HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
2137                                                            HeapTupleHeaderGetXmin(tuple->t_data));
2138
2139                 ReleaseSysCache(tuple);
2140         }
2141
2142         /* Okay, now it's valid again */
2143         relation->rd_isvalid = true;
2144 }
2145
2146 /*
2147  * RelationReloadNailed - reload minimal information for nailed relations.
2148  *
2149  * The structure of a nailed relation can never change (which is good, because
2150  * we rely on knowing their structure to be able to read catalog content). But
2151  * some parts, e.g. pg_class.relfrozenxid, are still important to have
2152  * accurate content for. Therefore those need to be reloaded after the arrival
2153  * of invalidations.
2154  */
2155 static void
2156 RelationReloadNailed(Relation relation)
2157 {
2158         Assert(relation->rd_isnailed);
2159
2160         /*
2161          * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
2162          * mapping changed.
2163          */
2164         RelationInitPhysicalAddr(relation);
2165
2166         /* flag as needing to be revalidated */
2167         relation->rd_isvalid = false;
2168
2169         /*
2170          * Can only reread catalog contents if in a transaction.  If the relation
2171          * is currently open (not counting the nailed refcount), do so
2172          * immediately. Otherwise we've already marked the entry as possibly
2173          * invalid, and it'll be fixed when next opened.
2174          */
2175         if (!IsTransactionState() || relation->rd_refcnt <= 1)
2176                 return;
2177
2178         if (relation->rd_rel->relkind == RELKIND_INDEX)
2179         {
2180                 /*
2181                  * If it's a nailed-but-not-mapped index, then we need to re-read the
2182                  * pg_class row to see if its relfilenode changed.
2183                  */
2184                 RelationReloadIndexInfo(relation);
2185         }
2186         else
2187         {
2188                 /*
2189                  * Reload a non-index entry.  We can't easily do so if relcaches
2190                  * aren't yet built, but that's fine because at that stage the
2191                  * attributes that need to be current (like relfrozenxid) aren't yet
2192                  * accessed.  To ensure the entry will later be revalidated, we leave
2193                  * it in invalid state, but allow use (cf. RelationIdGetRelation()).
2194                  */
2195                 if (criticalRelcachesBuilt)
2196                 {
2197                         HeapTuple       pg_class_tuple;
2198                         Form_pg_class relp;
2199
2200                         /*
2201                          * NB: Mark the entry as valid before starting to scan, to avoid
2202                          * self-recursion when re-building pg_class.
2203                          */
2204                         relation->rd_isvalid = true;
2205
2206                         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
2207                                                                                         true, false);
2208                         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2209                         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2210                         heap_freetuple(pg_class_tuple);
2211
2212                         /*
2213                          * Again mark as valid, to protect against concurrently arriving
2214                          * invalidations.
2215                          */
2216                         relation->rd_isvalid = true;
2217                 }
2218         }
2219 }
2220
2221 /*
2222  * RelationDestroyRelation
2223  *
2224  *      Physically delete a relation cache entry and all subsidiary data.
2225  *      Caller must already have unhooked the entry from the hash table.
2226  */
2227 static void
2228 RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2229 {
2230         Assert(RelationHasReferenceCountZero(relation));
2231
2232         /*
2233          * Make sure smgr and lower levels close the relation's files, if they
2234          * weren't closed already.  (This was probably done by caller, but let's
2235          * just be real sure.)
2236          */
2237         RelationCloseSmgr(relation);
2238
2239         /*
2240          * Free all the subsidiary data structures of the relcache entry, then the
2241          * entry itself.
2242          */
2243         if (relation->rd_rel)
2244                 pfree(relation->rd_rel);
2245         /* can't use DecrTupleDescRefCount here */
2246         Assert(relation->rd_att->tdrefcount > 0);
2247         if (--relation->rd_att->tdrefcount == 0)
2248         {
2249                 /*
2250                  * If we Rebuilt a relcache entry during a transaction then its
2251                  * possible we did that because the TupDesc changed as the result of
2252                  * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
2253                  * possible someone copied that TupDesc, in which case the copy would
2254                  * point to free'd memory. So if we rebuild an entry we keep the
2255                  * TupDesc around until end of transaction, to be safe.
2256                  */
2257                 if (remember_tupdesc)
2258                         RememberToFreeTupleDescAtEOX(relation->rd_att);
2259                 else
2260                         FreeTupleDesc(relation->rd_att);
2261         }
2262         FreeTriggerDesc(relation->trigdesc);
2263         list_free_deep(relation->rd_fkeylist);
2264         list_free(relation->rd_indexlist);
2265         bms_free(relation->rd_indexattr);
2266         bms_free(relation->rd_keyattr);
2267         bms_free(relation->rd_pkattr);
2268         bms_free(relation->rd_idattr);
2269         if (relation->rd_pubactions)
2270                 pfree(relation->rd_pubactions);
2271         if (relation->rd_options)
2272                 pfree(relation->rd_options);
2273         if (relation->rd_indextuple)
2274                 pfree(relation->rd_indextuple);
2275         if (relation->rd_indexcxt)
2276                 MemoryContextDelete(relation->rd_indexcxt);
2277         if (relation->rd_rulescxt)
2278                 MemoryContextDelete(relation->rd_rulescxt);
2279         if (relation->rd_rsdesc)
2280                 MemoryContextDelete(relation->rd_rsdesc->rscxt);
2281         if (relation->rd_partkeycxt)
2282                 MemoryContextDelete(relation->rd_partkeycxt);
2283         if (relation->rd_pdcxt)
2284                 MemoryContextDelete(relation->rd_pdcxt);
2285         if (relation->rd_partcheck)
2286                 pfree(relation->rd_partcheck);
2287         if (relation->rd_fdwroutine)
2288                 pfree(relation->rd_fdwroutine);
2289         pfree(relation);
2290 }
2291
2292 /*
2293  * RelationClearRelation
2294  *
2295  *       Physically blow away a relation cache entry, or reset it and rebuild
2296  *       it from scratch (that is, from catalog entries).  The latter path is
2297  *       used when we are notified of a change to an open relation (one with
2298  *       refcount > 0).
2299  *
2300  *       NB: when rebuilding, we'd better hold some lock on the relation,
2301  *       else the catalog data we need to read could be changing under us.
2302  *       Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2303  *       a sinval reset could happen while we're accessing the catalogs, and
2304  *       the rel would get blown away underneath us by RelationCacheInvalidate
2305  *       if it has zero refcnt.
2306  *
2307  *       The "rebuild" parameter is redundant in current usage because it has
2308  *       to match the relation's refcnt status, but we keep it as a crosscheck
2309  *       that we're doing what the caller expects.
2310  */
2311 static void
2312 RelationClearRelation(Relation relation, bool rebuild)
2313 {
2314         /*
2315          * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2316          * course it would be an equally bad idea to blow away one with nonzero
2317          * refcnt, since that would leave someone somewhere with a dangling
2318          * pointer.  All callers are expected to have verified that this holds.
2319          */
2320         Assert(rebuild ?
2321                    !RelationHasReferenceCountZero(relation) :
2322                    RelationHasReferenceCountZero(relation));
2323
2324         /*
2325          * Make sure smgr and lower levels close the relation's files, if they
2326          * weren't closed already.  If the relation is not getting deleted, the
2327          * next smgr access should reopen the files automatically.  This ensures
2328          * that the low-level file access state is updated after, say, a vacuum
2329          * truncation.
2330          */
2331         RelationCloseSmgr(relation);
2332
2333         /*
2334          * Treat nailed-in system relations separately, they always need to be
2335          * accessible, so we can't blow them away.
2336          */
2337         if (relation->rd_isnailed)
2338         {
2339                 RelationReloadNailed(relation);
2340                 return;
2341         }
2342
2343         /*
2344          * Even non-system indexes should not be blown away if they are open and
2345          * have valid index support information.  This avoids problems with active
2346          * use of the index support information.  As with nailed indexes, we
2347          * re-read the pg_class row to handle possible physical relocation of the
2348          * index, and we check for pg_index updates too.
2349          */
2350         if ((relation->rd_rel->relkind == RELKIND_INDEX ||
2351                  relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2352                 relation->rd_refcnt > 0 &&
2353                 relation->rd_indexcxt != NULL)
2354         {
2355                 relation->rd_isvalid = false;   /* needs to be revalidated */
2356                 if (IsTransactionState())
2357                         RelationReloadIndexInfo(relation);
2358                 return;
2359         }
2360
2361         /* Mark it invalid until we've finished rebuild */
2362         relation->rd_isvalid = false;
2363
2364         /*
2365          * If we're really done with the relcache entry, blow it away. But if
2366          * someone is still using it, reconstruct the whole deal without moving
2367          * the physical RelationData record (so that the someone's pointer is
2368          * still valid).
2369          */
2370         if (!rebuild)
2371         {
2372                 /* Remove it from the hash table */
2373                 RelationCacheDelete(relation);
2374
2375                 /* And release storage */
2376                 RelationDestroyRelation(relation, false);
2377         }
2378         else if (!IsTransactionState())
2379         {
2380                 /*
2381                  * If we're not inside a valid transaction, we can't do any catalog
2382                  * access so it's not possible to rebuild yet.  Just exit, leaving
2383                  * rd_isvalid = false so that the rebuild will occur when the entry is
2384                  * next opened.
2385                  *
2386                  * Note: it's possible that we come here during subtransaction abort,
2387                  * and the reason for wanting to rebuild is that the rel is open in
2388                  * the outer transaction.  In that case it might seem unsafe to not
2389                  * rebuild immediately, since whatever code has the rel already open
2390                  * will keep on using the relcache entry as-is.  However, in such a
2391                  * case the outer transaction should be holding a lock that's
2392                  * sufficient to prevent any significant change in the rel's schema,
2393                  * so the existing entry contents should be good enough for its
2394                  * purposes; at worst we might be behind on statistics updates or the
2395                  * like.  (See also CheckTableNotInUse() and its callers.)      These same
2396                  * remarks also apply to the cases above where we exit without having
2397                  * done RelationReloadIndexInfo() yet.
2398                  */
2399                 return;
2400         }
2401         else
2402         {
2403                 /*
2404                  * Our strategy for rebuilding an open relcache entry is to build a
2405                  * new entry from scratch, swap its contents with the old entry, and
2406                  * finally delete the new entry (along with any infrastructure swapped
2407                  * over from the old entry).  This is to avoid trouble in case an
2408                  * error causes us to lose control partway through.  The old entry
2409                  * will still be marked !rd_isvalid, so we'll try to rebuild it again
2410                  * on next access.  Meanwhile it's not any less valid than it was
2411                  * before, so any code that might expect to continue accessing it
2412                  * isn't hurt by the rebuild failure.  (Consider for example a
2413                  * subtransaction that ALTERs a table and then gets canceled partway
2414                  * through the cache entry rebuild.  The outer transaction should
2415                  * still see the not-modified cache entry as valid.)  The worst
2416                  * consequence of an error is leaking the necessarily-unreferenced new
2417                  * entry, and this shouldn't happen often enough for that to be a big
2418                  * problem.
2419                  *
2420                  * When rebuilding an open relcache entry, we must preserve ref count,
2421                  * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
2422                  * attempt to preserve the pg_class entry (rd_rel), tupledesc,
2423                  * rewrite-rule, partition key, and partition descriptor substructures
2424                  * in place, because various places assume that these structures won't
2425                  * move while they are working with an open relcache entry.  (Note:
2426                  * the refcount mechanism for tupledescs might someday allow us to
2427                  * remove this hack for the tupledesc.)
2428                  *
2429                  * Note that this process does not touch CurrentResourceOwner; which
2430                  * is good because whatever ref counts the entry may have do not
2431                  * necessarily belong to that resource owner.
2432                  */
2433                 Relation        newrel;
2434                 Oid                     save_relid = RelationGetRelid(relation);
2435                 bool            keep_tupdesc;
2436                 bool            keep_rules;
2437                 bool            keep_policies;
2438                 bool            keep_partkey;
2439                 bool            keep_partdesc;
2440
2441                 /* Build temporary entry, but don't link it into hashtable */
2442                 newrel = RelationBuildDesc(save_relid, false);
2443                 if (newrel == NULL)
2444                 {
2445                         /*
2446                          * We can validly get here, if we're using a historic snapshot in
2447                          * which a relation, accessed from outside logical decoding, is
2448                          * still invisible. In that case it's fine to just mark the
2449                          * relation as invalid and return - it'll fully get reloaded by
2450                          * the cache reset at the end of logical decoding (or at the next
2451                          * access).  During normal processing we don't want to ignore this
2452                          * case as it shouldn't happen there, as explained below.
2453                          */
2454                         if (HistoricSnapshotActive())
2455                                 return;
2456
2457                         /*
2458                          * This shouldn't happen as dropping a relation is intended to be
2459                          * impossible if still referenced (cf. CheckTableNotInUse()). But
2460                          * if we get here anyway, we can't just delete the relcache entry,
2461                          * as it possibly could get accessed later (as e.g. the error
2462                          * might get trapped and handled via a subtransaction rollback).
2463                          */
2464                         elog(ERROR, "relation %u deleted while still in use", save_relid);
2465                 }
2466
2467                 keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
2468                 keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2469                 keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2470                 /* partkey is immutable once set up, so we can always keep it */
2471                 keep_partkey = (relation->rd_partkey != NULL);
2472                 keep_partdesc = equalPartitionDescs(relation->rd_partkey,
2473                                                                                         relation->rd_partdesc,
2474                                                                                         newrel->rd_partdesc);
2475
2476                 /*
2477                  * Perform swapping of the relcache entry contents.  Within this
2478                  * process the old entry is momentarily invalid, so there *must* be no
2479                  * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
2480                  * all-in-line code for safety.
2481                  *
2482                  * Since the vast majority of fields should be swapped, our method is
2483                  * to swap the whole structures and then re-swap those few fields we
2484                  * didn't want swapped.
2485                  */
2486 #define SWAPFIELD(fldtype, fldname) \
2487                 do { \
2488                         fldtype _tmp = newrel->fldname; \
2489                         newrel->fldname = relation->fldname; \
2490                         relation->fldname = _tmp; \
2491                 } while (0)
2492
2493                 /* swap all Relation struct fields */
2494                 {
2495                         RelationData tmpstruct;
2496
2497                         memcpy(&tmpstruct, newrel, sizeof(RelationData));
2498                         memcpy(newrel, relation, sizeof(RelationData));
2499                         memcpy(relation, &tmpstruct, sizeof(RelationData));
2500                 }
2501
2502                 /* rd_smgr must not be swapped, due to back-links from smgr level */
2503                 SWAPFIELD(SMgrRelation, rd_smgr);
2504                 /* rd_refcnt must be preserved */
2505                 SWAPFIELD(int, rd_refcnt);
2506                 /* isnailed shouldn't change */
2507                 Assert(newrel->rd_isnailed == relation->rd_isnailed);
2508                 /* creation sub-XIDs must be preserved */
2509                 SWAPFIELD(SubTransactionId, rd_createSubid);
2510                 SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2511                 /* un-swap rd_rel pointers, swap contents instead */
2512                 SWAPFIELD(Form_pg_class, rd_rel);
2513                 /* ... but actually, we don't have to update newrel->rd_rel */
2514                 memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2515                 /* preserve old tupledesc, rules, policies if no logical change */
2516                 if (keep_tupdesc)
2517                         SWAPFIELD(TupleDesc, rd_att);
2518                 if (keep_rules)
2519                 {
2520                         SWAPFIELD(RuleLock *, rd_rules);
2521                         SWAPFIELD(MemoryContext, rd_rulescxt);
2522                 }
2523                 if (keep_policies)
2524                         SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2525                 /* toast OID override must be preserved */
2526                 SWAPFIELD(Oid, rd_toastoid);
2527                 /* pgstat_info must be preserved */
2528                 SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2529                 /* preserve old partitioning info if no logical change */
2530                 if (keep_partkey)
2531                 {
2532                         SWAPFIELD(PartitionKey, rd_partkey);
2533                         SWAPFIELD(MemoryContext, rd_partkeycxt);
2534                 }
2535                 if (keep_partdesc)
2536                 {
2537                         SWAPFIELD(PartitionDesc, rd_partdesc);
2538                         SWAPFIELD(MemoryContext, rd_pdcxt);
2539                 }
2540
2541 #undef SWAPFIELD
2542
2543                 /* And now we can throw away the temporary entry */
2544                 RelationDestroyRelation(newrel, !keep_tupdesc);
2545         }
2546 }
2547
2548 /*
2549  * RelationFlushRelation
2550  *
2551  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
2552  *       This is used when we receive a cache invalidation event for the rel.
2553  */
2554 static void
2555 RelationFlushRelation(Relation relation)
2556 {
2557         if (relation->rd_createSubid != InvalidSubTransactionId ||
2558                 relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2559         {
2560                 /*
2561                  * New relcache entries are always rebuilt, not flushed; else we'd
2562                  * forget the "new" status of the relation, which is a useful
2563                  * optimization to have.  Ditto for the new-relfilenode status.
2564                  *
2565                  * The rel could have zero refcnt here, so temporarily increment the
2566                  * refcnt to ensure it's safe to rebuild it.  We can assume that the
2567                  * current transaction has some lock on the rel already.
2568                  */
2569                 RelationIncrementReferenceCount(relation);
2570                 RelationClearRelation(relation, true);
2571                 RelationDecrementReferenceCount(relation);
2572         }
2573         else
2574         {
2575                 /*
2576                  * Pre-existing rels can be dropped from the relcache if not open.
2577                  */
2578                 bool            rebuild = !RelationHasReferenceCountZero(relation);
2579
2580                 RelationClearRelation(relation, rebuild);
2581         }
2582 }
2583
2584 /*
2585  * RelationForgetRelation - unconditionally remove a relcache entry
2586  *
2587  *                 External interface for destroying a relcache entry when we
2588  *                 drop the relation.
2589  */
2590 void
2591 RelationForgetRelation(Oid rid)
2592 {
2593         Relation        relation;
2594
2595         RelationIdCacheLookup(rid, relation);
2596
2597         if (!PointerIsValid(relation))
2598                 return;                                 /* not in cache, nothing to do */
2599
2600         if (!RelationHasReferenceCountZero(relation))
2601                 elog(ERROR, "relation %u is still open", rid);
2602
2603         /* Unconditionally destroy the relcache entry */
2604         RelationClearRelation(relation, false);
2605 }
2606
2607 /*
2608  *              RelationCacheInvalidateEntry
2609  *
2610  *              This routine is invoked for SI cache flush messages.
2611  *
2612  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2613  * already determined that the relid belongs to our database or is a shared
2614  * relation.)
2615  *
2616  * We used to skip local relations, on the grounds that they could
2617  * not be targets of cross-backend SI update messages; but it seems
2618  * safer to process them, so that our *own* SI update messages will
2619  * have the same effects during CommandCounterIncrement for both
2620  * local and nonlocal relations.
2621  */
2622 void
2623 RelationCacheInvalidateEntry(Oid relationId)
2624 {
2625         Relation        relation;
2626
2627         RelationIdCacheLookup(relationId, relation);
2628
2629         if (PointerIsValid(relation))
2630         {
2631                 relcacheInvalsReceived++;
2632                 RelationFlushRelation(relation);
2633         }
2634 }
2635
2636 /*
2637  * RelationCacheInvalidate
2638  *       Blow away cached relation descriptors that have zero reference counts,
2639  *       and rebuild those with positive reference counts.  Also reset the smgr
2640  *       relation cache and re-read relation mapping data.
2641  *
2642  *       This is currently used only to recover from SI message buffer overflow,
2643  *       so we do not touch new-in-transaction relations; they cannot be targets
2644  *       of cross-backend SI updates (and our own updates now go through a
2645  *       separate linked list that isn't limited by the SI message buffer size).
2646  *       Likewise, we need not discard new-relfilenode-in-transaction hints,
2647  *       since any invalidation of those would be a local event.
2648  *
2649  *       We do this in two phases: the first pass deletes deletable items, and
2650  *       the second one rebuilds the rebuildable items.  This is essential for
2651  *       safety, because hash_seq_search only copes with concurrent deletion of
2652  *       the element it is currently visiting.  If a second SI overflow were to
2653  *       occur while we are walking the table, resulting in recursive entry to
2654  *       this routine, we could crash because the inner invocation blows away
2655  *       the entry next to be visited by the outer scan.  But this way is OK,
2656  *       because (a) during the first pass we won't process any more SI messages,
2657  *       so hash_seq_search will complete safely; (b) during the second pass we
2658  *       only hold onto pointers to nondeletable entries.
2659  *
2660  *       The two-phase approach also makes it easy to update relfilenodes for
2661  *       mapped relations before we do anything else, and to ensure that the
2662  *       second pass processes nailed-in-cache items before other nondeletable
2663  *       items.  This should ensure that system catalogs are up to date before
2664  *       we attempt to use them to reload information about other open relations.
2665  */
2666 void
2667 RelationCacheInvalidate(void)
2668 {
2669         HASH_SEQ_STATUS status;
2670         RelIdCacheEnt *idhentry;
2671         Relation        relation;
2672         List       *rebuildFirstList = NIL;
2673         List       *rebuildList = NIL;
2674         ListCell   *l;
2675
2676         /*
2677          * Reload relation mapping data before starting to reconstruct cache.
2678          */
2679         RelationMapInvalidateAll();
2680
2681         /* Phase 1 */
2682         hash_seq_init(&status, RelationIdCache);
2683
2684         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2685         {
2686                 relation = idhentry->reldesc;
2687
2688                 /* Must close all smgr references to avoid leaving dangling ptrs */
2689                 RelationCloseSmgr(relation);
2690
2691                 /*
2692                  * Ignore new relations; no other backend will manipulate them before
2693                  * we commit.  Likewise, before replacing a relation's relfilenode, we
2694                  * shall have acquired AccessExclusiveLock and drained any applicable
2695                  * pending invalidations.
2696                  */
2697                 if (relation->rd_createSubid != InvalidSubTransactionId ||
2698                         relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2699                         continue;
2700
2701                 relcacheInvalsReceived++;
2702
2703                 if (RelationHasReferenceCountZero(relation))
2704                 {
2705                         /* Delete this entry immediately */
2706                         Assert(!relation->rd_isnailed);
2707                         RelationClearRelation(relation, false);
2708                 }
2709                 else
2710                 {
2711                         /*
2712                          * If it's a mapped relation, immediately update its rd_node in
2713                          * case its relfilenode changed.  We must do this during phase 1
2714                          * in case the relation is consulted during rebuild of other
2715                          * relcache entries in phase 2.  It's safe since consulting the
2716                          * map doesn't involve any access to relcache entries.
2717                          */
2718                         if (RelationIsMapped(relation))
2719                                 RelationInitPhysicalAddr(relation);
2720
2721                         /*
2722                          * Add this entry to list of stuff to rebuild in second pass.
2723                          * pg_class goes to the front of rebuildFirstList while
2724                          * pg_class_oid_index goes to the back of rebuildFirstList, so
2725                          * they are done first and second respectively.  Other nailed
2726                          * relations go to the front of rebuildList, so they'll be done
2727                          * next in no particular order; and everything else goes to the
2728                          * back of rebuildList.
2729                          */
2730                         if (RelationGetRelid(relation) == RelationRelationId)
2731                                 rebuildFirstList = lcons(relation, rebuildFirstList);
2732                         else if (RelationGetRelid(relation) == ClassOidIndexId)
2733                                 rebuildFirstList = lappend(rebuildFirstList, relation);
2734                         else if (relation->rd_isnailed)
2735                                 rebuildList = lcons(relation, rebuildList);
2736                         else
2737                                 rebuildList = lappend(rebuildList, relation);
2738                 }
2739         }
2740
2741         /*
2742          * Now zap any remaining smgr cache entries.  This must happen before we
2743          * start to rebuild entries, since that may involve catalog fetches which
2744          * will re-open catalog files.
2745          */
2746         smgrcloseall();
2747
2748         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
2749         foreach(l, rebuildFirstList)
2750         {
2751                 relation = (Relation) lfirst(l);
2752                 RelationClearRelation(relation, true);
2753         }
2754         list_free(rebuildFirstList);
2755         foreach(l, rebuildList)
2756         {
2757                 relation = (Relation) lfirst(l);
2758                 RelationClearRelation(relation, true);
2759         }
2760         list_free(rebuildList);
2761 }
2762
2763 /*
2764  * RelationCloseSmgrByOid - close a relcache entry's smgr link
2765  *
2766  * Needed in some cases where we are changing a relation's physical mapping.
2767  * The link will be automatically reopened on next use.
2768  */
2769 void
2770 RelationCloseSmgrByOid(Oid relationId)
2771 {
2772         Relation        relation;
2773
2774         RelationIdCacheLookup(relationId, relation);
2775
2776         if (!PointerIsValid(relation))
2777                 return;                                 /* not in cache, nothing to do */
2778
2779         RelationCloseSmgr(relation);
2780 }
2781
2782 static void
2783 RememberToFreeTupleDescAtEOX(TupleDesc td)
2784 {
2785         if (EOXactTupleDescArray == NULL)
2786         {
2787                 MemoryContext oldcxt;
2788
2789                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2790
2791                 EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
2792                 EOXactTupleDescArrayLen = 16;
2793                 NextEOXactTupleDescNum = 0;
2794                 MemoryContextSwitchTo(oldcxt);
2795         }
2796         else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
2797         {
2798                 int32           newlen = EOXactTupleDescArrayLen * 2;
2799
2800                 Assert(EOXactTupleDescArrayLen > 0);
2801
2802                 EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
2803                                                                                                           newlen * sizeof(TupleDesc));
2804                 EOXactTupleDescArrayLen = newlen;
2805         }
2806
2807         EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
2808 }
2809
2810 /*
2811  * AtEOXact_RelationCache
2812  *
2813  *      Clean up the relcache at main-transaction commit or abort.
2814  *
2815  * Note: this must be called *before* processing invalidation messages.
2816  * In the case of abort, we don't want to try to rebuild any invalidated
2817  * cache entries (since we can't safely do database accesses).  Therefore
2818  * we must reset refcnts before handling pending invalidations.
2819  *
2820  * As of PostgreSQL 8.1, relcache refcnts should get released by the
2821  * ResourceOwner mechanism.  This routine just does a debugging
2822  * cross-check that no pins remain.  However, we also need to do special
2823  * cleanup when the current transaction created any relations or made use
2824  * of forced index lists.
2825  */
2826 void
2827 AtEOXact_RelationCache(bool isCommit)
2828 {
2829         HASH_SEQ_STATUS status;
2830         RelIdCacheEnt *idhentry;
2831         int                     i;
2832
2833         /*
2834          * Unless the eoxact_list[] overflowed, we only need to examine the rels
2835          * listed in it.  Otherwise fall back on a hash_seq_search scan.
2836          *
2837          * For simplicity, eoxact_list[] entries are not deleted till end of
2838          * top-level transaction, even though we could remove them at
2839          * subtransaction end in some cases, or remove relations from the list if
2840          * they are cleared for other reasons.  Therefore we should expect the
2841          * case that list entries are not found in the hashtable; if not, there's
2842          * nothing to do for them.
2843          */
2844         if (eoxact_list_overflowed)
2845         {
2846                 hash_seq_init(&status, RelationIdCache);
2847                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2848                 {
2849                         AtEOXact_cleanup(idhentry->reldesc, isCommit);
2850                 }
2851         }
2852         else
2853         {
2854                 for (i = 0; i < eoxact_list_len; i++)
2855                 {
2856                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
2857                                                                                                          (void *) &eoxact_list[i],
2858                                                                                                          HASH_FIND,
2859                                                                                                          NULL);
2860                         if (idhentry != NULL)
2861                                 AtEOXact_cleanup(idhentry->reldesc, isCommit);
2862                 }
2863         }
2864
2865         if (EOXactTupleDescArrayLen > 0)
2866         {
2867                 Assert(EOXactTupleDescArray != NULL);
2868                 for (i = 0; i < NextEOXactTupleDescNum; i++)
2869                         FreeTupleDesc(EOXactTupleDescArray[i]);
2870                 pfree(EOXactTupleDescArray);
2871                 EOXactTupleDescArray = NULL;
2872         }
2873
2874         /* Now we're out of the transaction and can clear the lists */
2875         eoxact_list_len = 0;
2876         eoxact_list_overflowed = false;
2877         NextEOXactTupleDescNum = 0;
2878         EOXactTupleDescArrayLen = 0;
2879 }
2880
2881 /*
2882  * AtEOXact_cleanup
2883  *
2884  *      Clean up a single rel at main-transaction commit or abort
2885  *
2886  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
2887  * bother to prevent duplicate entries in eoxact_list[].
2888  */
2889 static void
2890 AtEOXact_cleanup(Relation relation, bool isCommit)
2891 {
2892         /*
2893          * The relcache entry's ref count should be back to its normal
2894          * not-in-a-transaction state: 0 unless it's nailed in cache.
2895          *
2896          * In bootstrap mode, this is NOT true, so don't check it --- the
2897          * bootstrap code expects relations to stay open across start/commit
2898          * transaction calls.  (That seems bogus, but it's not worth fixing.)
2899          *
2900          * Note: ideally this check would be applied to every relcache entry, not
2901          * just those that have eoxact work to do.  But it's not worth forcing a
2902          * scan of the whole relcache just for this.  (Moreover, doing so would
2903          * mean that assert-enabled testing never tests the hash_search code path
2904          * above, which seems a bad idea.)
2905          */
2906 #ifdef USE_ASSERT_CHECKING
2907         if (!IsBootstrapProcessingMode())
2908         {
2909                 int                     expected_refcnt;
2910
2911                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
2912                 Assert(relation->rd_refcnt == expected_refcnt);
2913         }
2914 #endif
2915
2916         /*
2917          * Is it a relation created in the current transaction?
2918          *
2919          * During commit, reset the flag to zero, since we are now out of the
2920          * creating transaction.  During abort, simply delete the relcache entry
2921          * --- it isn't interesting any longer.  (NOTE: if we have forgotten the
2922          * new-ness of a new relation due to a forced cache flush, the entry will
2923          * get deleted anyway by shared-cache-inval processing of the aborted
2924          * pg_class insertion.)
2925          */
2926         if (relation->rd_createSubid != InvalidSubTransactionId)
2927         {
2928                 if (isCommit)
2929                         relation->rd_createSubid = InvalidSubTransactionId;
2930                 else if (RelationHasReferenceCountZero(relation))
2931                 {
2932                         RelationClearRelation(relation, false);
2933                         return;
2934                 }
2935                 else
2936                 {
2937                         /*
2938                          * Hmm, somewhere there's a (leaked?) reference to the relation.
2939                          * We daren't remove the entry for fear of dereferencing a
2940                          * dangling pointer later.  Bleat, and mark it as not belonging to
2941                          * the current transaction.  Hopefully it'll get cleaned up
2942                          * eventually.  This must be just a WARNING to avoid
2943                          * error-during-error-recovery loops.
2944                          */
2945                         relation->rd_createSubid = InvalidSubTransactionId;
2946                         elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
2947                                  RelationGetRelationName(relation));
2948                 }
2949         }
2950
2951         /*
2952          * Likewise, reset the hint about the relfilenode being new.
2953          */
2954         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2955
2956         /*
2957          * Flush any temporary index list.
2958          */
2959         if (relation->rd_indexvalid == 2)
2960         {
2961                 list_free(relation->rd_indexlist);
2962                 relation->rd_indexlist = NIL;
2963                 relation->rd_pkindex = InvalidOid;
2964                 relation->rd_replidindex = InvalidOid;
2965                 relation->rd_indexvalid = 0;
2966         }
2967 }
2968
2969 /*
2970  * AtEOSubXact_RelationCache
2971  *
2972  *      Clean up the relcache at sub-transaction commit or abort.
2973  *
2974  * Note: this must be called *before* processing invalidation messages.
2975  */
2976 void
2977 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
2978                                                   SubTransactionId parentSubid)
2979 {
2980         HASH_SEQ_STATUS status;
2981         RelIdCacheEnt *idhentry;
2982         int                     i;
2983
2984         /*
2985          * Unless the eoxact_list[] overflowed, we only need to examine the rels
2986          * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
2987          * logic as in AtEOXact_RelationCache.
2988          */
2989         if (eoxact_list_overflowed)
2990         {
2991                 hash_seq_init(&status, RelationIdCache);
2992                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2993                 {
2994                         AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
2995                                                                 mySubid, parentSubid);
2996                 }
2997         }
2998         else
2999         {
3000                 for (i = 0; i < eoxact_list_len; i++)
3001                 {
3002                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3003                                                                                                          (void *) &eoxact_list[i],
3004                                                                                                          HASH_FIND,
3005                                                                                                          NULL);
3006                         if (idhentry != NULL)
3007                                 AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3008                                                                         mySubid, parentSubid);
3009                 }
3010         }
3011
3012         /* Don't reset the list; we still need more cleanup later */
3013 }
3014
3015 /*
3016  * AtEOSubXact_cleanup
3017  *
3018  *      Clean up a single rel at subtransaction commit or abort
3019  *
3020  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3021  * bother to prevent duplicate entries in eoxact_list[].
3022  */
3023 static void
3024 AtEOSubXact_cleanup(Relation relation, bool isCommit,
3025                                         SubTransactionId mySubid, SubTransactionId parentSubid)
3026 {
3027         /*
3028          * Is it a relation created in the current subtransaction?
3029          *
3030          * During subcommit, mark it as belonging to the parent, instead. During
3031          * subabort, simply delete the relcache entry.
3032          */
3033         if (relation->rd_createSubid == mySubid)
3034         {
3035                 if (isCommit)
3036                         relation->rd_createSubid = parentSubid;
3037                 else if (RelationHasReferenceCountZero(relation))
3038                 {
3039                         RelationClearRelation(relation, false);
3040                         return;
3041                 }
3042                 else
3043                 {
3044                         /*
3045                          * Hmm, somewhere there's a (leaked?) reference to the relation.
3046                          * We daren't remove the entry for fear of dereferencing a
3047                          * dangling pointer later.  Bleat, and transfer it to the parent
3048                          * subtransaction so we can try again later.  This must be just a
3049                          * WARNING to avoid error-during-error-recovery loops.
3050                          */
3051                         relation->rd_createSubid = parentSubid;
3052                         elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3053                                  RelationGetRelationName(relation));
3054                 }
3055         }
3056
3057         /*
3058          * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
3059          */
3060         if (relation->rd_newRelfilenodeSubid == mySubid)
3061         {
3062                 if (isCommit)
3063                         relation->rd_newRelfilenodeSubid = parentSubid;
3064                 else
3065                         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3066         }
3067
3068         /*
3069          * Flush any temporary index list.
3070          */
3071         if (relation->rd_indexvalid == 2)
3072         {
3073                 list_free(relation->rd_indexlist);
3074                 relation->rd_indexlist = NIL;
3075                 relation->rd_pkindex = InvalidOid;
3076                 relation->rd_replidindex = InvalidOid;
3077                 relation->rd_indexvalid = 0;
3078         }
3079 }
3080
3081
3082 /*
3083  *              RelationBuildLocalRelation
3084  *                      Build a relcache entry for an about-to-be-created relation,
3085  *                      and enter it into the relcache.
3086  */
3087 Relation
3088 RelationBuildLocalRelation(const char *relname,
3089                                                    Oid relnamespace,
3090                                                    TupleDesc tupDesc,
3091                                                    Oid relid,
3092                                                    Oid relfilenode,
3093                                                    Oid reltablespace,
3094                                                    bool shared_relation,
3095                                                    bool mapped_relation,
3096                                                    char relpersistence,
3097                                                    char relkind)
3098 {
3099         Relation        rel;
3100         MemoryContext oldcxt;
3101         int                     natts = tupDesc->natts;
3102         int                     i;
3103         bool            has_not_null;
3104         bool            nailit;
3105
3106         AssertArg(natts >= 0);
3107
3108         /*
3109          * check for creation of a rel that must be nailed in cache.
3110          *
3111          * XXX this list had better match the relations specially handled in
3112          * RelationCacheInitializePhase2/3.
3113          */
3114         switch (relid)
3115         {
3116                 case DatabaseRelationId:
3117                 case AuthIdRelationId:
3118                 case AuthMemRelationId:
3119                 case RelationRelationId:
3120                 case AttributeRelationId:
3121                 case ProcedureRelationId:
3122                 case TypeRelationId:
3123                         nailit = true;
3124                         break;
3125                 default:
3126                         nailit = false;
3127                         break;
3128         }
3129
3130         /*
3131          * check that hardwired list of shared rels matches what's in the
3132          * bootstrap .bki file.  If you get a failure here during initdb, you
3133          * probably need to fix IsSharedRelation() to match whatever you've done
3134          * to the set of shared relations.
3135          */
3136         if (shared_relation != IsSharedRelation(relid))
3137                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
3138                          relname, relid);
3139
3140         /* Shared relations had better be mapped, too */
3141         Assert(mapped_relation || !shared_relation);
3142
3143         /*
3144          * switch to the cache context to create the relcache entry.
3145          */
3146         if (!CacheMemoryContext)
3147                 CreateCacheMemoryContext();
3148
3149         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3150
3151         /*
3152          * allocate a new relation descriptor and fill in basic state fields.
3153          */
3154         rel = (Relation) palloc0(sizeof(RelationData));
3155
3156         /* make sure relation is marked as having no open file yet */
3157         rel->rd_smgr = NULL;
3158
3159         /* mark it nailed if appropriate */
3160         rel->rd_isnailed = nailit;
3161
3162         rel->rd_refcnt = nailit ? 1 : 0;
3163
3164         /* it's being created in this transaction */
3165         rel->rd_createSubid = GetCurrentSubTransactionId();
3166         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3167
3168         /*
3169          * create a new tuple descriptor from the one passed in.  We do this
3170          * partly to copy it into the cache context, and partly because the new
3171          * relation can't have any defaults or constraints yet; they have to be
3172          * added in later steps, because they require additions to multiple system
3173          * catalogs.  We can copy attnotnull constraints here, however.
3174          */
3175         rel->rd_att = CreateTupleDescCopy(tupDesc);
3176         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
3177         has_not_null = false;
3178         for (i = 0; i < natts; i++)
3179         {
3180                 Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
3181                 Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);
3182
3183                 datt->attidentity = satt->attidentity;
3184                 datt->attnotnull = satt->attnotnull;
3185                 has_not_null |= satt->attnotnull;
3186         }
3187
3188         if (has_not_null)
3189         {
3190                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3191
3192                 constr->has_not_null = true;
3193                 rel->rd_att->constr = constr;
3194         }
3195
3196         /*
3197          * initialize relation tuple form (caller may add/override data later)
3198          */
3199         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3200
3201         namestrcpy(&rel->rd_rel->relname, relname);
3202         rel->rd_rel->relnamespace = relnamespace;
3203
3204         rel->rd_rel->relkind = relkind;
3205         rel->rd_rel->relnatts = natts;
3206         rel->rd_rel->reltype = InvalidOid;
3207         /* needed when bootstrapping: */
3208         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3209
3210         /* set up persistence and relcache fields dependent on it */
3211         rel->rd_rel->relpersistence = relpersistence;
3212         switch (relpersistence)
3213         {
3214                 case RELPERSISTENCE_UNLOGGED:
3215                 case RELPERSISTENCE_PERMANENT:
3216                         rel->rd_backend = InvalidBackendId;
3217                         rel->rd_islocaltemp = false;
3218                         break;
3219                 case RELPERSISTENCE_TEMP:
3220                         Assert(isTempOrTempToastNamespace(relnamespace));
3221                         rel->rd_backend = BackendIdForTempRelations();
3222                         rel->rd_islocaltemp = true;
3223                         break;
3224                 default:
3225                         elog(ERROR, "invalid relpersistence: %c", relpersistence);
3226                         break;
3227         }
3228
3229         /* if it's a materialized view, it's not populated initially */
3230         if (relkind == RELKIND_MATVIEW)
3231                 rel->rd_rel->relispopulated = false;
3232         else
3233                 rel->rd_rel->relispopulated = true;
3234
3235         /* system relations and non-table objects don't have one */
3236         if (!IsSystemNamespace(relnamespace) &&
3237                 (relkind == RELKIND_RELATION ||
3238                  relkind == RELKIND_MATVIEW ||
3239                  relkind == RELKIND_PARTITIONED_TABLE))
3240                 rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
3241         else
3242                 rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
3243
3244         /*
3245          * Insert relation physical and logical identifiers (OIDs) into the right
3246          * places.  For a mapped relation, we set relfilenode to zero and rely on
3247          * RelationInitPhysicalAddr to consult the map.
3248          */
3249         rel->rd_rel->relisshared = shared_relation;
3250
3251         RelationGetRelid(rel) = relid;
3252
3253         for (i = 0; i < natts; i++)
3254                 TupleDescAttr(rel->rd_att, i)->attrelid = relid;
3255
3256         rel->rd_rel->reltablespace = reltablespace;
3257
3258         if (mapped_relation)
3259         {
3260                 rel->rd_rel->relfilenode = InvalidOid;
3261                 /* Add it to the active mapping information */
3262                 RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3263         }
3264         else
3265                 rel->rd_rel->relfilenode = relfilenode;
3266
3267         RelationInitLockInfo(rel);      /* see lmgr.c */
3268
3269         RelationInitPhysicalAddr(rel);
3270
3271         /*
3272          * Okay to insert into the relcache hash table.
3273          *
3274          * Ordinarily, there should certainly not be an existing hash entry for
3275          * the same OID; but during bootstrap, when we create a "real" relcache
3276          * entry for one of the bootstrap relations, we'll be overwriting the
3277          * phony one created with formrdesc.  So allow that to happen for nailed
3278          * rels.
3279          */
3280         RelationCacheInsert(rel, nailit);
3281
3282         /*
3283          * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
3284          * can't do this before storing relid in it.
3285          */
3286         EOXactListAdd(rel);
3287
3288         /*
3289          * done building relcache entry.
3290          */
3291         MemoryContextSwitchTo(oldcxt);
3292
3293         /* It's fully valid */
3294         rel->rd_isvalid = true;
3295
3296         /*
3297          * Caller expects us to pin the returned entry.
3298          */
3299         RelationIncrementReferenceCount(rel);
3300
3301         return rel;
3302 }
3303
3304
3305 /*
3306  * RelationSetNewRelfilenode
3307  *
3308  * Assign a new relfilenode (physical file name) to the relation.
3309  *
3310  * This allows a full rewrite of the relation to be done with transactional
3311  * safety (since the filenode assignment can be rolled back).  Note however
3312  * that there is no simple way to access the relation's old data for the
3313  * remainder of the current transaction.  This limits the usefulness to cases
3314  * such as TRUNCATE or rebuilding an index from scratch.
3315  *
3316  * Caller must already hold exclusive lock on the relation.
3317  *
3318  * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
3319  * must be passed for indexes and sequences).  This should be a lower bound on
3320  * the XIDs that will be put into the new relation contents.
3321  *
3322  * The new filenode's persistence is set to the given value.  This is useful
3323  * for the cases that are changing the relation's persistence; other callers
3324  * need to pass the original relpersistence value.
3325  */
3326 void
3327 RelationSetNewRelfilenode(Relation relation, char persistence,
3328                                                   TransactionId freezeXid, MultiXactId minmulti)
3329 {
3330         Oid                     newrelfilenode;
3331         RelFileNodeBackend newrnode;
3332         Relation        pg_class;
3333         HeapTuple       tuple;
3334         Form_pg_class classform;
3335
3336         /* Indexes, sequences must have Invalid frozenxid; other rels must not */
3337         Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
3338                         relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
3339                    freezeXid == InvalidTransactionId :
3340                    TransactionIdIsNormal(freezeXid));
3341         Assert(TransactionIdIsNormal(freezeXid) == MultiXactIdIsValid(minmulti));
3342
3343         /* Allocate a new relfilenode */
3344         newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3345                                                                            persistence);
3346
3347         /*
3348          * Get a writable copy of the pg_class tuple for the given relation.
3349          */
3350         pg_class = table_open(RelationRelationId, RowExclusiveLock);
3351
3352         tuple = SearchSysCacheCopy1(RELOID,
3353                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3354         if (!HeapTupleIsValid(tuple))
3355                 elog(ERROR, "could not find tuple for relation %u",
3356                          RelationGetRelid(relation));
3357         classform = (Form_pg_class) GETSTRUCT(tuple);
3358
3359         /*
3360          * Create storage for the main fork of the new relfilenode.
3361          *
3362          * NOTE: any conflict in relfilenode value will be caught here, if
3363          * GetNewRelFileNode messes up for any reason.
3364          */
3365         newrnode.node = relation->rd_node;
3366         newrnode.node.relNode = newrelfilenode;
3367         newrnode.backend = relation->rd_backend;
3368         RelationCreateStorage(newrnode.node, persistence);
3369         smgrclosenode(newrnode);
3370
3371         /*
3372          * Schedule unlinking of the old storage at transaction commit.
3373          */
3374         RelationDropStorage(relation);
3375
3376         /*
3377          * Now update the pg_class row.  However, if we're dealing with a mapped
3378          * index, pg_class.relfilenode doesn't change; instead we have to send the
3379          * update to the relation mapper.
3380          */
3381         if (RelationIsMapped(relation))
3382                 RelationMapUpdateMap(RelationGetRelid(relation),
3383                                                          newrelfilenode,
3384                                                          relation->rd_rel->relisshared,
3385                                                          false);
3386         else
3387                 classform->relfilenode = newrelfilenode;
3388
3389         /* These changes are safe even for a mapped relation */
3390         if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
3391         {
3392                 classform->relpages = 0;        /* it's empty until further notice */
3393                 classform->reltuples = 0;
3394                 classform->relallvisible = 0;
3395         }
3396         classform->relfrozenxid = freezeXid;
3397         classform->relminmxid = minmulti;
3398         classform->relpersistence = persistence;
3399
3400         CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
3401
3402         heap_freetuple(tuple);
3403
3404         table_close(pg_class, RowExclusiveLock);
3405
3406         /*
3407          * Make the pg_class row change visible, as well as the relation map
3408          * change if any.  This will cause the relcache entry to get updated, too.
3409          */
3410         CommandCounterIncrement();
3411
3412         /*
3413          * Mark the rel as having been given a new relfilenode in the current
3414          * (sub) transaction.  This is a hint that can be used to optimize later
3415          * operations on the rel in the same transaction.
3416          */
3417         relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3418
3419         /* Flag relation as needing eoxact cleanup (to remove the hint) */
3420         EOXactListAdd(relation);
3421 }
3422
3423
3424 /*
3425  *              RelationCacheInitialize
3426  *
3427  *              This initializes the relation descriptor cache.  At the time
3428  *              that this is invoked, we can't do database access yet (mainly
3429  *              because the transaction subsystem is not up); all we are doing
3430  *              is making an empty cache hashtable.  This must be done before
3431  *              starting the initialization transaction, because otherwise
3432  *              AtEOXact_RelationCache would crash if that transaction aborts
3433  *              before we can get the relcache set up.
3434  */
3435
3436 #define INITRELCACHESIZE                400
3437
3438 void
3439 RelationCacheInitialize(void)
3440 {
3441         HASHCTL         ctl;
3442
3443         /*
3444          * make sure cache memory context exists
3445          */
3446         if (!CacheMemoryContext)
3447                 CreateCacheMemoryContext();
3448
3449         /*
3450          * create hashtable that indexes the relcache
3451          */
3452         MemSet(&ctl, 0, sizeof(ctl));
3453         ctl.keysize = sizeof(Oid);
3454         ctl.entrysize = sizeof(RelIdCacheEnt);
3455         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3456                                                                   &ctl, HASH_ELEM | HASH_BLOBS);
3457
3458         /*
3459          * relation mapper needs to be initialized too
3460          */
3461         RelationMapInitialize();
3462 }
3463
3464 /*
3465  *              RelationCacheInitializePhase2
3466  *
3467  *              This is called to prepare for access to shared catalogs during startup.
3468  *              We must at least set up nailed reldescs for pg_database, pg_authid,
3469  *              pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
3470  *              for their indexes, too.  We attempt to load this information from the
3471  *              shared relcache init file.  If that's missing or broken, just make
3472  *              phony entries for the catalogs themselves.
3473  *              RelationCacheInitializePhase3 will clean up as needed.
3474  */
3475 void
3476 RelationCacheInitializePhase2(void)
3477 {
3478         MemoryContext oldcxt;
3479
3480         /*
3481          * relation mapper needs initialized too
3482          */
3483         RelationMapInitializePhase2();
3484
3485         /*
3486          * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
3487          * nothing.
3488          */
3489         if (IsBootstrapProcessingMode())
3490                 return;
3491
3492         /*
3493          * switch to cache memory context
3494          */
3495         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3496
3497         /*
3498          * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3499          * the cache with pre-made descriptors for the critical shared catalogs.
3500          */
3501         if (!load_relcache_init_file(true))
3502         {
3503                 formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3504                                   Natts_pg_database, Desc_pg_database);
3505                 formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3506                                   Natts_pg_authid, Desc_pg_authid);
3507                 formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3508                                   Natts_pg_auth_members, Desc_pg_auth_members);
3509                 formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3510                                   Natts_pg_shseclabel, Desc_pg_shseclabel);
3511                 formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3512                                   Natts_pg_subscription, Desc_pg_subscription);
3513
3514 #define NUM_CRITICAL_SHARED_RELS        5       /* fix if you change list above */
3515         }
3516
3517         MemoryContextSwitchTo(oldcxt);
3518 }
3519
3520 /*
3521  *              RelationCacheInitializePhase3
3522  *
3523  *              This is called as soon as the catcache and transaction system
3524  *              are functional and we have determined MyDatabaseId.  At this point
3525  *              we can actually read data from the database's system catalogs.
3526  *              We first try to read pre-computed relcache entries from the local
3527  *              relcache init file.  If that's missing or broken, make phony entries
3528  *              for the minimum set of nailed-in-cache relations.  Then (unless
3529  *              bootstrapping) make sure we have entries for the critical system
3530  *              indexes.  Once we've done all this, we have enough infrastructure to
3531  *              open any system catalog or use any catcache.  The last step is to
3532  *              rewrite the cache files if needed.
3533  */
3534 void
3535 RelationCacheInitializePhase3(void)
3536 {
3537         HASH_SEQ_STATUS status;
3538         RelIdCacheEnt *idhentry;
3539         MemoryContext oldcxt;
3540         bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
3541
3542         /*
3543          * relation mapper needs initialized too
3544          */
3545         RelationMapInitializePhase3();
3546
3547         /*
3548          * switch to cache memory context
3549          */
3550         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3551
3552         /*
3553          * Try to load the local relcache cache file.  If unsuccessful, bootstrap
3554          * the cache with pre-made descriptors for the critical "nailed-in" system
3555          * catalogs.
3556          */
3557         if (IsBootstrapProcessingMode() ||
3558                 !load_relcache_init_file(false))
3559         {
3560                 needNewCacheFile = true;
3561
3562                 formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
3563                                   Natts_pg_class, Desc_pg_class);
3564                 formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
3565                                   Natts_pg_attribute, Desc_pg_attribute);
3566                 formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
3567                                   Natts_pg_proc, Desc_pg_proc);
3568                 formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
3569                                   Natts_pg_type, Desc_pg_type);
3570
3571 #define NUM_CRITICAL_LOCAL_RELS 4       /* fix if you change list above */
3572         }
3573
3574         MemoryContextSwitchTo(oldcxt);
3575
3576         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
3577         if (IsBootstrapProcessingMode())
3578                 return;
3579
3580         /*
3581          * If we didn't get the critical system indexes loaded into relcache, do
3582          * so now.  These are critical because the catcache and/or opclass cache
3583          * depend on them for fetches done during relcache load.  Thus, we have an
3584          * infinite-recursion problem.  We can break the recursion by doing
3585          * heapscans instead of indexscans at certain key spots. To avoid hobbling
3586          * performance, we only want to do that until we have the critical indexes
3587          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
3588          * decide whether to do heapscan or indexscan at the key spots, and we set
3589          * it true after we've loaded the critical indexes.
3590          *
3591          * The critical indexes are marked as "nailed in cache", partly to make it
3592          * easy for load_relcache_init_file to count them, but mainly because we
3593          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
3594          * true.  (NOTE: perhaps it would be possible to reload them by
3595          * temporarily setting criticalRelcachesBuilt to false again.  For now,
3596          * though, we just nail 'em in.)
3597          *
3598          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
3599          * in the same way as the others, because the critical catalogs don't
3600          * (currently) have any rules or triggers, and so these indexes can be
3601          * rebuilt without inducing recursion.  However they are used during
3602          * relcache load when a rel does have rules or triggers, so we choose to
3603          * nail them for performance reasons.
3604          */
3605         if (!criticalRelcachesBuilt)
3606         {
3607                 load_critical_index(ClassOidIndexId,
3608                                                         RelationRelationId);
3609                 load_critical_index(AttributeRelidNumIndexId,
3610                                                         AttributeRelationId);
3611                 load_critical_index(IndexRelidIndexId,
3612                                                         IndexRelationId);
3613                 load_critical_index(OpclassOidIndexId,
3614                                                         OperatorClassRelationId);
3615                 load_critical_index(AccessMethodProcedureIndexId,
3616                                                         AccessMethodProcedureRelationId);
3617                 load_critical_index(RewriteRelRulenameIndexId,
3618                                                         RewriteRelationId);
3619                 load_critical_index(TriggerRelidNameIndexId,
3620                                                         TriggerRelationId);
3621
3622 #define NUM_CRITICAL_LOCAL_INDEXES      7       /* fix if you change list above */
3623
3624                 criticalRelcachesBuilt = true;
3625         }
3626
3627         /*
3628          * Process critical shared indexes too.
3629          *
3630          * DatabaseNameIndexId isn't critical for relcache loading, but rather for
3631          * initial lookup of MyDatabaseId, without which we'll never find any
3632          * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
3633          * database OID, so it instead depends on DatabaseOidIndexId.  We also
3634          * need to nail up some indexes on pg_authid and pg_auth_members for use
3635          * during client authentication.  SharedSecLabelObjectIndexId isn't
3636          * critical for the core system, but authentication hooks might be
3637          * interested in it.
3638          */
3639         if (!criticalSharedRelcachesBuilt)
3640         {
3641                 load_critical_index(DatabaseNameIndexId,
3642                                                         DatabaseRelationId);
3643                 load_critical_index(DatabaseOidIndexId,
3644                                                         DatabaseRelationId);
3645                 load_critical_index(AuthIdRolnameIndexId,
3646                                                         AuthIdRelationId);
3647                 load_critical_index(AuthIdOidIndexId,
3648                                                         AuthIdRelationId);
3649                 load_critical_index(AuthMemMemRoleIndexId,
3650                                                         AuthMemRelationId);
3651                 load_critical_index(SharedSecLabelObjectIndexId,
3652                                                         SharedSecLabelRelationId);
3653
3654 #define NUM_CRITICAL_SHARED_INDEXES 6   /* fix if you change list above */
3655
3656                 criticalSharedRelcachesBuilt = true;
3657         }
3658
3659         /*
3660          * Now, scan all the relcache entries and update anything that might be
3661          * wrong in the results from formrdesc or the relcache cache file. If we
3662          * faked up relcache entries using formrdesc, then read the real pg_class
3663          * rows and replace the fake entries with them. Also, if any of the
3664          * relcache entries have rules, triggers, or security policies, load that
3665          * info the hard way since it isn't recorded in the cache file.
3666          *
3667          * Whenever we access the catalogs to read data, there is a possibility of
3668          * a shared-inval cache flush causing relcache entries to be removed.
3669          * Since hash_seq_search only guarantees to still work after the *current*
3670          * entry is removed, it's unsafe to continue the hashtable scan afterward.
3671          * We handle this by restarting the scan from scratch after each access.
3672          * This is theoretically O(N^2), but the number of entries that actually
3673          * need to be fixed is small enough that it doesn't matter.
3674          */
3675         hash_seq_init(&status, RelationIdCache);
3676
3677         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3678         {
3679                 Relation        relation = idhentry->reldesc;
3680                 bool            restart = false;
3681
3682                 /*
3683                  * Make sure *this* entry doesn't get flushed while we work with it.
3684                  */
3685                 RelationIncrementReferenceCount(relation);
3686
3687                 /*
3688                  * If it's a faked-up entry, read the real pg_class tuple.
3689                  */
3690                 if (relation->rd_rel->relowner == InvalidOid)
3691                 {
3692                         HeapTuple       htup;
3693                         Form_pg_class relp;
3694
3695                         htup = SearchSysCache1(RELOID,
3696                                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
3697                         if (!HeapTupleIsValid(htup))
3698                                 elog(FATAL, "cache lookup failed for relation %u",
3699                                          RelationGetRelid(relation));
3700                         relp = (Form_pg_class) GETSTRUCT(htup);
3701
3702                         /*
3703                          * Copy tuple to relation->rd_rel. (See notes in
3704                          * AllocateRelationDesc())
3705                          */
3706                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3707
3708                         /* Update rd_options while we have the tuple */
3709                         if (relation->rd_options)
3710                                 pfree(relation->rd_options);
3711                         RelationParseRelOptions(relation, htup);
3712
3713                         /*
3714                          * Check the values in rd_att were set up correctly.  (We cannot
3715                          * just copy them over now: formrdesc must have set up the rd_att
3716                          * data correctly to start with, because it may already have been
3717                          * copied into one or more catcache entries.)
3718                          */
3719                         Assert(relation->rd_att->tdtypeid == relp->reltype);
3720                         Assert(relation->rd_att->tdtypmod == -1);
3721
3722                         ReleaseSysCache(htup);
3723
3724                         /* relowner had better be OK now, else we'll loop forever */
3725                         if (relation->rd_rel->relowner == InvalidOid)
3726                                 elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
3727                                          RelationGetRelationName(relation));
3728
3729                         restart = true;
3730                 }
3731
3732                 /*
3733                  * Fix data that isn't saved in relcache cache file.
3734                  *
3735                  * relhasrules or relhastriggers could possibly be wrong or out of
3736                  * date.  If we don't actually find any rules or triggers, clear the
3737                  * local copy of the flag so that we don't get into an infinite loop
3738                  * here.  We don't make any attempt to fix the pg_class entry, though.
3739                  */
3740                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3741                 {
3742                         RelationBuildRuleLock(relation);
3743                         if (relation->rd_rules == NULL)
3744                                 relation->rd_rel->relhasrules = false;
3745                         restart = true;
3746                 }
3747                 if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
3748                 {
3749                         RelationBuildTriggers(relation);
3750                         if (relation->trigdesc == NULL)
3751                                 relation->rd_rel->relhastriggers = false;
3752                         restart = true;
3753                 }
3754
3755                 /*
3756                  * Re-load the row security policies if the relation has them, since
3757                  * they are not preserved in the cache.  Note that we can never NOT
3758                  * have a policy while relrowsecurity is true,
3759                  * RelationBuildRowSecurity will create a single default-deny policy
3760                  * if there is no policy defined in pg_policy.
3761                  */
3762                 if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
3763                 {
3764                         RelationBuildRowSecurity(relation);
3765
3766                         Assert(relation->rd_rsdesc != NULL);
3767                         restart = true;
3768                 }
3769
3770                 /*
3771                  * Reload the partition key and descriptor for a partitioned table.
3772                  */
3773                 if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
3774                         relation->rd_partkey == NULL)
3775                 {
3776                         RelationBuildPartitionKey(relation);
3777                         Assert(relation->rd_partkey != NULL);
3778
3779                         restart = true;
3780                 }
3781
3782                 if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
3783                         relation->rd_partdesc == NULL)
3784                 {
3785                         RelationBuildPartitionDesc(relation);
3786                         Assert(relation->rd_partdesc != NULL);
3787
3788                         restart = true;
3789                 }
3790
3791                 /* Release hold on the relation */
3792                 RelationDecrementReferenceCount(relation);
3793
3794                 /* Now, restart the hashtable scan if needed */
3795                 if (restart)
3796                 {
3797                         hash_seq_term(&status);
3798                         hash_seq_init(&status, RelationIdCache);
3799                 }
3800         }
3801
3802         /*
3803          * Lastly, write out new relcache cache files if needed.  We don't bother
3804          * to distinguish cases where only one of the two needs an update.
3805          */
3806         if (needNewCacheFile)
3807         {
3808                 /*
3809                  * Force all the catcaches to finish initializing and thereby open the
3810                  * catalogs and indexes they use.  This will preload the relcache with
3811                  * entries for all the most important system catalogs and indexes, so
3812                  * that the init files will be most useful for future backends.
3813                  */
3814                 InitCatalogCachePhase2();
3815
3816                 /* now write the files */
3817                 write_relcache_init_file(true);
3818                 write_relcache_init_file(false);
3819         }
3820 }
3821
3822 /*
3823  * Load one critical system index into the relcache
3824  *
3825  * indexoid is the OID of the target index, heapoid is the OID of the catalog
3826  * it belongs to.
3827  */
3828 static void
3829 load_critical_index(Oid indexoid, Oid heapoid)
3830 {
3831         Relation        ird;
3832
3833         /*
3834          * We must lock the underlying catalog before locking the index to avoid
3835          * deadlock, since RelationBuildDesc might well need to read the catalog,
3836          * and if anyone else is exclusive-locking this catalog and index they'll
3837          * be doing it in that order.
3838          */
3839         LockRelationOid(heapoid, AccessShareLock);
3840         LockRelationOid(indexoid, AccessShareLock);
3841         ird = RelationBuildDesc(indexoid, true);
3842         if (ird == NULL)
3843                 elog(PANIC, "could not open critical system index %u", indexoid);
3844         ird->rd_isnailed = true;
3845         ird->rd_refcnt = 1;
3846         UnlockRelationOid(indexoid, AccessShareLock);
3847         UnlockRelationOid(heapoid, AccessShareLock);
3848 }
3849
3850 /*
3851  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3852  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
3853  *
3854  * We need this kluge because we have to be able to access non-fixed-width
3855  * fields of pg_class and pg_index before we have the standard catalog caches
3856  * available.  We use predefined data that's set up in just the same way as
3857  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
3858  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
3859  * does it have a TupleConstr field.  But it's good enough for the purpose of
3860  * extracting fields.
3861  */
3862 static TupleDesc
3863 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
3864 {
3865         TupleDesc       result;
3866         MemoryContext oldcxt;
3867         int                     i;
3868
3869         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3870
3871         result = CreateTemplateTupleDesc(natts);
3872         result->tdtypeid = RECORDOID;   /* not right, but we don't care */
3873         result->tdtypmod = -1;
3874
3875         for (i = 0; i < natts; i++)
3876         {
3877                 memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3878                 /* make sure attcacheoff is valid */
3879                 TupleDescAttr(result, i)->attcacheoff = -1;
3880         }
3881
3882         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3883         TupleDescAttr(result, 0)->attcacheoff = 0;
3884
3885         /* Note: we don't bother to set up a TupleConstr entry */
3886
3887         MemoryContextSwitchTo(oldcxt);
3888
3889         return result;
3890 }
3891
3892 static TupleDesc
3893 GetPgClassDescriptor(void)
3894 {
3895         static TupleDesc pgclassdesc = NULL;
3896
3897         /* Already done? */
3898         if (pgclassdesc == NULL)
3899                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
3900                                                                                            Desc_pg_class);
3901
3902         return pgclassdesc;
3903 }
3904
3905 static TupleDesc
3906 GetPgIndexDescriptor(void)
3907 {
3908         static TupleDesc pgindexdesc = NULL;
3909
3910         /* Already done? */
3911         if (pgindexdesc == NULL)
3912                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
3913                                                                                            Desc_pg_index);
3914
3915         return pgindexdesc;
3916 }
3917
3918 /*
3919  * Load any default attribute value definitions for the relation.
3920  */
3921 static void
3922 AttrDefaultFetch(Relation relation)
3923 {
3924         AttrDefault *attrdef = relation->rd_att->constr->defval;
3925         int                     ndef = relation->rd_att->constr->num_defval;
3926         Relation        adrel;
3927         SysScanDesc adscan;
3928         ScanKeyData skey;
3929         HeapTuple       htup;
3930         Datum           val;
3931         bool            isnull;
3932         int                     found;
3933         int                     i;
3934
3935         ScanKeyInit(&skey,
3936                                 Anum_pg_attrdef_adrelid,
3937                                 BTEqualStrategyNumber, F_OIDEQ,
3938                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3939
3940         adrel = table_open(AttrDefaultRelationId, AccessShareLock);
3941         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
3942                                                                 NULL, 1, &skey);
3943         found = 0;
3944
3945         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3946         {
3947                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3948                 Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1);
3949
3950                 for (i = 0; i < ndef; i++)
3951                 {
3952                         if (adform->adnum != attrdef[i].adnum)
3953                                 continue;
3954                         if (attrdef[i].adbin != NULL)
3955                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
3956                                          NameStr(attr->attname),
3957                                          RelationGetRelationName(relation));
3958                         else
3959                                 found++;
3960
3961                         val = fastgetattr(htup,
3962                                                           Anum_pg_attrdef_adbin,
3963                                                           adrel->rd_att, &isnull);
3964                         if (isnull)
3965                                 elog(WARNING, "null adbin for attr %s of rel %s",
3966                                          NameStr(attr->attname),
3967                                          RelationGetRelationName(relation));
3968                         else
3969                         {
3970                                 /* detoast and convert to cstring in caller's context */
3971                                 char       *s = TextDatumGetCString(val);
3972
3973                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
3974                                 pfree(s);
3975                         }
3976                         break;
3977                 }
3978
3979                 if (i >= ndef)
3980                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
3981                                  adform->adnum, RelationGetRelationName(relation));
3982         }
3983
3984         systable_endscan(adscan);
3985         table_close(adrel, AccessShareLock);
3986 }
3987
3988 /*
3989  * Load any check constraints for the relation.
3990  */
3991 static void
3992 CheckConstraintFetch(Relation relation)
3993 {
3994         ConstrCheck *check = relation->rd_att->constr->check;
3995         int                     ncheck = relation->rd_att->constr->num_check;
3996         Relation        conrel;
3997         SysScanDesc conscan;
3998         ScanKeyData skey[1];
3999         HeapTuple       htup;
4000         int                     found = 0;
4001
4002         ScanKeyInit(&skey[0],
4003                                 Anum_pg_constraint_conrelid,
4004                                 BTEqualStrategyNumber, F_OIDEQ,
4005                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4006
4007         conrel = table_open(ConstraintRelationId, AccessShareLock);
4008         conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4009                                                                  NULL, 1, skey);
4010
4011         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4012         {
4013                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4014                 Datum           val;
4015                 bool            isnull;
4016                 char       *s;
4017
4018                 /* We want check constraints only */
4019                 if (conform->contype != CONSTRAINT_CHECK)
4020                         continue;
4021
4022                 if (found >= ncheck)
4023                         elog(ERROR, "unexpected constraint record found for rel %s",
4024                                  RelationGetRelationName(relation));
4025
4026                 check[found].ccvalid = conform->convalidated;
4027                 check[found].ccnoinherit = conform->connoinherit;
4028                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4029                                                                                                   NameStr(conform->conname));
4030
4031                 /* Grab and test conbin is actually set */
4032                 val = fastgetattr(htup,
4033                                                   Anum_pg_constraint_conbin,
4034                                                   conrel->rd_att, &isnull);
4035                 if (isnull)
4036                         elog(ERROR, "null conbin for rel %s",
4037                                  RelationGetRelationName(relation));
4038
4039                 /* detoast and convert to cstring in caller's context */
4040                 s = TextDatumGetCString(val);
4041                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
4042                 pfree(s);
4043
4044                 found++;
4045         }
4046
4047         systable_endscan(conscan);
4048         table_close(conrel, AccessShareLock);
4049
4050         if (found != ncheck)
4051                 elog(ERROR, "%d constraint record(s) missing for rel %s",
4052                          ncheck - found, RelationGetRelationName(relation));
4053
4054         /* Sort the records so that CHECKs are applied in a deterministic order */
4055         if (ncheck > 1)
4056                 qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
4057 }
4058
4059 /*
4060  * qsort comparator to sort ConstrCheck entries by name
4061  */
4062 static int
4063 CheckConstraintCmp(const void *a, const void *b)
4064 {
4065         const ConstrCheck *ca = (const ConstrCheck *) a;
4066         const ConstrCheck *cb = (const ConstrCheck *) b;
4067
4068         return strcmp(ca->ccname, cb->ccname);
4069 }
4070
4071 /*
4072  * RelationGetFKeyList -- get a list of foreign key info for the relation
4073  *
4074  * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
4075  * the given relation.  This data is a direct copy of relevant fields from
4076  * pg_constraint.  The list items are in no particular order.
4077  *
4078  * CAUTION: the returned list is part of the relcache's data, and could
4079  * vanish in a relcache entry reset.  Callers must inspect or copy it
4080  * before doing anything that might trigger a cache flush, such as
4081  * system catalog accesses.  copyObject() can be used if desired.
4082  * (We define it this way because current callers want to filter and
4083  * modify the list entries anyway, so copying would be a waste of time.)
4084  */
4085 List *
4086 RelationGetFKeyList(Relation relation)
4087 {
4088         List       *result;
4089         Relation        conrel;
4090         SysScanDesc conscan;
4091         ScanKeyData skey;
4092         HeapTuple       htup;
4093         List       *oldlist;
4094         MemoryContext oldcxt;
4095
4096         /* Quick exit if we already computed the list. */
4097         if (relation->rd_fkeyvalid)
4098                 return relation->rd_fkeylist;
4099
4100         /* Fast path: non-partitioned tables without triggers can't have FKs */
4101         if (!relation->rd_rel->relhastriggers &&
4102                 relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
4103                 return NIL;
4104
4105         /*
4106          * We build the list we intend to return (in the caller's context) while
4107          * doing the scan.  After successfully completing the scan, we copy that
4108          * list into the relcache entry.  This avoids cache-context memory leakage
4109          * if we get some sort of error partway through.
4110          */
4111         result = NIL;
4112
4113         /* Prepare to scan pg_constraint for entries having conrelid = this rel. */
4114         ScanKeyInit(&skey,
4115                                 Anum_pg_constraint_conrelid,
4116                                 BTEqualStrategyNumber, F_OIDEQ,
4117                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4118
4119         conrel = table_open(ConstraintRelationId, AccessShareLock);
4120         conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4121                                                                  NULL, 1, &skey);
4122
4123         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4124         {
4125                 Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
4126                 ForeignKeyCacheInfo *info;
4127
4128                 /* consider only foreign keys */
4129                 if (constraint->contype != CONSTRAINT_FOREIGN)
4130                         continue;
4131
4132                 info = makeNode(ForeignKeyCacheInfo);
4133                 info->conoid = constraint->oid;
4134                 info->conrelid = constraint->conrelid;
4135                 info->confrelid = constraint->confrelid;
4136
4137                 DeconstructFkConstraintRow(htup, &info->nkeys,
4138                                                                    info->conkey,
4139                                                                    info->confkey,
4140                                                                    info->conpfeqop,
4141                                                                    NULL, NULL);
4142
4143                 /* Add FK's node to the result list */
4144                 result = lappend(result, info);
4145         }
4146
4147         systable_endscan(conscan);
4148         table_close(conrel, AccessShareLock);
4149
4150         /* Now save a copy of the completed list in the relcache entry. */
4151         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4152         oldlist = relation->rd_fkeylist;
4153         relation->rd_fkeylist = copyObject(result);
4154         relation->rd_fkeyvalid = true;
4155         MemoryContextSwitchTo(oldcxt);
4156
4157         /* Don't leak the old list, if there is one */
4158         list_free_deep(oldlist);
4159
4160         return result;
4161 }
4162
4163 /*
4164  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
4165  *
4166  * The index list is created only if someone requests it.  We scan pg_index
4167  * to find relevant indexes, and add the list to the relcache entry so that
4168  * we won't have to compute it again.  Note that shared cache inval of a
4169  * relcache entry will delete the old list and set rd_indexvalid to 0,
4170  * so that we must recompute the index list on next request.  This handles
4171  * creation or deletion of an index.
4172  *
4173  * Indexes that are marked not indislive are omitted from the returned list.
4174  * Such indexes are expected to be dropped momentarily, and should not be
4175  * touched at all by any caller of this function.
4176  *
4177  * The returned list is guaranteed to be sorted in order by OID.  This is
4178  * needed by the executor, since for index types that we obtain exclusive
4179  * locks on when updating the index, all backends must lock the indexes in
4180  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
4181  * consistent ordering would do, but ordering by OID is easy.
4182  *
4183  * Since shared cache inval causes the relcache's copy of the list to go away,
4184  * we return a copy of the list palloc'd in the caller's context.  The caller
4185  * may list_free() the returned list after scanning it. This is necessary
4186  * since the caller will typically be doing syscache lookups on the relevant
4187  * indexes, and syscache lookup could cause SI messages to be processed!
4188  *
4189  * In exactly the same way, we update rd_pkindex, which is the OID of the
4190  * relation's primary key index if any, else InvalidOid; and rd_replidindex,
4191  * which is the pg_class OID of an index to be used as the relation's
4192  * replication identity index, or InvalidOid if there is no such index.
4193  */
4194 List *
4195 RelationGetIndexList(Relation relation)
4196 {
4197         Relation        indrel;
4198         SysScanDesc indscan;
4199         ScanKeyData skey;
4200         HeapTuple       htup;
4201         List       *result;
4202         List       *oldlist;
4203         char            replident = relation->rd_rel->relreplident;
4204         Oid                     pkeyIndex = InvalidOid;
4205         Oid                     candidateIndex = InvalidOid;
4206         MemoryContext oldcxt;
4207
4208         /* Quick exit if we already computed the list. */
4209         if (relation->rd_indexvalid != 0)
4210                 return list_copy(relation->rd_indexlist);
4211
4212         /*
4213          * We build the list we intend to return (in the caller's context) while
4214          * doing the scan.  After successfully completing the scan, we copy that
4215          * list into the relcache entry.  This avoids cache-context memory leakage
4216          * if we get some sort of error partway through.
4217          */
4218         result = NIL;
4219
4220         /* Prepare to scan pg_index for entries having indrelid = this rel. */
4221         ScanKeyInit(&skey,
4222                                 Anum_pg_index_indrelid,
4223                                 BTEqualStrategyNumber, F_OIDEQ,
4224                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4225
4226         indrel = table_open(IndexRelationId, AccessShareLock);
4227         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4228                                                                  NULL, 1, &skey);
4229
4230         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4231         {
4232                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4233
4234                 /*
4235                  * Ignore any indexes that are currently being dropped.  This will
4236                  * prevent them from being searched, inserted into, or considered in
4237                  * HOT-safety decisions.  It's unsafe to touch such an index at all
4238                  * since its catalog entries could disappear at any instant.
4239                  */
4240                 if (!index->indislive)
4241                         continue;
4242
4243                 /* Add index's OID to result list in the proper order */
4244                 result = insert_ordered_oid(result, index->indexrelid);
4245
4246                 /*
4247                  * Invalid, non-unique, non-immediate or predicate indexes aren't
4248                  * interesting for either oid indexes or replication identity indexes,
4249                  * so don't check them.
4250                  */
4251                 if (!index->indisvalid || !index->indisunique ||
4252                         !index->indimmediate ||
4253                         !heap_attisnull(htup, Anum_pg_index_indpred, NULL))
4254                         continue;
4255
4256                 /* remember primary key index if any */
4257                 if (index->indisprimary)
4258                         pkeyIndex = index->indexrelid;
4259
4260                 /* remember explicitly chosen replica index */
4261                 if (index->indisreplident)
4262                         candidateIndex = index->indexrelid;
4263         }
4264
4265         systable_endscan(indscan);
4266
4267         table_close(indrel, AccessShareLock);
4268
4269         /* Now save a copy of the completed list in the relcache entry. */
4270         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4271         oldlist = relation->rd_indexlist;
4272         relation->rd_indexlist = list_copy(result);
4273         relation->rd_pkindex = pkeyIndex;
4274         if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
4275                 relation->rd_replidindex = pkeyIndex;
4276         else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
4277                 relation->rd_replidindex = candidateIndex;
4278         else
4279                 relation->rd_replidindex = InvalidOid;
4280         relation->rd_indexvalid = 1;
4281         MemoryContextSwitchTo(oldcxt);
4282
4283         /* Don't leak the old list, if there is one */
4284         list_free(oldlist);
4285
4286         return result;
4287 }
4288
4289 /*
4290  * RelationGetStatExtList
4291  *              get a list of OIDs of statistics objects on this relation
4292  *
4293  * The statistics list is created only if someone requests it, in a way
4294  * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
4295  * relevant statistics, and add the list to the relcache entry so that we
4296  * won't have to compute it again.  Note that shared cache inval of a
4297  * relcache entry will delete the old list and set rd_statvalid to 0,
4298  * so that we must recompute the statistics list on next request.  This
4299  * handles creation or deletion of a statistics object.
4300  *
4301  * The returned list is guaranteed to be sorted in order by OID, although
4302  * this is not currently needed.
4303  *
4304  * Since shared cache inval causes the relcache's copy of the list to go away,
4305  * we return a copy of the list palloc'd in the caller's context.  The caller
4306  * may list_free() the returned list after scanning it. This is necessary
4307  * since the caller will typically be doing syscache lookups on the relevant
4308  * statistics, and syscache lookup could cause SI messages to be processed!
4309  */
4310 List *
4311 RelationGetStatExtList(Relation relation)
4312 {
4313         Relation        indrel;
4314         SysScanDesc indscan;
4315         ScanKeyData skey;
4316         HeapTuple       htup;
4317         List       *result;
4318         List       *oldlist;
4319         MemoryContext oldcxt;
4320
4321         /* Quick exit if we already computed the list. */
4322         if (relation->rd_statvalid != 0)
4323                 return list_copy(relation->rd_statlist);
4324
4325         /*
4326          * We build the list we intend to return (in the caller's context) while
4327          * doing the scan.  After successfully completing the scan, we copy that
4328          * list into the relcache entry.  This avoids cache-context memory leakage
4329          * if we get some sort of error partway through.
4330          */
4331         result = NIL;
4332
4333         /*
4334          * Prepare to scan pg_statistic_ext for entries having stxrelid = this
4335          * rel.
4336          */
4337         ScanKeyInit(&skey,
4338                                 Anum_pg_statistic_ext_stxrelid,
4339                                 BTEqualStrategyNumber, F_OIDEQ,
4340                                 ObjectIdGetDatum(RelationGetRelid(relation)));
4341
4342         indrel = table_open(StatisticExtRelationId, AccessShareLock);
4343         indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
4344                                                                  NULL, 1, &skey);
4345
4346         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4347         {
4348                 Oid                     oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
4349
4350                 result = insert_ordered_oid(result, oid);
4351         }
4352
4353         systable_endscan(indscan);
4354
4355         table_close(indrel, AccessShareLock);
4356
4357         /* Now save a copy of the completed list in the relcache entry. */
4358         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4359         oldlist = relation->rd_statlist;
4360         relation->rd_statlist = list_copy(result);
4361
4362         relation->rd_statvalid = true;
4363         MemoryContextSwitchTo(oldcxt);
4364
4365         /* Don't leak the old list, if there is one */
4366         list_free(oldlist);
4367
4368         return result;
4369 }
4370
4371 /*
4372  * insert_ordered_oid
4373  *              Insert a new Oid into a sorted list of Oids, preserving ordering
4374  *
4375  * Building the ordered list this way is O(N^2), but with a pretty small
4376  * constant, so for the number of entries we expect it will probably be
4377  * faster than trying to apply qsort().  Most tables don't have very many
4378  * indexes...
4379  */
4380 static List *
4381 insert_ordered_oid(List *list, Oid datum)
4382 {
4383         ListCell   *prev;
4384
4385         /* Does the datum belong at the front? */
4386         if (list == NIL || datum < linitial_oid(list))
4387                 return lcons_oid(datum, list);
4388         /* No, so find the entry it belongs after */
4389         prev = list_head(list);
4390         for (;;)
4391         {
4392                 ListCell   *curr = lnext(prev);
4393
4394                 if (curr == NULL || datum < lfirst_oid(curr))
4395                         break;                          /* it belongs after 'prev', before 'curr' */
4396
4397                 prev = curr;
4398         }
4399         /* Insert datum into list after 'prev' */
4400         lappend_cell_oid(list, prev, datum);
4401         return list;
4402 }
4403
4404 /*
4405  * RelationSetIndexList -- externally force the index list contents
4406  *
4407  * This is used to temporarily override what we think the set of valid
4408  * indexes is (including the presence or absence of an OID index).
4409  * The forcing will be valid only until transaction commit or abort.
4410  *
4411  * This should only be applied to nailed relations, because in a non-nailed
4412  * relation the hacked index list could be lost at any time due to SI
4413  * messages.  In practice it is only used on pg_class (see REINDEX).
4414  *
4415  * It is up to the caller to make sure the given list is correctly ordered.
4416  *
4417  * We deliberately do not change rd_indexattr here: even when operating
4418  * with a temporary partial index list, HOT-update decisions must be made
4419  * correctly with respect to the full index set.  It is up to the caller
4420  * to ensure that a correct rd_indexattr set has been cached before first
4421  * calling RelationSetIndexList; else a subsequent inquiry might cause a
4422  * wrong rd_indexattr set to get computed and cached.  Likewise, we do not
4423  * touch rd_keyattr, rd_pkattr or rd_idattr.
4424  */
4425 void
4426 RelationSetIndexList(Relation relation, List *indexIds)
4427 {
4428         MemoryContext oldcxt;
4429
4430         Assert(relation->rd_isnailed);
4431         /* Copy the list into the cache context (could fail for lack of mem) */
4432         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4433         indexIds = list_copy(indexIds);
4434         MemoryContextSwitchTo(oldcxt);
4435         /* Okay to replace old list */
4436         list_free(relation->rd_indexlist);
4437         relation->rd_indexlist = indexIds;
4438
4439         /*
4440          * For the moment, assume the target rel hasn't got a pk or replica index.
4441          * We'll load them on demand in the API that wraps access to them.
4442          */
4443         relation->rd_pkindex = InvalidOid;
4444         relation->rd_replidindex = InvalidOid;
4445         relation->rd_indexvalid = 2;    /* mark list as forced */
4446         /* Flag relation as needing eoxact cleanup (to reset the list) */
4447         EOXactListAdd(relation);
4448 }
4449
4450 /*
4451  * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
4452  *
4453  * Returns InvalidOid if there is no such index.
4454  */
4455 Oid
4456 RelationGetPrimaryKeyIndex(Relation relation)
4457 {
4458         List       *ilist;
4459
4460         if (relation->rd_indexvalid == 0)
4461         {
4462                 /* RelationGetIndexList does the heavy lifting. */
4463                 ilist = RelationGetIndexList(relation);
4464                 list_free(ilist);
4465                 Assert(relation->rd_indexvalid != 0);
4466         }
4467
4468         return relation->rd_pkindex;
4469 }
4470
4471 /*
4472  * RelationGetReplicaIndex -- get OID of the relation's replica identity index
4473  *
4474  * Returns InvalidOid if there is no such index.
4475  */
4476 Oid
4477 RelationGetReplicaIndex(Relation relation)
4478 {
4479         List       *ilist;
4480
4481         if (relation->rd_indexvalid == 0)
4482         {
4483                 /* RelationGetIndexList does the heavy lifting. */
4484                 ilist = RelationGetIndexList(relation);
4485                 list_free(ilist);
4486                 Assert(relation->rd_indexvalid != 0);
4487         }
4488
4489         return relation->rd_replidindex;
4490 }
4491
4492 /*
4493  * RelationGetIndexExpressions -- get the index expressions for an index
4494  *
4495  * We cache the result of transforming pg_index.indexprs into a node tree.
4496  * If the rel is not an index or has no expressional columns, we return NIL.
4497  * Otherwise, the returned tree is copied into the caller's memory context.
4498  * (We don't want to return a pointer to the relcache copy, since it could
4499  * disappear due to relcache invalidation.)
4500  */
4501 List *
4502 RelationGetIndexExpressions(Relation relation)
4503 {
4504         List       *result;
4505         Datum           exprsDatum;
4506         bool            isnull;
4507         char       *exprsString;
4508         MemoryContext oldcxt;
4509
4510         /* Quick exit if we already computed the result. */
4511         if (relation->rd_indexprs)
4512                 return copyObject(relation->rd_indexprs);
4513
4514         /* Quick exit if there is nothing to do. */
4515         if (relation->rd_indextuple == NULL ||
4516                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4517                 return NIL;
4518
4519         /*
4520          * We build the tree we intend to return in the caller's context. After
4521          * successfully completing the work, we copy it into the relcache entry.
4522          * This avoids problems if we get some sort of error partway through.
4523          */
4524         exprsDatum = heap_getattr(relation->rd_indextuple,
4525                                                           Anum_pg_index_indexprs,
4526                                                           GetPgIndexDescriptor(),
4527                                                           &isnull);
4528         Assert(!isnull);
4529         exprsString = TextDatumGetCString(exprsDatum);
4530         result = (List *) stringToNode(exprsString);
4531         pfree(exprsString);
4532
4533         /*
4534          * Run the expressions through eval_const_expressions. This is not just an
4535          * optimization, but is necessary, because the planner will be comparing
4536          * them to similarly-processed qual clauses, and may fail to detect valid
4537          * matches without this.  We must not use canonicalize_qual, however,
4538          * since these aren't qual expressions.
4539          */
4540         result = (List *) eval_const_expressions(NULL, (Node *) result);
4541
4542         /* May as well fix opfuncids too */
4543         fix_opfuncids((Node *) result);
4544
4545         /* Now save a copy of the completed tree in the relcache entry. */
4546         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4547         relation->rd_indexprs = copyObject(result);
4548         MemoryContextSwitchTo(oldcxt);
4549
4550         return result;
4551 }
4552
4553 /*
4554  * RelationGetIndexPredicate -- get the index predicate for an index
4555  *
4556  * We cache the result of transforming pg_index.indpred into an implicit-AND
4557  * node tree (suitable for use in planning).
4558  * If the rel is not an index or has no predicate, we return NIL.
4559  * Otherwise, the returned tree is copied into the caller's memory context.
4560  * (We don't want to return a pointer to the relcache copy, since it could
4561  * disappear due to relcache invalidation.)
4562  */
4563 List *
4564 RelationGetIndexPredicate(Relation relation)
4565 {
4566         List       *result;
4567         Datum           predDatum;
4568         bool            isnull;
4569         char       *predString;
4570         MemoryContext oldcxt;
4571
4572         /* Quick exit if we already computed the result. */
4573         if (relation->rd_indpred)
4574                 return copyObject(relation->rd_indpred);
4575
4576         /* Quick exit if there is nothing to do. */
4577         if (relation->rd_indextuple == NULL ||
4578                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
4579                 return NIL;
4580
4581         /*
4582          * We build the tree we intend to return in the caller's context. After
4583          * successfully completing the work, we copy it into the relcache entry.
4584          * This avoids problems if we get some sort of error partway through.
4585          */
4586         predDatum = heap_getattr(relation->rd_indextuple,
4587                                                          Anum_pg_index_indpred,
4588                                                          GetPgIndexDescriptor(),
4589                                                          &isnull);
4590         Assert(!isnull);
4591         predString = TextDatumGetCString(predDatum);
4592         result = (List *) stringToNode(predString);
4593         pfree(predString);
4594
4595         /*
4596          * Run the expression through const-simplification and canonicalization.
4597          * This is not just an optimization, but is necessary, because the planner
4598          * will be comparing it to similarly-processed qual clauses, and may fail
4599          * to detect valid matches without this.  This must match the processing
4600          * done to qual clauses in preprocess_expression()!  (We can skip the
4601          * stuff involving subqueries, however, since we don't allow any in index
4602          * predicates.)
4603          */
4604         result = (List *) eval_const_expressions(NULL, (Node *) result);
4605
4606         result = (List *) canonicalize_qual((Expr *) result, false);
4607
4608         /* Also convert to implicit-AND format */
4609         result = make_ands_implicit((Expr *) result);
4610
4611         /* May as well fix opfuncids too */
4612         fix_opfuncids((Node *) result);
4613
4614         /* Now save a copy of the completed tree in the relcache entry. */
4615         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4616         relation->rd_indpred = copyObject(result);
4617         MemoryContextSwitchTo(oldcxt);
4618
4619         return result;
4620 }
4621
4622 /*
4623  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
4624  *
4625  * The result has a bit set for each attribute used anywhere in the index
4626  * definitions of all the indexes on this relation.  (This includes not only
4627  * simple index keys, but attributes used in expressions and partial-index
4628  * predicates.)
4629  *
4630  * Depending on attrKind, a bitmap covering the attnums for all index columns,
4631  * for all potential foreign key columns, or for all columns in the configured
4632  * replica identity index is returned.
4633  *
4634  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
4635  * we can include system attributes (e.g., OID) in the bitmap representation.
4636  *
4637  * Caller had better hold at least RowExclusiveLock on the target relation
4638  * to ensure it is safe (deadlock-free) for us to take locks on the relation's
4639  * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
4640  * that lock level doesn't guarantee a stable set of indexes, so we have to
4641  * be prepared to retry here in case of a change in the set of indexes.
4642  *
4643  * The returned result is palloc'd in the caller's memory context and should
4644  * be bms_free'd when not needed anymore.
4645  */
4646 Bitmapset *
4647 RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
4648 {
4649         Bitmapset  *indexattrs;         /* indexed columns */
4650         Bitmapset  *uindexattrs;        /* columns in unique indexes */
4651         Bitmapset  *pkindexattrs;       /* columns in the primary index */
4652         Bitmapset  *idindexattrs;       /* columns in the replica identity */
4653         List       *indexoidlist;
4654         List       *newindexoidlist;
4655         Oid                     relpkindex;
4656         Oid                     relreplindex;
4657         ListCell   *l;
4658         MemoryContext oldcxt;
4659
4660         /* Quick exit if we already computed the result. */
4661         if (relation->rd_indexattr != NULL)
4662         {
4663                 switch (attrKind)
4664                 {
4665                         case INDEX_ATTR_BITMAP_ALL:
4666                                 return bms_copy(relation->rd_indexattr);
4667                         case INDEX_ATTR_BITMAP_KEY:
4668                                 return bms_copy(relation->rd_keyattr);
4669                         case INDEX_ATTR_BITMAP_PRIMARY_KEY:
4670                                 return bms_copy(relation->rd_pkattr);
4671                         case INDEX_ATTR_BITMAP_IDENTITY_KEY:
4672                                 return bms_copy(relation->rd_idattr);
4673                         default:
4674                                 elog(ERROR, "unknown attrKind %u", attrKind);
4675                 }
4676         }
4677
4678         /* Fast path if definitely no indexes */
4679         if (!RelationGetForm(relation)->relhasindex)
4680                 return NULL;
4681
4682         /*
4683          * Get cached list of index OIDs. If we have to start over, we do so here.
4684          */
4685 restart:
4686         indexoidlist = RelationGetIndexList(relation);
4687
4688         /* Fall out if no indexes (but relhasindex was set) */
4689         if (indexoidlist == NIL)
4690                 return NULL;
4691
4692         /*
4693          * Copy the rd_pkindex and rd_replidindex values computed by
4694          * RelationGetIndexList before proceeding.  This is needed because a
4695          * relcache flush could occur inside index_open below, resetting the
4696          * fields managed by RelationGetIndexList.  We need to do the work with
4697          * stable values of these fields.
4698          */
4699         relpkindex = relation->rd_pkindex;
4700         relreplindex = relation->rd_replidindex;
4701
4702         /*
4703          * For each index, add referenced attributes to indexattrs.
4704          *
4705          * Note: we consider all indexes returned by RelationGetIndexList, even if
4706          * they are not indisready or indisvalid.  This is important because an
4707          * index for which CREATE INDEX CONCURRENTLY has just started must be
4708          * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
4709          * CONCURRENTLY is far enough along that we should ignore the index, it
4710          * won't be returned at all by RelationGetIndexList.
4711          */
4712         indexattrs = NULL;
4713         uindexattrs = NULL;
4714         pkindexattrs = NULL;
4715         idindexattrs = NULL;
4716         foreach(l, indexoidlist)
4717         {
4718                 Oid                     indexOid = lfirst_oid(l);
4719                 Relation        indexDesc;
4720                 IndexInfo  *indexInfo;
4721                 int                     i;
4722                 bool            isKey;          /* candidate key */
4723                 bool            isPK;           /* primary key */
4724                 bool            isIDKey;        /* replica identity index */
4725
4726                 indexDesc = index_open(indexOid, AccessShareLock);
4727
4728                 /* Extract index key information from the index's pg_index row */
4729                 indexInfo = BuildIndexInfo(indexDesc);
4730
4731                 /* Can this index be referenced by a foreign key? */
4732                 isKey = indexInfo->ii_Unique &&
4733                         indexInfo->ii_Expressions == NIL &&
4734                         indexInfo->ii_Predicate == NIL;
4735
4736                 /* Is this a primary key? */
4737                 isPK = (indexOid == relpkindex);
4738
4739                 /* Is this index the configured (or default) replica identity? */
4740                 isIDKey = (indexOid == relreplindex);
4741
4742                 /* Collect simple attribute references */
4743                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
4744                 {
4745                         int                     attrnum = indexInfo->ii_IndexAttrNumbers[i];
4746
4747                         /*
4748                          * Since we have covering indexes with non-key columns, we must
4749                          * handle them accurately here. non-key columns must be added into
4750                          * indexattrs, since they are in index, and HOT-update shouldn't
4751                          * miss them. Obviously, non-key columns couldn't be referenced by
4752                          * foreign key or identity key. Hence we do not include them into
4753                          * uindexattrs, pkindexattrs and idindexattrs bitmaps.
4754                          */
4755                         if (attrnum != 0)
4756                         {
4757                                 indexattrs = bms_add_member(indexattrs,
4758                                                                                         attrnum - FirstLowInvalidHeapAttributeNumber);
4759
4760                                 if (isKey && i < indexInfo->ii_NumIndexKeyAttrs)
4761                                         uindexattrs = bms_add_member(uindexattrs,
4762                                                                                                  attrnum - FirstLowInvalidHeapAttributeNumber);
4763
4764                                 if (isPK && i < indexInfo->ii_NumIndexKeyAttrs)
4765                                         pkindexattrs = bms_add_member(pkindexattrs,
4766                                                                                                   attrnum - FirstLowInvalidHeapAttributeNumber);
4767
4768                                 if (isIDKey && i < indexInfo->ii_NumIndexKeyAttrs)
4769                                         idindexattrs = bms_add_member(idindexattrs,
4770                                                                                                   attrnum - FirstLowInvalidHeapAttributeNumber);
4771                         }
4772                 }
4773
4774                 /* Collect all attributes used in expressions, too */
4775                 pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
4776
4777                 /* Collect all attributes in the index predicate, too */
4778                 pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
4779
4780                 index_close(indexDesc, AccessShareLock);
4781         }
4782
4783         /*
4784          * During one of the index_opens in the above loop, we might have received
4785          * a relcache flush event on this relcache entry, which might have been
4786          * signaling a change in the rel's index list.  If so, we'd better start
4787          * over to ensure we deliver up-to-date attribute bitmaps.
4788          */
4789         newindexoidlist = RelationGetIndexList(relation);
4790         if (equal(indexoidlist, newindexoidlist) &&
4791                 relpkindex == relation->rd_pkindex &&
4792                 relreplindex == relation->rd_replidindex)
4793         {
4794                 /* Still the same index set, so proceed */
4795                 list_free(newindexoidlist);
4796                 list_free(indexoidlist);
4797         }
4798         else
4799         {
4800                 /* Gotta do it over ... might as well not leak memory */
4801                 list_free(newindexoidlist);
4802                 list_free(indexoidlist);
4803                 bms_free(uindexattrs);
4804                 bms_free(pkindexattrs);
4805                 bms_free(idindexattrs);
4806                 bms_free(indexattrs);
4807
4808                 goto restart;
4809         }
4810
4811         /* Don't leak the old values of these bitmaps, if any */
4812         bms_free(relation->rd_indexattr);
4813         relation->rd_indexattr = NULL;
4814         bms_free(relation->rd_keyattr);
4815         relation->rd_keyattr = NULL;
4816         bms_free(relation->rd_pkattr);
4817         relation->rd_pkattr = NULL;
4818         bms_free(relation->rd_idattr);
4819         relation->rd_idattr = NULL;
4820
4821         /*
4822          * Now save copies of the bitmaps in the relcache entry.  We intentionally
4823          * set rd_indexattr last, because that's the one that signals validity of
4824          * the values; if we run out of memory before making that copy, we won't
4825          * leave the relcache entry looking like the other ones are valid but
4826          * empty.
4827          */
4828         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4829         relation->rd_keyattr = bms_copy(uindexattrs);
4830         relation->rd_pkattr = bms_copy(pkindexattrs);
4831         relation->rd_idattr = bms_copy(idindexattrs);
4832         relation->rd_indexattr = bms_copy(indexattrs);
4833         MemoryContextSwitchTo(oldcxt);
4834
4835         /* We return our original working copy for caller to play with */
4836         switch (attrKind)
4837         {
4838                 case INDEX_ATTR_BITMAP_ALL:
4839                         return indexattrs;
4840                 case INDEX_ATTR_BITMAP_KEY:
4841                         return uindexattrs;
4842                 case INDEX_ATTR_BITMAP_PRIMARY_KEY:
4843                         return pkindexattrs;
4844                 case INDEX_ATTR_BITMAP_IDENTITY_KEY:
4845                         return idindexattrs;
4846                 default:
4847                         elog(ERROR, "unknown attrKind %u", attrKind);
4848                         return NULL;
4849         }
4850 }
4851
4852 /*
4853  * RelationGetExclusionInfo -- get info about index's exclusion constraint
4854  *
4855  * This should be called only for an index that is known to have an
4856  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
4857  * context) of the exclusion operator OIDs, their underlying functions'
4858  * OIDs, and their strategy numbers in the index's opclasses.  We cache
4859  * all this information since it requires a fair amount of work to get.
4860  */
4861 void
4862 RelationGetExclusionInfo(Relation indexRelation,
4863                                                  Oid **operators,
4864                                                  Oid **procs,
4865                                                  uint16 **strategies)
4866 {
4867         int                     indnkeyatts;
4868         Oid                *ops;
4869         Oid                *funcs;
4870         uint16     *strats;
4871         Relation        conrel;
4872         SysScanDesc conscan;
4873         ScanKeyData skey[1];
4874         HeapTuple       htup;
4875         bool            found;
4876         MemoryContext oldcxt;
4877         int                     i;
4878
4879         indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
4880
4881         /* Allocate result space in caller context */
4882         *operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
4883         *procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
4884         *strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
4885
4886         /* Quick exit if we have the data cached already */
4887         if (indexRelation->rd_exclstrats != NULL)
4888         {
4889                 memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
4890                 memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
4891                 memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
4892                 return;
4893         }
4894
4895         /*
4896          * Search pg_constraint for the constraint associated with the index. To
4897          * make this not too painfully slow, we use the index on conrelid; that
4898          * will hold the parent relation's OID not the index's own OID.
4899          *
4900          * Note: if we wanted to rely on the constraint name matching the index's
4901          * name, we could just do a direct lookup using pg_constraint's unique
4902          * index.  For the moment it doesn't seem worth requiring that.
4903          */
4904         ScanKeyInit(&skey[0],
4905                                 Anum_pg_constraint_conrelid,
4906                                 BTEqualStrategyNumber, F_OIDEQ,
4907                                 ObjectIdGetDatum(indexRelation->rd_index->indrelid));
4908
4909         conrel = table_open(ConstraintRelationId, AccessShareLock);
4910         conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4911                                                                  NULL, 1, skey);
4912         found = false;
4913
4914         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4915         {
4916                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4917                 Datum           val;
4918                 bool            isnull;
4919                 ArrayType  *arr;
4920                 int                     nelem;
4921
4922                 /* We want the exclusion constraint owning the index */
4923                 if (conform->contype != CONSTRAINT_EXCLUSION ||
4924                         conform->conindid != RelationGetRelid(indexRelation))
4925                         continue;
4926
4927                 /* There should be only one */
4928                 if (found)
4929                         elog(ERROR, "unexpected exclusion constraint record found for rel %s",
4930                                  RelationGetRelationName(indexRelation));
4931                 found = true;
4932
4933                 /* Extract the operator OIDS from conexclop */
4934                 val = fastgetattr(htup,
4935                                                   Anum_pg_constraint_conexclop,
4936                                                   conrel->rd_att, &isnull);
4937                 if (isnull)
4938                         elog(ERROR, "null conexclop for rel %s",
4939                                  RelationGetRelationName(indexRelation));
4940
4941                 arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
4942                 nelem = ARR_DIMS(arr)[0];
4943                 if (ARR_NDIM(arr) != 1 ||
4944                         nelem != indnkeyatts ||
4945                         ARR_HASNULL(arr) ||
4946                         ARR_ELEMTYPE(arr) != OIDOID)
4947                         elog(ERROR, "conexclop is not a 1-D Oid array");
4948
4949                 memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
4950         }
4951
4952         systable_endscan(conscan);
4953         table_close(conrel, AccessShareLock);
4954
4955         if (!found)
4956                 elog(ERROR, "exclusion constraint record missing for rel %s",
4957                          RelationGetRelationName(indexRelation));
4958
4959         /* We need the func OIDs and strategy numbers too */
4960         for (i = 0; i < indnkeyatts; i++)
4961         {
4962                 funcs[i] = get_opcode(ops[i]);
4963                 strats[i] = get_op_opfamily_strategy(ops[i],
4964                                                                                          indexRelation->rd_opfamily[i]);
4965                 /* shouldn't fail, since it was checked at index creation */
4966                 if (strats[i] == InvalidStrategy)
4967                         elog(ERROR, "could not find strategy for operator %u in family %u",
4968                                  ops[i], indexRelation->rd_opfamily[i]);
4969         }
4970
4971         /* Save a copy of the results in the relcache entry. */
4972         oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
4973         indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
4974         indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
4975         indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
4976         memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
4977         memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
4978         memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
4979         MemoryContextSwitchTo(oldcxt);
4980 }
4981
4982 /*
4983  * Get publication actions for the given relation.
4984  */
4985 struct PublicationActions *
4986 GetRelationPublicationActions(Relation relation)
4987 {
4988         List       *puboids;
4989         ListCell   *lc;
4990         MemoryContext oldcxt;
4991         PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
4992
4993         if (relation->rd_pubactions)
4994                 return memcpy(pubactions, relation->rd_pubactions,
4995                                           sizeof(PublicationActions));
4996
4997         /* Fetch the publication membership info. */
4998         puboids = GetRelationPublications(RelationGetRelid(relation));
4999         puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
5000
5001         foreach(lc, puboids)
5002         {
5003                 Oid                     pubid = lfirst_oid(lc);
5004                 HeapTuple       tup;
5005                 Form_pg_publication pubform;
5006
5007                 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
5008
5009                 if (!HeapTupleIsValid(tup))
5010                         elog(ERROR, "cache lookup failed for publication %u", pubid);
5011
5012                 pubform = (Form_pg_publication) GETSTRUCT(tup);
5013
5014                 pubactions->pubinsert |= pubform->pubinsert;
5015                 pubactions->pubupdate |= pubform->pubupdate;
5016                 pubactions->pubdelete |= pubform->pubdelete;
5017                 pubactions->pubtruncate |= pubform->pubtruncate;
5018
5019                 ReleaseSysCache(tup);
5020
5021                 /*
5022                  * If we know everything is replicated, there is no point to check for
5023                  * other publications.
5024                  */
5025                 if (pubactions->pubinsert && pubactions->pubupdate &&
5026                         pubactions->pubdelete && pubactions->pubtruncate)
5027                         break;
5028         }
5029
5030         if (relation->rd_pubactions)
5031         {
5032                 pfree(relation->rd_pubactions);
5033                 relation->rd_pubactions = NULL;
5034         }
5035
5036         /* Now save copy of the actions in the relcache entry. */
5037         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5038         relation->rd_pubactions = palloc(sizeof(PublicationActions));
5039         memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
5040         MemoryContextSwitchTo(oldcxt);
5041
5042         return pubactions;
5043 }
5044
5045 /*
5046  * Routines to support ereport() reports of relation-related errors
5047  *
5048  * These could have been put into elog.c, but it seems like a module layering
5049  * violation to have elog.c calling relcache or syscache stuff --- and we
5050  * definitely don't want elog.h including rel.h.  So we put them here.
5051  */
5052
5053 /*
5054  * errtable --- stores schema_name and table_name of a table
5055  * within the current errordata.
5056  */
5057 int
5058 errtable(Relation rel)
5059 {
5060         err_generic_string(PG_DIAG_SCHEMA_NAME,
5061                                            get_namespace_name(RelationGetNamespace(rel)));
5062         err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
5063
5064         return 0;                                       /* return value does not matter */
5065 }
5066
5067 /*
5068  * errtablecol --- stores schema_name, table_name and column_name
5069  * of a table column within the current errordata.
5070  *
5071  * The column is specified by attribute number --- for most callers, this is
5072  * easier and less error-prone than getting the column name for themselves.
5073  */
5074 int
5075 errtablecol(Relation rel, int attnum)
5076 {
5077         TupleDesc       reldesc = RelationGetDescr(rel);
5078         const char *colname;
5079
5080         /* Use reldesc if it's a user attribute, else consult the catalogs */
5081         if (attnum > 0 && attnum <= reldesc->natts)
5082                 colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
5083         else
5084                 colname = get_attname(RelationGetRelid(rel), attnum, false);
5085
5086         return errtablecolname(rel, colname);
5087 }
5088
5089 /*
5090  * errtablecolname --- stores schema_name, table_name and column_name
5091  * of a table column within the current errordata, where the column name is
5092  * given directly rather than extracted from the relation's catalog data.
5093  *
5094  * Don't use this directly unless errtablecol() is inconvenient for some
5095  * reason.  This might possibly be needed during intermediate states in ALTER
5096  * TABLE, for instance.
5097  */
5098 int
5099 errtablecolname(Relation rel, const char *colname)
5100 {
5101         errtable(rel);
5102         err_generic_string(PG_DIAG_COLUMN_NAME, colname);
5103
5104         return 0;                                       /* return value does not matter */
5105 }
5106
5107 /*
5108  * errtableconstraint --- stores schema_name, table_name and constraint_name
5109  * of a table-related constraint within the current errordata.
5110  */
5111 int
5112 errtableconstraint(Relation rel, const char *conname)
5113 {
5114         errtable(rel);
5115         err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
5116
5117         return 0;                                       /* return value does not matter */
5118 }
5119
5120
5121 /*
5122  *      load_relcache_init_file, write_relcache_init_file
5123  *
5124  *              In late 1992, we started regularly having databases with more than
5125  *              a thousand classes in them.  With this number of classes, it became
5126  *              critical to do indexed lookups on the system catalogs.
5127  *
5128  *              Bootstrapping these lookups is very hard.  We want to be able to
5129  *              use an index on pg_attribute, for example, but in order to do so,
5130  *              we must have read pg_attribute for the attributes in the index,
5131  *              which implies that we need to use the index.
5132  *
5133  *              In order to get around the problem, we do the following:
5134  *
5135  *                 +  When the database system is initialized (at initdb time), we
5136  *                        don't use indexes.  We do sequential scans.
5137  *
5138  *                 +  When the backend is started up in normal mode, we load an image
5139  *                        of the appropriate relation descriptors, in internal format,
5140  *                        from an initialization file in the data/base/... directory.
5141  *
5142  *                 +  If the initialization file isn't there, then we create the
5143  *                        relation descriptors using sequential scans and write 'em to
5144  *                        the initialization file for use by subsequent backends.
5145  *
5146  *              As of Postgres 9.0, there is one local initialization file in each
5147  *              database, plus one shared initialization file for shared catalogs.
5148  *
5149  *              We could dispense with the initialization files and just build the
5150  *              critical reldescs the hard way on every backend startup, but that
5151  *              slows down backend startup noticeably.
5152  *
5153  *              We can in fact go further, and save more relcache entries than
5154  *              just the ones that are absolutely critical; this allows us to speed
5155  *              up backend startup by not having to build such entries the hard way.
5156  *              Presently, all the catalog and index entries that are referred to
5157  *              by catcaches are stored in the initialization files.
5158  *
5159  *              The same mechanism that detects when catcache and relcache entries
5160  *              need to be invalidated (due to catalog updates) also arranges to
5161  *              unlink the initialization files when the contents may be out of date.
5162  *              The files will then be rebuilt during the next backend startup.
5163  */
5164
5165 /*
5166  * load_relcache_init_file -- attempt to load cache from the shared
5167  * or local cache init file
5168  *
5169  * If successful, return true and set criticalRelcachesBuilt or
5170  * criticalSharedRelcachesBuilt to true.
5171  * If not successful, return false.
5172  *
5173  * NOTE: we assume we are already switched into CacheMemoryContext.
5174  */
5175 static bool
5176 load_relcache_init_file(bool shared)
5177 {
5178         FILE       *fp;
5179         char            initfilename[MAXPGPATH];
5180         Relation   *rels;
5181         int                     relno,
5182                                 num_rels,
5183                                 max_rels,
5184                                 nailed_rels,
5185                                 nailed_indexes,
5186                                 magic;
5187         int                     i;
5188
5189         if (shared)
5190                 snprintf(initfilename, sizeof(initfilename), "global/%s",
5191                                  RELCACHE_INIT_FILENAME);
5192         else
5193                 snprintf(initfilename, sizeof(initfilename), "%s/%s",
5194                                  DatabasePath, RELCACHE_INIT_FILENAME);
5195
5196         fp = AllocateFile(initfilename, PG_BINARY_R);
5197         if (fp == NULL)
5198                 return false;
5199
5200         /*
5201          * Read the index relcache entries from the file.  Note we will not enter
5202          * any of them into the cache if the read fails partway through; this
5203          * helps to guard against broken init files.
5204          */
5205         max_rels = 100;
5206         rels = (Relation *) palloc(max_rels * sizeof(Relation));
5207         num_rels = 0;
5208         nailed_rels = nailed_indexes = 0;
5209
5210         /* check for correct magic number (compatible version) */
5211         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5212                 goto read_failed;
5213         if (magic != RELCACHE_INIT_FILEMAGIC)
5214                 goto read_failed;
5215
5216         for (relno = 0;; relno++)
5217         {
5218                 Size            len;
5219                 size_t          nread;
5220                 Relation        rel;
5221                 Form_pg_class relform;
5222                 bool            has_not_null;
5223
5224                 /* first read the relation descriptor length */
5225                 nread = fread(&len, 1, sizeof(len), fp);
5226                 if (nread != sizeof(len))
5227                 {
5228                         if (nread == 0)
5229                                 break;                  /* end of file */
5230                         goto read_failed;
5231                 }
5232
5233                 /* safety check for incompatible relcache layout */
5234                 if (len != sizeof(RelationData))
5235                         goto read_failed;
5236
5237                 /* allocate another relcache header */
5238                 if (num_rels >= max_rels)
5239                 {
5240                         max_rels *= 2;
5241                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
5242                 }
5243
5244                 rel = rels[num_rels++] = (Relation) palloc(len);
5245
5246                 /* then, read the Relation structure */
5247                 if (fread(rel, 1, len, fp) != len)
5248                         goto read_failed;
5249
5250                 /* next read the relation tuple form */
5251                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5252                         goto read_failed;
5253
5254                 relform = (Form_pg_class) palloc(len);
5255                 if (fread(relform, 1, len, fp) != len)
5256                         goto read_failed;
5257
5258                 rel->rd_rel = relform;
5259
5260                 /* initialize attribute tuple forms */
5261                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
5262                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
5263
5264                 rel->rd_att->tdtypeid = relform->reltype;
5265                 rel->rd_att->tdtypmod = -1; /* unnecessary, but... */
5266
5267                 /* next read all the attribute tuple form data entries */
5268                 has_not_null = false;
5269                 for (i = 0; i < relform->relnatts; i++)
5270                 {
5271                         Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);
5272
5273                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5274                                 goto read_failed;
5275                         if (len != ATTRIBUTE_FIXED_PART_SIZE)
5276                                 goto read_failed;
5277                         if (fread(attr, 1, len, fp) != len)
5278                                 goto read_failed;
5279
5280                         has_not_null |= attr->attnotnull;
5281                 }
5282
5283                 /* next read the access method specific field */
5284                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5285                         goto read_failed;
5286                 if (len > 0)
5287                 {
5288                         rel->rd_options = palloc(len);
5289                         if (fread(rel->rd_options, 1, len, fp) != len)
5290                                 goto read_failed;
5291                         if (len != VARSIZE(rel->rd_options))
5292                                 goto read_failed;       /* sanity check */
5293                 }
5294                 else
5295                 {
5296                         rel->rd_options = NULL;
5297                 }
5298
5299                 /* mark not-null status */
5300                 if (has_not_null)
5301                 {
5302                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
5303
5304                         constr->has_not_null = true;
5305                         rel->rd_att->constr = constr;
5306                 }
5307
5308                 /*
5309                  * If it's an index, there's more to do.  Note we explicitly ignore
5310                  * partitioned indexes here.
5311                  */
5312                 if (rel->rd_rel->relkind == RELKIND_INDEX)
5313                 {
5314                         MemoryContext indexcxt;
5315                         Oid                *opfamily;
5316                         Oid                *opcintype;
5317                         RegProcedure *support;
5318                         int                     nsupport;
5319                         int16      *indoption;
5320                         Oid                *indcollation;
5321
5322                         /* Count nailed indexes to ensure we have 'em all */
5323                         if (rel->rd_isnailed)
5324                                 nailed_indexes++;
5325
5326                         /* next, read the pg_index tuple */
5327                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5328                                 goto read_failed;
5329
5330                         rel->rd_indextuple = (HeapTuple) palloc(len);
5331                         if (fread(rel->rd_indextuple, 1, len, fp) != len)
5332                                 goto read_failed;
5333
5334                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
5335                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
5336                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
5337
5338                         /*
5339                          * prepare index info context --- parameters should match
5340                          * RelationInitIndexAccessInfo
5341                          */
5342                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
5343                                                                                          "index info",
5344                                                                                          ALLOCSET_SMALL_SIZES);
5345                         rel->rd_indexcxt = indexcxt;
5346                         MemoryContextCopyAndSetIdentifier(indexcxt,
5347                                                                                           RelationGetRelationName(rel));
5348
5349                         /*
5350                          * Now we can fetch the index AM's API struct.  (We can't store
5351                          * that in the init file, since it contains function pointers that
5352                          * might vary across server executions.  Fortunately, it should be
5353                          * safe to call the amhandler even while bootstrapping indexes.)
5354                          */
5355                         InitIndexAmRoutine(rel);
5356
5357                         /* next, read the vector of opfamily OIDs */
5358                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5359                                 goto read_failed;
5360
5361                         opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
5362                         if (fread(opfamily, 1, len, fp) != len)
5363                                 goto read_failed;
5364
5365                         rel->rd_opfamily = opfamily;
5366
5367                         /* next, read the vector of opcintype OIDs */
5368                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5369                                 goto read_failed;
5370
5371                         opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
5372                         if (fread(opcintype, 1, len, fp) != len)
5373                                 goto read_failed;
5374
5375                         rel->rd_opcintype = opcintype;
5376
5377                         /* next, read the vector of support procedure OIDs */
5378                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5379                                 goto read_failed;
5380                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
5381                         if (fread(support, 1, len, fp) != len)
5382                                 goto read_failed;
5383
5384                         rel->rd_support = support;
5385
5386                         /* next, read the vector of collation OIDs */
5387                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5388                                 goto read_failed;
5389
5390                         indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
5391                         if (fread(indcollation, 1, len, fp) != len)
5392                                 goto read_failed;
5393
5394                         rel->rd_indcollation = indcollation;
5395
5396                         /* finally, read the vector of indoption values */
5397                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5398                                 goto read_failed;
5399
5400                         indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
5401                         if (fread(indoption, 1, len, fp) != len)
5402                                 goto read_failed;
5403
5404                         rel->rd_indoption = indoption;
5405
5406                         /* set up zeroed fmgr-info vector */
5407                         nsupport = relform->relnatts * rel->rd_indam->amsupport;
5408                         rel->rd_supportinfo = (FmgrInfo *)
5409                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
5410                 }
5411                 else
5412                 {
5413                         /* Count nailed rels to ensure we have 'em all */
5414                         if (rel->rd_isnailed)
5415                                 nailed_rels++;
5416
5417                         Assert(rel->rd_index == NULL);
5418                         Assert(rel->rd_indextuple == NULL);
5419                         Assert(rel->rd_indexcxt == NULL);
5420                         Assert(rel->rd_indam == NULL);
5421                         Assert(rel->rd_opfamily == NULL);
5422                         Assert(rel->rd_opcintype == NULL);
5423                         Assert(rel->rd_support == NULL);
5424                         Assert(rel->rd_supportinfo == NULL);
5425                         Assert(rel->rd_indoption == NULL);
5426                         Assert(rel->rd_indcollation == NULL);
5427                 }
5428
5429                 /*
5430                  * Rules and triggers are not saved (mainly because the internal
5431                  * format is complex and subject to change).  They must be rebuilt if
5432                  * needed by RelationCacheInitializePhase3.  This is not expected to
5433                  * be a big performance hit since few system catalogs have such. Ditto
5434                  * for RLS policy data, index expressions, predicates, exclusion info,
5435                  * and FDW info.
5436                  */
5437                 rel->rd_rules = NULL;
5438                 rel->rd_rulescxt = NULL;
5439                 rel->trigdesc = NULL;
5440                 rel->rd_rsdesc = NULL;
5441                 rel->rd_partkeycxt = NULL;
5442                 rel->rd_partkey = NULL;
5443                 rel->rd_pdcxt = NULL;
5444                 rel->rd_partdesc = NULL;
5445                 rel->rd_partcheck = NIL;
5446                 rel->rd_indexprs = NIL;
5447                 rel->rd_indpred = NIL;
5448                 rel->rd_exclops = NULL;
5449                 rel->rd_exclprocs = NULL;
5450                 rel->rd_exclstrats = NULL;
5451                 rel->rd_fdwroutine = NULL;
5452
5453                 /*
5454                  * Reset transient-state fields in the relcache entry
5455                  */
5456                 rel->rd_smgr = NULL;
5457                 if (rel->rd_isnailed)
5458                         rel->rd_refcnt = 1;
5459                 else
5460                         rel->rd_refcnt = 0;
5461                 rel->rd_indexvalid = 0;
5462                 rel->rd_fkeylist = NIL;
5463                 rel->rd_fkeyvalid = false;
5464                 rel->rd_indexlist = NIL;
5465                 rel->rd_pkindex = InvalidOid;
5466                 rel->rd_replidindex = InvalidOid;
5467                 rel->rd_indexattr = NULL;
5468                 rel->rd_keyattr = NULL;
5469                 rel->rd_pkattr = NULL;
5470                 rel->rd_idattr = NULL;
5471                 rel->rd_pubactions = NULL;
5472                 rel->rd_statvalid = false;
5473                 rel->rd_statlist = NIL;
5474                 rel->rd_createSubid = InvalidSubTransactionId;
5475                 rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
5476                 rel->rd_amcache = NULL;
5477                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
5478
5479                 /*
5480                  * Recompute lock and physical addressing info.  This is needed in
5481                  * case the pg_internal.init file was copied from some other database
5482                  * by CREATE DATABASE.
5483                  */
5484                 RelationInitLockInfo(rel);
5485                 RelationInitPhysicalAddr(rel);
5486         }
5487
5488         /*
5489          * We reached the end of the init file without apparent problem.  Did we
5490          * get the right number of nailed items?  This is a useful crosscheck in
5491          * case the set of critical rels or indexes changes.  However, that should
5492          * not happen in a normally-running system, so let's bleat if it does.
5493          *
5494          * For the shared init file, we're called before client authentication is
5495          * done, which means that elog(WARNING) will go only to the postmaster
5496          * log, where it's easily missed.  To ensure that developers notice bad
5497          * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
5498          * an Assert(false) there.
5499          */
5500         if (shared)
5501         {
5502                 if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
5503                         nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
5504                 {
5505                         elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
5506                                  nailed_rels, nailed_indexes,
5507                                  NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
5508                         /* Make sure we get developers' attention about this */
5509                         Assert(false);
5510                         /* In production builds, recover by bootstrapping the relcache */
5511                         goto read_failed;
5512                 }
5513         }
5514         else
5515         {
5516                 if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
5517                         nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
5518                 {
5519                         elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
5520                                  nailed_rels, nailed_indexes,
5521                                  NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
5522                         /* We don't need an Assert() in this case */
5523                         goto read_failed;
5524                 }
5525         }
5526
5527         /*
5528          * OK, all appears well.
5529          *
5530          * Now insert all the new relcache entries into the cache.
5531          */
5532         for (relno = 0; relno < num_rels; relno++)
5533         {
5534                 RelationCacheInsert(rels[relno], false);
5535         }
5536
5537         pfree(rels);
5538         FreeFile(fp);
5539
5540         if (shared)
5541                 criticalSharedRelcachesBuilt = true;
5542         else
5543                 criticalRelcachesBuilt = true;
5544         return true;
5545
5546         /*
5547          * init file is broken, so do it the hard way.  We don't bother trying to
5548          * free the clutter we just allocated; it's not in the relcache so it
5549          * won't hurt.
5550          */
5551 read_failed:
5552         pfree(rels);
5553         FreeFile(fp);
5554
5555         return false;
5556 }
5557
5558 /*
5559  * Write out a new initialization file with the current contents
5560  * of the relcache (either shared rels or local rels, as indicated).
5561  */
5562 static void
5563 write_relcache_init_file(bool shared)
5564 {
5565         FILE       *fp;
5566         char            tempfilename[MAXPGPATH];
5567         char            finalfilename[MAXPGPATH];
5568         int                     magic;
5569         HASH_SEQ_STATUS status;
5570         RelIdCacheEnt *idhentry;
5571         int                     i;
5572
5573         /*
5574          * If we have already received any relcache inval events, there's no
5575          * chance of succeeding so we may as well skip the whole thing.
5576          */
5577         if (relcacheInvalsReceived != 0L)
5578                 return;
5579
5580         /*
5581          * We must write a temporary file and rename it into place. Otherwise,
5582          * another backend starting at about the same time might crash trying to
5583          * read the partially-complete file.
5584          */
5585         if (shared)
5586         {
5587                 snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
5588                                  RELCACHE_INIT_FILENAME, MyProcPid);
5589                 snprintf(finalfilename, sizeof(finalfilename), "global/%s",
5590                                  RELCACHE_INIT_FILENAME);
5591         }
5592         else
5593         {
5594                 snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
5595                                  DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
5596                 snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
5597                                  DatabasePath, RELCACHE_INIT_FILENAME);
5598         }
5599
5600         unlink(tempfilename);           /* in case it exists w/wrong permissions */
5601
5602         fp = AllocateFile(tempfilename, PG_BINARY_W);
5603         if (fp == NULL)
5604         {
5605                 /*
5606                  * We used to consider this a fatal error, but we might as well
5607                  * continue with backend startup ...
5608                  */
5609                 ereport(WARNING,
5610                                 (errcode_for_file_access(),
5611                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
5612                                                 tempfilename),
5613                                  errdetail("Continuing anyway, but there's something wrong.")));
5614                 return;
5615         }
5616
5617         /*
5618          * Write a magic number to serve as a file version identifier.  We can
5619          * change the magic number whenever the relcache layout changes.
5620          */
5621         magic = RELCACHE_INIT_FILEMAGIC;
5622         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5623                 elog(FATAL, "could not write init file");
5624
5625         /*
5626          * Write all the appropriate reldescs (in no particular order).
5627          */
5628         hash_seq_init(&status, RelationIdCache);
5629
5630         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
5631         {
5632                 Relation        rel = idhentry->reldesc;
5633                 Form_pg_class relform = rel->rd_rel;
5634
5635                 /* ignore if not correct group */
5636                 if (relform->relisshared != shared)
5637                         continue;
5638
5639                 /*
5640                  * Ignore if not supposed to be in init file.  We can allow any shared
5641                  * relation that's been loaded so far to be in the shared init file,
5642                  * but unshared relations must be ones that should be in the local
5643                  * file per RelationIdIsInInitFile.  (Note: if you want to change the
5644                  * criterion for rels to be kept in the init file, see also inval.c.
5645                  * The reason for filtering here is to be sure that we don't put
5646                  * anything into the local init file for which a relcache inval would
5647                  * not cause invalidation of that init file.)
5648                  */
5649                 if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
5650                 {
5651                         /* Nailed rels had better get stored. */
5652                         Assert(!rel->rd_isnailed);
5653                         continue;
5654                 }
5655
5656                 /* first write the relcache entry proper */
5657                 write_item(rel, sizeof(RelationData), fp);
5658
5659                 /* next write the relation tuple form */
5660                 write_item(relform, CLASS_TUPLE_SIZE, fp);
5661
5662                 /* next, do all the attribute tuple form data entries */
5663                 for (i = 0; i < relform->relnatts; i++)
5664                 {
5665                         write_item(TupleDescAttr(rel->rd_att, i),
5666                                            ATTRIBUTE_FIXED_PART_SIZE, fp);
5667                 }
5668
5669                 /* next, do the access method specific field */
5670                 write_item(rel->rd_options,
5671                                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
5672                                    fp);
5673
5674                 /*
5675                  * If it's an index, there's more to do. Note we explicitly ignore
5676                  * partitioned indexes here.
5677                  */
5678                 if (rel->rd_rel->relkind == RELKIND_INDEX)
5679                 {
5680                         /* write the pg_index tuple */
5681                         /* we assume this was created by heap_copytuple! */
5682                         write_item(rel->rd_indextuple,
5683                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
5684                                            fp);
5685
5686                         /* next, write the vector of opfamily OIDs */
5687                         write_item(rel->rd_opfamily,
5688                                            relform->relnatts * sizeof(Oid),
5689                                            fp);
5690
5691                         /* next, write the vector of opcintype OIDs */
5692                         write_item(rel->rd_opcintype,
5693                                            relform->relnatts * sizeof(Oid),
5694                                            fp);
5695
5696                         /* next, write the vector of support procedure OIDs */
5697                         write_item(rel->rd_support,
5698                                            relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
5699                                            fp);
5700
5701                         /* next, write the vector of collation OIDs */
5702                         write_item(rel->rd_indcollation,
5703                                            relform->relnatts * sizeof(Oid),
5704                                            fp);
5705
5706                         /* finally, write the vector of indoption values */
5707                         write_item(rel->rd_indoption,
5708                                            relform->relnatts * sizeof(int16),
5709                                            fp);
5710                 }
5711         }
5712
5713         if (FreeFile(fp))
5714                 elog(FATAL, "could not write init file");
5715
5716         /*
5717          * Now we have to check whether the data we've so painstakingly
5718          * accumulated is already obsolete due to someone else's just-committed
5719          * catalog changes.  If so, we just delete the temp file and leave it to
5720          * the next backend to try again.  (Our own relcache entries will be
5721          * updated by SI message processing, but we can't be sure whether what we
5722          * wrote out was up-to-date.)
5723          *
5724          * This mustn't run concurrently with the code that unlinks an init file
5725          * and sends SI messages, so grab a serialization lock for the duration.
5726          */
5727         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
5728
5729         /* Make sure we have seen all incoming SI messages */
5730         AcceptInvalidationMessages();
5731
5732         /*
5733          * If we have received any SI relcache invals since backend start, assume
5734          * we may have written out-of-date data.
5735          */
5736         if (relcacheInvalsReceived == 0L)
5737         {
5738                 /*
5739                  * OK, rename the temp file to its final name, deleting any
5740                  * previously-existing init file.
5741                  *
5742                  * Note: a failure here is possible under Cygwin, if some other
5743                  * backend is holding open an unlinked-but-not-yet-gone init file. So
5744                  * treat this as a noncritical failure; just remove the useless temp
5745                  * file on failure.
5746                  */
5747                 if (rename(tempfilename, finalfilename) < 0)
5748                         unlink(tempfilename);
5749         }
5750         else
5751         {
5752                 /* Delete the already-obsolete temp file */
5753                 unlink(tempfilename);
5754         }
5755
5756         LWLockRelease(RelCacheInitLock);
5757 }
5758
5759 /* write a chunk of data preceded by its length */
5760 static void
5761 write_item(const void *data, Size len, FILE *fp)
5762 {
5763         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
5764                 elog(FATAL, "could not write init file");
5765         if (fwrite(data, 1, len, fp) != len)
5766                 elog(FATAL, "could not write init file");
5767 }
5768
5769 /*
5770  * Determine whether a given relation (identified by OID) is one of the ones
5771  * we should store in a relcache init file.
5772  *
5773  * We must cache all nailed rels, and for efficiency we should cache every rel
5774  * that supports a syscache.  The former set is almost but not quite a subset
5775  * of the latter. The special cases are relations where
5776  * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
5777  * which do not support any syscache.
5778  */
5779 bool
5780 RelationIdIsInInitFile(Oid relationId)
5781 {
5782         if (relationId == SharedSecLabelRelationId ||
5783                 relationId == TriggerRelidNameIndexId ||
5784                 relationId == DatabaseNameIndexId ||
5785                 relationId == SharedSecLabelObjectIndexId)
5786         {
5787                 /*
5788                  * If this Assert fails, we don't need the applicable special case
5789                  * anymore.
5790                  */
5791                 Assert(!RelationSupportsSysCache(relationId));
5792                 return true;
5793         }
5794         return RelationSupportsSysCache(relationId);
5795 }
5796
5797 /*
5798  * Tells whether any index for the relation is unlogged.
5799  *
5800  * Note: There doesn't seem to be any way to have an unlogged index attached
5801  * to a permanent table, but it seems best to keep this general so that it
5802  * returns sensible results even when they seem obvious (like for an unlogged
5803  * table) and to handle possible future unlogged indexes on permanent tables.
5804  */
5805 bool
5806 RelationHasUnloggedIndex(Relation rel)
5807 {
5808         List       *indexoidlist;
5809         ListCell   *indexoidscan;
5810         bool            result = false;
5811
5812         indexoidlist = RelationGetIndexList(rel);
5813
5814         foreach(indexoidscan, indexoidlist)
5815         {
5816                 Oid                     indexoid = lfirst_oid(indexoidscan);
5817                 HeapTuple       tp;
5818                 Form_pg_class reltup;
5819
5820                 tp = SearchSysCache1(RELOID, ObjectIdGetDatum(indexoid));
5821                 if (!HeapTupleIsValid(tp))
5822                         elog(ERROR, "cache lookup failed for relation %u", indexoid);
5823                 reltup = (Form_pg_class) GETSTRUCT(tp);
5824
5825                 if (reltup->relpersistence == RELPERSISTENCE_UNLOGGED)
5826                         result = true;
5827
5828                 ReleaseSysCache(tp);
5829
5830                 if (result == true)
5831                         break;
5832         }
5833
5834         list_free(indexoidlist);
5835
5836         return result;
5837 }
5838
5839 /*
5840  * Invalidate (remove) the init file during commit of a transaction that
5841  * changed one or more of the relation cache entries that are kept in the
5842  * local init file.
5843  *
5844  * To be safe against concurrent inspection or rewriting of the init file,
5845  * we must take RelCacheInitLock, then remove the old init file, then send
5846  * the SI messages that include relcache inval for such relations, and then
5847  * release RelCacheInitLock.  This serializes the whole affair against
5848  * write_relcache_init_file, so that we can be sure that any other process
5849  * that's concurrently trying to create a new init file won't move an
5850  * already-stale version into place after we unlink.  Also, because we unlink
5851  * before sending the SI messages, a backend that's currently starting cannot
5852  * read the now-obsolete init file and then miss the SI messages that will
5853  * force it to update its relcache entries.  (This works because the backend
5854  * startup sequence gets into the sinval array before trying to load the init
5855  * file.)
5856  *
5857  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
5858  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
5859  * send any pending SI messages between those calls.
5860  */
5861 void
5862 RelationCacheInitFilePreInvalidate(void)
5863 {
5864         char            localinitfname[MAXPGPATH];
5865         char            sharedinitfname[MAXPGPATH];
5866
5867         if (DatabasePath)
5868                 snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
5869                                  DatabasePath, RELCACHE_INIT_FILENAME);
5870         snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
5871                          RELCACHE_INIT_FILENAME);
5872
5873         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
5874
5875         /*
5876          * The files might not be there if no backend has been started since the
5877          * last removal.  But complain about failures other than ENOENT with
5878          * ERROR.  Fortunately, it's not too late to abort the transaction if we
5879          * can't get rid of the would-be-obsolete init file.
5880          */
5881         if (DatabasePath)
5882                 unlink_initfile(localinitfname, ERROR);
5883         unlink_initfile(sharedinitfname, ERROR);
5884 }
5885
5886 void
5887 RelationCacheInitFilePostInvalidate(void)
5888 {
5889         LWLockRelease(RelCacheInitLock);
5890 }
5891
5892 /*
5893  * Remove the init files during postmaster startup.
5894  *
5895  * We used to keep the init files across restarts, but that is unsafe in PITR
5896  * scenarios, and even in simple crash-recovery cases there are windows for
5897  * the init files to become out-of-sync with the database.  So now we just
5898  * remove them during startup and expect the first backend launch to rebuild
5899  * them.  Of course, this has to happen in each database of the cluster.
5900  */
5901 void
5902 RelationCacheInitFileRemove(void)
5903 {
5904         const char *tblspcdir = "pg_tblspc";
5905         DIR                *dir;
5906         struct dirent *de;
5907         char            path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
5908
5909         snprintf(path, sizeof(path), "global/%s",
5910                          RELCACHE_INIT_FILENAME);
5911         unlink_initfile(path, LOG);
5912
5913         /* Scan everything in the default tablespace */
5914         RelationCacheInitFileRemoveInDir("base");
5915
5916         /* Scan the tablespace link directory to find non-default tablespaces */
5917         dir = AllocateDir(tblspcdir);
5918
5919         while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
5920         {
5921                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
5922                 {
5923                         /* Scan the tablespace dir for per-database dirs */
5924                         snprintf(path, sizeof(path), "%s/%s/%s",
5925                                          tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
5926                         RelationCacheInitFileRemoveInDir(path);
5927                 }
5928         }
5929
5930         FreeDir(dir);
5931 }
5932
5933 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
5934 static void
5935 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
5936 {
5937         DIR                *dir;
5938         struct dirent *de;
5939         char            initfilename[MAXPGPATH * 2];
5940
5941         /* Scan the tablespace directory to find per-database directories */
5942         dir = AllocateDir(tblspcpath);
5943
5944         while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
5945         {
5946                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
5947                 {
5948                         /* Try to remove the init file in each database */
5949                         snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
5950                                          tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
5951                         unlink_initfile(initfilename, LOG);
5952                 }
5953         }
5954
5955         FreeDir(dir);
5956 }
5957
5958 static void
5959 unlink_initfile(const char *initfilename, int elevel)
5960 {
5961         if (unlink(initfilename) < 0)
5962         {
5963                 /* It might not be there, but log any error other than ENOENT */
5964                 if (errno != ENOENT)
5965                         ereport(elevel,
5966                                         (errcode_for_file_access(),
5967                                          errmsg("could not remove cache file \"%s\": %m",
5968                                                         initfilename)));
5969         }
5970 }