]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Move relpath() to libpgcommon
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - initialize shared-catalog entries
19  *              RelationCacheInitializePhase3   - finish initializing relcache
20  *              RelationIdGetRelation                   - get a reldesc by relation id
21  *              RelationClose                                   - close an open relation
22  *
23  * NOTES
24  *              The following code contains many undocumented hacks.  Please be
25  *              careful....
26  */
27 #include "postgres.h"
28
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/reloptions.h"
36 #include "access/sysattr.h"
37 #include "access/transam.h"
38 #include "access/xact.h"
39 #include "catalog/catalog.h"
40 #include "catalog/index.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/pg_amproc.h"
44 #include "catalog/pg_attrdef.h"
45 #include "catalog/pg_authid.h"
46 #include "catalog/pg_auth_members.h"
47 #include "catalog/pg_constraint.h"
48 #include "catalog/pg_database.h"
49 #include "catalog/pg_namespace.h"
50 #include "catalog/pg_opclass.h"
51 #include "catalog/pg_proc.h"
52 #include "catalog/pg_rewrite.h"
53 #include "catalog/pg_tablespace.h"
54 #include "catalog/pg_trigger.h"
55 #include "catalog/pg_type.h"
56 #include "catalog/schemapg.h"
57 #include "catalog/storage.h"
58 #include "commands/trigger.h"
59 #include "common/relpath.h"
60 #include "miscadmin.h"
61 #include "optimizer/clauses.h"
62 #include "optimizer/planmain.h"
63 #include "optimizer/prep.h"
64 #include "optimizer/var.h"
65 #include "rewrite/rewriteDefine.h"
66 #include "storage/lmgr.h"
67 #include "storage/smgr.h"
68 #include "utils/array.h"
69 #include "utils/builtins.h"
70 #include "utils/fmgroids.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/relmapper.h"
75 #include "utils/resowner_private.h"
76 #include "utils/syscache.h"
77 #include "utils/tqual.h"
78
79
80 /*
81  *              name of relcache init file(s), used to speed up backend startup
82  */
83 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
84
85 #define RELCACHE_INIT_FILEMAGIC         0x573266        /* version ID value */
86
87 /*
88  *              hardcoded tuple descriptors, contents generated by genbki.pl
89  */
90 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
91 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
92 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
93 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
94 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
95 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
96 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
97 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
98
99 /*
100  *              Hash tables that index the relation cache
101  *
102  *              We used to index the cache by both name and OID, but now there
103  *              is only an index by OID.
104  */
105 typedef struct relidcacheent
106 {
107         Oid                     reloid;
108         Relation        reldesc;
109 } RelIdCacheEnt;
110
111 static HTAB *RelationIdCache;
112
113 /*
114  * This flag is false until we have prepared the critical relcache entries
115  * that are needed to do indexscans on the tables read by relcache building.
116  */
117 bool            criticalRelcachesBuilt = false;
118
119 /*
120  * This flag is false until we have prepared the critical relcache entries
121  * for shared catalogs (which are the tables needed for login).
122  */
123 bool            criticalSharedRelcachesBuilt = false;
124
125 /*
126  * This counter counts relcache inval events received since backend startup
127  * (but only for rels that are actually in cache).      Presently, we use it only
128  * to detect whether data about to be written by write_relcache_init_file()
129  * might already be obsolete.
130  */
131 static long relcacheInvalsReceived = 0L;
132
133 /*
134  * This list remembers the OIDs of the non-shared relations cached in the
135  * database's local relcache init file.  Note that there is no corresponding
136  * list for the shared relcache init file, for reasons explained in the
137  * comments for RelationCacheInitFileRemove.
138  */
139 static List *initFileRelationIds = NIL;
140
141 /*
142  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
143  * cleanup work.  This list intentionally has limited size; if it overflows,
144  * we fall back to scanning the whole hashtable.  There is no value in a very
145  * large list because (1) at some point, a hash_seq_search scan is faster than
146  * retail lookups, and (2) the value of this is to reduce EOXact work for
147  * short transactions, which can't have dirtied all that many tables anyway.
148  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
149  * cleanup processing must be idempotent.
150  */
151 #define MAX_EOXACT_LIST 32
152 static Oid      eoxact_list[MAX_EOXACT_LIST];
153 static int      eoxact_list_len = 0;
154 static bool eoxact_list_overflowed = false;
155
156 #define EOXactListAdd(rel) \
157         do { \
158                 if (eoxact_list_len < MAX_EOXACT_LIST) \
159                         eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
160                 else \
161                         eoxact_list_overflowed = true; \
162         } while (0)
163
164
165 /*
166  *              macros to manipulate the lookup hashtables
167  */
168 #define RelationCacheInsert(RELATION)   \
169 do { \
170         RelIdCacheEnt *idhentry; bool found; \
171         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
172                                                                                    (void *) &(RELATION->rd_id), \
173                                                                                    HASH_ENTER, &found); \
174         /* used to give notice if found -- now just keep quiet */ \
175         idhentry->reldesc = RELATION; \
176 } while(0)
177
178 #define RelationIdCacheLookup(ID, RELATION) \
179 do { \
180         RelIdCacheEnt *hentry; \
181         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
182                                                                                  (void *) &(ID), \
183                                                                                  HASH_FIND, NULL); \
184         if (hentry) \
185                 RELATION = hentry->reldesc; \
186         else \
187                 RELATION = NULL; \
188 } while(0)
189
190 #define RelationCacheDelete(RELATION) \
191 do { \
192         RelIdCacheEnt *idhentry; \
193         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
194                                                                                    (void *) &(RELATION->rd_id), \
195                                                                                    HASH_REMOVE, NULL); \
196         if (idhentry == NULL) \
197                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
198 } while(0)
199
200
201 /*
202  * Special cache for opclass-related information
203  *
204  * Note: only default support procs get cached, ie, those with
205  * lefttype = righttype = opcintype.
206  */
207 typedef struct opclasscacheent
208 {
209         Oid                     opclassoid;             /* lookup key: OID of opclass */
210         bool            valid;                  /* set TRUE after successful fill-in */
211         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
212         Oid                     opcfamily;              /* OID of opclass's family */
213         Oid                     opcintype;              /* OID of opclass's declared input type */
214         RegProcedure *supportProcs; /* OIDs of support procedures */
215 } OpClassCacheEnt;
216
217 static HTAB *OpClassCache = NULL;
218
219
220 /* non-export function prototypes */
221
222 static void RelationDestroyRelation(Relation relation);
223 static void RelationClearRelation(Relation relation, bool rebuild);
224
225 static void RelationReloadIndexInfo(Relation relation);
226 static void RelationFlushRelation(Relation relation);
227 static void AtEOXact_cleanup(Relation relation, bool isCommit);
228 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
229                                         SubTransactionId mySubid, SubTransactionId parentSubid);
230 static bool load_relcache_init_file(bool shared);
231 static void write_relcache_init_file(bool shared);
232 static void write_item(const void *data, Size len, FILE *fp);
233
234 static void formrdesc(const char *relationName, Oid relationReltype,
235                   bool isshared, bool hasoids,
236                   int natts, const FormData_pg_attribute *attrs);
237
238 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
239 static Relation AllocateRelationDesc(Form_pg_class relp);
240 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
241 static void RelationBuildTupleDesc(Relation relation);
242 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
243 static void RelationInitPhysicalAddr(Relation relation);
244 static void load_critical_index(Oid indexoid, Oid heapoid);
245 static TupleDesc GetPgClassDescriptor(void);
246 static TupleDesc GetPgIndexDescriptor(void);
247 static void AttrDefaultFetch(Relation relation);
248 static void CheckConstraintFetch(Relation relation);
249 static List *insert_ordered_oid(List *list, Oid datum);
250 static void IndexSupportInitialize(oidvector *indclass,
251                                            RegProcedure *indexSupport,
252                                            Oid *opFamily,
253                                            Oid *opcInType,
254                                            StrategyNumber maxSupportNumber,
255                                            AttrNumber maxAttributeNumber);
256 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
257                                   StrategyNumber numSupport);
258 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
259 static void unlink_initfile(const char *initfilename);
260
261
262 /*
263  *              ScanPgRelation
264  *
265  *              This is used by RelationBuildDesc to find a pg_class
266  *              tuple matching targetRelId.  The caller must hold at least
267  *              AccessShareLock on the target relid to prevent concurrent-update
268  *              scenarios --- else our SnapshotNow scan might fail to find any
269  *              version that it thinks is live.
270  *
271  *              NB: the returned tuple has been copied into palloc'd storage
272  *              and must eventually be freed with heap_freetuple.
273  */
274 static HeapTuple
275 ScanPgRelation(Oid targetRelId, bool indexOK)
276 {
277         HeapTuple       pg_class_tuple;
278         Relation        pg_class_desc;
279         SysScanDesc pg_class_scan;
280         ScanKeyData key[1];
281
282         /*
283          * If something goes wrong during backend startup, we might find ourselves
284          * trying to read pg_class before we've selected a database.  That ain't
285          * gonna work, so bail out with a useful error message.  If this happens,
286          * it probably means a relcache entry that needs to be nailed isn't.
287          */
288         if (!OidIsValid(MyDatabaseId))
289                 elog(FATAL, "cannot read pg_class without having selected a database");
290
291         /*
292          * form a scan key
293          */
294         ScanKeyInit(&key[0],
295                                 ObjectIdAttributeNumber,
296                                 BTEqualStrategyNumber, F_OIDEQ,
297                                 ObjectIdGetDatum(targetRelId));
298
299         /*
300          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
301          * built the critical relcache entries (this includes initdb and startup
302          * without a pg_internal.init file).  The caller can also force a heap
303          * scan by setting indexOK == false.
304          */
305         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
306         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
307                                                                            indexOK && criticalRelcachesBuilt,
308                                                                            SnapshotNow,
309                                                                            1, key);
310
311         pg_class_tuple = systable_getnext(pg_class_scan);
312
313         /*
314          * Must copy tuple before releasing buffer.
315          */
316         if (HeapTupleIsValid(pg_class_tuple))
317                 pg_class_tuple = heap_copytuple(pg_class_tuple);
318
319         /* all done */
320         systable_endscan(pg_class_scan);
321         heap_close(pg_class_desc, AccessShareLock);
322
323         return pg_class_tuple;
324 }
325
326 /*
327  *              AllocateRelationDesc
328  *
329  *              This is used to allocate memory for a new relation descriptor
330  *              and initialize the rd_rel field from the given pg_class tuple.
331  */
332 static Relation
333 AllocateRelationDesc(Form_pg_class relp)
334 {
335         Relation        relation;
336         MemoryContext oldcxt;
337         Form_pg_class relationForm;
338
339         /* Relcache entries must live in CacheMemoryContext */
340         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
341
342         /*
343          * allocate and zero space for new relation descriptor
344          */
345         relation = (Relation) palloc0(sizeof(RelationData));
346
347         /* make sure relation is marked as having no open file yet */
348         relation->rd_smgr = NULL;
349
350         /*
351          * Copy the relation tuple form
352          *
353          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
354          * variable-length fields (relacl, reloptions) are NOT stored in the
355          * relcache --- there'd be little point in it, since we don't copy the
356          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
357          * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
358          * it from the syscache if you need it.  The same goes for the original
359          * form of reloptions (however, we do store the parsed form of reloptions
360          * in rd_options).
361          */
362         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
363
364         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
365
366         /* initialize relation tuple form */
367         relation->rd_rel = relationForm;
368
369         /* and allocate attribute tuple form storage */
370         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
371                                                                                            relationForm->relhasoids);
372         /* which we mark as a reference-counted tupdesc */
373         relation->rd_att->tdrefcount = 1;
374
375         MemoryContextSwitchTo(oldcxt);
376
377         return relation;
378 }
379
380 /*
381  * RelationParseRelOptions
382  *              Convert pg_class.reloptions into pre-parsed rd_options
383  *
384  * tuple is the real pg_class tuple (not rd_rel!) for relation
385  *
386  * Note: rd_rel and (if an index) rd_am must be valid already
387  */
388 static void
389 RelationParseRelOptions(Relation relation, HeapTuple tuple)
390 {
391         bytea      *options;
392
393         relation->rd_options = NULL;
394
395         /* Fall out if relkind should not have options */
396         switch (relation->rd_rel->relkind)
397         {
398                 case RELKIND_RELATION:
399                 case RELKIND_TOASTVALUE:
400                 case RELKIND_INDEX:
401                 case RELKIND_VIEW:
402                         break;
403                 default:
404                         return;
405         }
406
407         /*
408          * Fetch reloptions from tuple; have to use a hardwired descriptor because
409          * we might not have any other for pg_class yet (consider executing this
410          * code for pg_class itself)
411          */
412         options = extractRelOptions(tuple,
413                                                                 GetPgClassDescriptor(),
414                                                                 relation->rd_rel->relkind == RELKIND_INDEX ?
415                                                                 relation->rd_am->amoptions : InvalidOid);
416
417         /*
418          * Copy parsed data into CacheMemoryContext.  To guard against the
419          * possibility of leaks in the reloptions code, we want to do the actual
420          * parsing in the caller's memory context and copy the results into
421          * CacheMemoryContext after the fact.
422          */
423         if (options)
424         {
425                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
426                                                                                                   VARSIZE(options));
427                 memcpy(relation->rd_options, options, VARSIZE(options));
428                 pfree(options);
429         }
430 }
431
432 /*
433  *              RelationBuildTupleDesc
434  *
435  *              Form the relation's tuple descriptor from information in
436  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
437  */
438 static void
439 RelationBuildTupleDesc(Relation relation)
440 {
441         HeapTuple       pg_attribute_tuple;
442         Relation        pg_attribute_desc;
443         SysScanDesc pg_attribute_scan;
444         ScanKeyData skey[2];
445         int                     need;
446         TupleConstr *constr;
447         AttrDefault *attrdef = NULL;
448         int                     ndef = 0;
449
450         /* copy some fields from pg_class row to rd_att */
451         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
452         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
453         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
454
455         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
456                                                                                                 sizeof(TupleConstr));
457         constr->has_not_null = false;
458
459         /*
460          * Form a scan key that selects only user attributes (attnum > 0).
461          * (Eliminating system attribute rows at the index level is lots faster
462          * than fetching them.)
463          */
464         ScanKeyInit(&skey[0],
465                                 Anum_pg_attribute_attrelid,
466                                 BTEqualStrategyNumber, F_OIDEQ,
467                                 ObjectIdGetDatum(RelationGetRelid(relation)));
468         ScanKeyInit(&skey[1],
469                                 Anum_pg_attribute_attnum,
470                                 BTGreaterStrategyNumber, F_INT2GT,
471                                 Int16GetDatum(0));
472
473         /*
474          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
475          * built the critical relcache entries (this includes initdb and startup
476          * without a pg_internal.init file).
477          */
478         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
479         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
480                                                                                    AttributeRelidNumIndexId,
481                                                                                    criticalRelcachesBuilt,
482                                                                                    SnapshotNow,
483                                                                                    2, skey);
484
485         /*
486          * add attribute data to relation->rd_att
487          */
488         need = relation->rd_rel->relnatts;
489
490         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
491         {
492                 Form_pg_attribute attp;
493
494                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
495
496                 if (attp->attnum <= 0 ||
497                         attp->attnum > relation->rd_rel->relnatts)
498                         elog(ERROR, "invalid attribute number %d for %s",
499                                  attp->attnum, RelationGetRelationName(relation));
500
501                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
502                            attp,
503                            ATTRIBUTE_FIXED_PART_SIZE);
504
505                 /* Update constraint/default info */
506                 if (attp->attnotnull)
507                         constr->has_not_null = true;
508
509                 if (attp->atthasdef)
510                 {
511                         if (attrdef == NULL)
512                                 attrdef = (AttrDefault *)
513                                         MemoryContextAllocZero(CacheMemoryContext,
514                                                                                    relation->rd_rel->relnatts *
515                                                                                    sizeof(AttrDefault));
516                         attrdef[ndef].adnum = attp->attnum;
517                         attrdef[ndef].adbin = NULL;
518                         ndef++;
519                 }
520                 need--;
521                 if (need == 0)
522                         break;
523         }
524
525         /*
526          * end the scan and close the attribute relation
527          */
528         systable_endscan(pg_attribute_scan);
529         heap_close(pg_attribute_desc, AccessShareLock);
530
531         if (need != 0)
532                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
533                          need, RelationGetRelid(relation));
534
535         /*
536          * The attcacheoff values we read from pg_attribute should all be -1
537          * ("unknown").  Verify this if assert checking is on.  They will be
538          * computed when and if needed during tuple access.
539          */
540 #ifdef USE_ASSERT_CHECKING
541         {
542                 int                     i;
543
544                 for (i = 0; i < relation->rd_rel->relnatts; i++)
545                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
546         }
547 #endif
548
549         /*
550          * However, we can easily set the attcacheoff value for the first
551          * attribute: it must be zero.  This eliminates the need for special cases
552          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
553          */
554         if (relation->rd_rel->relnatts > 0)
555                 relation->rd_att->attrs[0]->attcacheoff = 0;
556
557         /*
558          * Set up constraint/default info
559          */
560         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
561         {
562                 relation->rd_att->constr = constr;
563
564                 if (ndef > 0)                   /* DEFAULTs */
565                 {
566                         if (ndef < relation->rd_rel->relnatts)
567                                 constr->defval = (AttrDefault *)
568                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
569                         else
570                                 constr->defval = attrdef;
571                         constr->num_defval = ndef;
572                         AttrDefaultFetch(relation);
573                 }
574                 else
575                         constr->num_defval = 0;
576
577                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
578                 {
579                         constr->num_check = relation->rd_rel->relchecks;
580                         constr->check = (ConstrCheck *)
581                                 MemoryContextAllocZero(CacheMemoryContext,
582                                                                         constr->num_check * sizeof(ConstrCheck));
583                         CheckConstraintFetch(relation);
584                 }
585                 else
586                         constr->num_check = 0;
587         }
588         else
589         {
590                 pfree(constr);
591                 relation->rd_att->constr = NULL;
592         }
593 }
594
595 /*
596  *              RelationBuildRuleLock
597  *
598  *              Form the relation's rewrite rules from information in
599  *              the pg_rewrite system catalog.
600  *
601  * Note: The rule parsetrees are potentially very complex node structures.
602  * To allow these trees to be freed when the relcache entry is flushed,
603  * we make a private memory context to hold the RuleLock information for
604  * each relcache entry that has associated rules.  The context is used
605  * just for rule info, not for any other subsidiary data of the relcache
606  * entry, because that keeps the update logic in RelationClearRelation()
607  * manageable.  The other subsidiary data structures are simple enough
608  * to be easy to free explicitly, anyway.
609  */
610 static void
611 RelationBuildRuleLock(Relation relation)
612 {
613         MemoryContext rulescxt;
614         MemoryContext oldcxt;
615         HeapTuple       rewrite_tuple;
616         Relation        rewrite_desc;
617         TupleDesc       rewrite_tupdesc;
618         SysScanDesc rewrite_scan;
619         ScanKeyData key;
620         RuleLock   *rulelock;
621         int                     numlocks;
622         RewriteRule **rules;
623         int                     maxlocks;
624
625         /*
626          * Make the private context.  Parameters are set on the assumption that
627          * it'll probably not contain much data.
628          */
629         rulescxt = AllocSetContextCreate(CacheMemoryContext,
630                                                                          RelationGetRelationName(relation),
631                                                                          ALLOCSET_SMALL_MINSIZE,
632                                                                          ALLOCSET_SMALL_INITSIZE,
633                                                                          ALLOCSET_SMALL_MAXSIZE);
634         relation->rd_rulescxt = rulescxt;
635
636         /*
637          * allocate an array to hold the rewrite rules (the array is extended if
638          * necessary)
639          */
640         maxlocks = 4;
641         rules = (RewriteRule **)
642                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
643         numlocks = 0;
644
645         /*
646          * form a scan key
647          */
648         ScanKeyInit(&key,
649                                 Anum_pg_rewrite_ev_class,
650                                 BTEqualStrategyNumber, F_OIDEQ,
651                                 ObjectIdGetDatum(RelationGetRelid(relation)));
652
653         /*
654          * open pg_rewrite and begin a scan
655          *
656          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
657          * be reading the rules in name order, except possibly during
658          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
659          * ensures that rules will be fired in name order.
660          */
661         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
662         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
663         rewrite_scan = systable_beginscan(rewrite_desc,
664                                                                           RewriteRelRulenameIndexId,
665                                                                           true, SnapshotNow,
666                                                                           1, &key);
667
668         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
669         {
670                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
671                 bool            isnull;
672                 Datum           rule_datum;
673                 char       *rule_str;
674                 RewriteRule *rule;
675
676                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
677                                                                                                   sizeof(RewriteRule));
678
679                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
680
681                 rule->event = rewrite_form->ev_type - '0';
682                 rule->attrno = rewrite_form->ev_attr;
683                 rule->enabled = rewrite_form->ev_enabled;
684                 rule->isInstead = rewrite_form->is_instead;
685
686                 /*
687                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
688                  * rule strings are often large enough to be toasted.  To avoid
689                  * leaking memory in the caller's context, do the detoasting here so
690                  * we can free the detoasted version.
691                  */
692                 rule_datum = heap_getattr(rewrite_tuple,
693                                                                   Anum_pg_rewrite_ev_action,
694                                                                   rewrite_tupdesc,
695                                                                   &isnull);
696                 Assert(!isnull);
697                 rule_str = TextDatumGetCString(rule_datum);
698                 oldcxt = MemoryContextSwitchTo(rulescxt);
699                 rule->actions = (List *) stringToNode(rule_str);
700                 MemoryContextSwitchTo(oldcxt);
701                 pfree(rule_str);
702
703                 rule_datum = heap_getattr(rewrite_tuple,
704                                                                   Anum_pg_rewrite_ev_qual,
705                                                                   rewrite_tupdesc,
706                                                                   &isnull);
707                 Assert(!isnull);
708                 rule_str = TextDatumGetCString(rule_datum);
709                 oldcxt = MemoryContextSwitchTo(rulescxt);
710                 rule->qual = (Node *) stringToNode(rule_str);
711                 MemoryContextSwitchTo(oldcxt);
712                 pfree(rule_str);
713
714                 /*
715                  * We want the rule's table references to be checked as though by the
716                  * table owner, not the user referencing the rule.      Therefore, scan
717                  * through the rule's actions and set the checkAsUser field on all
718                  * rtable entries.      We have to look at the qual as well, in case it
719                  * contains sublinks.
720                  *
721                  * The reason for doing this when the rule is loaded, rather than when
722                  * it is stored, is that otherwise ALTER TABLE OWNER would have to
723                  * grovel through stored rules to update checkAsUser fields. Scanning
724                  * the rule tree during load is relatively cheap (compared to
725                  * constructing it in the first place), so we do it here.
726                  */
727                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
728                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
729
730                 if (numlocks >= maxlocks)
731                 {
732                         maxlocks *= 2;
733                         rules = (RewriteRule **)
734                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
735                 }
736                 rules[numlocks++] = rule;
737         }
738
739         /*
740          * end the scan and close the attribute relation
741          */
742         systable_endscan(rewrite_scan);
743         heap_close(rewrite_desc, AccessShareLock);
744
745         /*
746          * there might not be any rules (if relhasrules is out-of-date)
747          */
748         if (numlocks == 0)
749         {
750                 relation->rd_rules = NULL;
751                 relation->rd_rulescxt = NULL;
752                 MemoryContextDelete(rulescxt);
753                 return;
754         }
755
756         /*
757          * form a RuleLock and insert into relation
758          */
759         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
760         rulelock->numLocks = numlocks;
761         rulelock->rules = rules;
762
763         relation->rd_rules = rulelock;
764 }
765
766 /*
767  *              equalRuleLocks
768  *
769  *              Determine whether two RuleLocks are equivalent
770  *
771  *              Probably this should be in the rules code someplace...
772  */
773 static bool
774 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
775 {
776         int                     i;
777
778         /*
779          * As of 7.3 we assume the rule ordering is repeatable, because
780          * RelationBuildRuleLock should read 'em in a consistent order.  So just
781          * compare corresponding slots.
782          */
783         if (rlock1 != NULL)
784         {
785                 if (rlock2 == NULL)
786                         return false;
787                 if (rlock1->numLocks != rlock2->numLocks)
788                         return false;
789                 for (i = 0; i < rlock1->numLocks; i++)
790                 {
791                         RewriteRule *rule1 = rlock1->rules[i];
792                         RewriteRule *rule2 = rlock2->rules[i];
793
794                         if (rule1->ruleId != rule2->ruleId)
795                                 return false;
796                         if (rule1->event != rule2->event)
797                                 return false;
798                         if (rule1->attrno != rule2->attrno)
799                                 return false;
800                         if (rule1->enabled != rule2->enabled)
801                                 return false;
802                         if (rule1->isInstead != rule2->isInstead)
803                                 return false;
804                         if (!equal(rule1->qual, rule2->qual))
805                                 return false;
806                         if (!equal(rule1->actions, rule2->actions))
807                                 return false;
808                 }
809         }
810         else if (rlock2 != NULL)
811                 return false;
812         return true;
813 }
814
815
816 /*
817  *              RelationBuildDesc
818  *
819  *              Build a relation descriptor.  The caller must hold at least
820  *              AccessShareLock on the target relid.
821  *
822  *              The new descriptor is inserted into the hash table if insertIt is true.
823  *
824  *              Returns NULL if no pg_class row could be found for the given relid
825  *              (suggesting we are trying to access a just-deleted relation).
826  *              Any other error is reported via elog.
827  */
828 static Relation
829 RelationBuildDesc(Oid targetRelId, bool insertIt)
830 {
831         Relation        relation;
832         Oid                     relid;
833         HeapTuple       pg_class_tuple;
834         Form_pg_class relp;
835
836         /*
837          * find the tuple in pg_class corresponding to the given relation id
838          */
839         pg_class_tuple = ScanPgRelation(targetRelId, true);
840
841         /*
842          * if no such tuple exists, return NULL
843          */
844         if (!HeapTupleIsValid(pg_class_tuple))
845                 return NULL;
846
847         /*
848          * get information from the pg_class_tuple
849          */
850         relid = HeapTupleGetOid(pg_class_tuple);
851         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
852         Assert(relid == targetRelId);
853
854         /*
855          * allocate storage for the relation descriptor, and copy pg_class_tuple
856          * to relation->rd_rel.
857          */
858         relation = AllocateRelationDesc(relp);
859
860         /*
861          * initialize the relation's relation id (relation->rd_id)
862          */
863         RelationGetRelid(relation) = relid;
864
865         /*
866          * normal relations are not nailed into the cache; nor can a pre-existing
867          * relation be new.  It could be temp though.  (Actually, it could be new
868          * too, but it's okay to forget that fact if forced to flush the entry.)
869          */
870         relation->rd_refcnt = 0;
871         relation->rd_isnailed = false;
872         relation->rd_createSubid = InvalidSubTransactionId;
873         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
874         switch (relation->rd_rel->relpersistence)
875         {
876                 case RELPERSISTENCE_UNLOGGED:
877                 case RELPERSISTENCE_PERMANENT:
878                         relation->rd_backend = InvalidBackendId;
879                         relation->rd_islocaltemp = false;
880                         break;
881                 case RELPERSISTENCE_TEMP:
882                         if (isTempOrToastNamespace(relation->rd_rel->relnamespace))
883                         {
884                                 relation->rd_backend = MyBackendId;
885                                 relation->rd_islocaltemp = true;
886                         }
887                         else
888                         {
889                                 /*
890                                  * If it's a temp table, but not one of ours, we have to use
891                                  * the slow, grotty method to figure out the owning backend.
892                                  *
893                                  * Note: it's possible that rd_backend gets set to MyBackendId
894                                  * here, in case we are looking at a pg_class entry left over
895                                  * from a crashed backend that coincidentally had the same
896                                  * BackendId we're using.  We should *not* consider such a
897                                  * table to be "ours"; this is why we need the separate
898                                  * rd_islocaltemp flag.  The pg_class entry will get flushed
899                                  * if/when we clean out the corresponding temp table namespace
900                                  * in preparation for using it.
901                                  */
902                                 relation->rd_backend =
903                                         GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
904                                 Assert(relation->rd_backend != InvalidBackendId);
905                                 relation->rd_islocaltemp = false;
906                         }
907                         break;
908                 default:
909                         elog(ERROR, "invalid relpersistence: %c",
910                                  relation->rd_rel->relpersistence);
911                         break;
912         }
913
914         /*
915          * initialize the tuple descriptor (relation->rd_att).
916          */
917         RelationBuildTupleDesc(relation);
918
919         /*
920          * Fetch rules and triggers that affect this relation
921          */
922         if (relation->rd_rel->relhasrules)
923                 RelationBuildRuleLock(relation);
924         else
925         {
926                 relation->rd_rules = NULL;
927                 relation->rd_rulescxt = NULL;
928         }
929
930         if (relation->rd_rel->relhastriggers)
931                 RelationBuildTriggers(relation);
932         else
933                 relation->trigdesc = NULL;
934
935         /*
936          * if it's an index, initialize index-related information
937          */
938         if (OidIsValid(relation->rd_rel->relam))
939                 RelationInitIndexAccessInfo(relation);
940
941         /* extract reloptions if any */
942         RelationParseRelOptions(relation, pg_class_tuple);
943
944         /*
945          * initialize the relation lock manager information
946          */
947         RelationInitLockInfo(relation);         /* see lmgr.c */
948
949         /*
950          * initialize physical addressing information for the relation
951          */
952         RelationInitPhysicalAddr(relation);
953
954         /* make sure relation is marked as having no open file yet */
955         relation->rd_smgr = NULL;
956
957         /*
958          * now we can free the memory allocated for pg_class_tuple
959          */
960         heap_freetuple(pg_class_tuple);
961
962         /*
963          * Insert newly created relation into relcache hash table, if requested.
964          */
965         if (insertIt)
966                 RelationCacheInsert(relation);
967
968         /* It's fully valid */
969         relation->rd_isvalid = true;
970
971         return relation;
972 }
973
974 /*
975  * Initialize the physical addressing info (RelFileNode) for a relcache entry
976  *
977  * Note: at the physical level, relations in the pg_global tablespace must
978  * be treated as shared, even if relisshared isn't set.  Hence we do not
979  * look at relisshared here.
980  */
981 static void
982 RelationInitPhysicalAddr(Relation relation)
983 {
984         if (relation->rd_rel->reltablespace)
985                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
986         else
987                 relation->rd_node.spcNode = MyDatabaseTableSpace;
988         if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
989                 relation->rd_node.dbNode = InvalidOid;
990         else
991                 relation->rd_node.dbNode = MyDatabaseId;
992         if (relation->rd_rel->relfilenode)
993                 relation->rd_node.relNode = relation->rd_rel->relfilenode;
994         else
995         {
996                 /* Consult the relation mapper */
997                 relation->rd_node.relNode =
998                         RelationMapOidToFilenode(relation->rd_id,
999                                                                          relation->rd_rel->relisshared);
1000                 if (!OidIsValid(relation->rd_node.relNode))
1001                         elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1002                                  RelationGetRelationName(relation), relation->rd_id);
1003         }
1004 }
1005
1006 /*
1007  * Initialize index-access-method support data for an index relation
1008  */
1009 void
1010 RelationInitIndexAccessInfo(Relation relation)
1011 {
1012         HeapTuple       tuple;
1013         Form_pg_am      aform;
1014         Datum           indcollDatum;
1015         Datum           indclassDatum;
1016         Datum           indoptionDatum;
1017         bool            isnull;
1018         oidvector  *indcoll;
1019         oidvector  *indclass;
1020         int2vector *indoption;
1021         MemoryContext indexcxt;
1022         MemoryContext oldcontext;
1023         int                     natts;
1024         uint16          amsupport;
1025
1026         /*
1027          * Make a copy of the pg_index entry for the index.  Since pg_index
1028          * contains variable-length and possibly-null fields, we have to do this
1029          * honestly rather than just treating it as a Form_pg_index struct.
1030          */
1031         tuple = SearchSysCache1(INDEXRELID,
1032                                                         ObjectIdGetDatum(RelationGetRelid(relation)));
1033         if (!HeapTupleIsValid(tuple))
1034                 elog(ERROR, "cache lookup failed for index %u",
1035                          RelationGetRelid(relation));
1036         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1037         relation->rd_indextuple = heap_copytuple(tuple);
1038         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1039         MemoryContextSwitchTo(oldcontext);
1040         ReleaseSysCache(tuple);
1041
1042         /*
1043          * Make a copy of the pg_am entry for the index's access method
1044          */
1045         tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1046         if (!HeapTupleIsValid(tuple))
1047                 elog(ERROR, "cache lookup failed for access method %u",
1048                          relation->rd_rel->relam);
1049         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
1050         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
1051         ReleaseSysCache(tuple);
1052         relation->rd_am = aform;
1053
1054         natts = relation->rd_rel->relnatts;
1055         if (natts != relation->rd_index->indnatts)
1056                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
1057                          RelationGetRelid(relation));
1058         amsupport = aform->amsupport;
1059
1060         /*
1061          * Make the private context to hold index access info.  The reason we need
1062          * a context, and not just a couple of pallocs, is so that we won't leak
1063          * any subsidiary info attached to fmgr lookup records.
1064          *
1065          * Context parameters are set on the assumption that it'll probably not
1066          * contain much data.
1067          */
1068         indexcxt = AllocSetContextCreate(CacheMemoryContext,
1069                                                                          RelationGetRelationName(relation),
1070                                                                          ALLOCSET_SMALL_MINSIZE,
1071                                                                          ALLOCSET_SMALL_INITSIZE,
1072                                                                          ALLOCSET_SMALL_MAXSIZE);
1073         relation->rd_indexcxt = indexcxt;
1074
1075         /*
1076          * Allocate arrays to hold data
1077          */
1078         relation->rd_aminfo = (RelationAmInfo *)
1079                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
1080
1081         relation->rd_opfamily = (Oid *)
1082                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1083         relation->rd_opcintype = (Oid *)
1084                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1085
1086         if (amsupport > 0)
1087         {
1088                 int                     nsupport = natts * amsupport;
1089
1090                 relation->rd_support = (RegProcedure *)
1091                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1092                 relation->rd_supportinfo = (FmgrInfo *)
1093                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1094         }
1095         else
1096         {
1097                 relation->rd_support = NULL;
1098                 relation->rd_supportinfo = NULL;
1099         }
1100
1101         relation->rd_indcollation = (Oid *)
1102                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1103
1104         relation->rd_indoption = (int16 *)
1105                 MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
1106
1107         /*
1108          * indcollation cannot be referenced directly through the C struct,
1109          * because it comes after the variable-width indkey field.      Must extract
1110          * the datum the hard way...
1111          */
1112         indcollDatum = fastgetattr(relation->rd_indextuple,
1113                                                            Anum_pg_index_indcollation,
1114                                                            GetPgIndexDescriptor(),
1115                                                            &isnull);
1116         Assert(!isnull);
1117         indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1118         memcpy(relation->rd_indcollation, indcoll->values, natts * sizeof(Oid));
1119
1120         /*
1121          * indclass cannot be referenced directly through the C struct, because it
1122          * comes after the variable-width indkey field.  Must extract the datum
1123          * the hard way...
1124          */
1125         indclassDatum = fastgetattr(relation->rd_indextuple,
1126                                                                 Anum_pg_index_indclass,
1127                                                                 GetPgIndexDescriptor(),
1128                                                                 &isnull);
1129         Assert(!isnull);
1130         indclass = (oidvector *) DatumGetPointer(indclassDatum);
1131
1132         /*
1133          * Fill the support procedure OID array, as well as the info about
1134          * opfamilies and opclass input types.  (aminfo and supportinfo are left
1135          * as zeroes, and are filled on-the-fly when used)
1136          */
1137         IndexSupportInitialize(indclass, relation->rd_support,
1138                                                    relation->rd_opfamily, relation->rd_opcintype,
1139                                                    amsupport, natts);
1140
1141         /*
1142          * Similarly extract indoption and copy it to the cache entry
1143          */
1144         indoptionDatum = fastgetattr(relation->rd_indextuple,
1145                                                                  Anum_pg_index_indoption,
1146                                                                  GetPgIndexDescriptor(),
1147                                                                  &isnull);
1148         Assert(!isnull);
1149         indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1150         memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
1151
1152         /*
1153          * expressions, predicate, exclusion caches will be filled later
1154          */
1155         relation->rd_indexprs = NIL;
1156         relation->rd_indpred = NIL;
1157         relation->rd_exclops = NULL;
1158         relation->rd_exclprocs = NULL;
1159         relation->rd_exclstrats = NULL;
1160         relation->rd_amcache = NULL;
1161 }
1162
1163 /*
1164  * IndexSupportInitialize
1165  *              Initializes an index's cached opclass information,
1166  *              given the index's pg_index.indclass entry.
1167  *
1168  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1169  * which are arrays allocated by the caller.
1170  *
1171  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1172  * indicate the size of the arrays it has allocated --- but in practice these
1173  * numbers must always match those obtainable from the system catalog entries
1174  * for the index and access method.
1175  */
1176 static void
1177 IndexSupportInitialize(oidvector *indclass,
1178                                            RegProcedure *indexSupport,
1179                                            Oid *opFamily,
1180                                            Oid *opcInType,
1181                                            StrategyNumber maxSupportNumber,
1182                                            AttrNumber maxAttributeNumber)
1183 {
1184         int                     attIndex;
1185
1186         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1187         {
1188                 OpClassCacheEnt *opcentry;
1189
1190                 if (!OidIsValid(indclass->values[attIndex]))
1191                         elog(ERROR, "bogus pg_index tuple");
1192
1193                 /* look up the info for this opclass, using a cache */
1194                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1195                                                                          maxSupportNumber);
1196
1197                 /* copy cached data into relcache entry */
1198                 opFamily[attIndex] = opcentry->opcfamily;
1199                 opcInType[attIndex] = opcentry->opcintype;
1200                 if (maxSupportNumber > 0)
1201                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1202                                    opcentry->supportProcs,
1203                                    maxSupportNumber * sizeof(RegProcedure));
1204         }
1205 }
1206
1207 /*
1208  * LookupOpclassInfo
1209  *
1210  * This routine maintains a per-opclass cache of the information needed
1211  * by IndexSupportInitialize().  This is more efficient than relying on
1212  * the catalog cache, because we can load all the info about a particular
1213  * opclass in a single indexscan of pg_amproc.
1214  *
1215  * The information from pg_am about expected range of support function
1216  * numbers is passed in, rather than being looked up, mainly because the
1217  * caller will have it already.
1218  *
1219  * Note there is no provision for flushing the cache.  This is OK at the
1220  * moment because there is no way to ALTER any interesting properties of an
1221  * existing opclass --- all you can do is drop it, which will result in
1222  * a useless but harmless dead entry in the cache.      To support altering
1223  * opclass membership (not the same as opfamily membership!), we'd need to
1224  * be able to flush this cache as well as the contents of relcache entries
1225  * for indexes.
1226  */
1227 static OpClassCacheEnt *
1228 LookupOpclassInfo(Oid operatorClassOid,
1229                                   StrategyNumber numSupport)
1230 {
1231         OpClassCacheEnt *opcentry;
1232         bool            found;
1233         Relation        rel;
1234         SysScanDesc scan;
1235         ScanKeyData skey[3];
1236         HeapTuple       htup;
1237         bool            indexOK;
1238
1239         if (OpClassCache == NULL)
1240         {
1241                 /* First time through: initialize the opclass cache */
1242                 HASHCTL         ctl;
1243
1244                 MemSet(&ctl, 0, sizeof(ctl));
1245                 ctl.keysize = sizeof(Oid);
1246                 ctl.entrysize = sizeof(OpClassCacheEnt);
1247                 ctl.hash = oid_hash;
1248                 OpClassCache = hash_create("Operator class cache", 64,
1249                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1250
1251                 /* Also make sure CacheMemoryContext exists */
1252                 if (!CacheMemoryContext)
1253                         CreateCacheMemoryContext();
1254         }
1255
1256         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1257                                                                                            (void *) &operatorClassOid,
1258                                                                                            HASH_ENTER, &found);
1259
1260         if (!found)
1261         {
1262                 /* Need to allocate memory for new entry */
1263                 opcentry->valid = false;        /* until known OK */
1264                 opcentry->numSupport = numSupport;
1265
1266                 if (numSupport > 0)
1267                         opcentry->supportProcs = (RegProcedure *)
1268                                 MemoryContextAllocZero(CacheMemoryContext,
1269                                                                            numSupport * sizeof(RegProcedure));
1270                 else
1271                         opcentry->supportProcs = NULL;
1272         }
1273         else
1274         {
1275                 Assert(numSupport == opcentry->numSupport);
1276         }
1277
1278         /*
1279          * When testing for cache-flush hazards, we intentionally disable the
1280          * operator class cache and force reloading of the info on each call. This
1281          * is helpful because we want to test the case where a cache flush occurs
1282          * while we are loading the info, and it's very hard to provoke that if
1283          * this happens only once per opclass per backend.
1284          */
1285 #if defined(CLOBBER_CACHE_ALWAYS)
1286         opcentry->valid = false;
1287 #endif
1288
1289         if (opcentry->valid)
1290                 return opcentry;
1291
1292         /*
1293          * Need to fill in new entry.
1294          *
1295          * To avoid infinite recursion during startup, force heap scans if we're
1296          * looking up info for the opclasses used by the indexes we would like to
1297          * reference here.
1298          */
1299         indexOK = criticalRelcachesBuilt ||
1300                 (operatorClassOid != OID_BTREE_OPS_OID &&
1301                  operatorClassOid != INT2_BTREE_OPS_OID);
1302
1303         /*
1304          * We have to fetch the pg_opclass row to determine its opfamily and
1305          * opcintype, which are needed to look up related operators and functions.
1306          * It'd be convenient to use the syscache here, but that probably doesn't
1307          * work while bootstrapping.
1308          */
1309         ScanKeyInit(&skey[0],
1310                                 ObjectIdAttributeNumber,
1311                                 BTEqualStrategyNumber, F_OIDEQ,
1312                                 ObjectIdGetDatum(operatorClassOid));
1313         rel = heap_open(OperatorClassRelationId, AccessShareLock);
1314         scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1315                                                           SnapshotNow, 1, skey);
1316
1317         if (HeapTupleIsValid(htup = systable_getnext(scan)))
1318         {
1319                 Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1320
1321                 opcentry->opcfamily = opclassform->opcfamily;
1322                 opcentry->opcintype = opclassform->opcintype;
1323         }
1324         else
1325                 elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1326
1327         systable_endscan(scan);
1328         heap_close(rel, AccessShareLock);
1329
1330         /*
1331          * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
1332          * the default ones (those with lefttype = righttype = opcintype).
1333          */
1334         if (numSupport > 0)
1335         {
1336                 ScanKeyInit(&skey[0],
1337                                         Anum_pg_amproc_amprocfamily,
1338                                         BTEqualStrategyNumber, F_OIDEQ,
1339                                         ObjectIdGetDatum(opcentry->opcfamily));
1340                 ScanKeyInit(&skey[1],
1341                                         Anum_pg_amproc_amproclefttype,
1342                                         BTEqualStrategyNumber, F_OIDEQ,
1343                                         ObjectIdGetDatum(opcentry->opcintype));
1344                 ScanKeyInit(&skey[2],
1345                                         Anum_pg_amproc_amprocrighttype,
1346                                         BTEqualStrategyNumber, F_OIDEQ,
1347                                         ObjectIdGetDatum(opcentry->opcintype));
1348                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1349                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1350                                                                   SnapshotNow, 3, skey);
1351
1352                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1353                 {
1354                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1355
1356                         if (amprocform->amprocnum <= 0 ||
1357                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1358                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1359                                          amprocform->amprocnum, operatorClassOid);
1360
1361                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1362                                 amprocform->amproc;
1363                 }
1364
1365                 systable_endscan(scan);
1366                 heap_close(rel, AccessShareLock);
1367         }
1368
1369         opcentry->valid = true;
1370         return opcentry;
1371 }
1372
1373
1374 /*
1375  *              formrdesc
1376  *
1377  *              This is a special cut-down version of RelationBuildDesc(),
1378  *              used while initializing the relcache.
1379  *              The relation descriptor is built just from the supplied parameters,
1380  *              without actually looking at any system table entries.  We cheat
1381  *              quite a lot since we only need to work for a few basic system
1382  *              catalogs.
1383  *
1384  * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
1385  * pg_class, pg_attribute, pg_proc, and pg_type
1386  * (see RelationCacheInitializePhase2/3).
1387  *
1388  * Note that these catalogs can't have constraints (except attnotnull),
1389  * default values, rules, or triggers, since we don't cope with any of that.
1390  * (Well, actually, this only matters for properties that need to be valid
1391  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1392  * these properties matter then...)
1393  *
1394  * NOTE: we assume we are already switched into CacheMemoryContext.
1395  */
1396 static void
1397 formrdesc(const char *relationName, Oid relationReltype,
1398                   bool isshared, bool hasoids,
1399                   int natts, const FormData_pg_attribute *attrs)
1400 {
1401         Relation        relation;
1402         int                     i;
1403         bool            has_not_null;
1404
1405         /*
1406          * allocate new relation desc, clear all fields of reldesc
1407          */
1408         relation = (Relation) palloc0(sizeof(RelationData));
1409
1410         /* make sure relation is marked as having no open file yet */
1411         relation->rd_smgr = NULL;
1412
1413         /*
1414          * initialize reference count: 1 because it is nailed in cache
1415          */
1416         relation->rd_refcnt = 1;
1417
1418         /*
1419          * all entries built with this routine are nailed-in-cache; none are for
1420          * new or temp relations.
1421          */
1422         relation->rd_isnailed = true;
1423         relation->rd_createSubid = InvalidSubTransactionId;
1424         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1425         relation->rd_backend = InvalidBackendId;
1426         relation->rd_islocaltemp = false;
1427
1428         /*
1429          * initialize relation tuple form
1430          *
1431          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1432          * get us launched.  RelationCacheInitializePhase3() will read the real
1433          * data from pg_class and replace what we've done here.  Note in
1434          * particular that relowner is left as zero; this cues
1435          * RelationCacheInitializePhase3 that the real data isn't there yet.
1436          */
1437         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1438
1439         namestrcpy(&relation->rd_rel->relname, relationName);
1440         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1441         relation->rd_rel->reltype = relationReltype;
1442
1443         /*
1444          * It's important to distinguish between shared and non-shared relations,
1445          * even at bootstrap time, to make sure we know where they are stored.
1446          */
1447         relation->rd_rel->relisshared = isshared;
1448         if (isshared)
1449                 relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1450
1451         /* formrdesc is used only for permanent relations */
1452         relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1453
1454         relation->rd_rel->relpages = 0;
1455         relation->rd_rel->reltuples = 0;
1456         relation->rd_rel->relallvisible = 0;
1457         relation->rd_rel->relkind = RELKIND_RELATION;
1458         relation->rd_rel->relhasoids = hasoids;
1459         relation->rd_rel->relnatts = (int16) natts;
1460
1461         /*
1462          * initialize attribute tuple form
1463          *
1464          * Unlike the case with the relation tuple, this data had better be right
1465          * because it will never be replaced.  The data comes from
1466          * src/include/catalog/ headers via genbki.pl.
1467          */
1468         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1469         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1470
1471         relation->rd_att->tdtypeid = relationReltype;
1472         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1473
1474         /*
1475          * initialize tuple desc info
1476          */
1477         has_not_null = false;
1478         for (i = 0; i < natts; i++)
1479         {
1480                 memcpy(relation->rd_att->attrs[i],
1481                            &attrs[i],
1482                            ATTRIBUTE_FIXED_PART_SIZE);
1483                 has_not_null |= attrs[i].attnotnull;
1484                 /* make sure attcacheoff is valid */
1485                 relation->rd_att->attrs[i]->attcacheoff = -1;
1486         }
1487
1488         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1489         relation->rd_att->attrs[0]->attcacheoff = 0;
1490
1491         /* mark not-null status */
1492         if (has_not_null)
1493         {
1494                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1495
1496                 constr->has_not_null = true;
1497                 relation->rd_att->constr = constr;
1498         }
1499
1500         /*
1501          * initialize relation id from info in att array (my, this is ugly)
1502          */
1503         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1504
1505         /*
1506          * All relations made with formrdesc are mapped.  This is necessarily so
1507          * because there is no other way to know what filenode they currently
1508          * have.  In bootstrap mode, add them to the initial relation mapper data,
1509          * specifying that the initial filenode is the same as the OID.
1510          */
1511         relation->rd_rel->relfilenode = InvalidOid;
1512         if (IsBootstrapProcessingMode())
1513                 RelationMapUpdateMap(RelationGetRelid(relation),
1514                                                          RelationGetRelid(relation),
1515                                                          isshared, true);
1516
1517         /*
1518          * initialize the relation lock manager information
1519          */
1520         RelationInitLockInfo(relation);         /* see lmgr.c */
1521
1522         /*
1523          * initialize physical addressing information for the relation
1524          */
1525         RelationInitPhysicalAddr(relation);
1526
1527         /*
1528          * initialize the rel-has-index flag, using hardwired knowledge
1529          */
1530         if (IsBootstrapProcessingMode())
1531         {
1532                 /* In bootstrap mode, we have no indexes */
1533                 relation->rd_rel->relhasindex = false;
1534         }
1535         else
1536         {
1537                 /* Otherwise, all the rels formrdesc is used for have indexes */
1538                 relation->rd_rel->relhasindex = true;
1539         }
1540
1541         /*
1542          * add new reldesc to relcache
1543          */
1544         RelationCacheInsert(relation);
1545
1546         /* It's fully valid */
1547         relation->rd_isvalid = true;
1548 }
1549
1550
1551 /* ----------------------------------------------------------------
1552  *                               Relation Descriptor Lookup Interface
1553  * ----------------------------------------------------------------
1554  */
1555
1556 /*
1557  *              RelationIdGetRelation
1558  *
1559  *              Lookup a reldesc by OID; make one if not already in cache.
1560  *
1561  *              Returns NULL if no pg_class row could be found for the given relid
1562  *              (suggesting we are trying to access a just-deleted relation).
1563  *              Any other error is reported via elog.
1564  *
1565  *              NB: caller should already have at least AccessShareLock on the
1566  *              relation ID, else there are nasty race conditions.
1567  *
1568  *              NB: relation ref count is incremented, or set to 1 if new entry.
1569  *              Caller should eventually decrement count.  (Usually,
1570  *              that happens by calling RelationClose().)
1571  */
1572 Relation
1573 RelationIdGetRelation(Oid relationId)
1574 {
1575         Relation        rd;
1576
1577         /*
1578          * first try to find reldesc in the cache
1579          */
1580         RelationIdCacheLookup(relationId, rd);
1581
1582         if (RelationIsValid(rd))
1583         {
1584                 RelationIncrementReferenceCount(rd);
1585                 /* revalidate cache entry if necessary */
1586                 if (!rd->rd_isvalid)
1587                 {
1588                         /*
1589                          * Indexes only have a limited number of possible schema changes,
1590                          * and we don't want to use the full-blown procedure because it's
1591                          * a headache for indexes that reload itself depends on.
1592                          */
1593                         if (rd->rd_rel->relkind == RELKIND_INDEX)
1594                                 RelationReloadIndexInfo(rd);
1595                         else
1596                                 RelationClearRelation(rd, true);
1597                 }
1598                 return rd;
1599         }
1600
1601         /*
1602          * no reldesc in the cache, so have RelationBuildDesc() build one and add
1603          * it.
1604          */
1605         rd = RelationBuildDesc(relationId, true);
1606         if (RelationIsValid(rd))
1607                 RelationIncrementReferenceCount(rd);
1608         return rd;
1609 }
1610
1611 /* ----------------------------------------------------------------
1612  *                              cache invalidation support routines
1613  * ----------------------------------------------------------------
1614  */
1615
1616 /*
1617  * RelationIncrementReferenceCount
1618  *              Increments relation reference count.
1619  *
1620  * Note: bootstrap mode has its own weird ideas about relation refcount
1621  * behavior; we ought to fix it someday, but for now, just disable
1622  * reference count ownership tracking in bootstrap mode.
1623  */
1624 void
1625 RelationIncrementReferenceCount(Relation rel)
1626 {
1627         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1628         rel->rd_refcnt += 1;
1629         if (!IsBootstrapProcessingMode())
1630                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1631 }
1632
1633 /*
1634  * RelationDecrementReferenceCount
1635  *              Decrements relation reference count.
1636  */
1637 void
1638 RelationDecrementReferenceCount(Relation rel)
1639 {
1640         Assert(rel->rd_refcnt > 0);
1641         rel->rd_refcnt -= 1;
1642         if (!IsBootstrapProcessingMode())
1643                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1644 }
1645
1646 /*
1647  * RelationClose - close an open relation
1648  *
1649  *      Actually, we just decrement the refcount.
1650  *
1651  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1652  *      will be freed as soon as their refcount goes to zero.  In combination
1653  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1654  *      to catch references to already-released relcache entries.  It slows
1655  *      things down quite a bit, however.
1656  */
1657 void
1658 RelationClose(Relation relation)
1659 {
1660         /* Note: no locking manipulations needed */
1661         RelationDecrementReferenceCount(relation);
1662
1663 #ifdef RELCACHE_FORCE_RELEASE
1664         if (RelationHasReferenceCountZero(relation) &&
1665                 relation->rd_createSubid == InvalidSubTransactionId &&
1666                 relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
1667                 RelationClearRelation(relation, false);
1668 #endif
1669 }
1670
1671 /*
1672  * RelationReloadIndexInfo - reload minimal information for an open index
1673  *
1674  *      This function is used only for indexes.  A relcache inval on an index
1675  *      can mean that its pg_class or pg_index row changed.  There are only
1676  *      very limited changes that are allowed to an existing index's schema,
1677  *      so we can update the relcache entry without a complete rebuild; which
1678  *      is fortunate because we can't rebuild an index entry that is "nailed"
1679  *      and/or in active use.  We support full replacement of the pg_class row,
1680  *      as well as updates of a few simple fields of the pg_index row.
1681  *
1682  *      We can't necessarily reread the catalog rows right away; we might be
1683  *      in a failed transaction when we receive the SI notification.  If so,
1684  *      RelationClearRelation just marks the entry as invalid by setting
1685  *      rd_isvalid to false.  This routine is called to fix the entry when it
1686  *      is next needed.
1687  *
1688  *      We assume that at the time we are called, we have at least AccessShareLock
1689  *      on the target index.  (Note: in the calls from RelationClearRelation,
1690  *      this is legitimate because we know the rel has positive refcount.)
1691  *
1692  *      If the target index is an index on pg_class or pg_index, we'd better have
1693  *      previously gotten at least AccessShareLock on its underlying catalog,
1694  *      else we are at risk of deadlock against someone trying to exclusive-lock
1695  *      the heap and index in that order.  This is ensured in current usage by
1696  *      only applying this to indexes being opened or having positive refcount.
1697  */
1698 static void
1699 RelationReloadIndexInfo(Relation relation)
1700 {
1701         bool            indexOK;
1702         HeapTuple       pg_class_tuple;
1703         Form_pg_class relp;
1704
1705         /* Should be called only for invalidated indexes */
1706         Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
1707                    !relation->rd_isvalid);
1708         /* Should be closed at smgr level */
1709         Assert(relation->rd_smgr == NULL);
1710
1711         /* Must free any AM cached data upon relcache flush */
1712         if (relation->rd_amcache)
1713                 pfree(relation->rd_amcache);
1714         relation->rd_amcache = NULL;
1715
1716         /*
1717          * If it's a shared index, we might be called before backend startup has
1718          * finished selecting a database, in which case we have no way to read
1719          * pg_class yet.  However, a shared index can never have any significant
1720          * schema updates, so it's okay to ignore the invalidation signal.  Just
1721          * mark it valid and return without doing anything more.
1722          */
1723         if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
1724         {
1725                 relation->rd_isvalid = true;
1726                 return;
1727         }
1728
1729         /*
1730          * Read the pg_class row
1731          *
1732          * Don't try to use an indexscan of pg_class_oid_index to reload the info
1733          * for pg_class_oid_index ...
1734          */
1735         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
1736         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
1737         if (!HeapTupleIsValid(pg_class_tuple))
1738                 elog(ERROR, "could not find pg_class tuple for index %u",
1739                          RelationGetRelid(relation));
1740         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1741         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
1742         /* Reload reloptions in case they changed */
1743         if (relation->rd_options)
1744                 pfree(relation->rd_options);
1745         RelationParseRelOptions(relation, pg_class_tuple);
1746         /* done with pg_class tuple */
1747         heap_freetuple(pg_class_tuple);
1748         /* We must recalculate physical address in case it changed */
1749         RelationInitPhysicalAddr(relation);
1750
1751         /*
1752          * For a non-system index, there are fields of the pg_index row that are
1753          * allowed to change, so re-read that row and update the relcache entry.
1754          * Most of the info derived from pg_index (such as support function lookup
1755          * info) cannot change, and indeed the whole point of this routine is to
1756          * update the relcache entry without clobbering that data; so wholesale
1757          * replacement is not appropriate.
1758          */
1759         if (!IsSystemRelation(relation))
1760         {
1761                 HeapTuple       tuple;
1762                 Form_pg_index index;
1763
1764                 tuple = SearchSysCache1(INDEXRELID,
1765                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
1766                 if (!HeapTupleIsValid(tuple))
1767                         elog(ERROR, "cache lookup failed for index %u",
1768                                  RelationGetRelid(relation));
1769                 index = (Form_pg_index) GETSTRUCT(tuple);
1770
1771                 /*
1772                  * Basically, let's just copy all the bool fields.  There are one or
1773                  * two of these that can't actually change in the current code, but
1774                  * it's not worth it to track exactly which ones they are.  None of
1775                  * the array fields are allowed to change, though.
1776                  */
1777                 relation->rd_index->indisunique = index->indisunique;
1778                 relation->rd_index->indisprimary = index->indisprimary;
1779                 relation->rd_index->indisexclusion = index->indisexclusion;
1780                 relation->rd_index->indimmediate = index->indimmediate;
1781                 relation->rd_index->indisclustered = index->indisclustered;
1782                 relation->rd_index->indisvalid = index->indisvalid;
1783                 relation->rd_index->indcheckxmin = index->indcheckxmin;
1784                 relation->rd_index->indisready = index->indisready;
1785                 relation->rd_index->indislive = index->indislive;
1786
1787                 /* Copy xmin too, as that is needed to make sense of indcheckxmin */
1788                 HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
1789                                                            HeapTupleHeaderGetXmin(tuple->t_data));
1790
1791                 ReleaseSysCache(tuple);
1792         }
1793
1794         /* Okay, now it's valid again */
1795         relation->rd_isvalid = true;
1796 }
1797
1798 /*
1799  * RelationDestroyRelation
1800  *
1801  *      Physically delete a relation cache entry and all subsidiary data.
1802  *      Caller must already have unhooked the entry from the hash table.
1803  */
1804 static void
1805 RelationDestroyRelation(Relation relation)
1806 {
1807         Assert(RelationHasReferenceCountZero(relation));
1808
1809         /*
1810          * Make sure smgr and lower levels close the relation's files, if they
1811          * weren't closed already.  (This was probably done by caller, but let's
1812          * just be real sure.)
1813          */
1814         RelationCloseSmgr(relation);
1815
1816         /*
1817          * Free all the subsidiary data structures of the relcache entry, then the
1818          * entry itself.
1819          */
1820         if (relation->rd_rel)
1821                 pfree(relation->rd_rel);
1822         /* can't use DecrTupleDescRefCount here */
1823         Assert(relation->rd_att->tdrefcount > 0);
1824         if (--relation->rd_att->tdrefcount == 0)
1825                 FreeTupleDesc(relation->rd_att);
1826         list_free(relation->rd_indexlist);
1827         bms_free(relation->rd_indexattr);
1828         FreeTriggerDesc(relation->trigdesc);
1829         if (relation->rd_options)
1830                 pfree(relation->rd_options);
1831         if (relation->rd_indextuple)
1832                 pfree(relation->rd_indextuple);
1833         if (relation->rd_am)
1834                 pfree(relation->rd_am);
1835         if (relation->rd_indexcxt)
1836                 MemoryContextDelete(relation->rd_indexcxt);
1837         if (relation->rd_rulescxt)
1838                 MemoryContextDelete(relation->rd_rulescxt);
1839         pfree(relation);
1840 }
1841
1842 /*
1843  * RelationClearRelation
1844  *
1845  *       Physically blow away a relation cache entry, or reset it and rebuild
1846  *       it from scratch (that is, from catalog entries).  The latter path is
1847  *       used when we are notified of a change to an open relation (one with
1848  *       refcount > 0).
1849  *
1850  *       NB: when rebuilding, we'd better hold some lock on the relation,
1851  *       else the catalog data we need to read could be changing under us.
1852  *       Also, a rel to be rebuilt had better have refcnt > 0.  This is because
1853  *       an sinval reset could happen while we're accessing the catalogs, and
1854  *       the rel would get blown away underneath us by RelationCacheInvalidate
1855  *       if it has zero refcnt.
1856  *
1857  *       The "rebuild" parameter is redundant in current usage because it has
1858  *       to match the relation's refcnt status, but we keep it as a crosscheck
1859  *       that we're doing what the caller expects.
1860  */
1861 static void
1862 RelationClearRelation(Relation relation, bool rebuild)
1863 {
1864         /*
1865          * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
1866          * course it would be a bad idea to blow away one with nonzero refcnt.
1867          */
1868         Assert(rebuild ?
1869                    !RelationHasReferenceCountZero(relation) :
1870                    RelationHasReferenceCountZero(relation));
1871
1872         /*
1873          * Make sure smgr and lower levels close the relation's files, if they
1874          * weren't closed already.  If the relation is not getting deleted, the
1875          * next smgr access should reopen the files automatically.      This ensures
1876          * that the low-level file access state is updated after, say, a vacuum
1877          * truncation.
1878          */
1879         RelationCloseSmgr(relation);
1880
1881         /*
1882          * Never, never ever blow away a nailed-in system relation, because we'd
1883          * be unable to recover.  However, we must redo RelationInitPhysicalAddr
1884          * in case it is a mapped relation whose mapping changed.
1885          *
1886          * If it's a nailed index, then we need to re-read the pg_class row to see
1887          * if its relfilenode changed.  We can't necessarily do that here, because
1888          * we might be in a failed transaction.  We assume it's okay to do it if
1889          * there are open references to the relcache entry (cf notes for
1890          * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
1891          * invalid, and it'll be fixed when next opened.
1892          */
1893         if (relation->rd_isnailed)
1894         {
1895                 RelationInitPhysicalAddr(relation);
1896
1897                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1898                 {
1899                         relation->rd_isvalid = false;           /* needs to be revalidated */
1900                         if (relation->rd_refcnt > 1)
1901                                 RelationReloadIndexInfo(relation);
1902                 }
1903                 return;
1904         }
1905
1906         /*
1907          * Even non-system indexes should not be blown away if they are open and
1908          * have valid index support information.  This avoids problems with active
1909          * use of the index support information.  As with nailed indexes, we
1910          * re-read the pg_class row to handle possible physical relocation of the
1911          * index, and we check for pg_index updates too.
1912          */
1913         if (relation->rd_rel->relkind == RELKIND_INDEX &&
1914                 relation->rd_refcnt > 0 &&
1915                 relation->rd_indexcxt != NULL)
1916         {
1917                 relation->rd_isvalid = false;   /* needs to be revalidated */
1918                 RelationReloadIndexInfo(relation);
1919                 return;
1920         }
1921
1922         /* Mark it invalid until we've finished rebuild */
1923         relation->rd_isvalid = false;
1924
1925         /*
1926          * If we're really done with the relcache entry, blow it away. But if
1927          * someone is still using it, reconstruct the whole deal without moving
1928          * the physical RelationData record (so that the someone's pointer is
1929          * still valid).
1930          */
1931         if (!rebuild)
1932         {
1933                 /* Remove it from the hash table */
1934                 RelationCacheDelete(relation);
1935
1936                 /* And release storage */
1937                 RelationDestroyRelation(relation);
1938         }
1939         else
1940         {
1941                 /*
1942                  * Our strategy for rebuilding an open relcache entry is to build a
1943                  * new entry from scratch, swap its contents with the old entry, and
1944                  * finally delete the new entry (along with any infrastructure swapped
1945                  * over from the old entry).  This is to avoid trouble in case an
1946                  * error causes us to lose control partway through.  The old entry
1947                  * will still be marked !rd_isvalid, so we'll try to rebuild it again
1948                  * on next access.      Meanwhile it's not any less valid than it was
1949                  * before, so any code that might expect to continue accessing it
1950                  * isn't hurt by the rebuild failure.  (Consider for example a
1951                  * subtransaction that ALTERs a table and then gets canceled partway
1952                  * through the cache entry rebuild.  The outer transaction should
1953                  * still see the not-modified cache entry as valid.)  The worst
1954                  * consequence of an error is leaking the necessarily-unreferenced new
1955                  * entry, and this shouldn't happen often enough for that to be a big
1956                  * problem.
1957                  *
1958                  * When rebuilding an open relcache entry, we must preserve ref count,
1959                  * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
1960                  * attempt to preserve the pg_class entry (rd_rel), tupledesc, and
1961                  * rewrite-rule substructures in place, because various places assume
1962                  * that these structures won't move while they are working with an
1963                  * open relcache entry.  (Note: the refcount mechanism for tupledescs
1964                  * might someday allow us to remove this hack for the tupledesc.)
1965                  *
1966                  * Note that this process does not touch CurrentResourceOwner; which
1967                  * is good because whatever ref counts the entry may have do not
1968                  * necessarily belong to that resource owner.
1969                  */
1970                 Relation        newrel;
1971                 Oid                     save_relid = RelationGetRelid(relation);
1972                 bool            keep_tupdesc;
1973                 bool            keep_rules;
1974
1975                 /* Build temporary entry, but don't link it into hashtable */
1976                 newrel = RelationBuildDesc(save_relid, false);
1977                 if (newrel == NULL)
1978                 {
1979                         /* Should only get here if relation was deleted */
1980                         RelationCacheDelete(relation);
1981                         RelationDestroyRelation(relation);
1982                         elog(ERROR, "relation %u deleted while still in use", save_relid);
1983                 }
1984
1985                 keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
1986                 keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
1987
1988                 /*
1989                  * Perform swapping of the relcache entry contents.  Within this
1990                  * process the old entry is momentarily invalid, so there *must* be no
1991                  * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
1992                  * all-in-line code for safety.
1993                  *
1994                  * Since the vast majority of fields should be swapped, our method is
1995                  * to swap the whole structures and then re-swap those few fields we
1996                  * didn't want swapped.
1997                  */
1998 #define SWAPFIELD(fldtype, fldname) \
1999                 do { \
2000                         fldtype _tmp = newrel->fldname; \
2001                         newrel->fldname = relation->fldname; \
2002                         relation->fldname = _tmp; \
2003                 } while (0)
2004
2005                 /* swap all Relation struct fields */
2006                 {
2007                         RelationData tmpstruct;
2008
2009                         memcpy(&tmpstruct, newrel, sizeof(RelationData));
2010                         memcpy(newrel, relation, sizeof(RelationData));
2011                         memcpy(relation, &tmpstruct, sizeof(RelationData));
2012                 }
2013
2014                 /* rd_smgr must not be swapped, due to back-links from smgr level */
2015                 SWAPFIELD(SMgrRelation, rd_smgr);
2016                 /* rd_refcnt must be preserved */
2017                 SWAPFIELD(int, rd_refcnt);
2018                 /* isnailed shouldn't change */
2019                 Assert(newrel->rd_isnailed == relation->rd_isnailed);
2020                 /* creation sub-XIDs must be preserved */
2021                 SWAPFIELD(SubTransactionId, rd_createSubid);
2022                 SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2023                 /* un-swap rd_rel pointers, swap contents instead */
2024                 SWAPFIELD(Form_pg_class, rd_rel);
2025                 /* ... but actually, we don't have to update newrel->rd_rel */
2026                 memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2027                 /* preserve old tupledesc and rules if no logical change */
2028                 if (keep_tupdesc)
2029                         SWAPFIELD(TupleDesc, rd_att);
2030                 if (keep_rules)
2031                 {
2032                         SWAPFIELD(RuleLock *, rd_rules);
2033                         SWAPFIELD(MemoryContext, rd_rulescxt);
2034                 }
2035                 /* toast OID override must be preserved */
2036                 SWAPFIELD(Oid, rd_toastoid);
2037                 /* pgstat_info must be preserved */
2038                 SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2039
2040 #undef SWAPFIELD
2041
2042                 /* And now we can throw away the temporary entry */
2043                 RelationDestroyRelation(newrel);
2044         }
2045 }
2046
2047 /*
2048  * RelationFlushRelation
2049  *
2050  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
2051  */
2052 static void
2053 RelationFlushRelation(Relation relation)
2054 {
2055         if (relation->rd_createSubid != InvalidSubTransactionId ||
2056                 relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2057         {
2058                 /*
2059                  * New relcache entries are always rebuilt, not flushed; else we'd
2060                  * forget the "new" status of the relation, which is a useful
2061                  * optimization to have.  Ditto for the new-relfilenode status.
2062                  *
2063                  * The rel could have zero refcnt here, so temporarily increment the
2064                  * refcnt to ensure it's safe to rebuild it.  We can assume that the
2065                  * current transaction has some lock on the rel already.
2066                  */
2067                 RelationIncrementReferenceCount(relation);
2068                 RelationClearRelation(relation, true);
2069                 RelationDecrementReferenceCount(relation);
2070         }
2071         else
2072         {
2073                 /*
2074                  * Pre-existing rels can be dropped from the relcache if not open.
2075                  */
2076                 bool            rebuild = !RelationHasReferenceCountZero(relation);
2077
2078                 RelationClearRelation(relation, rebuild);
2079         }
2080 }
2081
2082 /*
2083  * RelationForgetRelation - unconditionally remove a relcache entry
2084  *
2085  *                 External interface for destroying a relcache entry when we
2086  *                 drop the relation.
2087  */
2088 void
2089 RelationForgetRelation(Oid rid)
2090 {
2091         Relation        relation;
2092
2093         RelationIdCacheLookup(rid, relation);
2094
2095         if (!PointerIsValid(relation))
2096                 return;                                 /* not in cache, nothing to do */
2097
2098         if (!RelationHasReferenceCountZero(relation))
2099                 elog(ERROR, "relation %u is still open", rid);
2100
2101         /* Unconditionally destroy the relcache entry */
2102         RelationClearRelation(relation, false);
2103 }
2104
2105 /*
2106  *              RelationCacheInvalidateEntry
2107  *
2108  *              This routine is invoked for SI cache flush messages.
2109  *
2110  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2111  * already determined that the relid belongs to our database or is a shared
2112  * relation.)
2113  *
2114  * We used to skip local relations, on the grounds that they could
2115  * not be targets of cross-backend SI update messages; but it seems
2116  * safer to process them, so that our *own* SI update messages will
2117  * have the same effects during CommandCounterIncrement for both
2118  * local and nonlocal relations.
2119  */
2120 void
2121 RelationCacheInvalidateEntry(Oid relationId)
2122 {
2123         Relation        relation;
2124
2125         RelationIdCacheLookup(relationId, relation);
2126
2127         if (PointerIsValid(relation))
2128         {
2129                 relcacheInvalsReceived++;
2130                 RelationFlushRelation(relation);
2131         }
2132 }
2133
2134 /*
2135  * RelationCacheInvalidate
2136  *       Blow away cached relation descriptors that have zero reference counts,
2137  *       and rebuild those with positive reference counts.      Also reset the smgr
2138  *       relation cache and re-read relation mapping data.
2139  *
2140  *       This is currently used only to recover from SI message buffer overflow,
2141  *       so we do not touch new-in-transaction relations; they cannot be targets
2142  *       of cross-backend SI updates (and our own updates now go through a
2143  *       separate linked list that isn't limited by the SI message buffer size).
2144  *       Likewise, we need not discard new-relfilenode-in-transaction hints,
2145  *       since any invalidation of those would be a local event.
2146  *
2147  *       We do this in two phases: the first pass deletes deletable items, and
2148  *       the second one rebuilds the rebuildable items.  This is essential for
2149  *       safety, because hash_seq_search only copes with concurrent deletion of
2150  *       the element it is currently visiting.  If a second SI overflow were to
2151  *       occur while we are walking the table, resulting in recursive entry to
2152  *       this routine, we could crash because the inner invocation blows away
2153  *       the entry next to be visited by the outer scan.  But this way is OK,
2154  *       because (a) during the first pass we won't process any more SI messages,
2155  *       so hash_seq_search will complete safely; (b) during the second pass we
2156  *       only hold onto pointers to nondeletable entries.
2157  *
2158  *       The two-phase approach also makes it easy to update relfilenodes for
2159  *       mapped relations before we do anything else, and to ensure that the
2160  *       second pass processes nailed-in-cache items before other nondeletable
2161  *       items.  This should ensure that system catalogs are up to date before
2162  *       we attempt to use them to reload information about other open relations.
2163  */
2164 void
2165 RelationCacheInvalidate(void)
2166 {
2167         HASH_SEQ_STATUS status;
2168         RelIdCacheEnt *idhentry;
2169         Relation        relation;
2170         List       *rebuildFirstList = NIL;
2171         List       *rebuildList = NIL;
2172         ListCell   *l;
2173
2174         /*
2175          * Reload relation mapping data before starting to reconstruct cache.
2176          */
2177         RelationMapInvalidateAll();
2178
2179         /* Phase 1 */
2180         hash_seq_init(&status, RelationIdCache);
2181
2182         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2183         {
2184                 relation = idhentry->reldesc;
2185
2186                 /* Must close all smgr references to avoid leaving dangling ptrs */
2187                 RelationCloseSmgr(relation);
2188
2189                 /*
2190                  * Ignore new relations; no other backend will manipulate them before
2191                  * we commit.  Likewise, before replacing a relation's relfilenode, we
2192                  * shall have acquired AccessExclusiveLock and drained any applicable
2193                  * pending invalidations.
2194                  */
2195                 if (relation->rd_createSubid != InvalidSubTransactionId ||
2196                         relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2197                         continue;
2198
2199                 relcacheInvalsReceived++;
2200
2201                 if (RelationHasReferenceCountZero(relation))
2202                 {
2203                         /* Delete this entry immediately */
2204                         Assert(!relation->rd_isnailed);
2205                         RelationClearRelation(relation, false);
2206                 }
2207                 else
2208                 {
2209                         /*
2210                          * If it's a mapped relation, immediately update its rd_node in
2211                          * case its relfilenode changed.  We must do this during phase 1
2212                          * in case the relation is consulted during rebuild of other
2213                          * relcache entries in phase 2.  It's safe since consulting the
2214                          * map doesn't involve any access to relcache entries.
2215                          */
2216                         if (RelationIsMapped(relation))
2217                                 RelationInitPhysicalAddr(relation);
2218
2219                         /*
2220                          * Add this entry to list of stuff to rebuild in second pass.
2221                          * pg_class goes to the front of rebuildFirstList while
2222                          * pg_class_oid_index goes to the back of rebuildFirstList, so
2223                          * they are done first and second respectively.  Other nailed
2224                          * relations go to the front of rebuildList, so they'll be done
2225                          * next in no particular order; and everything else goes to the
2226                          * back of rebuildList.
2227                          */
2228                         if (RelationGetRelid(relation) == RelationRelationId)
2229                                 rebuildFirstList = lcons(relation, rebuildFirstList);
2230                         else if (RelationGetRelid(relation) == ClassOidIndexId)
2231                                 rebuildFirstList = lappend(rebuildFirstList, relation);
2232                         else if (relation->rd_isnailed)
2233                                 rebuildList = lcons(relation, rebuildList);
2234                         else
2235                                 rebuildList = lappend(rebuildList, relation);
2236                 }
2237         }
2238
2239         /*
2240          * Now zap any remaining smgr cache entries.  This must happen before we
2241          * start to rebuild entries, since that may involve catalog fetches which
2242          * will re-open catalog files.
2243          */
2244         smgrcloseall();
2245
2246         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
2247         foreach(l, rebuildFirstList)
2248         {
2249                 relation = (Relation) lfirst(l);
2250                 RelationClearRelation(relation, true);
2251         }
2252         list_free(rebuildFirstList);
2253         foreach(l, rebuildList)
2254         {
2255                 relation = (Relation) lfirst(l);
2256                 RelationClearRelation(relation, true);
2257         }
2258         list_free(rebuildList);
2259 }
2260
2261 /*
2262  * RelationCloseSmgrByOid - close a relcache entry's smgr link
2263  *
2264  * Needed in some cases where we are changing a relation's physical mapping.
2265  * The link will be automatically reopened on next use.
2266  */
2267 void
2268 RelationCloseSmgrByOid(Oid relationId)
2269 {
2270         Relation        relation;
2271
2272         RelationIdCacheLookup(relationId, relation);
2273
2274         if (!PointerIsValid(relation))
2275                 return;                                 /* not in cache, nothing to do */
2276
2277         RelationCloseSmgr(relation);
2278 }
2279
2280 /*
2281  * AtEOXact_RelationCache
2282  *
2283  *      Clean up the relcache at main-transaction commit or abort.
2284  *
2285  * Note: this must be called *before* processing invalidation messages.
2286  * In the case of abort, we don't want to try to rebuild any invalidated
2287  * cache entries (since we can't safely do database accesses).  Therefore
2288  * we must reset refcnts before handling pending invalidations.
2289  *
2290  * As of PostgreSQL 8.1, relcache refcnts should get released by the
2291  * ResourceOwner mechanism.  This routine just does a debugging
2292  * cross-check that no pins remain.  However, we also need to do special
2293  * cleanup when the current transaction created any relations or made use
2294  * of forced index lists.
2295  */
2296 void
2297 AtEOXact_RelationCache(bool isCommit)
2298 {
2299         HASH_SEQ_STATUS status;
2300         RelIdCacheEnt *idhentry;
2301         int                     i;
2302
2303         /*
2304          * Unless the eoxact_list[] overflowed, we only need to examine the rels
2305          * listed in it.  Otherwise fall back on a hash_seq_search scan.
2306          *
2307          * For simplicity, eoxact_list[] entries are not deleted till end of
2308          * top-level transaction, even though we could remove them at
2309          * subtransaction end in some cases, or remove relations from the list if
2310          * they are cleared for other reasons.  Therefore we should expect the
2311          * case that list entries are not found in the hashtable; if not, there's
2312          * nothing to do for them.
2313          */
2314         if (eoxact_list_overflowed)
2315         {
2316                 hash_seq_init(&status, RelationIdCache);
2317                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2318                 {
2319                         AtEOXact_cleanup(idhentry->reldesc, isCommit);
2320                 }
2321         }
2322         else
2323         {
2324                 for (i = 0; i < eoxact_list_len; i++)
2325                 {
2326                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
2327                                                                                                          (void *) &eoxact_list[i],
2328                                                                                                          HASH_FIND,
2329                                                                                                          NULL);
2330                         if (idhentry != NULL)
2331                                 AtEOXact_cleanup(idhentry->reldesc, isCommit);
2332                 }
2333         }
2334
2335         /* Now we're out of the transaction and can clear the list */
2336         eoxact_list_len = 0;
2337         eoxact_list_overflowed = false;
2338 }
2339
2340 /*
2341  * AtEOXact_cleanup
2342  *
2343  *      Clean up a single rel at main-transaction commit or abort
2344  *
2345  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
2346  * bother to prevent duplicate entries in eoxact_list[].
2347  */
2348 static void
2349 AtEOXact_cleanup(Relation relation, bool isCommit)
2350 {
2351                 /*
2352                  * The relcache entry's ref count should be back to its normal
2353                  * not-in-a-transaction state: 0 unless it's nailed in cache.
2354                  *
2355                  * In bootstrap mode, this is NOT true, so don't check it --- the
2356                  * bootstrap code expects relations to stay open across start/commit
2357                  * transaction calls.  (That seems bogus, but it's not worth fixing.)
2358                  *
2359                  * Note: ideally this check would be applied to every relcache entry,
2360                  * not just those that have eoxact work to do.  But it's not worth
2361                  * forcing a scan of the whole relcache just for this.  (Moreover,
2362                  * doing so would mean that assert-enabled testing never tests the
2363                  * hash_search code path above, which seems a bad idea.)
2364                  */
2365 #ifdef USE_ASSERT_CHECKING
2366                 if (!IsBootstrapProcessingMode())
2367                 {
2368                         int                     expected_refcnt;
2369
2370                         expected_refcnt = relation->rd_isnailed ? 1 : 0;
2371                         Assert(relation->rd_refcnt == expected_refcnt);
2372                 }
2373 #endif
2374
2375                 /*
2376                  * Is it a relation created in the current transaction?
2377                  *
2378                  * During commit, reset the flag to zero, since we are now out of the
2379                  * creating transaction.  During abort, simply delete the relcache
2380                  * entry --- it isn't interesting any longer.  (NOTE: if we have
2381                  * forgotten the new-ness of a new relation due to a forced cache
2382                  * flush, the entry will get deleted anyway by shared-cache-inval
2383                  * processing of the aborted pg_class insertion.)
2384                  */
2385                 if (relation->rd_createSubid != InvalidSubTransactionId)
2386                 {
2387                         if (isCommit)
2388                                 relation->rd_createSubid = InvalidSubTransactionId;
2389                         else
2390                         {
2391                                 RelationClearRelation(relation, false);
2392                                 return;
2393                         }
2394                 }
2395
2396                 /*
2397                  * Likewise, reset the hint about the relfilenode being new.
2398                  */
2399                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2400
2401                 /*
2402                  * Flush any temporary index list.
2403                  */
2404                 if (relation->rd_indexvalid == 2)
2405                 {
2406                         list_free(relation->rd_indexlist);
2407                         relation->rd_indexlist = NIL;
2408                         relation->rd_oidindex = InvalidOid;
2409                         relation->rd_indexvalid = 0;
2410                 }
2411 }
2412
2413 /*
2414  * AtEOSubXact_RelationCache
2415  *
2416  *      Clean up the relcache at sub-transaction commit or abort.
2417  *
2418  * Note: this must be called *before* processing invalidation messages.
2419  */
2420 void
2421 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
2422                                                   SubTransactionId parentSubid)
2423 {
2424         HASH_SEQ_STATUS status;
2425         RelIdCacheEnt *idhentry;
2426         int                     i;
2427
2428         /*
2429          * Unless the eoxact_list[] overflowed, we only need to examine the rels
2430          * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
2431          * logic as in AtEOXact_RelationCache.
2432          */
2433         if (eoxact_list_overflowed)
2434         {
2435                 hash_seq_init(&status, RelationIdCache);
2436                 while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2437                 {
2438                         AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
2439                                                                 mySubid, parentSubid);
2440                 }
2441         }
2442         else
2443         {
2444                 for (i = 0; i < eoxact_list_len; i++)
2445                 {
2446                         idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
2447                                                                                                          (void *) &eoxact_list[i],
2448                                                                                                          HASH_FIND,
2449                                                                                                          NULL);
2450                         if (idhentry != NULL)
2451                                 AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
2452                                                                         mySubid, parentSubid);
2453                 }
2454         }
2455
2456         /* Don't reset the list; we still need more cleanup later */
2457 }
2458
2459 /*
2460  * AtEOSubXact_cleanup
2461  *
2462  *      Clean up a single rel at subtransaction commit or abort
2463  *
2464  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
2465  * bother to prevent duplicate entries in eoxact_list[].
2466  */
2467 static void
2468 AtEOSubXact_cleanup(Relation relation, bool isCommit,
2469                                         SubTransactionId mySubid, SubTransactionId parentSubid)
2470 {
2471                 /*
2472                  * Is it a relation created in the current subtransaction?
2473                  *
2474                  * During subcommit, mark it as belonging to the parent, instead.
2475                  * During subabort, simply delete the relcache entry.
2476                  */
2477                 if (relation->rd_createSubid == mySubid)
2478                 {
2479                         if (isCommit)
2480                                 relation->rd_createSubid = parentSubid;
2481                         else
2482                         {
2483                                 RelationClearRelation(relation, false);
2484                                 return;
2485                         }
2486                 }
2487
2488                 /*
2489                  * Likewise, update or drop any new-relfilenode-in-subtransaction
2490                  * hint.
2491                  */
2492                 if (relation->rd_newRelfilenodeSubid == mySubid)
2493                 {
2494                         if (isCommit)
2495                                 relation->rd_newRelfilenodeSubid = parentSubid;
2496                         else
2497                                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2498                 }
2499
2500                 /*
2501                  * Flush any temporary index list.
2502                  */
2503                 if (relation->rd_indexvalid == 2)
2504                 {
2505                         list_free(relation->rd_indexlist);
2506                         relation->rd_indexlist = NIL;
2507                         relation->rd_oidindex = InvalidOid;
2508                         relation->rd_indexvalid = 0;
2509                 }
2510 }
2511
2512
2513 /*
2514  *              RelationBuildLocalRelation
2515  *                      Build a relcache entry for an about-to-be-created relation,
2516  *                      and enter it into the relcache.
2517  */
2518 Relation
2519 RelationBuildLocalRelation(const char *relname,
2520                                                    Oid relnamespace,
2521                                                    TupleDesc tupDesc,
2522                                                    Oid relid,
2523                                                    Oid relfilenode,
2524                                                    Oid reltablespace,
2525                                                    bool shared_relation,
2526                                                    bool mapped_relation,
2527                                                    char relpersistence,
2528                                                    char relkind)
2529 {
2530         Relation        rel;
2531         MemoryContext oldcxt;
2532         int                     natts = tupDesc->natts;
2533         int                     i;
2534         bool            has_not_null;
2535         bool            nailit;
2536
2537         AssertArg(natts >= 0);
2538
2539         /*
2540          * check for creation of a rel that must be nailed in cache.
2541          *
2542          * XXX this list had better match the relations specially handled in
2543          * RelationCacheInitializePhase2/3.
2544          */
2545         switch (relid)
2546         {
2547                 case DatabaseRelationId:
2548                 case AuthIdRelationId:
2549                 case AuthMemRelationId:
2550                 case RelationRelationId:
2551                 case AttributeRelationId:
2552                 case ProcedureRelationId:
2553                 case TypeRelationId:
2554                         nailit = true;
2555                         break;
2556                 default:
2557                         nailit = false;
2558                         break;
2559         }
2560
2561         /*
2562          * check that hardwired list of shared rels matches what's in the
2563          * bootstrap .bki file.  If you get a failure here during initdb, you
2564          * probably need to fix IsSharedRelation() to match whatever you've done
2565          * to the set of shared relations.
2566          */
2567         if (shared_relation != IsSharedRelation(relid))
2568                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
2569                          relname, relid);
2570
2571         /* Shared relations had better be mapped, too */
2572         Assert(mapped_relation || !shared_relation);
2573
2574         /*
2575          * switch to the cache context to create the relcache entry.
2576          */
2577         if (!CacheMemoryContext)
2578                 CreateCacheMemoryContext();
2579
2580         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2581
2582         /*
2583          * allocate a new relation descriptor and fill in basic state fields.
2584          */
2585         rel = (Relation) palloc0(sizeof(RelationData));
2586
2587         /* make sure relation is marked as having no open file yet */
2588         rel->rd_smgr = NULL;
2589
2590         /* mark it nailed if appropriate */
2591         rel->rd_isnailed = nailit;
2592
2593         rel->rd_refcnt = nailit ? 1 : 0;
2594
2595         /* it's being created in this transaction */
2596         rel->rd_createSubid = GetCurrentSubTransactionId();
2597         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2598
2599         /*
2600          * create a new tuple descriptor from the one passed in.  We do this
2601          * partly to copy it into the cache context, and partly because the new
2602          * relation can't have any defaults or constraints yet; they have to be
2603          * added in later steps, because they require additions to multiple system
2604          * catalogs.  We can copy attnotnull constraints here, however.
2605          */
2606         rel->rd_att = CreateTupleDescCopy(tupDesc);
2607         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
2608         has_not_null = false;
2609         for (i = 0; i < natts; i++)
2610         {
2611                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2612                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2613         }
2614
2615         if (has_not_null)
2616         {
2617                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2618
2619                 constr->has_not_null = true;
2620                 rel->rd_att->constr = constr;
2621         }
2622
2623         /*
2624          * initialize relation tuple form (caller may add/override data later)
2625          */
2626         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2627
2628         namestrcpy(&rel->rd_rel->relname, relname);
2629         rel->rd_rel->relnamespace = relnamespace;
2630
2631         rel->rd_rel->relkind = relkind;
2632         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2633         rel->rd_rel->relnatts = natts;
2634         rel->rd_rel->reltype = InvalidOid;
2635         /* needed when bootstrapping: */
2636         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2637
2638         /* set up persistence and relcache fields dependent on it */
2639         rel->rd_rel->relpersistence = relpersistence;
2640         switch (relpersistence)
2641         {
2642                 case RELPERSISTENCE_UNLOGGED:
2643                 case RELPERSISTENCE_PERMANENT:
2644                         rel->rd_backend = InvalidBackendId;
2645                         rel->rd_islocaltemp = false;
2646                         break;
2647                 case RELPERSISTENCE_TEMP:
2648                         Assert(isTempOrToastNamespace(relnamespace));
2649                         rel->rd_backend = MyBackendId;
2650                         rel->rd_islocaltemp = true;
2651                         break;
2652                 default:
2653                         elog(ERROR, "invalid relpersistence: %c", relpersistence);
2654                         break;
2655         }
2656
2657         /*
2658          * Insert relation physical and logical identifiers (OIDs) into the right
2659          * places.      For a mapped relation, we set relfilenode to zero and rely on
2660          * RelationInitPhysicalAddr to consult the map.
2661          */
2662         rel->rd_rel->relisshared = shared_relation;
2663
2664         RelationGetRelid(rel) = relid;
2665
2666         for (i = 0; i < natts; i++)
2667                 rel->rd_att->attrs[i]->attrelid = relid;
2668
2669         rel->rd_rel->reltablespace = reltablespace;
2670
2671         if (mapped_relation)
2672         {
2673                 rel->rd_rel->relfilenode = InvalidOid;
2674                 /* Add it to the active mapping information */
2675                 RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
2676         }
2677         else
2678                 rel->rd_rel->relfilenode = relfilenode;
2679
2680         RelationInitLockInfo(rel);      /* see lmgr.c */
2681
2682         RelationInitPhysicalAddr(rel);
2683
2684         /*
2685          * Okay to insert into the relcache hash tables.
2686          */
2687         RelationCacheInsert(rel);
2688
2689         /*
2690          * Flag relation as needing eoxact cleanup (to clear rd_createSubid).
2691          * We can't do this before storing relid in it.
2692          */
2693         EOXactListAdd(rel);
2694
2695         /*
2696          * done building relcache entry.
2697          */
2698         MemoryContextSwitchTo(oldcxt);
2699
2700         /* It's fully valid */
2701         rel->rd_isvalid = true;
2702
2703         /*
2704          * Caller expects us to pin the returned entry.
2705          */
2706         RelationIncrementReferenceCount(rel);
2707
2708         return rel;
2709 }
2710
2711
2712 /*
2713  * RelationSetNewRelfilenode
2714  *
2715  * Assign a new relfilenode (physical file name) to the relation.
2716  *
2717  * This allows a full rewrite of the relation to be done with transactional
2718  * safety (since the filenode assignment can be rolled back).  Note however
2719  * that there is no simple way to access the relation's old data for the
2720  * remainder of the current transaction.  This limits the usefulness to cases
2721  * such as TRUNCATE or rebuilding an index from scratch.
2722  *
2723  * Caller must already hold exclusive lock on the relation.
2724  *
2725  * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
2726  * must be passed for indexes and sequences).  This should be a lower bound on
2727  * the XIDs that will be put into the new relation contents.
2728  */
2729 void
2730 RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid,
2731                                                   MultiXactId minmulti)
2732 {
2733         Oid                     newrelfilenode;
2734         RelFileNodeBackend newrnode;
2735         Relation        pg_class;
2736         HeapTuple       tuple;
2737         Form_pg_class classform;
2738
2739         /* Indexes, sequences must have Invalid frozenxid; other rels must not */
2740         Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
2741                         relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
2742                    freezeXid == InvalidTransactionId :
2743                    TransactionIdIsNormal(freezeXid));
2744         Assert(TransactionIdIsNormal(freezeXid) == MultiXactIdIsValid(minmulti));
2745
2746         /* Allocate a new relfilenode */
2747         newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
2748                                                                            relation->rd_rel->relpersistence);
2749
2750         /*
2751          * Get a writable copy of the pg_class tuple for the given relation.
2752          */
2753         pg_class = heap_open(RelationRelationId, RowExclusiveLock);
2754
2755         tuple = SearchSysCacheCopy1(RELOID,
2756                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2757         if (!HeapTupleIsValid(tuple))
2758                 elog(ERROR, "could not find tuple for relation %u",
2759                          RelationGetRelid(relation));
2760         classform = (Form_pg_class) GETSTRUCT(tuple);
2761
2762         /*
2763          * Create storage for the main fork of the new relfilenode.
2764          *
2765          * NOTE: any conflict in relfilenode value will be caught here, if
2766          * GetNewRelFileNode messes up for any reason.
2767          */
2768         newrnode.node = relation->rd_node;
2769         newrnode.node.relNode = newrelfilenode;
2770         newrnode.backend = relation->rd_backend;
2771         RelationCreateStorage(newrnode.node, relation->rd_rel->relpersistence);
2772         smgrclosenode(newrnode);
2773
2774         /*
2775          * Schedule unlinking of the old storage at transaction commit.
2776          */
2777         RelationDropStorage(relation);
2778
2779         /*
2780          * Now update the pg_class row.  However, if we're dealing with a mapped
2781          * index, pg_class.relfilenode doesn't change; instead we have to send the
2782          * update to the relation mapper.
2783          */
2784         if (RelationIsMapped(relation))
2785                 RelationMapUpdateMap(RelationGetRelid(relation),
2786                                                          newrelfilenode,
2787                                                          relation->rd_rel->relisshared,
2788                                                          false);
2789         else
2790                 classform->relfilenode = newrelfilenode;
2791
2792         /* These changes are safe even for a mapped relation */
2793         if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
2794         {
2795                 classform->relpages = 0;        /* it's empty until further notice */
2796                 classform->reltuples = 0;
2797                 classform->relallvisible = 0;
2798         }
2799         classform->relfrozenxid = freezeXid;
2800         classform->relminmxid = minmulti;
2801
2802         simple_heap_update(pg_class, &tuple->t_self, tuple);
2803         CatalogUpdateIndexes(pg_class, tuple);
2804
2805         heap_freetuple(tuple);
2806
2807         heap_close(pg_class, RowExclusiveLock);
2808
2809         /*
2810          * Make the pg_class row change visible, as well as the relation map
2811          * change if any.  This will cause the relcache entry to get updated, too.
2812          */
2813         CommandCounterIncrement();
2814
2815         /*
2816          * Mark the rel as having been given a new relfilenode in the current
2817          * (sub) transaction.  This is a hint that can be used to optimize later
2818          * operations on the rel in the same transaction.
2819          */
2820         relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
2821
2822         /* Flag relation as needing eoxact cleanup (to remove the hint) */
2823         EOXactListAdd(relation);
2824 }
2825
2826
2827 /*
2828  *              RelationCacheInitialize
2829  *
2830  *              This initializes the relation descriptor cache.  At the time
2831  *              that this is invoked, we can't do database access yet (mainly
2832  *              because the transaction subsystem is not up); all we are doing
2833  *              is making an empty cache hashtable.  This must be done before
2834  *              starting the initialization transaction, because otherwise
2835  *              AtEOXact_RelationCache would crash if that transaction aborts
2836  *              before we can get the relcache set up.
2837  */
2838
2839 #define INITRELCACHESIZE                400
2840
2841 void
2842 RelationCacheInitialize(void)
2843 {
2844         HASHCTL         ctl;
2845
2846         /*
2847          * make sure cache memory context exists
2848          */
2849         if (!CacheMemoryContext)
2850                 CreateCacheMemoryContext();
2851
2852         /*
2853          * create hashtable that indexes the relcache
2854          */
2855         MemSet(&ctl, 0, sizeof(ctl));
2856         ctl.keysize = sizeof(Oid);
2857         ctl.entrysize = sizeof(RelIdCacheEnt);
2858         ctl.hash = oid_hash;
2859         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2860                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2861
2862         /*
2863          * relation mapper needs to be initialized too
2864          */
2865         RelationMapInitialize();
2866 }
2867
2868 /*
2869  *              RelationCacheInitializePhase2
2870  *
2871  *              This is called to prepare for access to shared catalogs during startup.
2872  *              We must at least set up nailed reldescs for pg_database, pg_authid,
2873  *              and pg_auth_members.  Ideally we'd like to have reldescs for their
2874  *              indexes, too.  We attempt to load this information from the shared
2875  *              relcache init file.  If that's missing or broken, just make phony
2876  *              entries for the catalogs themselves.  RelationCacheInitializePhase3
2877  *              will clean up as needed.
2878  */
2879 void
2880 RelationCacheInitializePhase2(void)
2881 {
2882         MemoryContext oldcxt;
2883
2884         /*
2885          * relation mapper needs initialized too
2886          */
2887         RelationMapInitializePhase2();
2888
2889         /*
2890          * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
2891          * nothing.
2892          */
2893         if (IsBootstrapProcessingMode())
2894                 return;
2895
2896         /*
2897          * switch to cache memory context
2898          */
2899         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2900
2901         /*
2902          * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
2903          * the cache with pre-made descriptors for the critical shared catalogs.
2904          */
2905         if (!load_relcache_init_file(true))
2906         {
2907                 formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
2908                                   true, Natts_pg_database, Desc_pg_database);
2909                 formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
2910                                   true, Natts_pg_authid, Desc_pg_authid);
2911                 formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
2912                                   false, Natts_pg_auth_members, Desc_pg_auth_members);
2913
2914 #define NUM_CRITICAL_SHARED_RELS        3       /* fix if you change list above */
2915         }
2916
2917         MemoryContextSwitchTo(oldcxt);
2918 }
2919
2920 /*
2921  *              RelationCacheInitializePhase3
2922  *
2923  *              This is called as soon as the catcache and transaction system
2924  *              are functional and we have determined MyDatabaseId.  At this point
2925  *              we can actually read data from the database's system catalogs.
2926  *              We first try to read pre-computed relcache entries from the local
2927  *              relcache init file.  If that's missing or broken, make phony entries
2928  *              for the minimum set of nailed-in-cache relations.  Then (unless
2929  *              bootstrapping) make sure we have entries for the critical system
2930  *              indexes.  Once we've done all this, we have enough infrastructure to
2931  *              open any system catalog or use any catcache.  The last step is to
2932  *              rewrite the cache files if needed.
2933  */
2934 void
2935 RelationCacheInitializePhase3(void)
2936 {
2937         HASH_SEQ_STATUS status;
2938         RelIdCacheEnt *idhentry;
2939         MemoryContext oldcxt;
2940         bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
2941
2942         /*
2943          * relation mapper needs initialized too
2944          */
2945         RelationMapInitializePhase3();
2946
2947         /*
2948          * switch to cache memory context
2949          */
2950         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2951
2952         /*
2953          * Try to load the local relcache cache file.  If unsuccessful, bootstrap
2954          * the cache with pre-made descriptors for the critical "nailed-in" system
2955          * catalogs.
2956          */
2957         if (IsBootstrapProcessingMode() ||
2958                 !load_relcache_init_file(false))
2959         {
2960                 needNewCacheFile = true;
2961
2962                 formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
2963                                   true, Natts_pg_class, Desc_pg_class);
2964                 formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
2965                                   false, Natts_pg_attribute, Desc_pg_attribute);
2966                 formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
2967                                   true, Natts_pg_proc, Desc_pg_proc);
2968                 formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
2969                                   true, Natts_pg_type, Desc_pg_type);
2970
2971 #define NUM_CRITICAL_LOCAL_RELS 4               /* fix if you change list above */
2972         }
2973
2974         MemoryContextSwitchTo(oldcxt);
2975
2976         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
2977         if (IsBootstrapProcessingMode())
2978                 return;
2979
2980         /*
2981          * If we didn't get the critical system indexes loaded into relcache, do
2982          * so now.      These are critical because the catcache and/or opclass cache
2983          * depend on them for fetches done during relcache load.  Thus, we have an
2984          * infinite-recursion problem.  We can break the recursion by doing
2985          * heapscans instead of indexscans at certain key spots. To avoid hobbling
2986          * performance, we only want to do that until we have the critical indexes
2987          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
2988          * decide whether to do heapscan or indexscan at the key spots, and we set
2989          * it true after we've loaded the critical indexes.
2990          *
2991          * The critical indexes are marked as "nailed in cache", partly to make it
2992          * easy for load_relcache_init_file to count them, but mainly because we
2993          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
2994          * true.  (NOTE: perhaps it would be possible to reload them by
2995          * temporarily setting criticalRelcachesBuilt to false again.  For now,
2996          * though, we just nail 'em in.)
2997          *
2998          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
2999          * in the same way as the others, because the critical catalogs don't
3000          * (currently) have any rules or triggers, and so these indexes can be
3001          * rebuilt without inducing recursion.  However they are used during
3002          * relcache load when a rel does have rules or triggers, so we choose to
3003          * nail them for performance reasons.
3004          */
3005         if (!criticalRelcachesBuilt)
3006         {
3007                 load_critical_index(ClassOidIndexId,
3008                                                         RelationRelationId);
3009                 load_critical_index(AttributeRelidNumIndexId,
3010                                                         AttributeRelationId);
3011                 load_critical_index(IndexRelidIndexId,
3012                                                         IndexRelationId);
3013                 load_critical_index(OpclassOidIndexId,
3014                                                         OperatorClassRelationId);
3015                 load_critical_index(AccessMethodProcedureIndexId,
3016                                                         AccessMethodProcedureRelationId);
3017                 load_critical_index(RewriteRelRulenameIndexId,
3018                                                         RewriteRelationId);
3019                 load_critical_index(TriggerRelidNameIndexId,
3020                                                         TriggerRelationId);
3021
3022 #define NUM_CRITICAL_LOCAL_INDEXES      7       /* fix if you change list above */
3023
3024                 criticalRelcachesBuilt = true;
3025         }
3026
3027         /*
3028          * Process critical shared indexes too.
3029          *
3030          * DatabaseNameIndexId isn't critical for relcache loading, but rather for
3031          * initial lookup of MyDatabaseId, without which we'll never find any
3032          * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
3033          * database OID, so it instead depends on DatabaseOidIndexId.  We also
3034          * need to nail up some indexes on pg_authid and pg_auth_members for use
3035          * during client authentication.
3036          */
3037         if (!criticalSharedRelcachesBuilt)
3038         {
3039                 load_critical_index(DatabaseNameIndexId,
3040                                                         DatabaseRelationId);
3041                 load_critical_index(DatabaseOidIndexId,
3042                                                         DatabaseRelationId);
3043                 load_critical_index(AuthIdRolnameIndexId,
3044                                                         AuthIdRelationId);
3045                 load_critical_index(AuthIdOidIndexId,
3046                                                         AuthIdRelationId);
3047                 load_critical_index(AuthMemMemRoleIndexId,
3048                                                         AuthMemRelationId);
3049
3050 #define NUM_CRITICAL_SHARED_INDEXES 5   /* fix if you change list above */
3051
3052                 criticalSharedRelcachesBuilt = true;
3053         }
3054
3055         /*
3056          * Now, scan all the relcache entries and update anything that might be
3057          * wrong in the results from formrdesc or the relcache cache file. If we
3058          * faked up relcache entries using formrdesc, then read the real pg_class
3059          * rows and replace the fake entries with them. Also, if any of the
3060          * relcache entries have rules or triggers, load that info the hard way
3061          * since it isn't recorded in the cache file.
3062          *
3063          * Whenever we access the catalogs to read data, there is a possibility of
3064          * a shared-inval cache flush causing relcache entries to be removed.
3065          * Since hash_seq_search only guarantees to still work after the *current*
3066          * entry is removed, it's unsafe to continue the hashtable scan afterward.
3067          * We handle this by restarting the scan from scratch after each access.
3068          * This is theoretically O(N^2), but the number of entries that actually
3069          * need to be fixed is small enough that it doesn't matter.
3070          */
3071         hash_seq_init(&status, RelationIdCache);
3072
3073         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3074         {
3075                 Relation        relation = idhentry->reldesc;
3076                 bool            restart = false;
3077
3078                 /*
3079                  * Make sure *this* entry doesn't get flushed while we work with it.
3080                  */
3081                 RelationIncrementReferenceCount(relation);
3082
3083                 /*
3084                  * If it's a faked-up entry, read the real pg_class tuple.
3085                  */
3086                 if (relation->rd_rel->relowner == InvalidOid)
3087                 {
3088                         HeapTuple       htup;
3089                         Form_pg_class relp;
3090
3091                         htup = SearchSysCache1(RELOID,
3092                                                            ObjectIdGetDatum(RelationGetRelid(relation)));
3093                         if (!HeapTupleIsValid(htup))
3094                                 elog(FATAL, "cache lookup failed for relation %u",
3095                                          RelationGetRelid(relation));
3096                         relp = (Form_pg_class) GETSTRUCT(htup);
3097
3098                         /*
3099                          * Copy tuple to relation->rd_rel. (See notes in
3100                          * AllocateRelationDesc())
3101                          */
3102                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
3103
3104                         /* Update rd_options while we have the tuple */
3105                         if (relation->rd_options)
3106                                 pfree(relation->rd_options);
3107                         RelationParseRelOptions(relation, htup);
3108
3109                         /*
3110                          * Check the values in rd_att were set up correctly.  (We cannot
3111                          * just copy them over now: formrdesc must have set up the rd_att
3112                          * data correctly to start with, because it may already have been
3113                          * copied into one or more catcache entries.)
3114                          */
3115                         Assert(relation->rd_att->tdtypeid == relp->reltype);
3116                         Assert(relation->rd_att->tdtypmod == -1);
3117                         Assert(relation->rd_att->tdhasoid == relp->relhasoids);
3118
3119                         ReleaseSysCache(htup);
3120
3121                         /* relowner had better be OK now, else we'll loop forever */
3122                         if (relation->rd_rel->relowner == InvalidOid)
3123                                 elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
3124                                          RelationGetRelationName(relation));
3125
3126                         restart = true;
3127                 }
3128
3129                 /*
3130                  * Fix data that isn't saved in relcache cache file.
3131                  *
3132                  * relhasrules or relhastriggers could possibly be wrong or out of
3133                  * date.  If we don't actually find any rules or triggers, clear the
3134                  * local copy of the flag so that we don't get into an infinite loop
3135                  * here.  We don't make any attempt to fix the pg_class entry, though.
3136                  */
3137                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
3138                 {
3139                         RelationBuildRuleLock(relation);
3140                         if (relation->rd_rules == NULL)
3141                                 relation->rd_rel->relhasrules = false;
3142                         restart = true;
3143                 }
3144                 if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
3145                 {
3146                         RelationBuildTriggers(relation);
3147                         if (relation->trigdesc == NULL)
3148                                 relation->rd_rel->relhastriggers = false;
3149                         restart = true;
3150                 }
3151
3152                 /* Release hold on the relation */
3153                 RelationDecrementReferenceCount(relation);
3154
3155                 /* Now, restart the hashtable scan if needed */
3156                 if (restart)
3157                 {
3158                         hash_seq_term(&status);
3159                         hash_seq_init(&status, RelationIdCache);
3160                 }
3161         }
3162
3163         /*
3164          * Lastly, write out new relcache cache files if needed.  We don't bother
3165          * to distinguish cases where only one of the two needs an update.
3166          */
3167         if (needNewCacheFile)
3168         {
3169                 /*
3170                  * Force all the catcaches to finish initializing and thereby open the
3171                  * catalogs and indexes they use.  This will preload the relcache with
3172                  * entries for all the most important system catalogs and indexes, so
3173                  * that the init files will be most useful for future backends.
3174                  */
3175                 InitCatalogCachePhase2();
3176
3177                 /* reset initFileRelationIds list; we'll fill it during write */
3178                 initFileRelationIds = NIL;
3179
3180                 /* now write the files */
3181                 write_relcache_init_file(true);
3182                 write_relcache_init_file(false);
3183         }
3184 }
3185
3186 /*
3187  * Load one critical system index into the relcache
3188  *
3189  * indexoid is the OID of the target index, heapoid is the OID of the catalog
3190  * it belongs to.
3191  */
3192 static void
3193 load_critical_index(Oid indexoid, Oid heapoid)
3194 {
3195         Relation        ird;
3196
3197         /*
3198          * We must lock the underlying catalog before locking the index to avoid
3199          * deadlock, since RelationBuildDesc might well need to read the catalog,
3200          * and if anyone else is exclusive-locking this catalog and index they'll
3201          * be doing it in that order.
3202          */
3203         LockRelationOid(heapoid, AccessShareLock);
3204         LockRelationOid(indexoid, AccessShareLock);
3205         ird = RelationBuildDesc(indexoid, true);
3206         if (ird == NULL)
3207                 elog(PANIC, "could not open critical system index %u", indexoid);
3208         ird->rd_isnailed = true;
3209         ird->rd_refcnt = 1;
3210         UnlockRelationOid(indexoid, AccessShareLock);
3211         UnlockRelationOid(heapoid, AccessShareLock);
3212 }
3213
3214 /*
3215  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
3216  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
3217  *
3218  * We need this kluge because we have to be able to access non-fixed-width
3219  * fields of pg_class and pg_index before we have the standard catalog caches
3220  * available.  We use predefined data that's set up in just the same way as
3221  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
3222  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
3223  * does it have a TupleConstr field.  But it's good enough for the purpose of
3224  * extracting fields.
3225  */
3226 static TupleDesc
3227 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
3228                                                  bool hasoids)
3229 {
3230         TupleDesc       result;
3231         MemoryContext oldcxt;
3232         int                     i;
3233
3234         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3235
3236         result = CreateTemplateTupleDesc(natts, hasoids);
3237         result->tdtypeid = RECORDOID;           /* not right, but we don't care */
3238         result->tdtypmod = -1;
3239
3240         for (i = 0; i < natts; i++)
3241         {
3242                 memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
3243                 /* make sure attcacheoff is valid */
3244                 result->attrs[i]->attcacheoff = -1;
3245         }
3246
3247         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
3248         result->attrs[0]->attcacheoff = 0;
3249
3250         /* Note: we don't bother to set up a TupleConstr entry */
3251
3252         MemoryContextSwitchTo(oldcxt);
3253
3254         return result;
3255 }
3256
3257 static TupleDesc
3258 GetPgClassDescriptor(void)
3259 {
3260         static TupleDesc pgclassdesc = NULL;
3261
3262         /* Already done? */
3263         if (pgclassdesc == NULL)
3264                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
3265                                                                                            Desc_pg_class,
3266                                                                                            true);
3267
3268         return pgclassdesc;
3269 }
3270
3271 static TupleDesc
3272 GetPgIndexDescriptor(void)
3273 {
3274         static TupleDesc pgindexdesc = NULL;
3275
3276         /* Already done? */
3277         if (pgindexdesc == NULL)
3278                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
3279                                                                                            Desc_pg_index,
3280                                                                                            false);
3281
3282         return pgindexdesc;
3283 }
3284
3285 /*
3286  * Load any default attribute value definitions for the relation.
3287  */
3288 static void
3289 AttrDefaultFetch(Relation relation)
3290 {
3291         AttrDefault *attrdef = relation->rd_att->constr->defval;
3292         int                     ndef = relation->rd_att->constr->num_defval;
3293         Relation        adrel;
3294         SysScanDesc adscan;
3295         ScanKeyData skey;
3296         HeapTuple       htup;
3297         Datum           val;
3298         bool            isnull;
3299         int                     found;
3300         int                     i;
3301
3302         ScanKeyInit(&skey,
3303                                 Anum_pg_attrdef_adrelid,
3304                                 BTEqualStrategyNumber, F_OIDEQ,
3305                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3306
3307         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
3308         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
3309                                                                 SnapshotNow, 1, &skey);
3310         found = 0;
3311
3312         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
3313         {
3314                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
3315
3316                 for (i = 0; i < ndef; i++)
3317                 {
3318                         if (adform->adnum != attrdef[i].adnum)
3319                                 continue;
3320                         if (attrdef[i].adbin != NULL)
3321                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
3322                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3323                                          RelationGetRelationName(relation));
3324                         else
3325                                 found++;
3326
3327                         val = fastgetattr(htup,
3328                                                           Anum_pg_attrdef_adbin,
3329                                                           adrel->rd_att, &isnull);
3330                         if (isnull)
3331                                 elog(WARNING, "null adbin for attr %s of rel %s",
3332                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
3333                                          RelationGetRelationName(relation));
3334                         else
3335                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
3336                                                                                                    TextDatumGetCString(val));
3337                         break;
3338                 }
3339
3340                 if (i >= ndef)
3341                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
3342                                  adform->adnum, RelationGetRelationName(relation));
3343         }
3344
3345         systable_endscan(adscan);
3346         heap_close(adrel, AccessShareLock);
3347
3348         if (found != ndef)
3349                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
3350                          ndef - found, RelationGetRelationName(relation));
3351 }
3352
3353 /*
3354  * Load any check constraints for the relation.
3355  */
3356 static void
3357 CheckConstraintFetch(Relation relation)
3358 {
3359         ConstrCheck *check = relation->rd_att->constr->check;
3360         int                     ncheck = relation->rd_att->constr->num_check;
3361         Relation        conrel;
3362         SysScanDesc conscan;
3363         ScanKeyData skey[1];
3364         HeapTuple       htup;
3365         Datum           val;
3366         bool            isnull;
3367         int                     found = 0;
3368
3369         ScanKeyInit(&skey[0],
3370                                 Anum_pg_constraint_conrelid,
3371                                 BTEqualStrategyNumber, F_OIDEQ,
3372                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3373
3374         conrel = heap_open(ConstraintRelationId, AccessShareLock);
3375         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
3376                                                                  SnapshotNow, 1, skey);
3377
3378         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3379         {
3380                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
3381
3382                 /* We want check constraints only */
3383                 if (conform->contype != CONSTRAINT_CHECK)
3384                         continue;
3385
3386                 if (found >= ncheck)
3387                         elog(ERROR, "unexpected constraint record found for rel %s",
3388                                  RelationGetRelationName(relation));
3389
3390                 check[found].ccvalid = conform->convalidated;
3391                 check[found].ccnoinherit = conform->connoinherit;
3392                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
3393                                                                                                   NameStr(conform->conname));
3394
3395                 /* Grab and test conbin is actually set */
3396                 val = fastgetattr(htup,
3397                                                   Anum_pg_constraint_conbin,
3398                                                   conrel->rd_att, &isnull);
3399                 if (isnull)
3400                         elog(ERROR, "null conbin for rel %s",
3401                                  RelationGetRelationName(relation));
3402
3403                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3404                                                                                                  TextDatumGetCString(val));
3405                 found++;
3406         }
3407
3408         systable_endscan(conscan);
3409         heap_close(conrel, AccessShareLock);
3410
3411         if (found != ncheck)
3412                 elog(ERROR, "%d constraint record(s) missing for rel %s",
3413                          ncheck - found, RelationGetRelationName(relation));
3414 }
3415
3416 /*
3417  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
3418  *
3419  * The index list is created only if someone requests it.  We scan pg_index
3420  * to find relevant indexes, and add the list to the relcache entry so that
3421  * we won't have to compute it again.  Note that shared cache inval of a
3422  * relcache entry will delete the old list and set rd_indexvalid to 0,
3423  * so that we must recompute the index list on next request.  This handles
3424  * creation or deletion of an index.
3425  *
3426  * Indexes that are marked not IndexIsLive are omitted from the returned list.
3427  * Such indexes are expected to be dropped momentarily, and should not be
3428  * touched at all by any caller of this function.
3429  *
3430  * The returned list is guaranteed to be sorted in order by OID.  This is
3431  * needed by the executor, since for index types that we obtain exclusive
3432  * locks on when updating the index, all backends must lock the indexes in
3433  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
3434  * consistent ordering would do, but ordering by OID is easy.
3435  *
3436  * Since shared cache inval causes the relcache's copy of the list to go away,
3437  * we return a copy of the list palloc'd in the caller's context.  The caller
3438  * may list_free() the returned list after scanning it. This is necessary
3439  * since the caller will typically be doing syscache lookups on the relevant
3440  * indexes, and syscache lookup could cause SI messages to be processed!
3441  *
3442  * We also update rd_oidindex, which this module treats as effectively part
3443  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
3444  * it is the pg_class OID of a unique index on OID when the relation has one,
3445  * and InvalidOid if there is no such index.
3446  */
3447 List *
3448 RelationGetIndexList(Relation relation)
3449 {
3450         Relation        indrel;
3451         SysScanDesc indscan;
3452         ScanKeyData skey;
3453         HeapTuple       htup;
3454         List       *result;
3455         Oid                     oidIndex;
3456         MemoryContext oldcxt;
3457
3458         /* Quick exit if we already computed the list. */
3459         if (relation->rd_indexvalid != 0)
3460                 return list_copy(relation->rd_indexlist);
3461
3462         /*
3463          * We build the list we intend to return (in the caller's context) while
3464          * doing the scan.      After successfully completing the scan, we copy that
3465          * list into the relcache entry.  This avoids cache-context memory leakage
3466          * if we get some sort of error partway through.
3467          */
3468         result = NIL;
3469         oidIndex = InvalidOid;
3470
3471         /* Prepare to scan pg_index for entries having indrelid = this rel. */
3472         ScanKeyInit(&skey,
3473                                 Anum_pg_index_indrelid,
3474                                 BTEqualStrategyNumber, F_OIDEQ,
3475                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3476
3477         indrel = heap_open(IndexRelationId, AccessShareLock);
3478         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
3479                                                                  SnapshotNow, 1, &skey);
3480
3481         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3482         {
3483                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3484                 Datum           indclassDatum;
3485                 oidvector  *indclass;
3486                 bool            isnull;
3487
3488                 /*
3489                  * Ignore any indexes that are currently being dropped.  This will
3490                  * prevent them from being searched, inserted into, or considered in
3491                  * HOT-safety decisions.  It's unsafe to touch such an index at all
3492                  * since its catalog entries could disappear at any instant.
3493                  */
3494                 if (!IndexIsLive(index))
3495                         continue;
3496
3497                 /* Add index's OID to result list in the proper order */
3498                 result = insert_ordered_oid(result, index->indexrelid);
3499
3500                 /*
3501                  * indclass cannot be referenced directly through the C struct,
3502                  * because it comes after the variable-width indkey field.      Must
3503                  * extract the datum the hard way...
3504                  */
3505                 indclassDatum = heap_getattr(htup,
3506                                                                          Anum_pg_index_indclass,
3507                                                                          GetPgIndexDescriptor(),
3508                                                                          &isnull);
3509                 Assert(!isnull);
3510                 indclass = (oidvector *) DatumGetPointer(indclassDatum);
3511
3512                 /* Check to see if it is a unique, non-partial btree index on OID */
3513                 if (IndexIsValid(index) &&
3514                         index->indnatts == 1 &&
3515                         index->indisunique && index->indimmediate &&
3516                         index->indkey.values[0] == ObjectIdAttributeNumber &&
3517                         indclass->values[0] == OID_BTREE_OPS_OID &&
3518                         heap_attisnull(htup, Anum_pg_index_indpred))
3519                         oidIndex = index->indexrelid;
3520         }
3521
3522         systable_endscan(indscan);
3523         heap_close(indrel, AccessShareLock);
3524
3525         /* Now save a copy of the completed list in the relcache entry. */
3526         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3527         relation->rd_indexlist = list_copy(result);
3528         relation->rd_oidindex = oidIndex;
3529         relation->rd_indexvalid = 1;
3530         MemoryContextSwitchTo(oldcxt);
3531
3532         return result;
3533 }
3534
3535 /*
3536  * insert_ordered_oid
3537  *              Insert a new Oid into a sorted list of Oids, preserving ordering
3538  *
3539  * Building the ordered list this way is O(N^2), but with a pretty small
3540  * constant, so for the number of entries we expect it will probably be
3541  * faster than trying to apply qsort().  Most tables don't have very many
3542  * indexes...
3543  */
3544 static List *
3545 insert_ordered_oid(List *list, Oid datum)
3546 {
3547         ListCell   *prev;
3548
3549         /* Does the datum belong at the front? */
3550         if (list == NIL || datum < linitial_oid(list))
3551                 return lcons_oid(datum, list);
3552         /* No, so find the entry it belongs after */
3553         prev = list_head(list);
3554         for (;;)
3555         {
3556                 ListCell   *curr = lnext(prev);
3557
3558                 if (curr == NULL || datum < lfirst_oid(curr))
3559                         break;                          /* it belongs after 'prev', before 'curr' */
3560
3561                 prev = curr;
3562         }
3563         /* Insert datum into list after 'prev' */
3564         lappend_cell_oid(list, prev, datum);
3565         return list;
3566 }
3567
3568 /*
3569  * RelationSetIndexList -- externally force the index list contents
3570  *
3571  * This is used to temporarily override what we think the set of valid
3572  * indexes is (including the presence or absence of an OID index).
3573  * The forcing will be valid only until transaction commit or abort.
3574  *
3575  * This should only be applied to nailed relations, because in a non-nailed
3576  * relation the hacked index list could be lost at any time due to SI
3577  * messages.  In practice it is only used on pg_class (see REINDEX).
3578  *
3579  * It is up to the caller to make sure the given list is correctly ordered.
3580  *
3581  * We deliberately do not change rd_indexattr here: even when operating
3582  * with a temporary partial index list, HOT-update decisions must be made
3583  * correctly with respect to the full index set.  It is up to the caller
3584  * to ensure that a correct rd_indexattr set has been cached before first
3585  * calling RelationSetIndexList; else a subsequent inquiry might cause a
3586  * wrong rd_indexattr set to get computed and cached.
3587  */
3588 void
3589 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3590 {
3591         MemoryContext oldcxt;
3592
3593         Assert(relation->rd_isnailed);
3594         /* Copy the list into the cache context (could fail for lack of mem) */
3595         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3596         indexIds = list_copy(indexIds);
3597         MemoryContextSwitchTo(oldcxt);
3598         /* Okay to replace old list */
3599         list_free(relation->rd_indexlist);
3600         relation->rd_indexlist = indexIds;
3601         relation->rd_oidindex = oidIndex;
3602         relation->rd_indexvalid = 2;    /* mark list as forced */
3603         /* Flag relation as needing eoxact cleanup (to reset the list) */
3604         EOXactListAdd(relation);
3605 }
3606
3607 /*
3608  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
3609  *
3610  * Returns InvalidOid if there is no such index.
3611  */
3612 Oid
3613 RelationGetOidIndex(Relation relation)
3614 {
3615         List       *ilist;
3616
3617         /*
3618          * If relation doesn't have OIDs at all, caller is probably confused. (We
3619          * could just silently return InvalidOid, but it seems better to throw an
3620          * assertion.)
3621          */
3622         Assert(relation->rd_rel->relhasoids);
3623
3624         if (relation->rd_indexvalid == 0)
3625         {
3626                 /* RelationGetIndexList does the heavy lifting. */
3627                 ilist = RelationGetIndexList(relation);
3628                 list_free(ilist);
3629                 Assert(relation->rd_indexvalid != 0);
3630         }
3631
3632         return relation->rd_oidindex;
3633 }
3634
3635 /*
3636  * RelationGetIndexExpressions -- get the index expressions for an index
3637  *
3638  * We cache the result of transforming pg_index.indexprs into a node tree.
3639  * If the rel is not an index or has no expressional columns, we return NIL.
3640  * Otherwise, the returned tree is copied into the caller's memory context.
3641  * (We don't want to return a pointer to the relcache copy, since it could
3642  * disappear due to relcache invalidation.)
3643  */
3644 List *
3645 RelationGetIndexExpressions(Relation relation)
3646 {
3647         List       *result;
3648         Datum           exprsDatum;
3649         bool            isnull;
3650         char       *exprsString;
3651         MemoryContext oldcxt;
3652
3653         /* Quick exit if we already computed the result. */
3654         if (relation->rd_indexprs)
3655                 return (List *) copyObject(relation->rd_indexprs);
3656
3657         /* Quick exit if there is nothing to do. */
3658         if (relation->rd_indextuple == NULL ||
3659                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
3660                 return NIL;
3661
3662         /*
3663          * We build the tree we intend to return in the caller's context. After
3664          * successfully completing the work, we copy it into the relcache entry.
3665          * This avoids problems if we get some sort of error partway through.
3666          */
3667         exprsDatum = heap_getattr(relation->rd_indextuple,
3668                                                           Anum_pg_index_indexprs,
3669                                                           GetPgIndexDescriptor(),
3670                                                           &isnull);
3671         Assert(!isnull);
3672         exprsString = TextDatumGetCString(exprsDatum);
3673         result = (List *) stringToNode(exprsString);
3674         pfree(exprsString);
3675
3676         /*
3677          * Run the expressions through eval_const_expressions. This is not just an
3678          * optimization, but is necessary, because the planner will be comparing
3679          * them to similarly-processed qual clauses, and may fail to detect valid
3680          * matches without this.  We don't bother with canonicalize_qual, however.
3681          */
3682         result = (List *) eval_const_expressions(NULL, (Node *) result);
3683
3684         /* May as well fix opfuncids too */
3685         fix_opfuncids((Node *) result);
3686
3687         /* Now save a copy of the completed tree in the relcache entry. */
3688         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
3689         relation->rd_indexprs = (List *) copyObject(result);
3690         MemoryContextSwitchTo(oldcxt);
3691
3692         return result;
3693 }
3694
3695 /*
3696  * RelationGetIndexPredicate -- get the index predicate for an index
3697  *
3698  * We cache the result of transforming pg_index.indpred into an implicit-AND
3699  * node tree (suitable for ExecQual).
3700  * If the rel is not an index or has no predicate, we return NIL.
3701  * Otherwise, the returned tree is copied into the caller's memory context.
3702  * (We don't want to return a pointer to the relcache copy, since it could
3703  * disappear due to relcache invalidation.)
3704  */
3705 List *
3706 RelationGetIndexPredicate(Relation relation)
3707 {
3708         List       *result;
3709         Datum           predDatum;
3710         bool            isnull;
3711         char       *predString;
3712         MemoryContext oldcxt;
3713
3714         /* Quick exit if we already computed the result. */
3715         if (relation->rd_indpred)
3716                 return (List *) copyObject(relation->rd_indpred);
3717
3718         /* Quick exit if there is nothing to do. */
3719         if (relation->rd_indextuple == NULL ||
3720                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
3721                 return NIL;
3722
3723         /*
3724          * We build the tree we intend to return in the caller's context. After
3725          * successfully completing the work, we copy it into the relcache entry.
3726          * This avoids problems if we get some sort of error partway through.
3727          */
3728         predDatum = heap_getattr(relation->rd_indextuple,
3729                                                          Anum_pg_index_indpred,
3730                                                          GetPgIndexDescriptor(),
3731                                                          &isnull);
3732         Assert(!isnull);
3733         predString = TextDatumGetCString(predDatum);
3734         result = (List *) stringToNode(predString);
3735         pfree(predString);
3736
3737         /*
3738          * Run the expression through const-simplification and canonicalization.
3739          * This is not just an optimization, but is necessary, because the planner
3740          * will be comparing it to similarly-processed qual clauses, and may fail
3741          * to detect valid matches without this.  This must match the processing
3742          * done to qual clauses in preprocess_expression()!  (We can skip the
3743          * stuff involving subqueries, however, since we don't allow any in index
3744          * predicates.)
3745          */
3746         result = (List *) eval_const_expressions(NULL, (Node *) result);
3747
3748         result = (List *) canonicalize_qual((Expr *) result);
3749
3750         /* Also convert to implicit-AND format */
3751         result = make_ands_implicit((Expr *) result);
3752
3753         /* May as well fix opfuncids too */
3754         fix_opfuncids((Node *) result);
3755
3756         /* Now save a copy of the completed tree in the relcache entry. */
3757         oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
3758         relation->rd_indpred = (List *) copyObject(result);
3759         MemoryContextSwitchTo(oldcxt);
3760
3761         return result;
3762 }
3763
3764 /*
3765  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
3766  *
3767  * The result has a bit set for each attribute used anywhere in the index
3768  * definitions of all the indexes on this relation.  (This includes not only
3769  * simple index keys, but attributes used in expressions and partial-index
3770  * predicates.)
3771  *
3772  * If "keyAttrs" is true, only attributes that can be referenced by foreign
3773  * keys are considered.
3774  *
3775  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
3776  * we can include system attributes (e.g., OID) in the bitmap representation.
3777  *
3778  * Caller had better hold at least RowExclusiveLock on the target relation
3779  * to ensure that it has a stable set of indexes.  This also makes it safe
3780  * (deadlock-free) for us to take locks on the relation's indexes.
3781  *
3782  * The returned result is palloc'd in the caller's memory context and should
3783  * be bms_free'd when not needed anymore.
3784  */
3785 Bitmapset *
3786 RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
3787 {
3788         Bitmapset  *indexattrs;
3789         Bitmapset  *uindexattrs;
3790         List       *indexoidlist;
3791         ListCell   *l;
3792         MemoryContext oldcxt;
3793
3794         /* Quick exit if we already computed the result. */
3795         if (relation->rd_indexattr != NULL)
3796                 return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr);
3797
3798         /* Fast path if definitely no indexes */
3799         if (!RelationGetForm(relation)->relhasindex)
3800                 return NULL;
3801
3802         /*
3803          * Get cached list of index OIDs
3804          */
3805         indexoidlist = RelationGetIndexList(relation);
3806
3807         /* Fall out if no indexes (but relhasindex was set) */
3808         if (indexoidlist == NIL)
3809                 return NULL;
3810
3811         /*
3812          * For each index, add referenced attributes to indexattrs.
3813          *
3814          * Note: we consider all indexes returned by RelationGetIndexList, even if
3815          * they are not indisready or indisvalid.  This is important because an
3816          * index for which CREATE INDEX CONCURRENTLY has just started must be
3817          * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
3818          * CONCURRENTLY is far enough along that we should ignore the index, it
3819          * won't be returned at all by RelationGetIndexList.
3820          */
3821         indexattrs = NULL;
3822         uindexattrs = NULL;
3823         foreach(l, indexoidlist)
3824         {
3825                 Oid                     indexOid = lfirst_oid(l);
3826                 Relation        indexDesc;
3827                 IndexInfo  *indexInfo;
3828                 int                     i;
3829                 bool            isKey;
3830
3831                 indexDesc = index_open(indexOid, AccessShareLock);
3832
3833                 /* Extract index key information from the index's pg_index row */
3834                 indexInfo = BuildIndexInfo(indexDesc);
3835
3836                 /* Can this index be referenced by a foreign key? */
3837                 isKey = indexInfo->ii_Unique &&
3838                                 indexInfo->ii_Expressions == NIL &&
3839                                 indexInfo->ii_Predicate == NIL;
3840
3841                 /* Collect simple attribute references */
3842                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
3843                 {
3844                         int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
3845
3846                         if (attrnum != 0)
3847                         {
3848                                 indexattrs = bms_add_member(indexattrs,
3849                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
3850                                 if (isKey)
3851                                         uindexattrs = bms_add_member(uindexattrs,
3852                                                                                                  attrnum - FirstLowInvalidHeapAttributeNumber);
3853                         }
3854                 }
3855
3856                 /* Collect all attributes used in expressions, too */
3857                 pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
3858
3859                 /* Collect all attributes in the index predicate, too */
3860                 pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
3861
3862                 index_close(indexDesc, AccessShareLock);
3863         }
3864
3865         list_free(indexoidlist);
3866
3867         /* Now save a copy of the bitmap in the relcache entry. */
3868         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3869         relation->rd_indexattr = bms_copy(indexattrs);
3870         relation->rd_keyattr = bms_copy(uindexattrs);
3871         MemoryContextSwitchTo(oldcxt);
3872
3873         /* We return our original working copy for caller to play with */
3874         return keyAttrs ? uindexattrs : indexattrs;
3875 }
3876
3877 /*
3878  * RelationGetExclusionInfo -- get info about index's exclusion constraint
3879  *
3880  * This should be called only for an index that is known to have an
3881  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
3882  * context) of the exclusion operator OIDs, their underlying functions'
3883  * OIDs, and their strategy numbers in the index's opclasses.  We cache
3884  * all this information since it requires a fair amount of work to get.
3885  */
3886 void
3887 RelationGetExclusionInfo(Relation indexRelation,
3888                                                  Oid **operators,
3889                                                  Oid **procs,
3890                                                  uint16 **strategies)
3891 {
3892         int                     ncols = indexRelation->rd_rel->relnatts;
3893         Oid                *ops;
3894         Oid                *funcs;
3895         uint16     *strats;
3896         Relation        conrel;
3897         SysScanDesc conscan;
3898         ScanKeyData skey[1];
3899         HeapTuple       htup;
3900         bool            found;
3901         MemoryContext oldcxt;
3902         int                     i;
3903
3904         /* Allocate result space in caller context */
3905         *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
3906         *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
3907         *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
3908
3909         /* Quick exit if we have the data cached already */
3910         if (indexRelation->rd_exclstrats != NULL)
3911         {
3912                 memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
3913                 memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
3914                 memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
3915                 return;
3916         }
3917
3918         /*
3919          * Search pg_constraint for the constraint associated with the index. To
3920          * make this not too painfully slow, we use the index on conrelid; that
3921          * will hold the parent relation's OID not the index's own OID.
3922          */
3923         ScanKeyInit(&skey[0],
3924                                 Anum_pg_constraint_conrelid,
3925                                 BTEqualStrategyNumber, F_OIDEQ,
3926                                 ObjectIdGetDatum(indexRelation->rd_index->indrelid));
3927
3928         conrel = heap_open(ConstraintRelationId, AccessShareLock);
3929         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
3930                                                                  SnapshotNow, 1, skey);
3931         found = false;
3932
3933         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3934         {
3935                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
3936                 Datum           val;
3937                 bool            isnull;
3938                 ArrayType  *arr;
3939                 int                     nelem;
3940
3941                 /* We want the exclusion constraint owning the index */
3942                 if (conform->contype != CONSTRAINT_EXCLUSION ||
3943                         conform->conindid != RelationGetRelid(indexRelation))
3944                         continue;
3945
3946                 /* There should be only one */
3947                 if (found)
3948                         elog(ERROR, "unexpected exclusion constraint record found for rel %s",
3949                                  RelationGetRelationName(indexRelation));
3950                 found = true;
3951
3952                 /* Extract the operator OIDS from conexclop */
3953                 val = fastgetattr(htup,
3954                                                   Anum_pg_constraint_conexclop,
3955                                                   conrel->rd_att, &isnull);
3956                 if (isnull)
3957                         elog(ERROR, "null conexclop for rel %s",
3958                                  RelationGetRelationName(indexRelation));
3959
3960                 arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
3961                 nelem = ARR_DIMS(arr)[0];
3962                 if (ARR_NDIM(arr) != 1 ||
3963                         nelem != ncols ||
3964                         ARR_HASNULL(arr) ||
3965                         ARR_ELEMTYPE(arr) != OIDOID)
3966                         elog(ERROR, "conexclop is not a 1-D Oid array");
3967
3968                 memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
3969         }
3970
3971         systable_endscan(conscan);
3972         heap_close(conrel, AccessShareLock);
3973
3974         if (!found)
3975                 elog(ERROR, "exclusion constraint record missing for rel %s",
3976                          RelationGetRelationName(indexRelation));
3977
3978         /* We need the func OIDs and strategy numbers too */
3979         for (i = 0; i < ncols; i++)
3980         {
3981                 funcs[i] = get_opcode(ops[i]);
3982                 strats[i] = get_op_opfamily_strategy(ops[i],
3983                                                                                          indexRelation->rd_opfamily[i]);
3984                 /* shouldn't fail, since it was checked at index creation */
3985                 if (strats[i] == InvalidStrategy)
3986                         elog(ERROR, "could not find strategy for operator %u in family %u",
3987                                  ops[i], indexRelation->rd_opfamily[i]);
3988         }
3989
3990         /* Save a copy of the results in the relcache entry. */
3991         oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
3992         indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
3993         indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
3994         indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
3995         memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
3996         memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
3997         memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
3998         MemoryContextSwitchTo(oldcxt);
3999 }
4000
4001
4002 /*
4003  * Routines to support ereport() reports of relation-related errors
4004  *
4005  * These could have been put into elog.c, but it seems like a module layering
4006  * violation to have elog.c calling relcache or syscache stuff --- and we
4007  * definitely don't want elog.h including rel.h.  So we put them here.
4008  */
4009
4010 /*
4011  * errtable --- stores schema_name and table_name of a table
4012  * within the current errordata.
4013  */
4014 int
4015 errtable(Relation rel)
4016 {
4017         err_generic_string(PG_DIAG_SCHEMA_NAME,
4018                                            get_namespace_name(RelationGetNamespace(rel)));
4019         err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
4020
4021         return 0;                       /* return value does not matter */
4022 }
4023
4024 /*
4025  * errtablecol --- stores schema_name, table_name and column_name
4026  * of a table column within the current errordata.
4027  *
4028  * The column is specified by attribute number --- for most callers, this is
4029  * easier and less error-prone than getting the column name for themselves.
4030  */
4031 int
4032 errtablecol(Relation rel, int attnum)
4033 {
4034         TupleDesc       reldesc = RelationGetDescr(rel);
4035         const char *colname;
4036
4037         /* Use reldesc if it's a user attribute, else consult the catalogs */
4038         if (attnum > 0 && attnum <= reldesc->natts)
4039                 colname = NameStr(reldesc->attrs[attnum - 1]->attname);
4040         else
4041                 colname = get_relid_attribute_name(RelationGetRelid(rel), attnum);
4042
4043         return errtablecolname(rel, colname);
4044 }
4045
4046 /*
4047  * errtablecolname --- stores schema_name, table_name and column_name
4048  * of a table column within the current errordata, where the column name is
4049  * given directly rather than extracted from the relation's catalog data.
4050  *
4051  * Don't use this directly unless errtablecol() is inconvenient for some
4052  * reason.  This might possibly be needed during intermediate states in ALTER
4053  * TABLE, for instance.
4054  */
4055 int
4056 errtablecolname(Relation rel, const char *colname)
4057 {
4058         errtable(rel);
4059         err_generic_string(PG_DIAG_COLUMN_NAME, colname);
4060
4061         return 0;                       /* return value does not matter */
4062 }
4063
4064 /*
4065  * errtableconstraint --- stores schema_name, table_name and constraint_name
4066  * of a table-related constraint within the current errordata.
4067  */
4068 int
4069 errtableconstraint(Relation rel, const char *conname)
4070 {
4071         errtable(rel);
4072         err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
4073
4074         return 0;                       /* return value does not matter */
4075 }
4076
4077
4078 /*
4079  *      load_relcache_init_file, write_relcache_init_file
4080  *
4081  *              In late 1992, we started regularly having databases with more than
4082  *              a thousand classes in them.  With this number of classes, it became
4083  *              critical to do indexed lookups on the system catalogs.
4084  *
4085  *              Bootstrapping these lookups is very hard.  We want to be able to
4086  *              use an index on pg_attribute, for example, but in order to do so,
4087  *              we must have read pg_attribute for the attributes in the index,
4088  *              which implies that we need to use the index.
4089  *
4090  *              In order to get around the problem, we do the following:
4091  *
4092  *                 +  When the database system is initialized (at initdb time), we
4093  *                        don't use indexes.  We do sequential scans.
4094  *
4095  *                 +  When the backend is started up in normal mode, we load an image
4096  *                        of the appropriate relation descriptors, in internal format,
4097  *                        from an initialization file in the data/base/... directory.
4098  *
4099  *                 +  If the initialization file isn't there, then we create the
4100  *                        relation descriptors using sequential scans and write 'em to
4101  *                        the initialization file for use by subsequent backends.
4102  *
4103  *              As of Postgres 9.0, there is one local initialization file in each
4104  *              database, plus one shared initialization file for shared catalogs.
4105  *
4106  *              We could dispense with the initialization files and just build the
4107  *              critical reldescs the hard way on every backend startup, but that
4108  *              slows down backend startup noticeably.
4109  *
4110  *              We can in fact go further, and save more relcache entries than
4111  *              just the ones that are absolutely critical; this allows us to speed
4112  *              up backend startup by not having to build such entries the hard way.
4113  *              Presently, all the catalog and index entries that are referred to
4114  *              by catcaches are stored in the initialization files.
4115  *
4116  *              The same mechanism that detects when catcache and relcache entries
4117  *              need to be invalidated (due to catalog updates) also arranges to
4118  *              unlink the initialization files when the contents may be out of date.
4119  *              The files will then be rebuilt during the next backend startup.
4120  */
4121
4122 /*
4123  * load_relcache_init_file -- attempt to load cache from the shared
4124  * or local cache init file
4125  *
4126  * If successful, return TRUE and set criticalRelcachesBuilt or
4127  * criticalSharedRelcachesBuilt to true.
4128  * If not successful, return FALSE.
4129  *
4130  * NOTE: we assume we are already switched into CacheMemoryContext.
4131  */
4132 static bool
4133 load_relcache_init_file(bool shared)
4134 {
4135         FILE       *fp;
4136         char            initfilename[MAXPGPATH];
4137         Relation   *rels;
4138         int                     relno,
4139                                 num_rels,
4140                                 max_rels,
4141                                 nailed_rels,
4142                                 nailed_indexes,
4143                                 magic;
4144         int                     i;
4145
4146         if (shared)
4147                 snprintf(initfilename, sizeof(initfilename), "global/%s",
4148                                  RELCACHE_INIT_FILENAME);
4149         else
4150                 snprintf(initfilename, sizeof(initfilename), "%s/%s",
4151                                  DatabasePath, RELCACHE_INIT_FILENAME);
4152
4153         fp = AllocateFile(initfilename, PG_BINARY_R);
4154         if (fp == NULL)
4155                 return false;
4156
4157         /*
4158          * Read the index relcache entries from the file.  Note we will not enter
4159          * any of them into the cache if the read fails partway through; this
4160          * helps to guard against broken init files.
4161          */
4162         max_rels = 100;
4163         rels = (Relation *) palloc(max_rels * sizeof(Relation));
4164         num_rels = 0;
4165         nailed_rels = nailed_indexes = 0;
4166
4167         /* check for correct magic number (compatible version) */
4168         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
4169                 goto read_failed;
4170         if (magic != RELCACHE_INIT_FILEMAGIC)
4171                 goto read_failed;
4172
4173         for (relno = 0;; relno++)
4174         {
4175                 Size            len;
4176                 size_t          nread;
4177                 Relation        rel;
4178                 Form_pg_class relform;
4179                 bool            has_not_null;
4180
4181                 /* first read the relation descriptor length */
4182                 nread = fread(&len, 1, sizeof(len), fp);
4183                 if (nread != sizeof(len))
4184                 {
4185                         if (nread == 0)
4186                                 break;                  /* end of file */
4187                         goto read_failed;
4188                 }
4189
4190                 /* safety check for incompatible relcache layout */
4191                 if (len != sizeof(RelationData))
4192                         goto read_failed;
4193
4194                 /* allocate another relcache header */
4195                 if (num_rels >= max_rels)
4196                 {
4197                         max_rels *= 2;
4198                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
4199                 }
4200
4201                 rel = rels[num_rels++] = (Relation) palloc(len);
4202
4203                 /* then, read the Relation structure */
4204                 if (fread(rel, 1, len, fp) != len)
4205                         goto read_failed;
4206
4207                 /* next read the relation tuple form */
4208                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4209                         goto read_failed;
4210
4211                 relform = (Form_pg_class) palloc(len);
4212                 if (fread(relform, 1, len, fp) != len)
4213                         goto read_failed;
4214
4215                 rel->rd_rel = relform;
4216
4217                 /* initialize attribute tuple forms */
4218                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
4219                                                                                           relform->relhasoids);
4220                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
4221
4222                 rel->rd_att->tdtypeid = relform->reltype;
4223                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
4224
4225                 /* next read all the attribute tuple form data entries */
4226                 has_not_null = false;
4227                 for (i = 0; i < relform->relnatts; i++)
4228                 {
4229                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4230                                 goto read_failed;
4231                         if (len != ATTRIBUTE_FIXED_PART_SIZE)
4232                                 goto read_failed;
4233                         if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
4234                                 goto read_failed;
4235
4236                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
4237                 }
4238
4239                 /* next read the access method specific field */
4240                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4241                         goto read_failed;
4242                 if (len > 0)
4243                 {
4244                         rel->rd_options = palloc(len);
4245                         if (fread(rel->rd_options, 1, len, fp) != len)
4246                                 goto read_failed;
4247                         if (len != VARSIZE(rel->rd_options))
4248                                 goto read_failed;               /* sanity check */
4249                 }
4250                 else
4251                 {
4252                         rel->rd_options = NULL;
4253                 }
4254
4255                 /* mark not-null status */
4256                 if (has_not_null)
4257                 {
4258                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
4259
4260                         constr->has_not_null = true;
4261                         rel->rd_att->constr = constr;
4262                 }
4263
4264                 /* If it's an index, there's more to do */
4265                 if (rel->rd_rel->relkind == RELKIND_INDEX)
4266                 {
4267                         Form_pg_am      am;
4268                         MemoryContext indexcxt;
4269                         Oid                *opfamily;
4270                         Oid                *opcintype;
4271                         RegProcedure *support;
4272                         int                     nsupport;
4273                         int16      *indoption;
4274                         Oid                *indcollation;
4275
4276                         /* Count nailed indexes to ensure we have 'em all */
4277                         if (rel->rd_isnailed)
4278                                 nailed_indexes++;
4279
4280                         /* next, read the pg_index tuple */
4281                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4282                                 goto read_failed;
4283
4284                         rel->rd_indextuple = (HeapTuple) palloc(len);
4285                         if (fread(rel->rd_indextuple, 1, len, fp) != len)
4286                                 goto read_failed;
4287
4288                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
4289                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
4290                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
4291
4292                         /* next, read the access method tuple form */
4293                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4294                                 goto read_failed;
4295
4296                         am = (Form_pg_am) palloc(len);
4297                         if (fread(am, 1, len, fp) != len)
4298                                 goto read_failed;
4299                         rel->rd_am = am;
4300
4301                         /*
4302                          * prepare index info context --- parameters should match
4303                          * RelationInitIndexAccessInfo
4304                          */
4305                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
4306                                                                                          RelationGetRelationName(rel),
4307                                                                                          ALLOCSET_SMALL_MINSIZE,
4308                                                                                          ALLOCSET_SMALL_INITSIZE,
4309                                                                                          ALLOCSET_SMALL_MAXSIZE);
4310                         rel->rd_indexcxt = indexcxt;
4311
4312                         /* next, read the vector of opfamily OIDs */
4313                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4314                                 goto read_failed;
4315
4316                         opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
4317                         if (fread(opfamily, 1, len, fp) != len)
4318                                 goto read_failed;
4319
4320                         rel->rd_opfamily = opfamily;
4321
4322                         /* next, read the vector of opcintype OIDs */
4323                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4324                                 goto read_failed;
4325
4326                         opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
4327                         if (fread(opcintype, 1, len, fp) != len)
4328                                 goto read_failed;
4329
4330                         rel->rd_opcintype = opcintype;
4331
4332                         /* next, read the vector of support procedure OIDs */
4333                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4334                                 goto read_failed;
4335                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
4336                         if (fread(support, 1, len, fp) != len)
4337                                 goto read_failed;
4338
4339                         rel->rd_support = support;
4340
4341                         /* next, read the vector of collation OIDs */
4342                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4343                                 goto read_failed;
4344
4345                         indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
4346                         if (fread(indcollation, 1, len, fp) != len)
4347                                 goto read_failed;
4348
4349                         rel->rd_indcollation = indcollation;
4350
4351                         /* finally, read the vector of indoption values */
4352                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
4353                                 goto read_failed;
4354
4355                         indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
4356                         if (fread(indoption, 1, len, fp) != len)
4357                                 goto read_failed;
4358
4359                         rel->rd_indoption = indoption;
4360
4361                         /* set up zeroed fmgr-info vectors */
4362                         rel->rd_aminfo = (RelationAmInfo *)
4363                                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
4364                         nsupport = relform->relnatts * am->amsupport;
4365                         rel->rd_supportinfo = (FmgrInfo *)
4366                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
4367                 }
4368                 else
4369                 {
4370                         /* Count nailed rels to ensure we have 'em all */
4371                         if (rel->rd_isnailed)
4372                                 nailed_rels++;
4373
4374                         Assert(rel->rd_index == NULL);
4375                         Assert(rel->rd_indextuple == NULL);
4376                         Assert(rel->rd_am == NULL);
4377                         Assert(rel->rd_indexcxt == NULL);
4378                         Assert(rel->rd_aminfo == NULL);
4379                         Assert(rel->rd_opfamily == NULL);
4380                         Assert(rel->rd_opcintype == NULL);
4381                         Assert(rel->rd_support == NULL);
4382                         Assert(rel->rd_supportinfo == NULL);
4383                         Assert(rel->rd_indoption == NULL);
4384                         Assert(rel->rd_indcollation == NULL);
4385                 }
4386
4387                 /*
4388                  * Rules and triggers are not saved (mainly because the internal
4389                  * format is complex and subject to change).  They must be rebuilt if
4390                  * needed by RelationCacheInitializePhase3.  This is not expected to
4391                  * be a big performance hit since few system catalogs have such. Ditto
4392                  * for index expressions, predicates, and exclusion info.
4393                  */
4394                 rel->rd_rules = NULL;
4395                 rel->rd_rulescxt = NULL;
4396                 rel->trigdesc = NULL;
4397                 rel->rd_indexprs = NIL;
4398                 rel->rd_indpred = NIL;
4399                 rel->rd_exclops = NULL;
4400                 rel->rd_exclprocs = NULL;
4401                 rel->rd_exclstrats = NULL;
4402
4403                 /*
4404                  * Reset transient-state fields in the relcache entry
4405                  */
4406                 rel->rd_smgr = NULL;
4407                 if (rel->rd_isnailed)
4408                         rel->rd_refcnt = 1;
4409                 else
4410                         rel->rd_refcnt = 0;
4411                 rel->rd_indexvalid = 0;
4412                 rel->rd_indexlist = NIL;
4413                 rel->rd_indexattr = NULL;
4414                 rel->rd_oidindex = InvalidOid;
4415                 rel->rd_createSubid = InvalidSubTransactionId;
4416                 rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
4417                 rel->rd_amcache = NULL;
4418                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
4419
4420                 /*
4421                  * Recompute lock and physical addressing info.  This is needed in
4422                  * case the pg_internal.init file was copied from some other database
4423                  * by CREATE DATABASE.
4424                  */
4425                 RelationInitLockInfo(rel);
4426                 RelationInitPhysicalAddr(rel);
4427         }
4428
4429         /*
4430          * We reached the end of the init file without apparent problem. Did we
4431          * get the right number of nailed items?  (This is a useful crosscheck in
4432          * case the set of critical rels or indexes changes.)
4433          */
4434         if (shared)
4435         {
4436                 if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
4437                         nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
4438                         goto read_failed;
4439         }
4440         else
4441         {
4442                 if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
4443                         nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
4444                         goto read_failed;
4445         }
4446
4447         /*
4448          * OK, all appears well.
4449          *
4450          * Now insert all the new relcache entries into the cache.
4451          */
4452         for (relno = 0; relno < num_rels; relno++)
4453         {
4454                 RelationCacheInsert(rels[relno]);
4455                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
4456                 if (!shared)
4457                         initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
4458                                                                                         initFileRelationIds);
4459         }
4460
4461         pfree(rels);
4462         FreeFile(fp);
4463
4464         if (shared)
4465                 criticalSharedRelcachesBuilt = true;
4466         else
4467                 criticalRelcachesBuilt = true;
4468         return true;
4469
4470         /*
4471          * init file is broken, so do it the hard way.  We don't bother trying to
4472          * free the clutter we just allocated; it's not in the relcache so it
4473          * won't hurt.
4474          */
4475 read_failed:
4476         pfree(rels);
4477         FreeFile(fp);
4478
4479         return false;
4480 }
4481
4482 /*
4483  * Write out a new initialization file with the current contents
4484  * of the relcache (either shared rels or local rels, as indicated).
4485  */
4486 static void
4487 write_relcache_init_file(bool shared)
4488 {
4489         FILE       *fp;
4490         char            tempfilename[MAXPGPATH];
4491         char            finalfilename[MAXPGPATH];
4492         int                     magic;
4493         HASH_SEQ_STATUS status;
4494         RelIdCacheEnt *idhentry;
4495         MemoryContext oldcxt;
4496         int                     i;
4497
4498         /*
4499          * We must write a temporary file and rename it into place. Otherwise,
4500          * another backend starting at about the same time might crash trying to
4501          * read the partially-complete file.
4502          */
4503         if (shared)
4504         {
4505                 snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
4506                                  RELCACHE_INIT_FILENAME, MyProcPid);
4507                 snprintf(finalfilename, sizeof(finalfilename), "global/%s",
4508                                  RELCACHE_INIT_FILENAME);
4509         }
4510         else
4511         {
4512                 snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
4513                                  DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
4514                 snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
4515                                  DatabasePath, RELCACHE_INIT_FILENAME);
4516         }
4517
4518         unlink(tempfilename);           /* in case it exists w/wrong permissions */
4519
4520         fp = AllocateFile(tempfilename, PG_BINARY_W);
4521         if (fp == NULL)
4522         {
4523                 /*
4524                  * We used to consider this a fatal error, but we might as well
4525                  * continue with backend startup ...
4526                  */
4527                 ereport(WARNING,
4528                                 (errcode_for_file_access(),
4529                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
4530                                                 tempfilename),
4531                           errdetail("Continuing anyway, but there's something wrong.")));
4532                 return;
4533         }
4534
4535         /*
4536          * Write a magic number to serve as a file version identifier.  We can
4537          * change the magic number whenever the relcache layout changes.
4538          */
4539         magic = RELCACHE_INIT_FILEMAGIC;
4540         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
4541                 elog(FATAL, "could not write init file");
4542
4543         /*
4544          * Write all the appropriate reldescs (in no particular order).
4545          */
4546         hash_seq_init(&status, RelationIdCache);
4547
4548         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4549         {
4550                 Relation        rel = idhentry->reldesc;
4551                 Form_pg_class relform = rel->rd_rel;
4552
4553                 /* ignore if not correct group */
4554                 if (relform->relisshared != shared)
4555                         continue;
4556
4557                 /* first write the relcache entry proper */
4558                 write_item(rel, sizeof(RelationData), fp);
4559
4560                 /* next write the relation tuple form */
4561                 write_item(relform, CLASS_TUPLE_SIZE, fp);
4562
4563                 /* next, do all the attribute tuple form data entries */
4564                 for (i = 0; i < relform->relnatts; i++)
4565                 {
4566                         write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4567                 }
4568
4569                 /* next, do the access method specific field */
4570                 write_item(rel->rd_options,
4571                                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4572                                    fp);
4573
4574                 /* If it's an index, there's more to do */
4575                 if (rel->rd_rel->relkind == RELKIND_INDEX)
4576                 {
4577                         Form_pg_am      am = rel->rd_am;
4578
4579                         /* write the pg_index tuple */
4580                         /* we assume this was created by heap_copytuple! */
4581                         write_item(rel->rd_indextuple,
4582                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
4583                                            fp);
4584
4585                         /* next, write the access method tuple form */
4586                         write_item(am, sizeof(FormData_pg_am), fp);
4587
4588                         /* next, write the vector of opfamily OIDs */
4589                         write_item(rel->rd_opfamily,
4590                                            relform->relnatts * sizeof(Oid),
4591                                            fp);
4592
4593                         /* next, write the vector of opcintype OIDs */
4594                         write_item(rel->rd_opcintype,
4595                                            relform->relnatts * sizeof(Oid),
4596                                            fp);
4597
4598                         /* next, write the vector of support procedure OIDs */
4599                         write_item(rel->rd_support,
4600                                   relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4601                                            fp);
4602
4603                         /* next, write the vector of collation OIDs */
4604                         write_item(rel->rd_indcollation,
4605                                            relform->relnatts * sizeof(Oid),
4606                                            fp);
4607
4608                         /* finally, write the vector of indoption values */
4609                         write_item(rel->rd_indoption,
4610                                            relform->relnatts * sizeof(int16),
4611                                            fp);
4612                 }
4613
4614                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
4615                 if (!shared)
4616                 {
4617                         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4618                         initFileRelationIds = lcons_oid(RelationGetRelid(rel),
4619                                                                                         initFileRelationIds);
4620                         MemoryContextSwitchTo(oldcxt);
4621                 }
4622         }
4623
4624         if (FreeFile(fp))
4625                 elog(FATAL, "could not write init file");
4626
4627         /*
4628          * Now we have to check whether the data we've so painstakingly
4629          * accumulated is already obsolete due to someone else's just-committed
4630          * catalog changes.  If so, we just delete the temp file and leave it to
4631          * the next backend to try again.  (Our own relcache entries will be
4632          * updated by SI message processing, but we can't be sure whether what we
4633          * wrote out was up-to-date.)
4634          *
4635          * This mustn't run concurrently with the code that unlinks an init file
4636          * and sends SI messages, so grab a serialization lock for the duration.
4637          */
4638         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
4639
4640         /* Make sure we have seen all incoming SI messages */
4641         AcceptInvalidationMessages();
4642
4643         /*
4644          * If we have received any SI relcache invals since backend start, assume
4645          * we may have written out-of-date data.
4646          */
4647         if (relcacheInvalsReceived == 0L)
4648         {
4649                 /*
4650                  * OK, rename the temp file to its final name, deleting any
4651                  * previously-existing init file.
4652                  *
4653                  * Note: a failure here is possible under Cygwin, if some other
4654                  * backend is holding open an unlinked-but-not-yet-gone init file. So
4655                  * treat this as a noncritical failure; just remove the useless temp
4656                  * file on failure.
4657                  */
4658                 if (rename(tempfilename, finalfilename) < 0)
4659                         unlink(tempfilename);
4660         }
4661         else
4662         {
4663                 /* Delete the already-obsolete temp file */
4664                 unlink(tempfilename);
4665         }
4666
4667         LWLockRelease(RelCacheInitLock);
4668 }
4669
4670 /* write a chunk of data preceded by its length */
4671 static void
4672 write_item(const void *data, Size len, FILE *fp)
4673 {
4674         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
4675                 elog(FATAL, "could not write init file");
4676         if (fwrite(data, 1, len, fp) != len)
4677                 elog(FATAL, "could not write init file");
4678 }
4679
4680 /*
4681  * Detect whether a given relation (identified by OID) is one of the ones
4682  * we store in the local relcache init file.
4683  *
4684  * Note that we effectively assume that all backends running in a database
4685  * would choose to store the same set of relations in the init file;
4686  * otherwise there are cases where we'd fail to detect the need for an init
4687  * file invalidation.  This does not seem likely to be a problem in practice.
4688  */
4689 bool
4690 RelationIdIsInInitFile(Oid relationId)
4691 {
4692         return list_member_oid(initFileRelationIds, relationId);
4693 }
4694
4695 /*
4696  * Invalidate (remove) the init file during commit of a transaction that
4697  * changed one or more of the relation cache entries that are kept in the
4698  * local init file.
4699  *
4700  * To be safe against concurrent inspection or rewriting of the init file,
4701  * we must take RelCacheInitLock, then remove the old init file, then send
4702  * the SI messages that include relcache inval for such relations, and then
4703  * release RelCacheInitLock.  This serializes the whole affair against
4704  * write_relcache_init_file, so that we can be sure that any other process
4705  * that's concurrently trying to create a new init file won't move an
4706  * already-stale version into place after we unlink.  Also, because we unlink
4707  * before sending the SI messages, a backend that's currently starting cannot
4708  * read the now-obsolete init file and then miss the SI messages that will
4709  * force it to update its relcache entries.  (This works because the backend
4710  * startup sequence gets into the sinval array before trying to load the init
4711  * file.)
4712  *
4713  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
4714  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
4715  * send any pending SI messages between those calls.
4716  *
4717  * Notice this deals only with the local init file, not the shared init file.
4718  * The reason is that there can never be a "significant" change to the
4719  * relcache entry of a shared relation; the most that could happen is
4720  * updates of noncritical fields such as relpages/reltuples.  So, while
4721  * it's worth updating the shared init file from time to time, it can never
4722  * be invalid enough to make it necessary to remove it.
4723  */
4724 void
4725 RelationCacheInitFilePreInvalidate(void)
4726 {
4727         char            initfilename[MAXPGPATH];
4728
4729         snprintf(initfilename, sizeof(initfilename), "%s/%s",
4730                          DatabasePath, RELCACHE_INIT_FILENAME);
4731
4732         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
4733
4734         if (unlink(initfilename) < 0)
4735         {
4736                 /*
4737                  * The file might not be there if no backend has been started since
4738                  * the last removal.  But complain about failures other than ENOENT.
4739                  * Fortunately, it's not too late to abort the transaction if we can't
4740                  * get rid of the would-be-obsolete init file.
4741                  */
4742                 if (errno != ENOENT)
4743                         ereport(ERROR,
4744                                         (errcode_for_file_access(),
4745                                          errmsg("could not remove cache file \"%s\": %m",
4746                                                         initfilename)));
4747         }
4748 }
4749
4750 void
4751 RelationCacheInitFilePostInvalidate(void)
4752 {
4753         LWLockRelease(RelCacheInitLock);
4754 }
4755
4756 /*
4757  * Remove the init files during postmaster startup.
4758  *
4759  * We used to keep the init files across restarts, but that is unsafe in PITR
4760  * scenarios, and even in simple crash-recovery cases there are windows for
4761  * the init files to become out-of-sync with the database.      So now we just
4762  * remove them during startup and expect the first backend launch to rebuild
4763  * them.  Of course, this has to happen in each database of the cluster.
4764  */
4765 void
4766 RelationCacheInitFileRemove(void)
4767 {
4768         const char *tblspcdir = "pg_tblspc";
4769         DIR                *dir;
4770         struct dirent *de;
4771         char            path[MAXPGPATH];
4772
4773         /*
4774          * We zap the shared cache file too.  In theory it can't get out of sync
4775          * enough to be a problem, but in data-corruption cases, who knows ...
4776          */
4777         snprintf(path, sizeof(path), "global/%s",
4778                          RELCACHE_INIT_FILENAME);
4779         unlink_initfile(path);
4780
4781         /* Scan everything in the default tablespace */
4782         RelationCacheInitFileRemoveInDir("base");
4783
4784         /* Scan the tablespace link directory to find non-default tablespaces */
4785         dir = AllocateDir(tblspcdir);
4786         if (dir == NULL)
4787         {
4788                 elog(LOG, "could not open tablespace link directory \"%s\": %m",
4789                          tblspcdir);
4790                 return;
4791         }
4792
4793         while ((de = ReadDir(dir, tblspcdir)) != NULL)
4794         {
4795                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
4796                 {
4797                         /* Scan the tablespace dir for per-database dirs */
4798                         snprintf(path, sizeof(path), "%s/%s/%s",
4799                                          tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
4800                         RelationCacheInitFileRemoveInDir(path);
4801                 }
4802         }
4803
4804         FreeDir(dir);
4805 }
4806
4807 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
4808 static void
4809 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4810 {
4811         DIR                *dir;
4812         struct dirent *de;
4813         char            initfilename[MAXPGPATH];
4814
4815         /* Scan the tablespace directory to find per-database directories */
4816         dir = AllocateDir(tblspcpath);
4817         if (dir == NULL)
4818         {
4819                 elog(LOG, "could not open tablespace directory \"%s\": %m",
4820                          tblspcpath);
4821                 return;
4822         }
4823
4824         while ((de = ReadDir(dir, tblspcpath)) != NULL)
4825         {
4826                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
4827                 {
4828                         /* Try to remove the init file in each database */
4829                         snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
4830                                          tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
4831                         unlink_initfile(initfilename);
4832                 }
4833         }
4834
4835         FreeDir(dir);
4836 }
4837
4838 static void
4839 unlink_initfile(const char *initfilename)
4840 {
4841         if (unlink(initfilename) < 0)
4842         {
4843                 /* It might not be there, but log any error other than ENOENT */
4844                 if (errno != ENOENT)
4845                         elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
4846         }
4847 }