]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Update copyright for the year 2010.
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.295 2010/01/02 16:57:55 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - initialize shared-catalog entries
19  *              RelationCacheInitializePhase3   - finish initializing relcache
20  *              RelationIdGetRelation                   - get a reldesc by relation id
21  *              RelationClose                                   - close an open relation
22  *
23  * NOTES
24  *              The following code contains many undocumented hacks.  Please be
25  *              careful....
26  */
27 #include "postgres.h"
28
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32
33 #include "access/genam.h"
34 #include "access/reloptions.h"
35 #include "access/sysattr.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/index.h"
39 #include "catalog/indexing.h"
40 #include "catalog/namespace.h"
41 #include "catalog/pg_amop.h"
42 #include "catalog/pg_amproc.h"
43 #include "catalog/pg_attrdef.h"
44 #include "catalog/pg_authid.h"
45 #include "catalog/pg_constraint.h"
46 #include "catalog/pg_database.h"
47 #include "catalog/pg_namespace.h"
48 #include "catalog/pg_opclass.h"
49 #include "catalog/pg_proc.h"
50 #include "catalog/pg_rewrite.h"
51 #include "catalog/pg_tablespace.h"
52 #include "catalog/pg_type.h"
53 #include "commands/trigger.h"
54 #include "miscadmin.h"
55 #include "optimizer/clauses.h"
56 #include "optimizer/planmain.h"
57 #include "optimizer/prep.h"
58 #include "optimizer/var.h"
59 #include "rewrite/rewriteDefine.h"
60 #include "storage/fd.h"
61 #include "storage/lmgr.h"
62 #include "storage/smgr.h"
63 #include "utils/array.h"
64 #include "utils/builtins.h"
65 #include "utils/fmgroids.h"
66 #include "utils/inval.h"
67 #include "utils/lsyscache.h"
68 #include "utils/memutils.h"
69 #include "utils/relcache.h"
70 #include "utils/resowner.h"
71 #include "utils/syscache.h"
72 #include "utils/tqual.h"
73 #include "utils/typcache.h"
74
75
76 /*
77  *              name of relcache init file(s), used to speed up backend startup
78  */
79 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
80
81 #define RELCACHE_INIT_FILEMAGIC         0x573265        /* version ID value */
82
83 /*
84  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
85  */
86 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
87 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
88 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
89 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
90 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
91 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
92
93 /*
94  *              Hash tables that index the relation cache
95  *
96  *              We used to index the cache by both name and OID, but now there
97  *              is only an index by OID.
98  */
99 typedef struct relidcacheent
100 {
101         Oid                     reloid;
102         Relation        reldesc;
103 } RelIdCacheEnt;
104
105 static HTAB *RelationIdCache;
106
107 /*
108  * This flag is false until we have prepared the critical relcache entries
109  * that are needed to do indexscans on the tables read by relcache building.
110  */
111 bool            criticalRelcachesBuilt = false;
112
113 /*
114  * This flag is false until we have prepared the critical relcache entries
115  * for shared catalogs (specifically, pg_database and its indexes).
116  */
117 bool            criticalSharedRelcachesBuilt = false;
118
119 /*
120  * This counter counts relcache inval events received since backend startup
121  * (but only for rels that are actually in cache).      Presently, we use it only
122  * to detect whether data about to be written by write_relcache_init_file()
123  * might already be obsolete.
124  */
125 static long relcacheInvalsReceived = 0L;
126
127 /*
128  * This list remembers the OIDs of the non-shared relations cached in the
129  * database's local relcache init file.  Note that there is no corresponding
130  * list for the shared relcache init file, for reasons explained in the
131  * comments for RelationCacheInitFileRemove.
132  */
133 static List *initFileRelationIds = NIL;
134
135 /*
136  * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
137  */
138 static bool need_eoxact_work = false;
139
140
141 /*
142  *              macros to manipulate the lookup hashtables
143  */
144 #define RelationCacheInsert(RELATION)   \
145 do { \
146         RelIdCacheEnt *idhentry; bool found; \
147         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
148                                                                                    (void *) &(RELATION->rd_id), \
149                                                                                    HASH_ENTER, &found); \
150         /* used to give notice if found -- now just keep quiet */ \
151         idhentry->reldesc = RELATION; \
152 } while(0)
153
154 #define RelationIdCacheLookup(ID, RELATION) \
155 do { \
156         RelIdCacheEnt *hentry; \
157         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
158                                                                                  (void *) &(ID), \
159                                                                                  HASH_FIND, NULL); \
160         if (hentry) \
161                 RELATION = hentry->reldesc; \
162         else \
163                 RELATION = NULL; \
164 } while(0)
165
166 #define RelationCacheDelete(RELATION) \
167 do { \
168         RelIdCacheEnt *idhentry; \
169         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
170                                                                                    (void *) &(RELATION->rd_id), \
171                                                                                    HASH_REMOVE, NULL); \
172         if (idhentry == NULL) \
173                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
174 } while(0)
175
176
177 /*
178  * Special cache for opclass-related information
179  *
180  * Note: only default operators and support procs get cached, ie, those with
181  * lefttype = righttype = opcintype.
182  */
183 typedef struct opclasscacheent
184 {
185         Oid                     opclassoid;             /* lookup key: OID of opclass */
186         bool            valid;                  /* set TRUE after successful fill-in */
187         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
188         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
189         Oid                     opcfamily;              /* OID of opclass's family */
190         Oid                     opcintype;              /* OID of opclass's declared input type */
191         Oid                *operatorOids;       /* strategy operators' OIDs */
192         RegProcedure *supportProcs; /* support procs */
193 } OpClassCacheEnt;
194
195 static HTAB *OpClassCache = NULL;
196
197
198 /* non-export function prototypes */
199
200 static void RelationClearRelation(Relation relation, bool rebuild);
201
202 static void RelationReloadIndexInfo(Relation relation);
203 static void RelationFlushRelation(Relation relation);
204 static bool load_relcache_init_file(bool shared);
205 static void write_relcache_init_file(bool shared);
206 static void write_item(const void *data, Size len, FILE *fp);
207
208 static void formrdesc(const char *relationName, Oid relationReltype,
209                   bool isshared, bool hasoids,
210                   int natts, const FormData_pg_attribute *attrs);
211
212 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
213 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
214 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
215 static void RelationBuildTupleDesc(Relation relation);
216 static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
217 static void RelationInitPhysicalAddr(Relation relation);
218 static void load_critical_index(Oid indexoid);
219 static TupleDesc GetPgClassDescriptor(void);
220 static TupleDesc GetPgIndexDescriptor(void);
221 static void AttrDefaultFetch(Relation relation);
222 static void CheckConstraintFetch(Relation relation);
223 static List *insert_ordered_oid(List *list, Oid datum);
224 static void IndexSupportInitialize(oidvector *indclass,
225                                            Oid *indexOperator,
226                                            RegProcedure *indexSupport,
227                                            Oid *opFamily,
228                                            Oid *opcInType,
229                                            StrategyNumber maxStrategyNumber,
230                                            StrategyNumber maxSupportNumber,
231                                            AttrNumber maxAttributeNumber);
232 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
233                                   StrategyNumber numStrats,
234                                   StrategyNumber numSupport);
235 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
236 static void unlink_initfile(const char *initfilename);
237
238
239 /*
240  *              ScanPgRelation
241  *
242  *              This is used by RelationBuildDesc to find a pg_class
243  *              tuple matching targetRelId.  The caller must hold at least
244  *              AccessShareLock on the target relid to prevent concurrent-update
245  *              scenarios --- else our SnapshotNow scan might fail to find any
246  *              version that it thinks is live.
247  *
248  *              NB: the returned tuple has been copied into palloc'd storage
249  *              and must eventually be freed with heap_freetuple.
250  */
251 static HeapTuple
252 ScanPgRelation(Oid targetRelId, bool indexOK)
253 {
254         HeapTuple       pg_class_tuple;
255         Relation        pg_class_desc;
256         SysScanDesc pg_class_scan;
257         ScanKeyData key[1];
258
259         /*
260          * If something goes wrong during backend startup, we might find ourselves
261          * trying to read pg_class before we've selected a database.  That ain't
262          * gonna work, so bail out with a useful error message.  If this happens,
263          * it probably means a relcache entry that needs to be nailed isn't.
264          */
265         if (!OidIsValid(MyDatabaseId))
266                 elog(FATAL, "cannot read pg_class without having selected a database");
267
268         /*
269          * form a scan key
270          */
271         ScanKeyInit(&key[0],
272                                 ObjectIdAttributeNumber,
273                                 BTEqualStrategyNumber, F_OIDEQ,
274                                 ObjectIdGetDatum(targetRelId));
275
276         /*
277          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
278          * built the critical relcache entries (this includes initdb and startup
279          * without a pg_internal.init file).  The caller can also force a heap
280          * scan by setting indexOK == false.
281          */
282         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
283         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
284                                                                            indexOK && criticalRelcachesBuilt,
285                                                                            SnapshotNow,
286                                                                            1, key);
287
288         pg_class_tuple = systable_getnext(pg_class_scan);
289
290         /*
291          * Must copy tuple before releasing buffer.
292          */
293         if (HeapTupleIsValid(pg_class_tuple))
294                 pg_class_tuple = heap_copytuple(pg_class_tuple);
295
296         /* all done */
297         systable_endscan(pg_class_scan);
298         heap_close(pg_class_desc, AccessShareLock);
299
300         return pg_class_tuple;
301 }
302
303 /*
304  *              AllocateRelationDesc
305  *
306  *              This is used to allocate memory for a new relation descriptor
307  *              and initialize the rd_rel field.
308  *
309  *              If 'relation' is NULL, allocate a new RelationData object.
310  *              If not, reuse the given object (that path is taken only when
311  *              we have to rebuild a relcache entry during RelationClearRelation).
312  */
313 static Relation
314 AllocateRelationDesc(Relation relation, Form_pg_class relp)
315 {
316         MemoryContext oldcxt;
317         Form_pg_class relationForm;
318
319         /* Relcache entries must live in CacheMemoryContext */
320         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
321
322         /*
323          * allocate space for new relation descriptor, if needed
324          */
325         if (relation == NULL)
326                 relation = (Relation) palloc(sizeof(RelationData));
327
328         /*
329          * clear all fields of reldesc
330          */
331         MemSet(relation, 0, sizeof(RelationData));
332         relation->rd_targblock = InvalidBlockNumber;
333         relation->rd_fsm_nblocks = InvalidBlockNumber;
334         relation->rd_vm_nblocks = InvalidBlockNumber;
335
336         /* make sure relation is marked as having no open file yet */
337         relation->rd_smgr = NULL;
338
339         /*
340          * Copy the relation tuple form
341          *
342          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
343          * variable-length fields (relacl, reloptions) are NOT stored in the
344          * relcache --- there'd be little point in it, since we don't copy the
345          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
346          * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
347          * it from the syscache if you need it.  The same goes for the original
348          * form of reloptions (however, we do store the parsed form of reloptions
349          * in rd_options).
350          */
351         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
352
353         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
354
355         /* initialize relation tuple form */
356         relation->rd_rel = relationForm;
357
358         /* and allocate attribute tuple form storage */
359         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
360                                                                                            relationForm->relhasoids);
361         /* which we mark as a reference-counted tupdesc */
362         relation->rd_att->tdrefcount = 1;
363
364         MemoryContextSwitchTo(oldcxt);
365
366         return relation;
367 }
368
369 /*
370  * RelationParseRelOptions
371  *              Convert pg_class.reloptions into pre-parsed rd_options
372  *
373  * tuple is the real pg_class tuple (not rd_rel!) for relation
374  *
375  * Note: rd_rel and (if an index) rd_am must be valid already
376  */
377 static void
378 RelationParseRelOptions(Relation relation, HeapTuple tuple)
379 {
380         bytea      *options;
381
382         relation->rd_options = NULL;
383
384         /* Fall out if relkind should not have options */
385         switch (relation->rd_rel->relkind)
386         {
387                 case RELKIND_RELATION:
388                 case RELKIND_TOASTVALUE:
389                 case RELKIND_UNCATALOGED:
390                 case RELKIND_INDEX:
391                         break;
392                 default:
393                         return;
394         }
395
396         /*
397          * Fetch reloptions from tuple; have to use a hardwired descriptor because
398          * we might not have any other for pg_class yet (consider executing this
399          * code for pg_class itself)
400          */
401         options = extractRelOptions(tuple,
402                                                                 GetPgClassDescriptor(),
403                                                                 relation->rd_rel->relkind == RELKIND_INDEX ?
404                                                                 relation->rd_am->amoptions : InvalidOid);
405
406         /* Copy parsed data into CacheMemoryContext */
407         if (options)
408         {
409                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
410                                                                                                   VARSIZE(options));
411                 memcpy(relation->rd_options, options, VARSIZE(options));
412         }
413 }
414
415 /*
416  *              RelationBuildTupleDesc
417  *
418  *              Form the relation's tuple descriptor from information in
419  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
420  */
421 static void
422 RelationBuildTupleDesc(Relation relation)
423 {
424         HeapTuple       pg_attribute_tuple;
425         Relation        pg_attribute_desc;
426         SysScanDesc pg_attribute_scan;
427         ScanKeyData skey[2];
428         int                     need;
429         TupleConstr *constr;
430         AttrDefault *attrdef = NULL;
431         int                     ndef = 0;
432
433         /* copy some fields from pg_class row to rd_att */
434         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
435         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
436         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
437
438         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
439                                                                                                 sizeof(TupleConstr));
440         constr->has_not_null = false;
441
442         /*
443          * Form a scan key that selects only user attributes (attnum > 0).
444          * (Eliminating system attribute rows at the index level is lots faster
445          * than fetching them.)
446          */
447         ScanKeyInit(&skey[0],
448                                 Anum_pg_attribute_attrelid,
449                                 BTEqualStrategyNumber, F_OIDEQ,
450                                 ObjectIdGetDatum(RelationGetRelid(relation)));
451         ScanKeyInit(&skey[1],
452                                 Anum_pg_attribute_attnum,
453                                 BTGreaterStrategyNumber, F_INT2GT,
454                                 Int16GetDatum(0));
455
456         /*
457          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
458          * built the critical relcache entries (this includes initdb and startup
459          * without a pg_internal.init file).
460          */
461         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
462         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
463                                                                                    AttributeRelidNumIndexId,
464                                                                                    criticalRelcachesBuilt,
465                                                                                    SnapshotNow,
466                                                                                    2, skey);
467
468         /*
469          * add attribute data to relation->rd_att
470          */
471         need = relation->rd_rel->relnatts;
472
473         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
474         {
475                 Form_pg_attribute attp;
476
477                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
478
479                 if (attp->attnum <= 0 ||
480                         attp->attnum > relation->rd_rel->relnatts)
481                         elog(ERROR, "invalid attribute number %d for %s",
482                                  attp->attnum, RelationGetRelationName(relation));
483
484                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
485                            attp,
486                            ATTRIBUTE_FIXED_PART_SIZE);
487
488                 /* Update constraint/default info */
489                 if (attp->attnotnull)
490                         constr->has_not_null = true;
491
492                 if (attp->atthasdef)
493                 {
494                         if (attrdef == NULL)
495                                 attrdef = (AttrDefault *)
496                                         MemoryContextAllocZero(CacheMemoryContext,
497                                                                                    relation->rd_rel->relnatts *
498                                                                                    sizeof(AttrDefault));
499                         attrdef[ndef].adnum = attp->attnum;
500                         attrdef[ndef].adbin = NULL;
501                         ndef++;
502                 }
503                 need--;
504                 if (need == 0)
505                         break;
506         }
507
508         /*
509          * end the scan and close the attribute relation
510          */
511         systable_endscan(pg_attribute_scan);
512         heap_close(pg_attribute_desc, AccessShareLock);
513
514         if (need != 0)
515                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
516                          need, RelationGetRelid(relation));
517
518         /*
519          * The attcacheoff values we read from pg_attribute should all be -1
520          * ("unknown").  Verify this if assert checking is on.  They will be
521          * computed when and if needed during tuple access.
522          */
523 #ifdef USE_ASSERT_CHECKING
524         {
525                 int                     i;
526
527                 for (i = 0; i < relation->rd_rel->relnatts; i++)
528                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
529         }
530 #endif
531
532         /*
533          * However, we can easily set the attcacheoff value for the first
534          * attribute: it must be zero.  This eliminates the need for special cases
535          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
536          */
537         if (relation->rd_rel->relnatts > 0)
538                 relation->rd_att->attrs[0]->attcacheoff = 0;
539
540         /*
541          * Set up constraint/default info
542          */
543         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
544         {
545                 relation->rd_att->constr = constr;
546
547                 if (ndef > 0)                   /* DEFAULTs */
548                 {
549                         if (ndef < relation->rd_rel->relnatts)
550                                 constr->defval = (AttrDefault *)
551                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
552                         else
553                                 constr->defval = attrdef;
554                         constr->num_defval = ndef;
555                         AttrDefaultFetch(relation);
556                 }
557                 else
558                         constr->num_defval = 0;
559
560                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
561                 {
562                         constr->num_check = relation->rd_rel->relchecks;
563                         constr->check = (ConstrCheck *)
564                                 MemoryContextAllocZero(CacheMemoryContext,
565                                                                         constr->num_check * sizeof(ConstrCheck));
566                         CheckConstraintFetch(relation);
567                 }
568                 else
569                         constr->num_check = 0;
570         }
571         else
572         {
573                 pfree(constr);
574                 relation->rd_att->constr = NULL;
575         }
576 }
577
578 /*
579  *              RelationBuildRuleLock
580  *
581  *              Form the relation's rewrite rules from information in
582  *              the pg_rewrite system catalog.
583  *
584  * Note: The rule parsetrees are potentially very complex node structures.
585  * To allow these trees to be freed when the relcache entry is flushed,
586  * we make a private memory context to hold the RuleLock information for
587  * each relcache entry that has associated rules.  The context is used
588  * just for rule info, not for any other subsidiary data of the relcache
589  * entry, because that keeps the update logic in RelationClearRelation()
590  * manageable.  The other subsidiary data structures are simple enough
591  * to be easy to free explicitly, anyway.
592  */
593 static void
594 RelationBuildRuleLock(Relation relation)
595 {
596         MemoryContext rulescxt;
597         MemoryContext oldcxt;
598         HeapTuple       rewrite_tuple;
599         Relation        rewrite_desc;
600         TupleDesc       rewrite_tupdesc;
601         SysScanDesc rewrite_scan;
602         ScanKeyData key;
603         RuleLock   *rulelock;
604         int                     numlocks;
605         RewriteRule **rules;
606         int                     maxlocks;
607
608         /*
609          * Make the private context.  Parameters are set on the assumption that
610          * it'll probably not contain much data.
611          */
612         rulescxt = AllocSetContextCreate(CacheMemoryContext,
613                                                                          RelationGetRelationName(relation),
614                                                                          ALLOCSET_SMALL_MINSIZE,
615                                                                          ALLOCSET_SMALL_INITSIZE,
616                                                                          ALLOCSET_SMALL_MAXSIZE);
617         relation->rd_rulescxt = rulescxt;
618
619         /*
620          * allocate an array to hold the rewrite rules (the array is extended if
621          * necessary)
622          */
623         maxlocks = 4;
624         rules = (RewriteRule **)
625                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
626         numlocks = 0;
627
628         /*
629          * form a scan key
630          */
631         ScanKeyInit(&key,
632                                 Anum_pg_rewrite_ev_class,
633                                 BTEqualStrategyNumber, F_OIDEQ,
634                                 ObjectIdGetDatum(RelationGetRelid(relation)));
635
636         /*
637          * open pg_rewrite and begin a scan
638          *
639          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
640          * be reading the rules in name order, except possibly during
641          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
642          * ensures that rules will be fired in name order.
643          */
644         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
645         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
646         rewrite_scan = systable_beginscan(rewrite_desc,
647                                                                           RewriteRelRulenameIndexId,
648                                                                           true, SnapshotNow,
649                                                                           1, &key);
650
651         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
652         {
653                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
654                 bool            isnull;
655                 Datum           rule_datum;
656                 char       *rule_str;
657                 RewriteRule *rule;
658
659                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
660                                                                                                   sizeof(RewriteRule));
661
662                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
663
664                 rule->event = rewrite_form->ev_type - '0';
665                 rule->attrno = rewrite_form->ev_attr;
666                 rule->enabled = rewrite_form->ev_enabled;
667                 rule->isInstead = rewrite_form->is_instead;
668
669                 /*
670                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
671                  * rule strings are often large enough to be toasted.  To avoid
672                  * leaking memory in the caller's context, do the detoasting here so
673                  * we can free the detoasted version.
674                  */
675                 rule_datum = heap_getattr(rewrite_tuple,
676                                                                   Anum_pg_rewrite_ev_action,
677                                                                   rewrite_tupdesc,
678                                                                   &isnull);
679                 Assert(!isnull);
680                 rule_str = TextDatumGetCString(rule_datum);
681                 oldcxt = MemoryContextSwitchTo(rulescxt);
682                 rule->actions = (List *) stringToNode(rule_str);
683                 MemoryContextSwitchTo(oldcxt);
684                 pfree(rule_str);
685
686                 rule_datum = heap_getattr(rewrite_tuple,
687                                                                   Anum_pg_rewrite_ev_qual,
688                                                                   rewrite_tupdesc,
689                                                                   &isnull);
690                 Assert(!isnull);
691                 rule_str = TextDatumGetCString(rule_datum);
692                 oldcxt = MemoryContextSwitchTo(rulescxt);
693                 rule->qual = (Node *) stringToNode(rule_str);
694                 MemoryContextSwitchTo(oldcxt);
695                 pfree(rule_str);
696
697                 /*
698                  * We want the rule's table references to be checked as though by the
699                  * table owner, not the user referencing the rule.      Therefore, scan
700                  * through the rule's actions and set the checkAsUser field on all
701                  * rtable entries.      We have to look at the qual as well, in case it
702                  * contains sublinks.
703                  *
704                  * The reason for doing this when the rule is loaded, rather than when
705                  * it is stored, is that otherwise ALTER TABLE OWNER would have to
706                  * grovel through stored rules to update checkAsUser fields. Scanning
707                  * the rule tree during load is relatively cheap (compared to
708                  * constructing it in the first place), so we do it here.
709                  */
710                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
711                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
712
713                 if (numlocks >= maxlocks)
714                 {
715                         maxlocks *= 2;
716                         rules = (RewriteRule **)
717                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
718                 }
719                 rules[numlocks++] = rule;
720         }
721
722         /*
723          * end the scan and close the attribute relation
724          */
725         systable_endscan(rewrite_scan);
726         heap_close(rewrite_desc, AccessShareLock);
727
728         /*
729          * there might not be any rules (if relhasrules is out-of-date)
730          */
731         if (numlocks == 0)
732         {
733                 relation->rd_rules = NULL;
734                 relation->rd_rulescxt = NULL;
735                 MemoryContextDelete(rulescxt);
736                 return;
737         }
738
739         /*
740          * form a RuleLock and insert into relation
741          */
742         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
743         rulelock->numLocks = numlocks;
744         rulelock->rules = rules;
745
746         relation->rd_rules = rulelock;
747 }
748
749 /*
750  *              equalRuleLocks
751  *
752  *              Determine whether two RuleLocks are equivalent
753  *
754  *              Probably this should be in the rules code someplace...
755  */
756 static bool
757 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
758 {
759         int                     i;
760
761         /*
762          * As of 7.3 we assume the rule ordering is repeatable, because
763          * RelationBuildRuleLock should read 'em in a consistent order.  So just
764          * compare corresponding slots.
765          */
766         if (rlock1 != NULL)
767         {
768                 if (rlock2 == NULL)
769                         return false;
770                 if (rlock1->numLocks != rlock2->numLocks)
771                         return false;
772                 for (i = 0; i < rlock1->numLocks; i++)
773                 {
774                         RewriteRule *rule1 = rlock1->rules[i];
775                         RewriteRule *rule2 = rlock2->rules[i];
776
777                         if (rule1->ruleId != rule2->ruleId)
778                                 return false;
779                         if (rule1->event != rule2->event)
780                                 return false;
781                         if (rule1->attrno != rule2->attrno)
782                                 return false;
783                         if (rule1->enabled != rule2->enabled)
784                                 return false;
785                         if (rule1->isInstead != rule2->isInstead)
786                                 return false;
787                         if (!equal(rule1->qual, rule2->qual))
788                                 return false;
789                         if (!equal(rule1->actions, rule2->actions))
790                                 return false;
791                 }
792         }
793         else if (rlock2 != NULL)
794                 return false;
795         return true;
796 }
797
798
799 /*
800  *              RelationBuildDesc
801  *
802  *              Build a relation descriptor --- either a new one, or by
803  *              recycling the given old relation object.  The latter case
804  *              supports rebuilding a relcache entry without invalidating
805  *              pointers to it.  The caller must hold at least
806  *              AccessShareLock on the target relid.
807  *
808  *              Returns NULL if no pg_class row could be found for the given relid
809  *              (suggesting we are trying to access a just-deleted relation).
810  *              Any other error is reported via elog.
811  */
812 static Relation
813 RelationBuildDesc(Oid targetRelId, Relation oldrelation)
814 {
815         Relation        relation;
816         Oid                     relid;
817         HeapTuple       pg_class_tuple;
818         Form_pg_class relp;
819         MemoryContext oldcxt;
820
821         /*
822          * find the tuple in pg_class corresponding to the given relation id
823          */
824         pg_class_tuple = ScanPgRelation(targetRelId, true);
825
826         /*
827          * if no such tuple exists, return NULL
828          */
829         if (!HeapTupleIsValid(pg_class_tuple))
830                 return NULL;
831
832         /*
833          * get information from the pg_class_tuple
834          */
835         relid = HeapTupleGetOid(pg_class_tuple);
836         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
837
838         /*
839          * allocate storage for the relation descriptor, and copy pg_class_tuple
840          * to relation->rd_rel.
841          */
842         relation = AllocateRelationDesc(oldrelation, relp);
843
844         /*
845          * initialize the relation's relation id (relation->rd_id)
846          */
847         RelationGetRelid(relation) = relid;
848
849         /*
850          * normal relations are not nailed into the cache; nor can a pre-existing
851          * relation be new.  It could be temp though.  (Actually, it could be new
852          * too, but it's okay to forget that fact if forced to flush the entry.)
853          */
854         relation->rd_refcnt = 0;
855         relation->rd_isnailed = false;
856         relation->rd_createSubid = InvalidSubTransactionId;
857         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
858         relation->rd_istemp = relation->rd_rel->relistemp;
859         if (relation->rd_istemp)
860                 relation->rd_islocaltemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
861         else
862                 relation->rd_islocaltemp = false;
863
864         /*
865          * initialize the tuple descriptor (relation->rd_att).
866          */
867         RelationBuildTupleDesc(relation);
868
869         /*
870          * Fetch rules and triggers that affect this relation
871          */
872         if (relation->rd_rel->relhasrules)
873                 RelationBuildRuleLock(relation);
874         else
875         {
876                 relation->rd_rules = NULL;
877                 relation->rd_rulescxt = NULL;
878         }
879
880         if (relation->rd_rel->relhastriggers)
881                 RelationBuildTriggers(relation);
882         else
883                 relation->trigdesc = NULL;
884
885         /*
886          * if it's an index, initialize index-related information
887          */
888         if (OidIsValid(relation->rd_rel->relam))
889                 RelationInitIndexAccessInfo(relation);
890
891         /* extract reloptions if any */
892         RelationParseRelOptions(relation, pg_class_tuple);
893
894         /*
895          * initialize the relation lock manager information
896          */
897         RelationInitLockInfo(relation);         /* see lmgr.c */
898
899         /*
900          * initialize physical addressing information for the relation
901          */
902         RelationInitPhysicalAddr(relation);
903
904         /* make sure relation is marked as having no open file yet */
905         relation->rd_smgr = NULL;
906
907         /*
908          * now we can free the memory allocated for pg_class_tuple
909          */
910         heap_freetuple(pg_class_tuple);
911
912         /*
913          * Insert newly created relation into relcache hash tables.
914          */
915         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
916         RelationCacheInsert(relation);
917         MemoryContextSwitchTo(oldcxt);
918
919         /* It's fully valid */
920         relation->rd_isvalid = true;
921
922         return relation;
923 }
924
925 /*
926  * Initialize the physical addressing info (RelFileNode) for a relcache entry
927  */
928 static void
929 RelationInitPhysicalAddr(Relation relation)
930 {
931         if (relation->rd_rel->reltablespace)
932                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
933         else
934                 relation->rd_node.spcNode = MyDatabaseTableSpace;
935         if (relation->rd_rel->relisshared)
936                 relation->rd_node.dbNode = InvalidOid;
937         else
938                 relation->rd_node.dbNode = MyDatabaseId;
939         relation->rd_node.relNode = relation->rd_rel->relfilenode;
940 }
941
942 /*
943  * Initialize index-access-method support data for an index relation
944  */
945 void
946 RelationInitIndexAccessInfo(Relation relation)
947 {
948         HeapTuple       tuple;
949         Form_pg_am      aform;
950         Datum           indclassDatum;
951         Datum           indoptionDatum;
952         bool            isnull;
953         oidvector  *indclass;
954         int2vector *indoption;
955         MemoryContext indexcxt;
956         MemoryContext oldcontext;
957         int                     natts;
958         uint16          amstrategies;
959         uint16          amsupport;
960
961         /*
962          * Make a copy of the pg_index entry for the index.  Since pg_index
963          * contains variable-length and possibly-null fields, we have to do this
964          * honestly rather than just treating it as a Form_pg_index struct.
965          */
966         tuple = SearchSysCache(INDEXRELID,
967                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
968                                                    0, 0, 0);
969         if (!HeapTupleIsValid(tuple))
970                 elog(ERROR, "cache lookup failed for index %u",
971                          RelationGetRelid(relation));
972         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
973         relation->rd_indextuple = heap_copytuple(tuple);
974         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
975         MemoryContextSwitchTo(oldcontext);
976         ReleaseSysCache(tuple);
977
978         /*
979          * Make a copy of the pg_am entry for the index's access method
980          */
981         tuple = SearchSysCache(AMOID,
982                                                    ObjectIdGetDatum(relation->rd_rel->relam),
983                                                    0, 0, 0);
984         if (!HeapTupleIsValid(tuple))
985                 elog(ERROR, "cache lookup failed for access method %u",
986                          relation->rd_rel->relam);
987         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
988         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
989         ReleaseSysCache(tuple);
990         relation->rd_am = aform;
991
992         natts = relation->rd_rel->relnatts;
993         if (natts != relation->rd_index->indnatts)
994                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
995                          RelationGetRelid(relation));
996         amstrategies = aform->amstrategies;
997         amsupport = aform->amsupport;
998
999         /*
1000          * Make the private context to hold index access info.  The reason we need
1001          * a context, and not just a couple of pallocs, is so that we won't leak
1002          * any subsidiary info attached to fmgr lookup records.
1003          *
1004          * Context parameters are set on the assumption that it'll probably not
1005          * contain much data.
1006          */
1007         indexcxt = AllocSetContextCreate(CacheMemoryContext,
1008                                                                          RelationGetRelationName(relation),
1009                                                                          ALLOCSET_SMALL_MINSIZE,
1010                                                                          ALLOCSET_SMALL_INITSIZE,
1011                                                                          ALLOCSET_SMALL_MAXSIZE);
1012         relation->rd_indexcxt = indexcxt;
1013
1014         /*
1015          * Allocate arrays to hold data
1016          */
1017         relation->rd_aminfo = (RelationAmInfo *)
1018                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
1019
1020         relation->rd_opfamily = (Oid *)
1021                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1022         relation->rd_opcintype = (Oid *)
1023                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1024
1025         if (amstrategies > 0)
1026                 relation->rd_operator = (Oid *)
1027                         MemoryContextAllocZero(indexcxt,
1028                                                                    natts * amstrategies * sizeof(Oid));
1029         else
1030                 relation->rd_operator = NULL;
1031
1032         if (amsupport > 0)
1033         {
1034                 int                     nsupport = natts * amsupport;
1035
1036                 relation->rd_support = (RegProcedure *)
1037                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1038                 relation->rd_supportinfo = (FmgrInfo *)
1039                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1040         }
1041         else
1042         {
1043                 relation->rd_support = NULL;
1044                 relation->rd_supportinfo = NULL;
1045         }
1046
1047         relation->rd_indoption = (int16 *)
1048                 MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
1049
1050         /*
1051          * indclass cannot be referenced directly through the C struct, because it
1052          * comes after the variable-width indkey field.  Must extract the datum
1053          * the hard way...
1054          */
1055         indclassDatum = fastgetattr(relation->rd_indextuple,
1056                                                                 Anum_pg_index_indclass,
1057                                                                 GetPgIndexDescriptor(),
1058                                                                 &isnull);
1059         Assert(!isnull);
1060         indclass = (oidvector *) DatumGetPointer(indclassDatum);
1061
1062         /*
1063          * Fill the operator and support procedure OID arrays, as well as the info
1064          * about opfamilies and opclass input types.  (aminfo and supportinfo are
1065          * left as zeroes, and are filled on-the-fly when used)
1066          */
1067         IndexSupportInitialize(indclass,
1068                                                    relation->rd_operator, relation->rd_support,
1069                                                    relation->rd_opfamily, relation->rd_opcintype,
1070                                                    amstrategies, amsupport, natts);
1071
1072         /*
1073          * Similarly extract indoption and copy it to the cache entry
1074          */
1075         indoptionDatum = fastgetattr(relation->rd_indextuple,
1076                                                                  Anum_pg_index_indoption,
1077                                                                  GetPgIndexDescriptor(),
1078                                                                  &isnull);
1079         Assert(!isnull);
1080         indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1081         memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
1082
1083         /*
1084          * expressions, predicate, exclusion caches will be filled later
1085          */
1086         relation->rd_indexprs = NIL;
1087         relation->rd_indpred = NIL;
1088         relation->rd_exclops = NULL;
1089         relation->rd_exclprocs = NULL;
1090         relation->rd_exclstrats = NULL;
1091         relation->rd_amcache = NULL;
1092 }
1093
1094 /*
1095  * IndexSupportInitialize
1096  *              Initializes an index's cached opclass information,
1097  *              given the index's pg_index.indclass entry.
1098  *
1099  * Data is returned into *indexOperator, *indexSupport, *opFamily, and
1100  * *opcInType, which are arrays allocated by the caller.
1101  *
1102  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1103  * maxAttributeNumber, since these indicate the size of the arrays
1104  * it has allocated --- but in practice these numbers must always match
1105  * those obtainable from the system catalog entries for the index and
1106  * access method.
1107  */
1108 static void
1109 IndexSupportInitialize(oidvector *indclass,
1110                                            Oid *indexOperator,
1111                                            RegProcedure *indexSupport,
1112                                            Oid *opFamily,
1113                                            Oid *opcInType,
1114                                            StrategyNumber maxStrategyNumber,
1115                                            StrategyNumber maxSupportNumber,
1116                                            AttrNumber maxAttributeNumber)
1117 {
1118         int                     attIndex;
1119
1120         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1121         {
1122                 OpClassCacheEnt *opcentry;
1123
1124                 if (!OidIsValid(indclass->values[attIndex]))
1125                         elog(ERROR, "bogus pg_index tuple");
1126
1127                 /* look up the info for this opclass, using a cache */
1128                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1129                                                                          maxStrategyNumber,
1130                                                                          maxSupportNumber);
1131
1132                 /* copy cached data into relcache entry */
1133                 opFamily[attIndex] = opcentry->opcfamily;
1134                 opcInType[attIndex] = opcentry->opcintype;
1135                 if (maxStrategyNumber > 0)
1136                         memcpy(&indexOperator[attIndex * maxStrategyNumber],
1137                                    opcentry->operatorOids,
1138                                    maxStrategyNumber * sizeof(Oid));
1139                 if (maxSupportNumber > 0)
1140                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1141                                    opcentry->supportProcs,
1142                                    maxSupportNumber * sizeof(RegProcedure));
1143         }
1144 }
1145
1146 /*
1147  * LookupOpclassInfo
1148  *
1149  * This routine maintains a per-opclass cache of the information needed
1150  * by IndexSupportInitialize().  This is more efficient than relying on
1151  * the catalog cache, because we can load all the info about a particular
1152  * opclass in a single indexscan of pg_amproc or pg_amop.
1153  *
1154  * The information from pg_am about expected range of strategy and support
1155  * numbers is passed in, rather than being looked up, mainly because the
1156  * caller will have it already.
1157  *
1158  * Note there is no provision for flushing the cache.  This is OK at the
1159  * moment because there is no way to ALTER any interesting properties of an
1160  * existing opclass --- all you can do is drop it, which will result in
1161  * a useless but harmless dead entry in the cache.      To support altering
1162  * opclass membership (not the same as opfamily membership!), we'd need to
1163  * be able to flush this cache as well as the contents of relcache entries
1164  * for indexes.
1165  */
1166 static OpClassCacheEnt *
1167 LookupOpclassInfo(Oid operatorClassOid,
1168                                   StrategyNumber numStrats,
1169                                   StrategyNumber numSupport)
1170 {
1171         OpClassCacheEnt *opcentry;
1172         bool            found;
1173         Relation        rel;
1174         SysScanDesc scan;
1175         ScanKeyData skey[3];
1176         HeapTuple       htup;
1177         bool            indexOK;
1178
1179         if (OpClassCache == NULL)
1180         {
1181                 /* First time through: initialize the opclass cache */
1182                 HASHCTL         ctl;
1183
1184                 MemSet(&ctl, 0, sizeof(ctl));
1185                 ctl.keysize = sizeof(Oid);
1186                 ctl.entrysize = sizeof(OpClassCacheEnt);
1187                 ctl.hash = oid_hash;
1188                 OpClassCache = hash_create("Operator class cache", 64,
1189                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1190
1191                 /* Also make sure CacheMemoryContext exists */
1192                 if (!CacheMemoryContext)
1193                         CreateCacheMemoryContext();
1194         }
1195
1196         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1197                                                                                            (void *) &operatorClassOid,
1198                                                                                            HASH_ENTER, &found);
1199
1200         if (!found)
1201         {
1202                 /* Need to allocate memory for new entry */
1203                 opcentry->valid = false;        /* until known OK */
1204                 opcentry->numStrats = numStrats;
1205                 opcentry->numSupport = numSupport;
1206
1207                 if (numStrats > 0)
1208                         opcentry->operatorOids = (Oid *)
1209                                 MemoryContextAllocZero(CacheMemoryContext,
1210                                                                            numStrats * sizeof(Oid));
1211                 else
1212                         opcentry->operatorOids = NULL;
1213
1214                 if (numSupport > 0)
1215                         opcentry->supportProcs = (RegProcedure *)
1216                                 MemoryContextAllocZero(CacheMemoryContext,
1217                                                                            numSupport * sizeof(RegProcedure));
1218                 else
1219                         opcentry->supportProcs = NULL;
1220         }
1221         else
1222         {
1223                 Assert(numStrats == opcentry->numStrats);
1224                 Assert(numSupport == opcentry->numSupport);
1225         }
1226
1227         /*
1228          * When testing for cache-flush hazards, we intentionally disable the
1229          * operator class cache and force reloading of the info on each call. This
1230          * is helpful because we want to test the case where a cache flush occurs
1231          * while we are loading the info, and it's very hard to provoke that if
1232          * this happens only once per opclass per backend.
1233          */
1234 #if defined(CLOBBER_CACHE_ALWAYS)
1235         opcentry->valid = false;
1236 #endif
1237
1238         if (opcentry->valid)
1239                 return opcentry;
1240
1241         /*
1242          * Need to fill in new entry.
1243          *
1244          * To avoid infinite recursion during startup, force heap scans if we're
1245          * looking up info for the opclasses used by the indexes we would like to
1246          * reference here.
1247          */
1248         indexOK = criticalRelcachesBuilt ||
1249                 (operatorClassOid != OID_BTREE_OPS_OID &&
1250                  operatorClassOid != INT2_BTREE_OPS_OID);
1251
1252         /*
1253          * We have to fetch the pg_opclass row to determine its opfamily and
1254          * opcintype, which are needed to look up the operators and functions.
1255          * It'd be convenient to use the syscache here, but that probably doesn't
1256          * work while bootstrapping.
1257          */
1258         ScanKeyInit(&skey[0],
1259                                 ObjectIdAttributeNumber,
1260                                 BTEqualStrategyNumber, F_OIDEQ,
1261                                 ObjectIdGetDatum(operatorClassOid));
1262         rel = heap_open(OperatorClassRelationId, AccessShareLock);
1263         scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1264                                                           SnapshotNow, 1, skey);
1265
1266         if (HeapTupleIsValid(htup = systable_getnext(scan)))
1267         {
1268                 Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1269
1270                 opcentry->opcfamily = opclassform->opcfamily;
1271                 opcentry->opcintype = opclassform->opcintype;
1272         }
1273         else
1274                 elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1275
1276         systable_endscan(scan);
1277         heap_close(rel, AccessShareLock);
1278
1279
1280         /*
1281          * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1282          * default ones (those with lefttype = righttype = opcintype).
1283          */
1284         if (numStrats > 0)
1285         {
1286                 ScanKeyInit(&skey[0],
1287                                         Anum_pg_amop_amopfamily,
1288                                         BTEqualStrategyNumber, F_OIDEQ,
1289                                         ObjectIdGetDatum(opcentry->opcfamily));
1290                 ScanKeyInit(&skey[1],
1291                                         Anum_pg_amop_amoplefttype,
1292                                         BTEqualStrategyNumber, F_OIDEQ,
1293                                         ObjectIdGetDatum(opcentry->opcintype));
1294                 ScanKeyInit(&skey[2],
1295                                         Anum_pg_amop_amoprighttype,
1296                                         BTEqualStrategyNumber, F_OIDEQ,
1297                                         ObjectIdGetDatum(opcentry->opcintype));
1298                 rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
1299                 scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1300                                                                   SnapshotNow, 3, skey);
1301
1302                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1303                 {
1304                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1305
1306                         if (amopform->amopstrategy <= 0 ||
1307                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1308                                 elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1309                                          amopform->amopstrategy, operatorClassOid);
1310                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1311                                 amopform->amopopr;
1312                 }
1313
1314                 systable_endscan(scan);
1315                 heap_close(rel, AccessShareLock);
1316         }
1317
1318         /*
1319          * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
1320          * the default ones (those with lefttype = righttype = opcintype).
1321          */
1322         if (numSupport > 0)
1323         {
1324                 ScanKeyInit(&skey[0],
1325                                         Anum_pg_amproc_amprocfamily,
1326                                         BTEqualStrategyNumber, F_OIDEQ,
1327                                         ObjectIdGetDatum(opcentry->opcfamily));
1328                 ScanKeyInit(&skey[1],
1329                                         Anum_pg_amproc_amproclefttype,
1330                                         BTEqualStrategyNumber, F_OIDEQ,
1331                                         ObjectIdGetDatum(opcentry->opcintype));
1332                 ScanKeyInit(&skey[2],
1333                                         Anum_pg_amproc_amprocrighttype,
1334                                         BTEqualStrategyNumber, F_OIDEQ,
1335                                         ObjectIdGetDatum(opcentry->opcintype));
1336                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1337                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1338                                                                   SnapshotNow, 3, skey);
1339
1340                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1341                 {
1342                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1343
1344                         if (amprocform->amprocnum <= 0 ||
1345                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1346                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1347                                          amprocform->amprocnum, operatorClassOid);
1348
1349                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1350                                 amprocform->amproc;
1351                 }
1352
1353                 systable_endscan(scan);
1354                 heap_close(rel, AccessShareLock);
1355         }
1356
1357         opcentry->valid = true;
1358         return opcentry;
1359 }
1360
1361
1362 /*
1363  *              formrdesc
1364  *
1365  *              This is a special cut-down version of RelationBuildDesc(),
1366  *              used while initializing the relcache.
1367  *              The relation descriptor is built just from the supplied parameters,
1368  *              without actually looking at any system table entries.  We cheat
1369  *              quite a lot since we only need to work for a few basic system
1370  *              catalogs.
1371  *
1372  * formrdesc is currently used for: pg_database, pg_class, pg_attribute,
1373  * pg_proc, and pg_type (see RelationCacheInitializePhase2/3).
1374  *
1375  * Note that these catalogs can't have constraints (except attnotnull),
1376  * default values, rules, or triggers, since we don't cope with any of that.
1377  * (Well, actually, this only matters for properties that need to be valid
1378  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1379  * these properties matter then...)
1380  *
1381  * NOTE: we assume we are already switched into CacheMemoryContext.
1382  */
1383 static void
1384 formrdesc(const char *relationName, Oid relationReltype,
1385                   bool isshared, bool hasoids,
1386                   int natts, const FormData_pg_attribute *attrs)
1387 {
1388         Relation        relation;
1389         int                     i;
1390         bool            has_not_null;
1391
1392         /*
1393          * allocate new relation desc, clear all fields of reldesc
1394          */
1395         relation = (Relation) palloc0(sizeof(RelationData));
1396         relation->rd_targblock = InvalidBlockNumber;
1397         relation->rd_fsm_nblocks = InvalidBlockNumber;
1398         relation->rd_vm_nblocks = InvalidBlockNumber;
1399
1400         /* make sure relation is marked as having no open file yet */
1401         relation->rd_smgr = NULL;
1402
1403         /*
1404          * initialize reference count: 1 because it is nailed in cache
1405          */
1406         relation->rd_refcnt = 1;
1407
1408         /*
1409          * all entries built with this routine are nailed-in-cache; none are for
1410          * new or temp relations.
1411          */
1412         relation->rd_isnailed = true;
1413         relation->rd_createSubid = InvalidSubTransactionId;
1414         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1415         relation->rd_istemp = false;
1416         relation->rd_islocaltemp = false;
1417
1418         /*
1419          * initialize relation tuple form
1420          *
1421          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1422          * get us launched.  RelationCacheInitializePhase3() will read the real
1423          * data from pg_class and replace what we've done here.  Note in particular
1424          * that relowner is left as zero; this cues RelationCacheInitializePhase3
1425          * that the real data isn't there yet.
1426          */
1427         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1428
1429         namestrcpy(&relation->rd_rel->relname, relationName);
1430         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1431         relation->rd_rel->reltype = relationReltype;
1432
1433         /*
1434          * It's important to distinguish between shared and non-shared relations,
1435          * even at bootstrap time, to make sure we know where they are stored.
1436          */
1437         relation->rd_rel->relisshared = isshared;
1438         if (isshared)
1439                 relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1440
1441         /*
1442          * Likewise, we must know if a relation is temp ... but formrdesc is not
1443          * used for any temp relations.
1444          */
1445         relation->rd_rel->relistemp = false;
1446
1447         relation->rd_rel->relpages = 1;
1448         relation->rd_rel->reltuples = 1;
1449         relation->rd_rel->relkind = RELKIND_RELATION;
1450         relation->rd_rel->relhasoids = hasoids;
1451         relation->rd_rel->relnatts = (int16) natts;
1452
1453         /*
1454          * initialize attribute tuple form
1455          *
1456          * Unlike the case with the relation tuple, this data had better be right
1457          * because it will never be replaced.  The input values must be correctly
1458          * defined by macros in src/include/catalog/ headers.
1459          */
1460         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1461         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1462
1463         relation->rd_att->tdtypeid = relationReltype;
1464         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1465
1466         /*
1467          * initialize tuple desc info
1468          */
1469         has_not_null = false;
1470         for (i = 0; i < natts; i++)
1471         {
1472                 memcpy(relation->rd_att->attrs[i],
1473                            &attrs[i],
1474                            ATTRIBUTE_FIXED_PART_SIZE);
1475                 has_not_null |= attrs[i].attnotnull;
1476                 /* make sure attcacheoff is valid */
1477                 relation->rd_att->attrs[i]->attcacheoff = -1;
1478         }
1479
1480         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1481         relation->rd_att->attrs[0]->attcacheoff = 0;
1482
1483         /* mark not-null status */
1484         if (has_not_null)
1485         {
1486                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1487
1488                 constr->has_not_null = true;
1489                 relation->rd_att->constr = constr;
1490         }
1491
1492         /*
1493          * initialize relation id from info in att array (my, this is ugly)
1494          */
1495         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1496         relation->rd_rel->relfilenode = RelationGetRelid(relation);
1497
1498         /*
1499          * initialize the relation lock manager information
1500          */
1501         RelationInitLockInfo(relation);         /* see lmgr.c */
1502
1503         /*
1504          * initialize physical addressing information for the relation
1505          */
1506         RelationInitPhysicalAddr(relation);
1507
1508         /*
1509          * initialize the rel-has-index flag, using hardwired knowledge
1510          */
1511         if (IsBootstrapProcessingMode())
1512         {
1513                 /* In bootstrap mode, we have no indexes */
1514                 relation->rd_rel->relhasindex = false;
1515         }
1516         else
1517         {
1518                 /* Otherwise, all the rels formrdesc is used for have indexes */
1519                 relation->rd_rel->relhasindex = true;
1520         }
1521
1522         /*
1523          * add new reldesc to relcache
1524          */
1525         RelationCacheInsert(relation);
1526
1527         /* It's fully valid */
1528         relation->rd_isvalid = true;
1529 }
1530
1531
1532 /* ----------------------------------------------------------------
1533  *                               Relation Descriptor Lookup Interface
1534  * ----------------------------------------------------------------
1535  */
1536
1537 /*
1538  *              RelationIdGetRelation
1539  *
1540  *              Lookup a reldesc by OID; make one if not already in cache.
1541  *
1542  *              Returns NULL if no pg_class row could be found for the given relid
1543  *              (suggesting we are trying to access a just-deleted relation).
1544  *              Any other error is reported via elog.
1545  *
1546  *              NB: caller should already have at least AccessShareLock on the
1547  *              relation ID, else there are nasty race conditions.
1548  *
1549  *              NB: relation ref count is incremented, or set to 1 if new entry.
1550  *              Caller should eventually decrement count.  (Usually,
1551  *              that happens by calling RelationClose().)
1552  */
1553 Relation
1554 RelationIdGetRelation(Oid relationId)
1555 {
1556         Relation        rd;
1557
1558         /*
1559          * first try to find reldesc in the cache
1560          */
1561         RelationIdCacheLookup(relationId, rd);
1562
1563         if (RelationIsValid(rd))
1564         {
1565                 RelationIncrementReferenceCount(rd);
1566                 /* revalidate nailed index if necessary */
1567                 if (!rd->rd_isvalid)
1568                         RelationReloadIndexInfo(rd);
1569                 return rd;
1570         }
1571
1572         /*
1573          * no reldesc in the cache, so have RelationBuildDesc() build one and add
1574          * it.
1575          */
1576         rd = RelationBuildDesc(relationId, NULL);
1577         if (RelationIsValid(rd))
1578                 RelationIncrementReferenceCount(rd);
1579         return rd;
1580 }
1581
1582 /* ----------------------------------------------------------------
1583  *                              cache invalidation support routines
1584  * ----------------------------------------------------------------
1585  */
1586
1587 /*
1588  * RelationIncrementReferenceCount
1589  *              Increments relation reference count.
1590  *
1591  * Note: bootstrap mode has its own weird ideas about relation refcount
1592  * behavior; we ought to fix it someday, but for now, just disable
1593  * reference count ownership tracking in bootstrap mode.
1594  */
1595 void
1596 RelationIncrementReferenceCount(Relation rel)
1597 {
1598         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1599         rel->rd_refcnt += 1;
1600         if (!IsBootstrapProcessingMode())
1601                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1602 }
1603
1604 /*
1605  * RelationDecrementReferenceCount
1606  *              Decrements relation reference count.
1607  */
1608 void
1609 RelationDecrementReferenceCount(Relation rel)
1610 {
1611         Assert(rel->rd_refcnt > 0);
1612         rel->rd_refcnt -= 1;
1613         if (!IsBootstrapProcessingMode())
1614                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1615 }
1616
1617 /*
1618  * RelationClose - close an open relation
1619  *
1620  *      Actually, we just decrement the refcount.
1621  *
1622  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1623  *      will be freed as soon as their refcount goes to zero.  In combination
1624  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1625  *      to catch references to already-released relcache entries.  It slows
1626  *      things down quite a bit, however.
1627  */
1628 void
1629 RelationClose(Relation relation)
1630 {
1631         /* Note: no locking manipulations needed */
1632         RelationDecrementReferenceCount(relation);
1633
1634 #ifdef RELCACHE_FORCE_RELEASE
1635         if (RelationHasReferenceCountZero(relation) &&
1636                 relation->rd_createSubid == InvalidSubTransactionId &&
1637                 relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
1638                 RelationClearRelation(relation, false);
1639 #endif
1640 }
1641
1642 /*
1643  * RelationReloadIndexInfo - reload minimal information for an open index
1644  *
1645  *      This function is used only for indexes.  A relcache inval on an index
1646  *      can mean that its pg_class or pg_index row changed.  There are only
1647  *      very limited changes that are allowed to an existing index's schema,
1648  *      so we can update the relcache entry without a complete rebuild; which
1649  *      is fortunate because we can't rebuild an index entry that is "nailed"
1650  *      and/or in active use.  We support full replacement of the pg_class row,
1651  *      as well as updates of a few simple fields of the pg_index row.
1652  *
1653  *      We can't necessarily reread the catalog rows right away; we might be
1654  *      in a failed transaction when we receive the SI notification.  If so,
1655  *      RelationClearRelation just marks the entry as invalid by setting
1656  *      rd_isvalid to false.  This routine is called to fix the entry when it
1657  *      is next needed.
1658  *
1659  *      We assume that at the time we are called, we have at least AccessShareLock
1660  *      on the target index.  (Note: in the calls from RelationClearRelation,
1661  *      this is legitimate because we know the rel has positive refcount.)
1662  */
1663 static void
1664 RelationReloadIndexInfo(Relation relation)
1665 {
1666         bool            indexOK;
1667         HeapTuple       pg_class_tuple;
1668         Form_pg_class relp;
1669
1670         /* Should be called only for invalidated indexes */
1671         Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
1672                    !relation->rd_isvalid);
1673         /* Should be closed at smgr level */
1674         Assert(relation->rd_smgr == NULL);
1675
1676         /*
1677          * Must reset targblock, fsm_nblocks and vm_nblocks in case rel was
1678          * truncated
1679          */
1680         relation->rd_targblock = InvalidBlockNumber;
1681         relation->rd_fsm_nblocks = InvalidBlockNumber;
1682         relation->rd_vm_nblocks = InvalidBlockNumber;
1683         /* Must free any AM cached data, too */
1684         if (relation->rd_amcache)
1685                 pfree(relation->rd_amcache);
1686         relation->rd_amcache = NULL;
1687
1688         /*
1689          * If it's a shared index, we might be called before backend startup
1690          * has finished selecting a database, in which case we have no way to
1691          * read pg_class yet.  However, a shared index can never have any
1692          * significant schema updates, so it's okay to ignore the invalidation
1693          * signal.  Just mark it valid and return without doing anything more.
1694          */
1695         if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
1696         {
1697                 relation->rd_isvalid = true;
1698                 return;
1699         }
1700
1701         /*
1702          * Read the pg_class row
1703          *
1704          * Don't try to use an indexscan of pg_class_oid_index to reload the info
1705          * for pg_class_oid_index ...
1706          */
1707         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
1708         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
1709         if (!HeapTupleIsValid(pg_class_tuple))
1710                 elog(ERROR, "could not find pg_class tuple for index %u",
1711                          RelationGetRelid(relation));
1712         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1713         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
1714         /* Reload reloptions in case they changed */
1715         if (relation->rd_options)
1716                 pfree(relation->rd_options);
1717         RelationParseRelOptions(relation, pg_class_tuple);
1718         /* done with pg_class tuple */
1719         heap_freetuple(pg_class_tuple);
1720         /* We must recalculate physical address in case it changed */
1721         RelationInitPhysicalAddr(relation);
1722
1723         /*
1724          * For a non-system index, there are fields of the pg_index row that are
1725          * allowed to change, so re-read that row and update the relcache entry.
1726          * Most of the info derived from pg_index (such as support function lookup
1727          * info) cannot change, and indeed the whole point of this routine is to
1728          * update the relcache entry without clobbering that data; so wholesale
1729          * replacement is not appropriate.
1730          */
1731         if (!IsSystemRelation(relation))
1732         {
1733                 HeapTuple       tuple;
1734                 Form_pg_index index;
1735
1736                 tuple = SearchSysCache(INDEXRELID,
1737                                                            ObjectIdGetDatum(RelationGetRelid(relation)),
1738                                                            0, 0, 0);
1739                 if (!HeapTupleIsValid(tuple))
1740                         elog(ERROR, "cache lookup failed for index %u",
1741                                  RelationGetRelid(relation));
1742                 index = (Form_pg_index) GETSTRUCT(tuple);
1743
1744                 relation->rd_index->indisvalid = index->indisvalid;
1745                 relation->rd_index->indcheckxmin = index->indcheckxmin;
1746                 relation->rd_index->indisready = index->indisready;
1747                 HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
1748                                                            HeapTupleHeaderGetXmin(tuple->t_data));
1749
1750                 ReleaseSysCache(tuple);
1751         }
1752
1753         /* Okay, now it's valid again */
1754         relation->rd_isvalid = true;
1755 }
1756
1757 /*
1758  * RelationClearRelation
1759  *
1760  *       Physically blow away a relation cache entry, or reset it and rebuild
1761  *       it from scratch (that is, from catalog entries).  The latter path is
1762  *       usually used when we are notified of a change to an open relation
1763  *       (one with refcount > 0).  However, this routine just does whichever
1764  *       it's told to do; callers must determine which they want.
1765  *
1766  *       NB: when rebuilding, we'd better hold some lock on the relation.
1767  *       In current usages this is presumed true because it has refcnt > 0.
1768  */
1769 static void
1770 RelationClearRelation(Relation relation, bool rebuild)
1771 {
1772         Oid                     old_reltype = relation->rd_rel->reltype;
1773         MemoryContext oldcxt;
1774
1775         /*
1776          * Make sure smgr and lower levels close the relation's files, if they
1777          * weren't closed already.  If the relation is not getting deleted, the
1778          * next smgr access should reopen the files automatically.      This ensures
1779          * that the low-level file access state is updated after, say, a vacuum
1780          * truncation.
1781          */
1782         RelationCloseSmgr(relation);
1783
1784         /*
1785          * Never, never ever blow away a nailed-in system relation, because we'd
1786          * be unable to recover.  However, we must reset rd_targblock, in case we
1787          * got called because of a relation cache flush that was triggered by
1788          * VACUUM.
1789          *
1790          * If it's a nailed index, then we need to re-read the pg_class row to see
1791          * if its relfilenode changed.  We can't necessarily do that here, because
1792          * we might be in a failed transaction.  We assume it's okay to do it if
1793          * there are open references to the relcache entry (cf notes for
1794          * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
1795          * invalid, and it'll be fixed when next opened.
1796          */
1797         if (relation->rd_isnailed)
1798         {
1799                 relation->rd_targblock = InvalidBlockNumber;
1800                 relation->rd_fsm_nblocks = InvalidBlockNumber;
1801                 relation->rd_vm_nblocks = InvalidBlockNumber;
1802                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1803                 {
1804                         relation->rd_isvalid = false;           /* needs to be revalidated */
1805                         if (relation->rd_refcnt > 1)
1806                                 RelationReloadIndexInfo(relation);
1807                 }
1808                 return;
1809         }
1810
1811         /*
1812          * Even non-system indexes should not be blown away if they are open and
1813          * have valid index support information.  This avoids problems with active
1814          * use of the index support information.  As with nailed indexes, we
1815          * re-read the pg_class row to handle possible physical relocation of the
1816          * index, and we check for pg_index updates too.
1817          */
1818         if (relation->rd_rel->relkind == RELKIND_INDEX &&
1819                 relation->rd_refcnt > 0 &&
1820                 relation->rd_indexcxt != NULL)
1821         {
1822                 relation->rd_isvalid = false;   /* needs to be revalidated */
1823                 RelationReloadIndexInfo(relation);
1824                 return;
1825         }
1826
1827         /*
1828          * Remove relation from hash tables
1829          *
1830          * Note: we might be reinserting it momentarily, but we must not have it
1831          * visible in the hash tables until it's valid again, so don't try to
1832          * optimize this away...
1833          */
1834         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1835         RelationCacheDelete(relation);
1836         MemoryContextSwitchTo(oldcxt);
1837
1838         /* Clear out catcache's entries for this relation */
1839         CatalogCacheFlushRelation(RelationGetRelid(relation));
1840
1841         /*
1842          * Free all the subsidiary data structures of the relcache entry. We
1843          * cannot free rd_att if we are trying to rebuild the entry, however,
1844          * because pointers to it may be cached in various places. The rule
1845          * manager might also have pointers into the rewrite rules. So to begin
1846          * with, we can only get rid of these fields:
1847          */
1848         FreeTriggerDesc(relation->trigdesc);
1849         if (relation->rd_indextuple)
1850                 pfree(relation->rd_indextuple);
1851         if (relation->rd_am)
1852                 pfree(relation->rd_am);
1853         if (relation->rd_rel)
1854                 pfree(relation->rd_rel);
1855         if (relation->rd_options)
1856                 pfree(relation->rd_options);
1857         list_free(relation->rd_indexlist);
1858         bms_free(relation->rd_indexattr);
1859         if (relation->rd_indexcxt)
1860                 MemoryContextDelete(relation->rd_indexcxt);
1861
1862         /*
1863          * If we're really done with the relcache entry, blow it away. But if
1864          * someone is still using it, reconstruct the whole deal without moving
1865          * the physical RelationData record (so that the someone's pointer is
1866          * still valid).
1867          */
1868         if (!rebuild)
1869         {
1870                 /* ok to zap remaining substructure */
1871                 flush_rowtype_cache(old_reltype);
1872                 /* can't use DecrTupleDescRefCount here */
1873                 Assert(relation->rd_att->tdrefcount > 0);
1874                 if (--relation->rd_att->tdrefcount == 0)
1875                         FreeTupleDesc(relation->rd_att);
1876                 if (relation->rd_rulescxt)
1877                         MemoryContextDelete(relation->rd_rulescxt);
1878                 pfree(relation);
1879         }
1880         else
1881         {
1882                 /*
1883                  * When rebuilding an open relcache entry, must preserve ref count and
1884                  * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
1885                  * preserve the tupledesc and rewrite-rule substructures in place.
1886                  * (Note: the refcount mechanism for tupledescs may eventually ensure
1887                  * that we don't really need to preserve the tupledesc in-place, but
1888                  * for now there are still a lot of places that assume an open rel's
1889                  * tupledesc won't move.)
1890                  *
1891                  * Note that this process does not touch CurrentResourceOwner; which
1892                  * is good because whatever ref counts the entry may have do not
1893                  * necessarily belong to that resource owner.
1894                  */
1895                 Oid                     save_relid = RelationGetRelid(relation);
1896                 int                     old_refcnt = relation->rd_refcnt;
1897                 SubTransactionId old_createSubid = relation->rd_createSubid;
1898                 SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
1899                 struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
1900                 TupleDesc       old_att = relation->rd_att;
1901                 RuleLock   *old_rules = relation->rd_rules;
1902                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1903
1904                 if (RelationBuildDesc(save_relid, relation) != relation)
1905                 {
1906                         /* Should only get here if relation was deleted */
1907                         flush_rowtype_cache(old_reltype);
1908                         Assert(old_att->tdrefcount > 0);
1909                         if (--old_att->tdrefcount == 0)
1910                                 FreeTupleDesc(old_att);
1911                         if (old_rulescxt)
1912                                 MemoryContextDelete(old_rulescxt);
1913                         pfree(relation);
1914                         elog(ERROR, "relation %u deleted while still in use", save_relid);
1915                 }
1916                 relation->rd_refcnt = old_refcnt;
1917                 relation->rd_createSubid = old_createSubid;
1918                 relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
1919                 relation->pgstat_info = old_pgstat_info;
1920
1921                 if (equalTupleDescs(old_att, relation->rd_att))
1922                 {
1923                         /* needn't flush typcache here */
1924                         Assert(relation->rd_att->tdrefcount == 1);
1925                         if (--relation->rd_att->tdrefcount == 0)
1926                                 FreeTupleDesc(relation->rd_att);
1927                         relation->rd_att = old_att;
1928                 }
1929                 else
1930                 {
1931                         flush_rowtype_cache(old_reltype);
1932                         Assert(old_att->tdrefcount > 0);
1933                         if (--old_att->tdrefcount == 0)
1934                                 FreeTupleDesc(old_att);
1935                 }
1936                 if (equalRuleLocks(old_rules, relation->rd_rules))
1937                 {
1938                         if (relation->rd_rulescxt)
1939                                 MemoryContextDelete(relation->rd_rulescxt);
1940                         relation->rd_rules = old_rules;
1941                         relation->rd_rulescxt = old_rulescxt;
1942                 }
1943                 else
1944                 {
1945                         if (old_rulescxt)
1946                                 MemoryContextDelete(old_rulescxt);
1947                 }
1948         }
1949 }
1950
1951 /*
1952  * RelationFlushRelation
1953  *
1954  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1955  */
1956 static void
1957 RelationFlushRelation(Relation relation)
1958 {
1959         bool            rebuild;
1960
1961         if (relation->rd_createSubid != InvalidSubTransactionId ||
1962                 relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1963         {
1964                 /*
1965                  * New relcache entries are always rebuilt, not flushed; else we'd
1966                  * forget the "new" status of the relation, which is a useful
1967                  * optimization to have.  Ditto for the new-relfilenode status.
1968                  */
1969                 rebuild = true;
1970         }
1971         else
1972         {
1973                 /*
1974                  * Pre-existing rels can be dropped from the relcache if not open.
1975                  */
1976                 rebuild = !RelationHasReferenceCountZero(relation);
1977         }
1978
1979         RelationClearRelation(relation, rebuild);
1980 }
1981
1982 /*
1983  * RelationForgetRelation - unconditionally remove a relcache entry
1984  *
1985  *                 External interface for destroying a relcache entry when we
1986  *                 drop the relation.
1987  */
1988 void
1989 RelationForgetRelation(Oid rid)
1990 {
1991         Relation        relation;
1992
1993         RelationIdCacheLookup(rid, relation);
1994
1995         if (!PointerIsValid(relation))
1996                 return;                                 /* not in cache, nothing to do */
1997
1998         if (!RelationHasReferenceCountZero(relation))
1999                 elog(ERROR, "relation %u is still open", rid);
2000
2001         /* Unconditionally destroy the relcache entry */
2002         RelationClearRelation(relation, false);
2003 }
2004
2005 /*
2006  *              RelationCacheInvalidateEntry
2007  *
2008  *              This routine is invoked for SI cache flush messages.
2009  *
2010  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2011  * already determined that the relid belongs to our database or is a shared
2012  * relation.)
2013  *
2014  * We used to skip local relations, on the grounds that they could
2015  * not be targets of cross-backend SI update messages; but it seems
2016  * safer to process them, so that our *own* SI update messages will
2017  * have the same effects during CommandCounterIncrement for both
2018  * local and nonlocal relations.
2019  */
2020 void
2021 RelationCacheInvalidateEntry(Oid relationId)
2022 {
2023         Relation        relation;
2024
2025         RelationIdCacheLookup(relationId, relation);
2026
2027         if (PointerIsValid(relation))
2028         {
2029                 relcacheInvalsReceived++;
2030                 RelationFlushRelation(relation);
2031         }
2032 }
2033
2034 /*
2035  * RelationCacheInvalidate
2036  *       Blow away cached relation descriptors that have zero reference counts,
2037  *       and rebuild those with positive reference counts.      Also reset the smgr
2038  *       relation cache.
2039  *
2040  *       This is currently used only to recover from SI message buffer overflow,
2041  *       so we do not touch new-in-transaction relations; they cannot be targets
2042  *       of cross-backend SI updates (and our own updates now go through a
2043  *       separate linked list that isn't limited by the SI message buffer size).
2044  *       Likewise, we need not discard new-relfilenode-in-transaction hints,
2045  *       since any invalidation of those would be a local event.
2046  *
2047  *       We do this in two phases: the first pass deletes deletable items, and
2048  *       the second one rebuilds the rebuildable items.  This is essential for
2049  *       safety, because hash_seq_search only copes with concurrent deletion of
2050  *       the element it is currently visiting.  If a second SI overflow were to
2051  *       occur while we are walking the table, resulting in recursive entry to
2052  *       this routine, we could crash because the inner invocation blows away
2053  *       the entry next to be visited by the outer scan.  But this way is OK,
2054  *       because (a) during the first pass we won't process any more SI messages,
2055  *       so hash_seq_search will complete safely; (b) during the second pass we
2056  *       only hold onto pointers to nondeletable entries.
2057  *
2058  *       The two-phase approach also makes it easy to ensure that we process
2059  *       nailed-in-cache indexes before other nondeletable items, and that we
2060  *       process pg_class_oid_index first of all.  In scenarios where a nailed
2061  *       index has been given a new relfilenode, we have to detect that update
2062  *       before the nailed index is used in reloading any other relcache entry.
2063  */
2064 void
2065 RelationCacheInvalidate(void)
2066 {
2067         HASH_SEQ_STATUS status;
2068         RelIdCacheEnt *idhentry;
2069         Relation        relation;
2070         List       *rebuildFirstList = NIL;
2071         List       *rebuildList = NIL;
2072         ListCell   *l;
2073
2074         /* Phase 1 */
2075         hash_seq_init(&status, RelationIdCache);
2076
2077         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2078         {
2079                 relation = idhentry->reldesc;
2080
2081                 /* Must close all smgr references to avoid leaving dangling ptrs */
2082                 RelationCloseSmgr(relation);
2083
2084                 /* Ignore new relations, since they are never SI targets */
2085                 if (relation->rd_createSubid != InvalidSubTransactionId)
2086                         continue;
2087
2088                 relcacheInvalsReceived++;
2089
2090                 if (RelationHasReferenceCountZero(relation))
2091                 {
2092                         /* Delete this entry immediately */
2093                         Assert(!relation->rd_isnailed);
2094                         RelationClearRelation(relation, false);
2095                 }
2096                 else
2097                 {
2098                         /*
2099                          * Add this entry to list of stuff to rebuild in second pass.
2100                          * pg_class_oid_index goes on the front of rebuildFirstList, other
2101                          * nailed indexes on the back, and everything else into
2102                          * rebuildList (in no particular order).
2103                          */
2104                         if (relation->rd_isnailed &&
2105                                 relation->rd_rel->relkind == RELKIND_INDEX)
2106                         {
2107                                 if (RelationGetRelid(relation) == ClassOidIndexId)
2108                                         rebuildFirstList = lcons(relation, rebuildFirstList);
2109                                 else
2110                                         rebuildFirstList = lappend(rebuildFirstList, relation);
2111                         }
2112                         else
2113                                 rebuildList = lcons(relation, rebuildList);
2114                 }
2115         }
2116
2117         /*
2118          * Now zap any remaining smgr cache entries.  This must happen before we
2119          * start to rebuild entries, since that may involve catalog fetches which
2120          * will re-open catalog files.
2121          */
2122         smgrcloseall();
2123
2124         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
2125         foreach(l, rebuildFirstList)
2126         {
2127                 relation = (Relation) lfirst(l);
2128                 RelationClearRelation(relation, true);
2129         }
2130         list_free(rebuildFirstList);
2131         foreach(l, rebuildList)
2132         {
2133                 relation = (Relation) lfirst(l);
2134                 RelationClearRelation(relation, true);
2135         }
2136         list_free(rebuildList);
2137 }
2138
2139 /*
2140  * AtEOXact_RelationCache
2141  *
2142  *      Clean up the relcache at main-transaction commit or abort.
2143  *
2144  * Note: this must be called *before* processing invalidation messages.
2145  * In the case of abort, we don't want to try to rebuild any invalidated
2146  * cache entries (since we can't safely do database accesses).  Therefore
2147  * we must reset refcnts before handling pending invalidations.
2148  *
2149  * As of PostgreSQL 8.1, relcache refcnts should get released by the
2150  * ResourceOwner mechanism.  This routine just does a debugging
2151  * cross-check that no pins remain.  However, we also need to do special
2152  * cleanup when the current transaction created any relations or made use
2153  * of forced index lists.
2154  */
2155 void
2156 AtEOXact_RelationCache(bool isCommit)
2157 {
2158         HASH_SEQ_STATUS status;
2159         RelIdCacheEnt *idhentry;
2160
2161         /*
2162          * To speed up transaction exit, we want to avoid scanning the relcache
2163          * unless there is actually something for this routine to do.  Other than
2164          * the debug-only Assert checks, most transactions don't create any work
2165          * for us to do here, so we keep a static flag that gets set if there is
2166          * anything to do.      (Currently, this means either a relation is created in
2167          * the current xact, or one is given a new relfilenode, or an index list
2168          * is forced.)  For simplicity, the flag remains set till end of top-level
2169          * transaction, even though we could clear it at subtransaction end in
2170          * some cases.
2171          */
2172         if (!need_eoxact_work
2173 #ifdef USE_ASSERT_CHECKING
2174                 && !assert_enabled
2175 #endif
2176                 )
2177                 return;
2178
2179         hash_seq_init(&status, RelationIdCache);
2180
2181         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2182         {
2183                 Relation        relation = idhentry->reldesc;
2184
2185                 /*
2186                  * The relcache entry's ref count should be back to its normal
2187                  * not-in-a-transaction state: 0 unless it's nailed in cache.
2188                  *
2189                  * In bootstrap mode, this is NOT true, so don't check it --- the
2190                  * bootstrap code expects relations to stay open across start/commit
2191                  * transaction calls.  (That seems bogus, but it's not worth fixing.)
2192                  */
2193 #ifdef USE_ASSERT_CHECKING
2194                 if (!IsBootstrapProcessingMode())
2195                 {
2196                         int                     expected_refcnt;
2197
2198                         expected_refcnt = relation->rd_isnailed ? 1 : 0;
2199                         Assert(relation->rd_refcnt == expected_refcnt);
2200                 }
2201 #endif
2202
2203                 /*
2204                  * Is it a relation created in the current transaction?
2205                  *
2206                  * During commit, reset the flag to zero, since we are now out of the
2207                  * creating transaction.  During abort, simply delete the relcache
2208                  * entry --- it isn't interesting any longer.  (NOTE: if we have
2209                  * forgotten the new-ness of a new relation due to a forced cache
2210                  * flush, the entry will get deleted anyway by shared-cache-inval
2211                  * processing of the aborted pg_class insertion.)
2212                  */
2213                 if (relation->rd_createSubid != InvalidSubTransactionId)
2214                 {
2215                         if (isCommit)
2216                                 relation->rd_createSubid = InvalidSubTransactionId;
2217                         else
2218                         {
2219                                 RelationClearRelation(relation, false);
2220                                 continue;
2221                         }
2222                 }
2223
2224                 /*
2225                  * Likewise, reset the hint about the relfilenode being new.
2226                  */
2227                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2228
2229                 /*
2230                  * Flush any temporary index list.
2231                  */
2232                 if (relation->rd_indexvalid == 2)
2233                 {
2234                         list_free(relation->rd_indexlist);
2235                         relation->rd_indexlist = NIL;
2236                         relation->rd_oidindex = InvalidOid;
2237                         relation->rd_indexvalid = 0;
2238                 }
2239         }
2240
2241         /* Once done with the transaction, we can reset need_eoxact_work */
2242         need_eoxact_work = false;
2243 }
2244
2245 /*
2246  * AtEOSubXact_RelationCache
2247  *
2248  *      Clean up the relcache at sub-transaction commit or abort.
2249  *
2250  * Note: this must be called *before* processing invalidation messages.
2251  */
2252 void
2253 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
2254                                                   SubTransactionId parentSubid)
2255 {
2256         HASH_SEQ_STATUS status;
2257         RelIdCacheEnt *idhentry;
2258
2259         /*
2260          * Skip the relcache scan if nothing to do --- see notes for
2261          * AtEOXact_RelationCache.
2262          */
2263         if (!need_eoxact_work)
2264                 return;
2265
2266         hash_seq_init(&status, RelationIdCache);
2267
2268         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2269         {
2270                 Relation        relation = idhentry->reldesc;
2271
2272                 /*
2273                  * Is it a relation created in the current subtransaction?
2274                  *
2275                  * During subcommit, mark it as belonging to the parent, instead.
2276                  * During subabort, simply delete the relcache entry.
2277                  */
2278                 if (relation->rd_createSubid == mySubid)
2279                 {
2280                         if (isCommit)
2281                                 relation->rd_createSubid = parentSubid;
2282                         else
2283                         {
2284                                 Assert(RelationHasReferenceCountZero(relation));
2285                                 RelationClearRelation(relation, false);
2286                                 continue;
2287                         }
2288                 }
2289
2290                 /*
2291                  * Likewise, update or drop any new-relfilenode-in-subtransaction
2292                  * hint.
2293                  */
2294                 if (relation->rd_newRelfilenodeSubid == mySubid)
2295                 {
2296                         if (isCommit)
2297                                 relation->rd_newRelfilenodeSubid = parentSubid;
2298                         else
2299                                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2300                 }
2301
2302                 /*
2303                  * Flush any temporary index list.
2304                  */
2305                 if (relation->rd_indexvalid == 2)
2306                 {
2307                         list_free(relation->rd_indexlist);
2308                         relation->rd_indexlist = NIL;
2309                         relation->rd_oidindex = InvalidOid;
2310                         relation->rd_indexvalid = 0;
2311                 }
2312         }
2313 }
2314
2315 /*
2316  * RelationCacheMarkNewRelfilenode
2317  *
2318  *      Mark the rel as having been given a new relfilenode in the current
2319  *      (sub) transaction.      This is a hint that can be used to optimize
2320  *      later operations on the rel in the same transaction.
2321  */
2322 void
2323 RelationCacheMarkNewRelfilenode(Relation rel)
2324 {
2325         /* Mark it... */
2326         rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
2327         /* ... and now we have eoxact cleanup work to do */
2328         need_eoxact_work = true;
2329 }
2330
2331
2332 /*
2333  *              RelationBuildLocalRelation
2334  *                      Build a relcache entry for an about-to-be-created relation,
2335  *                      and enter it into the relcache.
2336  */
2337 Relation
2338 RelationBuildLocalRelation(const char *relname,
2339                                                    Oid relnamespace,
2340                                                    TupleDesc tupDesc,
2341                                                    Oid relid,
2342                                                    Oid reltablespace,
2343                                                    bool shared_relation)
2344 {
2345         Relation        rel;
2346         MemoryContext oldcxt;
2347         int                     natts = tupDesc->natts;
2348         int                     i;
2349         bool            has_not_null;
2350         bool            nailit;
2351
2352         AssertArg(natts >= 0);
2353
2354         /*
2355          * check for creation of a rel that must be nailed in cache.
2356          *
2357          * XXX this list had better match the relations specially handled in
2358          * RelationCacheInitializePhase2/3.
2359          */
2360         switch (relid)
2361         {
2362                 case DatabaseRelationId:
2363                 case RelationRelationId:
2364                 case AttributeRelationId:
2365                 case ProcedureRelationId:
2366                 case TypeRelationId:
2367                         nailit = true;
2368                         break;
2369                 default:
2370                         nailit = false;
2371                         break;
2372         }
2373
2374         /*
2375          * check that hardwired list of shared rels matches what's in the
2376          * bootstrap .bki file.  If you get a failure here during initdb, you
2377          * probably need to fix IsSharedRelation() to match whatever you've done
2378          * to the set of shared relations.
2379          */
2380         if (shared_relation != IsSharedRelation(relid))
2381                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
2382                          relname, relid);
2383
2384         /*
2385          * switch to the cache context to create the relcache entry.
2386          */
2387         if (!CacheMemoryContext)
2388                 CreateCacheMemoryContext();
2389
2390         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2391
2392         /*
2393          * allocate a new relation descriptor and fill in basic state fields.
2394          */
2395         rel = (Relation) palloc0(sizeof(RelationData));
2396
2397         rel->rd_targblock = InvalidBlockNumber;
2398         rel->rd_fsm_nblocks = InvalidBlockNumber;
2399         rel->rd_vm_nblocks = InvalidBlockNumber;
2400
2401         /* make sure relation is marked as having no open file yet */
2402         rel->rd_smgr = NULL;
2403
2404         /* mark it nailed if appropriate */
2405         rel->rd_isnailed = nailit;
2406
2407         rel->rd_refcnt = nailit ? 1 : 0;
2408
2409         /* it's being created in this transaction */
2410         rel->rd_createSubid = GetCurrentSubTransactionId();
2411         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2412
2413         /* must flag that we have rels created in this transaction */
2414         need_eoxact_work = true;
2415
2416         /* it is temporary if and only if it is in my temp-table namespace */
2417         rel->rd_istemp = isTempOrToastNamespace(relnamespace);
2418         rel->rd_islocaltemp = rel->rd_istemp;
2419
2420         /*
2421          * create a new tuple descriptor from the one passed in.  We do this
2422          * partly to copy it into the cache context, and partly because the new
2423          * relation can't have any defaults or constraints yet; they have to be
2424          * added in later steps, because they require additions to multiple system
2425          * catalogs.  We can copy attnotnull constraints here, however.
2426          */
2427         rel->rd_att = CreateTupleDescCopy(tupDesc);
2428         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
2429         has_not_null = false;
2430         for (i = 0; i < natts; i++)
2431         {
2432                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2433                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2434         }
2435
2436         if (has_not_null)
2437         {
2438                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2439
2440                 constr->has_not_null = true;
2441                 rel->rd_att->constr = constr;
2442         }
2443
2444         /*
2445          * initialize relation tuple form (caller may add/override data later)
2446          */
2447         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2448
2449         namestrcpy(&rel->rd_rel->relname, relname);
2450         rel->rd_rel->relnamespace = relnamespace;
2451
2452         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2453         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2454         rel->rd_rel->relnatts = natts;
2455         rel->rd_rel->reltype = InvalidOid;
2456         /* needed when bootstrapping: */
2457         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2458
2459         /*
2460          * Insert relation physical and logical identifiers (OIDs) into the right
2461          * places.      Note that the physical ID (relfilenode) is initially the same
2462          * as the logical ID (OID).
2463          */
2464         rel->rd_rel->relisshared = shared_relation;
2465         rel->rd_rel->relistemp = rel->rd_istemp;
2466
2467         RelationGetRelid(rel) = relid;
2468
2469         for (i = 0; i < natts; i++)
2470                 rel->rd_att->attrs[i]->attrelid = relid;
2471
2472         rel->rd_rel->relfilenode = relid;
2473         rel->rd_rel->reltablespace = reltablespace;
2474
2475         RelationInitLockInfo(rel);      /* see lmgr.c */
2476
2477         RelationInitPhysicalAddr(rel);
2478
2479         /*
2480          * Okay to insert into the relcache hash tables.
2481          */
2482         RelationCacheInsert(rel);
2483
2484         /*
2485          * done building relcache entry.
2486          */
2487         MemoryContextSwitchTo(oldcxt);
2488
2489         /* It's fully valid */
2490         rel->rd_isvalid = true;
2491
2492         /*
2493          * Caller expects us to pin the returned entry.
2494          */
2495         RelationIncrementReferenceCount(rel);
2496
2497         return rel;
2498 }
2499
2500 /*
2501  *              RelationCacheInitialize
2502  *
2503  *              This initializes the relation descriptor cache.  At the time
2504  *              that this is invoked, we can't do database access yet (mainly
2505  *              because the transaction subsystem is not up); all we are doing
2506  *              is making an empty cache hashtable.  This must be done before
2507  *              starting the initialization transaction, because otherwise
2508  *              AtEOXact_RelationCache would crash if that transaction aborts
2509  *              before we can get the relcache set up.
2510  */
2511
2512 #define INITRELCACHESIZE                400
2513
2514 void
2515 RelationCacheInitialize(void)
2516 {
2517         HASHCTL         ctl;
2518
2519         /*
2520          * make sure cache memory context exists
2521          */
2522         if (!CacheMemoryContext)
2523                 CreateCacheMemoryContext();
2524
2525         /*
2526          * create hashtable that indexes the relcache
2527          */
2528         MemSet(&ctl, 0, sizeof(ctl));
2529         ctl.keysize = sizeof(Oid);
2530         ctl.entrysize = sizeof(RelIdCacheEnt);
2531         ctl.hash = oid_hash;
2532         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2533                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2534 }
2535
2536 /*
2537  *              RelationCacheInitializePhase2
2538  *
2539  *              This is called to prepare for access to pg_database during startup.
2540  *              We must at least set up a nailed reldesc for pg_database.  Ideally
2541  *              we'd like to have reldescs for its indexes, too.  We attempt to
2542  *              load this information from the shared relcache init file.  If that's
2543  *              missing or broken, just make a phony entry for pg_database.
2544  *              RelationCacheInitializePhase3 will clean up as needed.
2545  */
2546 void
2547 RelationCacheInitializePhase2(void)
2548 {
2549         MemoryContext oldcxt;
2550
2551         /*
2552          * In bootstrap mode, pg_database isn't there yet anyway, so do nothing.
2553          */
2554         if (IsBootstrapProcessingMode())
2555                 return;
2556
2557         /*
2558          * switch to cache memory context
2559          */
2560         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2561
2562         /*
2563          * Try to load the shared relcache cache file.  If unsuccessful,
2564          * bootstrap the cache with a pre-made descriptor for pg_database.
2565          */
2566         if (!load_relcache_init_file(true))
2567         {
2568                 formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
2569                                   true, Natts_pg_database, Desc_pg_database);
2570
2571 #define NUM_CRITICAL_SHARED_RELS        1       /* fix if you change list above */
2572         }
2573
2574         MemoryContextSwitchTo(oldcxt);
2575 }
2576
2577 /*
2578  *              RelationCacheInitializePhase3
2579  *
2580  *              This is called as soon as the catcache and transaction system
2581  *              are functional and we have determined MyDatabaseId.  At this point
2582  *              we can actually read data from the database's system catalogs.
2583  *              We first try to read pre-computed relcache entries from the local
2584  *              relcache init file.  If that's missing or broken, make phony entries
2585  *              for the minimum set of nailed-in-cache relations.  Then (unless
2586  *              bootstrapping) make sure we have entries for the critical system
2587  *              indexes.  Once we've done all this, we have enough infrastructure to
2588  *              open any system catalog or use any catcache.  The last step is to
2589  *              rewrite the cache files if needed.
2590  */
2591 void
2592 RelationCacheInitializePhase3(void)
2593 {
2594         HASH_SEQ_STATUS status;
2595         RelIdCacheEnt *idhentry;
2596         MemoryContext oldcxt;
2597         bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
2598
2599         /*
2600          * switch to cache memory context
2601          */
2602         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2603
2604         /*
2605          * Try to load the local relcache cache file.  If unsuccessful,
2606          * bootstrap the cache with pre-made descriptors for the critical
2607          * "nailed-in" system catalogs.
2608          */
2609         if (IsBootstrapProcessingMode() ||
2610                 !load_relcache_init_file(false))
2611         {
2612                 needNewCacheFile = true;
2613
2614                 formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
2615                                   true, Natts_pg_class, Desc_pg_class);
2616                 formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
2617                                   false, Natts_pg_attribute, Desc_pg_attribute);
2618                 formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
2619                                   true, Natts_pg_proc, Desc_pg_proc);
2620                 formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
2621                                   true, Natts_pg_type, Desc_pg_type);
2622
2623 #define NUM_CRITICAL_LOCAL_RELS 4       /* fix if you change list above */
2624         }
2625
2626         MemoryContextSwitchTo(oldcxt);
2627
2628         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
2629         if (IsBootstrapProcessingMode())
2630                 return;
2631
2632         /*
2633          * If we didn't get the critical system indexes loaded into relcache, do
2634          * so now.      These are critical because the catcache and/or opclass cache
2635          * depend on them for fetches done during relcache load.  Thus, we have an
2636          * infinite-recursion problem.  We can break the recursion by doing
2637          * heapscans instead of indexscans at certain key spots. To avoid hobbling
2638          * performance, we only want to do that until we have the critical indexes
2639          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
2640          * decide whether to do heapscan or indexscan at the key spots, and we set
2641          * it true after we've loaded the critical indexes.
2642          *
2643          * The critical indexes are marked as "nailed in cache", partly to make it
2644          * easy for load_relcache_init_file to count them, but mainly because we
2645          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
2646          * true.  (NOTE: perhaps it would be possible to reload them by
2647          * temporarily setting criticalRelcachesBuilt to false again.  For now,
2648          * though, we just nail 'em in.)
2649          *
2650          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
2651          * in the same way as the others, because the critical catalogs don't
2652          * (currently) have any rules or triggers, and so these indexes can be
2653          * rebuilt without inducing recursion.  However they are used during
2654          * relcache load when a rel does have rules or triggers, so we choose to
2655          * nail them for performance reasons.
2656          */
2657         if (!criticalRelcachesBuilt)
2658         {
2659                 load_critical_index(ClassOidIndexId);
2660                 load_critical_index(AttributeRelidNumIndexId);
2661                 load_critical_index(IndexRelidIndexId);
2662                 load_critical_index(OpclassOidIndexId);
2663                 load_critical_index(AccessMethodStrategyIndexId);
2664                 load_critical_index(AccessMethodProcedureIndexId);
2665                 load_critical_index(OperatorOidIndexId);
2666                 load_critical_index(RewriteRelRulenameIndexId);
2667                 load_critical_index(TriggerRelidNameIndexId);
2668
2669 #define NUM_CRITICAL_LOCAL_INDEXES      9               /* fix if you change list above */
2670
2671                 criticalRelcachesBuilt = true;
2672         }
2673
2674         /*
2675          * Process critical shared indexes too.
2676          *
2677          * DatabaseNameIndexId isn't critical for relcache loading, but rather
2678          * for initial lookup of MyDatabaseId, without which we'll never find
2679          * any non-shared catalogs at all.  Autovacuum calls InitPostgres with
2680          * a database OID, so it instead depends on DatabaseOidIndexId.
2681          */
2682         if (!criticalSharedRelcachesBuilt)
2683         {
2684                 load_critical_index(DatabaseNameIndexId);
2685                 load_critical_index(DatabaseOidIndexId);
2686
2687 #define NUM_CRITICAL_SHARED_INDEXES     2               /* fix if you change list above */
2688
2689                 criticalSharedRelcachesBuilt = true;
2690         }
2691
2692         /*
2693          * Now, scan all the relcache entries and update anything that might be
2694          * wrong in the results from formrdesc or the relcache cache file. If we
2695          * faked up relcache entries using formrdesc, then read the real pg_class
2696          * rows and replace the fake entries with them. Also, if any of the
2697          * relcache entries have rules or triggers, load that info the hard way
2698          * since it isn't recorded in the cache file.
2699          *
2700          * Whenever we access the catalogs to read data, there is a possibility
2701          * of a shared-inval cache flush causing relcache entries to be removed.
2702          * Since hash_seq_search only guarantees to still work after the *current*
2703          * entry is removed, it's unsafe to continue the hashtable scan afterward.
2704          * We handle this by restarting the scan from scratch after each access.
2705          * This is theoretically O(N^2), but the number of entries that actually
2706          * need to be fixed is small enough that it doesn't matter.
2707          */
2708         hash_seq_init(&status, RelationIdCache);
2709
2710         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2711         {
2712                 Relation        relation = idhentry->reldesc;
2713                 bool            restart = false;
2714
2715                 /*
2716                  * Make sure *this* entry doesn't get flushed while we work with it.
2717                  */
2718                 RelationIncrementReferenceCount(relation);
2719
2720                 /*
2721                  * If it's a faked-up entry, read the real pg_class tuple.
2722                  */
2723                 if (relation->rd_rel->relowner == InvalidOid)
2724                 {
2725                         HeapTuple       htup;
2726                         Form_pg_class relp;
2727
2728                         htup = SearchSysCache(RELOID,
2729                                                                 ObjectIdGetDatum(RelationGetRelid(relation)),
2730                                                                   0, 0, 0);
2731                         if (!HeapTupleIsValid(htup))
2732                                 elog(FATAL, "cache lookup failed for relation %u",
2733                                          RelationGetRelid(relation));
2734                         relp = (Form_pg_class) GETSTRUCT(htup);
2735
2736                         /*
2737                          * Copy tuple to relation->rd_rel. (See notes in
2738                          * AllocateRelationDesc())
2739                          */
2740                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2741
2742                         /* Update rd_options while we have the tuple */
2743                         if (relation->rd_options)
2744                                 pfree(relation->rd_options);
2745                         RelationParseRelOptions(relation, htup);
2746
2747                         /*
2748                          * Check the values in rd_att were set up correctly.  (We cannot
2749                          * just copy them over now: formrdesc must have set up the
2750                          * rd_att data correctly to start with, because it may already
2751                          * have been copied into one or more catcache entries.)
2752                          */
2753                         Assert(relation->rd_att->tdtypeid == relp->reltype);
2754                         Assert(relation->rd_att->tdtypmod == -1);
2755                         Assert(relation->rd_att->tdhasoid == relp->relhasoids);
2756
2757                         ReleaseSysCache(htup);
2758
2759                         /* relowner had better be OK now, else we'll loop forever */
2760                         if (relation->rd_rel->relowner == InvalidOid)
2761                                 elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
2762                                          RelationGetRelationName(relation));
2763
2764                         restart = true;
2765                 }
2766
2767                 /*
2768                  * Fix data that isn't saved in relcache cache file.
2769                  *
2770                  * relhasrules or relhastriggers could possibly be wrong or out of
2771                  * date.  If we don't actually find any rules or triggers, clear the
2772                  * local copy of the flag so that we don't get into an infinite loop
2773                  * here.  We don't make any attempt to fix the pg_class entry, though.
2774                  */
2775                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2776                 {
2777                         RelationBuildRuleLock(relation);
2778                         if (relation->rd_rules == NULL)
2779                                 relation->rd_rel->relhasrules = false;
2780                         restart = true;
2781                 }
2782                 if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
2783                 {
2784                         RelationBuildTriggers(relation);
2785                         if (relation->trigdesc == NULL)
2786                                 relation->rd_rel->relhastriggers = false;
2787                         restart = true;
2788                 }
2789
2790                 /* Release hold on the relation */
2791                 RelationDecrementReferenceCount(relation);
2792
2793                 /* Now, restart the hashtable scan if needed */
2794                 if (restart)
2795                 {
2796                         hash_seq_term(&status);
2797                         hash_seq_init(&status, RelationIdCache);
2798                 }
2799         }
2800
2801         /*
2802          * Lastly, write out new relcache cache files if needed.  We don't bother
2803          * to distinguish cases where only one of the two needs an update.
2804          */
2805         if (needNewCacheFile)
2806         {
2807                 /*
2808                  * Force all the catcaches to finish initializing and thereby open the
2809                  * catalogs and indexes they use.  This will preload the relcache with
2810                  * entries for all the most important system catalogs and indexes, so
2811                  * that the init files will be most useful for future backends.
2812                  */
2813                 InitCatalogCachePhase2();
2814
2815                 /* reset initFileRelationIds list; we'll fill it during write */
2816                 initFileRelationIds = NIL;
2817
2818                 /* now write the files */
2819                 write_relcache_init_file(true);
2820                 write_relcache_init_file(false);
2821         }
2822 }
2823
2824 /*
2825  * Load one critical system index into the relcache
2826  */
2827 static void
2828 load_critical_index(Oid indexoid)
2829 {
2830         Relation        ird;
2831
2832         LockRelationOid(indexoid, AccessShareLock);
2833         ird = RelationBuildDesc(indexoid, NULL);
2834         if (ird == NULL)
2835                 elog(PANIC, "could not open critical system index %u", indexoid);
2836         ird->rd_isnailed = true;
2837         ird->rd_refcnt = 1;
2838         UnlockRelationOid(indexoid, AccessShareLock);
2839 }
2840
2841 /*
2842  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
2843  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
2844  *
2845  * We need this kluge because we have to be able to access non-fixed-width
2846  * fields of pg_class and pg_index before we have the standard catalog caches
2847  * available.  We use predefined data that's set up in just the same way as
2848  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
2849  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
2850  * does it have a TupleConstr field.  But it's good enough for the purpose of
2851  * extracting fields.
2852  */
2853 static TupleDesc
2854 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
2855                                                  bool hasoids)
2856 {
2857         TupleDesc       result;
2858         MemoryContext oldcxt;
2859         int                     i;
2860
2861         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2862
2863         result = CreateTemplateTupleDesc(natts, hasoids);
2864         result->tdtypeid = RECORDOID;           /* not right, but we don't care */
2865         result->tdtypmod = -1;
2866
2867         for (i = 0; i < natts; i++)
2868         {
2869                 memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
2870                 /* make sure attcacheoff is valid */
2871                 result->attrs[i]->attcacheoff = -1;
2872         }
2873
2874         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
2875         result->attrs[0]->attcacheoff = 0;
2876
2877         /* Note: we don't bother to set up a TupleConstr entry */
2878
2879         MemoryContextSwitchTo(oldcxt);
2880
2881         return result;
2882 }
2883
2884 static TupleDesc
2885 GetPgClassDescriptor(void)
2886 {
2887         static TupleDesc pgclassdesc = NULL;
2888
2889         /* Already done? */
2890         if (pgclassdesc == NULL)
2891                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
2892                                                                                            Desc_pg_class,
2893                                                                                            true);
2894
2895         return pgclassdesc;
2896 }
2897
2898 static TupleDesc
2899 GetPgIndexDescriptor(void)
2900 {
2901         static TupleDesc pgindexdesc = NULL;
2902
2903         /* Already done? */
2904         if (pgindexdesc == NULL)
2905                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
2906                                                                                            Desc_pg_index,
2907                                                                                            false);
2908
2909         return pgindexdesc;
2910 }
2911
2912 /*
2913  * Load any default attribute value definitions for the relation.
2914  */
2915 static void
2916 AttrDefaultFetch(Relation relation)
2917 {
2918         AttrDefault *attrdef = relation->rd_att->constr->defval;
2919         int                     ndef = relation->rd_att->constr->num_defval;
2920         Relation        adrel;
2921         SysScanDesc adscan;
2922         ScanKeyData skey;
2923         HeapTuple       htup;
2924         Datum           val;
2925         bool            isnull;
2926         int                     found;
2927         int                     i;
2928
2929         ScanKeyInit(&skey,
2930                                 Anum_pg_attrdef_adrelid,
2931                                 BTEqualStrategyNumber, F_OIDEQ,
2932                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2933
2934         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
2935         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
2936                                                                 SnapshotNow, 1, &skey);
2937         found = 0;
2938
2939         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2940         {
2941                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2942
2943                 for (i = 0; i < ndef; i++)
2944                 {
2945                         if (adform->adnum != attrdef[i].adnum)
2946                                 continue;
2947                         if (attrdef[i].adbin != NULL)
2948                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
2949                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2950                                          RelationGetRelationName(relation));
2951                         else
2952                                 found++;
2953
2954                         val = fastgetattr(htup,
2955                                                           Anum_pg_attrdef_adbin,
2956                                                           adrel->rd_att, &isnull);
2957                         if (isnull)
2958                                 elog(WARNING, "null adbin for attr %s of rel %s",
2959                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2960                                          RelationGetRelationName(relation));
2961                         else
2962                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2963                                                                                                    TextDatumGetCString(val));
2964                         break;
2965                 }
2966
2967                 if (i >= ndef)
2968                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
2969                                  adform->adnum, RelationGetRelationName(relation));
2970         }
2971
2972         systable_endscan(adscan);
2973         heap_close(adrel, AccessShareLock);
2974
2975         if (found != ndef)
2976                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
2977                          ndef - found, RelationGetRelationName(relation));
2978 }
2979
2980 /*
2981  * Load any check constraints for the relation.
2982  */
2983 static void
2984 CheckConstraintFetch(Relation relation)
2985 {
2986         ConstrCheck *check = relation->rd_att->constr->check;
2987         int                     ncheck = relation->rd_att->constr->num_check;
2988         Relation        conrel;
2989         SysScanDesc conscan;
2990         ScanKeyData skey[1];
2991         HeapTuple       htup;
2992         Datum           val;
2993         bool            isnull;
2994         int                     found = 0;
2995
2996         ScanKeyInit(&skey[0],
2997                                 Anum_pg_constraint_conrelid,
2998                                 BTEqualStrategyNumber, F_OIDEQ,
2999                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3000
3001         conrel = heap_open(ConstraintRelationId, AccessShareLock);
3002         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
3003                                                                  SnapshotNow, 1, skey);
3004
3005         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3006         {
3007                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
3008
3009                 /* We want check constraints only */
3010                 if (conform->contype != CONSTRAINT_CHECK)
3011                         continue;
3012
3013                 if (found >= ncheck)
3014                         elog(ERROR, "unexpected constraint record found for rel %s",
3015                                  RelationGetRelationName(relation));
3016
3017                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
3018                                                                                                   NameStr(conform->conname));
3019
3020                 /* Grab and test conbin is actually set */
3021                 val = fastgetattr(htup,
3022                                                   Anum_pg_constraint_conbin,
3023                                                   conrel->rd_att, &isnull);
3024                 if (isnull)
3025                         elog(ERROR, "null conbin for rel %s",
3026                                  RelationGetRelationName(relation));
3027
3028                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
3029                                                                                                  TextDatumGetCString(val));
3030                 found++;
3031         }
3032
3033         systable_endscan(conscan);
3034         heap_close(conrel, AccessShareLock);
3035
3036         if (found != ncheck)
3037                 elog(ERROR, "%d constraint record(s) missing for rel %s",
3038                          ncheck - found, RelationGetRelationName(relation));
3039 }
3040
3041 /*
3042  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
3043  *
3044  * The index list is created only if someone requests it.  We scan pg_index
3045  * to find relevant indexes, and add the list to the relcache entry so that
3046  * we won't have to compute it again.  Note that shared cache inval of a
3047  * relcache entry will delete the old list and set rd_indexvalid to 0,
3048  * so that we must recompute the index list on next request.  This handles
3049  * creation or deletion of an index.
3050  *
3051  * The returned list is guaranteed to be sorted in order by OID.  This is
3052  * needed by the executor, since for index types that we obtain exclusive
3053  * locks on when updating the index, all backends must lock the indexes in
3054  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
3055  * consistent ordering would do, but ordering by OID is easy.
3056  *
3057  * Since shared cache inval causes the relcache's copy of the list to go away,
3058  * we return a copy of the list palloc'd in the caller's context.  The caller
3059  * may list_free() the returned list after scanning it. This is necessary
3060  * since the caller will typically be doing syscache lookups on the relevant
3061  * indexes, and syscache lookup could cause SI messages to be processed!
3062  *
3063  * We also update rd_oidindex, which this module treats as effectively part
3064  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
3065  * it is the pg_class OID of a unique index on OID when the relation has one,
3066  * and InvalidOid if there is no such index.
3067  */
3068 List *
3069 RelationGetIndexList(Relation relation)
3070 {
3071         Relation        indrel;
3072         SysScanDesc indscan;
3073         ScanKeyData skey;
3074         HeapTuple       htup;
3075         List       *result;
3076         Oid                     oidIndex;
3077         MemoryContext oldcxt;
3078
3079         /* Quick exit if we already computed the list. */
3080         if (relation->rd_indexvalid != 0)
3081                 return list_copy(relation->rd_indexlist);
3082
3083         /*
3084          * We build the list we intend to return (in the caller's context) while
3085          * doing the scan.      After successfully completing the scan, we copy that
3086          * list into the relcache entry.  This avoids cache-context memory leakage
3087          * if we get some sort of error partway through.
3088          */
3089         result = NIL;
3090         oidIndex = InvalidOid;
3091
3092         /* Prepare to scan pg_index for entries having indrelid = this rel. */
3093         ScanKeyInit(&skey,
3094                                 Anum_pg_index_indrelid,
3095                                 BTEqualStrategyNumber, F_OIDEQ,
3096                                 ObjectIdGetDatum(RelationGetRelid(relation)));
3097
3098         indrel = heap_open(IndexRelationId, AccessShareLock);
3099         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
3100                                                                  SnapshotNow, 1, &skey);
3101
3102         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
3103         {
3104                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
3105
3106                 /* Add index's OID to result list in the proper order */
3107                 result = insert_ordered_oid(result, index->indexrelid);
3108
3109                 /* Check to see if it is a unique, non-partial btree index on OID */
3110                 if (index->indnatts == 1 &&
3111                         index->indisunique && index->indimmediate &&
3112                         index->indkey.values[0] == ObjectIdAttributeNumber &&
3113                         index->indclass.values[0] == OID_BTREE_OPS_OID &&
3114                         heap_attisnull(htup, Anum_pg_index_indpred))
3115                         oidIndex = index->indexrelid;
3116         }
3117
3118         systable_endscan(indscan);
3119         heap_close(indrel, AccessShareLock);
3120
3121         /* Now save a copy of the completed list in the relcache entry. */
3122         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3123         relation->rd_indexlist = list_copy(result);
3124         relation->rd_oidindex = oidIndex;
3125         relation->rd_indexvalid = 1;
3126         MemoryContextSwitchTo(oldcxt);
3127
3128         return result;
3129 }
3130
3131 /*
3132  * insert_ordered_oid
3133  *              Insert a new Oid into a sorted list of Oids, preserving ordering
3134  *
3135  * Building the ordered list this way is O(N^2), but with a pretty small
3136  * constant, so for the number of entries we expect it will probably be
3137  * faster than trying to apply qsort().  Most tables don't have very many
3138  * indexes...
3139  */
3140 static List *
3141 insert_ordered_oid(List *list, Oid datum)
3142 {
3143         ListCell   *prev;
3144
3145         /* Does the datum belong at the front? */
3146         if (list == NIL || datum < linitial_oid(list))
3147                 return lcons_oid(datum, list);
3148         /* No, so find the entry it belongs after */
3149         prev = list_head(list);
3150         for (;;)
3151         {
3152                 ListCell   *curr = lnext(prev);
3153
3154                 if (curr == NULL || datum < lfirst_oid(curr))
3155                         break;                          /* it belongs after 'prev', before 'curr' */
3156
3157                 prev = curr;
3158         }
3159         /* Insert datum into list after 'prev' */
3160         lappend_cell_oid(list, prev, datum);
3161         return list;
3162 }
3163
3164 /*
3165  * RelationSetIndexList -- externally force the index list contents
3166  *
3167  * This is used to temporarily override what we think the set of valid
3168  * indexes is (including the presence or absence of an OID index).
3169  * The forcing will be valid only until transaction commit or abort.
3170  *
3171  * This should only be applied to nailed relations, because in a non-nailed
3172  * relation the hacked index list could be lost at any time due to SI
3173  * messages.  In practice it is only used on pg_class (see REINDEX).
3174  *
3175  * It is up to the caller to make sure the given list is correctly ordered.
3176  *
3177  * We deliberately do not change rd_indexattr here: even when operating
3178  * with a temporary partial index list, HOT-update decisions must be made
3179  * correctly with respect to the full index set.  It is up to the caller
3180  * to ensure that a correct rd_indexattr set has been cached before first
3181  * calling RelationSetIndexList; else a subsequent inquiry might cause a
3182  * wrong rd_indexattr set to get computed and cached.
3183  */
3184 void
3185 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3186 {
3187         MemoryContext oldcxt;
3188
3189         Assert(relation->rd_isnailed);
3190         /* Copy the list into the cache context (could fail for lack of mem) */
3191         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3192         indexIds = list_copy(indexIds);
3193         MemoryContextSwitchTo(oldcxt);
3194         /* Okay to replace old list */
3195         list_free(relation->rd_indexlist);
3196         relation->rd_indexlist = indexIds;
3197         relation->rd_oidindex = oidIndex;
3198         relation->rd_indexvalid = 2;    /* mark list as forced */
3199         /* must flag that we have a forced index list */
3200         need_eoxact_work = true;
3201 }
3202
3203 /*
3204  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
3205  *
3206  * Returns InvalidOid if there is no such index.
3207  */
3208 Oid
3209 RelationGetOidIndex(Relation relation)
3210 {
3211         List       *ilist;
3212
3213         /*
3214          * If relation doesn't have OIDs at all, caller is probably confused. (We
3215          * could just silently return InvalidOid, but it seems better to throw an
3216          * assertion.)
3217          */
3218         Assert(relation->rd_rel->relhasoids);
3219
3220         if (relation->rd_indexvalid == 0)
3221         {
3222                 /* RelationGetIndexList does the heavy lifting. */
3223                 ilist = RelationGetIndexList(relation);
3224                 list_free(ilist);
3225                 Assert(relation->rd_indexvalid != 0);
3226         }
3227
3228         return relation->rd_oidindex;
3229 }
3230
3231 /*
3232  * RelationGetIndexExpressions -- get the index expressions for an index
3233  *
3234  * We cache the result of transforming pg_index.indexprs into a node tree.
3235  * If the rel is not an index or has no expressional columns, we return NIL.
3236  * Otherwise, the returned tree is copied into the caller's memory context.
3237  * (We don't want to return a pointer to the relcache copy, since it could
3238  * disappear due to relcache invalidation.)
3239  */
3240 List *
3241 RelationGetIndexExpressions(Relation relation)
3242 {
3243         List       *result;
3244         Datum           exprsDatum;
3245         bool            isnull;
3246         char       *exprsString;
3247         MemoryContext oldcxt;
3248
3249         /* Quick exit if we already computed the result. */
3250         if (relation->rd_indexprs)
3251                 return (List *) copyObject(relation->rd_indexprs);
3252
3253         /* Quick exit if there is nothing to do. */
3254         if (relation->rd_indextuple == NULL ||
3255                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
3256                 return NIL;
3257
3258         /*
3259          * We build the tree we intend to return in the caller's context. After
3260          * successfully completing the work, we copy it into the relcache entry.
3261          * This avoids problems if we get some sort of error partway through.
3262          */
3263         exprsDatum = heap_getattr(relation->rd_indextuple,
3264                                                           Anum_pg_index_indexprs,
3265                                                           GetPgIndexDescriptor(),
3266                                                           &isnull);
3267         Assert(!isnull);
3268         exprsString = TextDatumGetCString(exprsDatum);
3269         result = (List *) stringToNode(exprsString);
3270         pfree(exprsString);
3271
3272         /*
3273          * Run the expressions through eval_const_expressions. This is not just an
3274          * optimization, but is necessary, because the planner will be comparing
3275          * them to similarly-processed qual clauses, and may fail to detect valid
3276          * matches without this.  We don't bother with canonicalize_qual, however.
3277          */
3278         result = (List *) eval_const_expressions(NULL, (Node *) result);
3279
3280         /*
3281          * Also mark any coercion format fields as "don't care", so that the
3282          * planner can match to both explicit and implicit coercions.
3283          */
3284         set_coercionform_dontcare((Node *) result);
3285
3286         /* May as well fix opfuncids too */
3287         fix_opfuncids((Node *) result);
3288
3289         /* Now save a copy of the completed tree in the relcache entry. */
3290         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3291         relation->rd_indexprs = (List *) copyObject(result);
3292         MemoryContextSwitchTo(oldcxt);
3293
3294         return result;
3295 }
3296
3297 /*
3298  * RelationGetIndexPredicate -- get the index predicate for an index
3299  *
3300  * We cache the result of transforming pg_index.indpred into an implicit-AND
3301  * node tree (suitable for ExecQual).
3302  * If the rel is not an index or has no predicate, we return NIL.
3303  * Otherwise, the returned tree is copied into the caller's memory context.
3304  * (We don't want to return a pointer to the relcache copy, since it could
3305  * disappear due to relcache invalidation.)
3306  */
3307 List *
3308 RelationGetIndexPredicate(Relation relation)
3309 {
3310         List       *result;
3311         Datum           predDatum;
3312         bool            isnull;
3313         char       *predString;
3314         MemoryContext oldcxt;
3315
3316         /* Quick exit if we already computed the result. */
3317         if (relation->rd_indpred)
3318                 return (List *) copyObject(relation->rd_indpred);
3319
3320         /* Quick exit if there is nothing to do. */
3321         if (relation->rd_indextuple == NULL ||
3322                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
3323                 return NIL;
3324
3325         /*
3326          * We build the tree we intend to return in the caller's context. After
3327          * successfully completing the work, we copy it into the relcache entry.
3328          * This avoids problems if we get some sort of error partway through.
3329          */
3330         predDatum = heap_getattr(relation->rd_indextuple,
3331                                                          Anum_pg_index_indpred,
3332                                                          GetPgIndexDescriptor(),
3333                                                          &isnull);
3334         Assert(!isnull);
3335         predString = TextDatumGetCString(predDatum);
3336         result = (List *) stringToNode(predString);
3337         pfree(predString);
3338
3339         /*
3340          * Run the expression through const-simplification and canonicalization.
3341          * This is not just an optimization, but is necessary, because the planner
3342          * will be comparing it to similarly-processed qual clauses, and may fail
3343          * to detect valid matches without this.  This must match the processing
3344          * done to qual clauses in preprocess_expression()!  (We can skip the
3345          * stuff involving subqueries, however, since we don't allow any in index
3346          * predicates.)
3347          */
3348         result = (List *) eval_const_expressions(NULL, (Node *) result);
3349
3350         result = (List *) canonicalize_qual((Expr *) result);
3351
3352         /*
3353          * Also mark any coercion format fields as "don't care", so that the
3354          * planner can match to both explicit and implicit coercions.
3355          */
3356         set_coercionform_dontcare((Node *) result);
3357
3358         /* Also convert to implicit-AND format */
3359         result = make_ands_implicit((Expr *) result);
3360
3361         /* May as well fix opfuncids too */
3362         fix_opfuncids((Node *) result);
3363
3364         /* Now save a copy of the completed tree in the relcache entry. */
3365         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3366         relation->rd_indpred = (List *) copyObject(result);
3367         MemoryContextSwitchTo(oldcxt);
3368
3369         return result;
3370 }
3371
3372 /*
3373  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
3374  *
3375  * The result has a bit set for each attribute used anywhere in the index
3376  * definitions of all the indexes on this relation.  (This includes not only
3377  * simple index keys, but attributes used in expressions and partial-index
3378  * predicates.)
3379  *
3380  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
3381  * we can include system attributes (e.g., OID) in the bitmap representation.
3382  *
3383  * The returned result is palloc'd in the caller's memory context and should
3384  * be bms_free'd when not needed anymore.
3385  */
3386 Bitmapset *
3387 RelationGetIndexAttrBitmap(Relation relation)
3388 {
3389         Bitmapset  *indexattrs;
3390         List       *indexoidlist;
3391         ListCell   *l;
3392         MemoryContext oldcxt;
3393
3394         /* Quick exit if we already computed the result. */
3395         if (relation->rd_indexattr != NULL)
3396                 return bms_copy(relation->rd_indexattr);
3397
3398         /* Fast path if definitely no indexes */
3399         if (!RelationGetForm(relation)->relhasindex)
3400                 return NULL;
3401
3402         /*
3403          * Get cached list of index OIDs
3404          */
3405         indexoidlist = RelationGetIndexList(relation);
3406
3407         /* Fall out if no indexes (but relhasindex was set) */
3408         if (indexoidlist == NIL)
3409                 return NULL;
3410
3411         /*
3412          * For each index, add referenced attributes to indexattrs.
3413          */
3414         indexattrs = NULL;
3415         foreach(l, indexoidlist)
3416         {
3417                 Oid                     indexOid = lfirst_oid(l);
3418                 Relation        indexDesc;
3419                 IndexInfo  *indexInfo;
3420                 int                     i;
3421
3422                 indexDesc = index_open(indexOid, AccessShareLock);
3423
3424                 /* Extract index key information from the index's pg_index row */
3425                 indexInfo = BuildIndexInfo(indexDesc);
3426
3427                 /* Collect simple attribute references */
3428                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
3429                 {
3430                         int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
3431
3432                         if (attrnum != 0)
3433                                 indexattrs = bms_add_member(indexattrs,
3434                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
3435                 }
3436
3437                 /* Collect all attributes used in expressions, too */
3438                 pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);
3439
3440                 /* Collect all attributes in the index predicate, too */
3441                 pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);
3442
3443                 index_close(indexDesc, AccessShareLock);
3444         }
3445
3446         list_free(indexoidlist);
3447
3448         /* Now save a copy of the bitmap in the relcache entry. */
3449         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3450         relation->rd_indexattr = bms_copy(indexattrs);
3451         MemoryContextSwitchTo(oldcxt);
3452
3453         /* We return our original working copy for caller to play with */
3454         return indexattrs;
3455 }
3456
3457 /*
3458  * RelationGetExclusionInfo -- get info about index's exclusion constraint
3459  *
3460  * This should be called only for an index that is known to have an
3461  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
3462  * context) of the exclusion operator OIDs, their underlying functions'
3463  * OIDs, and their strategy numbers in the index's opclasses.  We cache
3464  * all this information since it requires a fair amount of work to get.
3465  */
3466 void
3467 RelationGetExclusionInfo(Relation indexRelation,
3468                                                  Oid **operators,
3469                                                  Oid **procs,
3470                                                  uint16 **strategies)
3471 {
3472         int                     ncols = indexRelation->rd_rel->relnatts;
3473         Oid                *ops;
3474         Oid                *funcs;
3475         uint16     *strats;
3476         Relation        conrel;
3477         SysScanDesc     conscan;
3478         ScanKeyData     skey[1];
3479         HeapTuple       htup;
3480         bool            found;
3481         MemoryContext oldcxt;
3482         int                     i;
3483
3484         /* Allocate result space in caller context */
3485         *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
3486         *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
3487         *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
3488
3489         /* Quick exit if we have the data cached already */
3490         if (indexRelation->rd_exclstrats != NULL)
3491         {
3492                 memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
3493                 memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
3494                 memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
3495                 return;
3496         }
3497
3498         /*
3499          * Search pg_constraint for the constraint associated with the index.
3500          * To make this not too painfully slow, we use the index on conrelid;
3501          * that will hold the parent relation's OID not the index's own OID.
3502          */
3503         ScanKeyInit(&skey[0],
3504                                 Anum_pg_constraint_conrelid,
3505                                 BTEqualStrategyNumber, F_OIDEQ,
3506                                 ObjectIdGetDatum(indexRelation->rd_index->indrelid));
3507
3508         conrel = heap_open(ConstraintRelationId, AccessShareLock);
3509         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
3510                                                                  SnapshotNow, 1, skey);
3511         found = false;
3512
3513         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
3514         {
3515                 Form_pg_constraint       conform = (Form_pg_constraint) GETSTRUCT(htup);
3516                 Datum           val;
3517                 bool            isnull;
3518                 ArrayType  *arr;
3519                 int                     nelem;
3520
3521                 /* We want the exclusion constraint owning the index */
3522                 if (conform->contype != CONSTRAINT_EXCLUSION ||
3523                         conform->conindid != RelationGetRelid(indexRelation))
3524                         continue;
3525
3526                 /* There should be only one */
3527                 if (found)
3528                         elog(ERROR, "unexpected exclusion constraint record found for rel %s",
3529                                  RelationGetRelationName(indexRelation));
3530                 found = true;
3531
3532                 /* Extract the operator OIDS from conexclop */
3533                 val = fastgetattr(htup,
3534                                                   Anum_pg_constraint_conexclop,
3535                                                   conrel->rd_att, &isnull);
3536                 if (isnull)
3537                         elog(ERROR, "null conexclop for rel %s",
3538                                  RelationGetRelationName(indexRelation));
3539
3540                 arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
3541                 nelem = ARR_DIMS(arr)[0];
3542                 if (ARR_NDIM(arr) != 1 ||
3543                         nelem != ncols ||
3544                         ARR_HASNULL(arr) ||
3545                         ARR_ELEMTYPE(arr) != OIDOID)
3546                         elog(ERROR, "conexclop is not a 1-D Oid array");
3547
3548                 memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
3549         }
3550
3551         systable_endscan(conscan);
3552         heap_close(conrel, AccessShareLock);
3553
3554         if (!found)
3555                 elog(ERROR, "exclusion constraint record missing for rel %s",
3556                          RelationGetRelationName(indexRelation));
3557
3558         /* We need the func OIDs and strategy numbers too */
3559         for (i = 0; i < ncols; i++)
3560         {
3561                 funcs[i] = get_opcode(ops[i]);
3562                 strats[i] = get_op_opfamily_strategy(ops[i],
3563                                                                                          indexRelation->rd_opfamily[i]);
3564                 /* shouldn't fail, since it was checked at index creation */
3565                 if (strats[i] == InvalidStrategy)
3566                         elog(ERROR, "could not find strategy for operator %u in family %u",
3567                                  ops[i], indexRelation->rd_opfamily[i]);
3568         }
3569
3570         /* Save a copy of the results in the relcache entry. */
3571         oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
3572         indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
3573         indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
3574         indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
3575         memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
3576         memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
3577         memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
3578         MemoryContextSwitchTo(oldcxt);
3579 }
3580
3581
3582 /*
3583  *      load_relcache_init_file, write_relcache_init_file
3584  *
3585  *              In late 1992, we started regularly having databases with more than
3586  *              a thousand classes in them.  With this number of classes, it became
3587  *              critical to do indexed lookups on the system catalogs.
3588  *
3589  *              Bootstrapping these lookups is very hard.  We want to be able to
3590  *              use an index on pg_attribute, for example, but in order to do so,
3591  *              we must have read pg_attribute for the attributes in the index,
3592  *              which implies that we need to use the index.
3593  *
3594  *              In order to get around the problem, we do the following:
3595  *
3596  *                 +  When the database system is initialized (at initdb time), we
3597  *                        don't use indexes.  We do sequential scans.
3598  *
3599  *                 +  When the backend is started up in normal mode, we load an image
3600  *                        of the appropriate relation descriptors, in internal format,
3601  *                        from an initialization file in the data/base/... directory.
3602  *
3603  *                 +  If the initialization file isn't there, then we create the
3604  *                        relation descriptors using sequential scans and write 'em to
3605  *                        the initialization file for use by subsequent backends.
3606  *
3607  *              As of Postgres 8.5, there is one local initialization file in each
3608  *              database, plus one shared initialization file for shared catalogs.
3609  *
3610  *              We could dispense with the initialization files and just build the
3611  *              critical reldescs the hard way on every backend startup, but that
3612  *              slows down backend startup noticeably.
3613  *
3614  *              We can in fact go further, and save more relcache entries than
3615  *              just the ones that are absolutely critical; this allows us to speed
3616  *              up backend startup by not having to build such entries the hard way.
3617  *              Presently, all the catalog and index entries that are referred to
3618  *              by catcaches are stored in the initialization files.
3619  *
3620  *              The same mechanism that detects when catcache and relcache entries
3621  *              need to be invalidated (due to catalog updates) also arranges to
3622  *              unlink the initialization files when the contents may be out of date.
3623  *              The files will then be rebuilt during the next backend startup.
3624  */
3625
3626 /*
3627  * load_relcache_init_file -- attempt to load cache from the shared
3628  * or local cache init file
3629  *
3630  * If successful, return TRUE and set criticalRelcachesBuilt or
3631  * criticalSharedRelcachesBuilt to true.
3632  * If not successful, return FALSE.
3633  *
3634  * NOTE: we assume we are already switched into CacheMemoryContext.
3635  */
3636 static bool
3637 load_relcache_init_file(bool shared)
3638 {
3639         FILE       *fp;
3640         char            initfilename[MAXPGPATH];
3641         Relation   *rels;
3642         int                     relno,
3643                                 num_rels,
3644                                 max_rels,
3645                                 nailed_rels,
3646                                 nailed_indexes,
3647                                 magic;
3648         int                     i;
3649
3650         if (shared)
3651                 snprintf(initfilename, sizeof(initfilename), "global/%s",
3652                                  RELCACHE_INIT_FILENAME);
3653         else
3654                 snprintf(initfilename, sizeof(initfilename), "%s/%s",
3655                                  DatabasePath, RELCACHE_INIT_FILENAME);
3656
3657         fp = AllocateFile(initfilename, PG_BINARY_R);
3658         if (fp == NULL)
3659                 return false;
3660
3661         /*
3662          * Read the index relcache entries from the file.  Note we will not enter
3663          * any of them into the cache if the read fails partway through; this
3664          * helps to guard against broken init files.
3665          */
3666         max_rels = 100;
3667         rels = (Relation *) palloc(max_rels * sizeof(Relation));
3668         num_rels = 0;
3669         nailed_rels = nailed_indexes = 0;
3670
3671         /* check for correct magic number (compatible version) */
3672         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3673                 goto read_failed;
3674         if (magic != RELCACHE_INIT_FILEMAGIC)
3675                 goto read_failed;
3676
3677         for (relno = 0;; relno++)
3678         {
3679                 Size            len;
3680                 size_t          nread;
3681                 Relation        rel;
3682                 Form_pg_class relform;
3683                 bool            has_not_null;
3684
3685                 /* first read the relation descriptor length */
3686                 nread = fread(&len, 1, sizeof(len), fp);
3687                 if (nread != sizeof(len))
3688                 {
3689                         if (nread == 0)
3690                                 break;                  /* end of file */
3691                         goto read_failed;
3692                 }
3693
3694                 /* safety check for incompatible relcache layout */
3695                 if (len != sizeof(RelationData))
3696                         goto read_failed;
3697
3698                 /* allocate another relcache header */
3699                 if (num_rels >= max_rels)
3700                 {
3701                         max_rels *= 2;
3702                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
3703                 }
3704
3705                 rel = rels[num_rels++] = (Relation) palloc(len);
3706
3707                 /* then, read the Relation structure */
3708                 if (fread(rel, 1, len, fp) != len)
3709                         goto read_failed;
3710
3711                 /* next read the relation tuple form */
3712                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3713                         goto read_failed;
3714
3715                 relform = (Form_pg_class) palloc(len);
3716                 if (fread(relform, 1, len, fp) != len)
3717                         goto read_failed;
3718
3719                 rel->rd_rel = relform;
3720
3721                 /* initialize attribute tuple forms */
3722                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
3723                                                                                           relform->relhasoids);
3724                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
3725
3726                 rel->rd_att->tdtypeid = relform->reltype;
3727                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
3728
3729                 /* next read all the attribute tuple form data entries */
3730                 has_not_null = false;
3731                 for (i = 0; i < relform->relnatts; i++)
3732                 {
3733                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3734                                 goto read_failed;
3735                         if (len != ATTRIBUTE_FIXED_PART_SIZE)
3736                                 goto read_failed;
3737                         if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
3738                                 goto read_failed;
3739
3740                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
3741                 }
3742
3743                 /* next read the access method specific field */
3744                 if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3745                         goto read_failed;
3746                 if (len > 0)
3747                 {
3748                         rel->rd_options = palloc(len);
3749                         if (fread(rel->rd_options, 1, len, fp) != len)
3750                                 goto read_failed;
3751                         if (len != VARSIZE(rel->rd_options))
3752                                 goto read_failed;               /* sanity check */
3753                 }
3754                 else
3755                 {
3756                         rel->rd_options = NULL;
3757                 }
3758
3759                 /* mark not-null status */
3760                 if (has_not_null)
3761                 {
3762                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3763
3764                         constr->has_not_null = true;
3765                         rel->rd_att->constr = constr;
3766                 }
3767
3768                 /* If it's an index, there's more to do */
3769                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3770                 {
3771                         Form_pg_am      am;
3772                         MemoryContext indexcxt;
3773                         Oid                *opfamily;
3774                         Oid                *opcintype;
3775                         Oid                *operator;
3776                         RegProcedure *support;
3777                         int                     nsupport;
3778                         int16      *indoption;
3779
3780                         /* Count nailed indexes to ensure we have 'em all */
3781                         if (rel->rd_isnailed)
3782                                 nailed_indexes++;
3783
3784                         /* next, read the pg_index tuple */
3785                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3786                                 goto read_failed;
3787
3788                         rel->rd_indextuple = (HeapTuple) palloc(len);
3789                         if (fread(rel->rd_indextuple, 1, len, fp) != len)
3790                                 goto read_failed;
3791
3792                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
3793                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
3794                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
3795
3796                         /* next, read the access method tuple form */
3797                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3798                                 goto read_failed;
3799
3800                         am = (Form_pg_am) palloc(len);
3801                         if (fread(am, 1, len, fp) != len)
3802                                 goto read_failed;
3803                         rel->rd_am = am;
3804
3805                         /*
3806                          * prepare index info context --- parameters should match
3807                          * RelationInitIndexAccessInfo
3808                          */
3809                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
3810                                                                                          RelationGetRelationName(rel),
3811                                                                                          ALLOCSET_SMALL_MINSIZE,
3812                                                                                          ALLOCSET_SMALL_INITSIZE,
3813                                                                                          ALLOCSET_SMALL_MAXSIZE);
3814                         rel->rd_indexcxt = indexcxt;
3815
3816                         /* next, read the vector of opfamily OIDs */
3817                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3818                                 goto read_failed;
3819
3820                         opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
3821                         if (fread(opfamily, 1, len, fp) != len)
3822                                 goto read_failed;
3823
3824                         rel->rd_opfamily = opfamily;
3825
3826                         /* next, read the vector of opcintype OIDs */
3827                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3828                                 goto read_failed;
3829
3830                         opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
3831                         if (fread(opcintype, 1, len, fp) != len)
3832                                 goto read_failed;
3833
3834                         rel->rd_opcintype = opcintype;
3835
3836                         /* next, read the vector of operator OIDs */
3837                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3838                                 goto read_failed;
3839
3840                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
3841                         if (fread(operator, 1, len, fp) != len)
3842                                 goto read_failed;
3843
3844                         rel->rd_operator = operator;
3845
3846                         /* next, read the vector of support procedures */
3847                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3848                                 goto read_failed;
3849                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
3850                         if (fread(support, 1, len, fp) != len)
3851                                 goto read_failed;
3852
3853                         rel->rd_support = support;
3854
3855                         /* finally, read the vector of indoption values */
3856                         if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
3857                                 goto read_failed;
3858
3859                         indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
3860                         if (fread(indoption, 1, len, fp) != len)
3861                                 goto read_failed;
3862
3863                         rel->rd_indoption = indoption;
3864
3865                         /* set up zeroed fmgr-info vectors */
3866                         rel->rd_aminfo = (RelationAmInfo *)
3867                                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
3868                         nsupport = relform->relnatts * am->amsupport;
3869                         rel->rd_supportinfo = (FmgrInfo *)
3870                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3871                 }
3872                 else
3873                 {
3874                         /* Count nailed rels to ensure we have 'em all */
3875                         if (rel->rd_isnailed)
3876                                 nailed_rels++;
3877
3878                         Assert(rel->rd_index == NULL);
3879                         Assert(rel->rd_indextuple == NULL);
3880                         Assert(rel->rd_am == NULL);
3881                         Assert(rel->rd_indexcxt == NULL);
3882                         Assert(rel->rd_aminfo == NULL);
3883                         Assert(rel->rd_opfamily == NULL);
3884                         Assert(rel->rd_opcintype == NULL);
3885                         Assert(rel->rd_operator == NULL);
3886                         Assert(rel->rd_support == NULL);
3887                         Assert(rel->rd_supportinfo == NULL);
3888                         Assert(rel->rd_indoption == NULL);
3889                 }
3890
3891                 /*
3892                  * Rules and triggers are not saved (mainly because the internal
3893                  * format is complex and subject to change).  They must be rebuilt if
3894                  * needed by RelationCacheInitializePhase3.  This is not expected to
3895                  * be a big performance hit since few system catalogs have such. Ditto
3896                  * for index expressions, predicates, and exclusion info.
3897                  */
3898                 rel->rd_rules = NULL;
3899                 rel->rd_rulescxt = NULL;
3900                 rel->trigdesc = NULL;
3901                 rel->rd_indexprs = NIL;
3902                 rel->rd_indpred = NIL;
3903                 rel->rd_exclops = NULL;
3904                 rel->rd_exclprocs = NULL;
3905                 rel->rd_exclstrats = NULL;
3906
3907                 /*
3908                  * Reset transient-state fields in the relcache entry
3909                  */
3910                 rel->rd_smgr = NULL;
3911                 rel->rd_targblock = InvalidBlockNumber;
3912                 rel->rd_fsm_nblocks = InvalidBlockNumber;
3913                 rel->rd_vm_nblocks = InvalidBlockNumber;
3914                 if (rel->rd_isnailed)
3915                         rel->rd_refcnt = 1;
3916                 else
3917                         rel->rd_refcnt = 0;
3918                 rel->rd_indexvalid = 0;
3919                 rel->rd_indexlist = NIL;
3920                 rel->rd_indexattr = NULL;
3921                 rel->rd_oidindex = InvalidOid;
3922                 rel->rd_createSubid = InvalidSubTransactionId;
3923                 rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3924                 rel->rd_amcache = NULL;
3925                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3926
3927                 /*
3928                  * Recompute lock and physical addressing info.  This is needed in
3929                  * case the pg_internal.init file was copied from some other database
3930                  * by CREATE DATABASE.
3931                  */
3932                 RelationInitLockInfo(rel);
3933                 RelationInitPhysicalAddr(rel);
3934         }
3935
3936         /*
3937          * We reached the end of the init file without apparent problem. Did we
3938          * get the right number of nailed items?  (This is a useful crosscheck in
3939          * case the set of critical rels or indexes changes.)
3940          */
3941         if (shared)
3942         {
3943                 if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
3944                         nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
3945                         goto read_failed;
3946         }
3947         else
3948         {
3949                 if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
3950                         nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
3951                         goto read_failed;
3952         }
3953
3954         /*
3955          * OK, all appears well.
3956          *
3957          * Now insert all the new relcache entries into the cache.
3958          */
3959         for (relno = 0; relno < num_rels; relno++)
3960         {
3961                 RelationCacheInsert(rels[relno]);
3962                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3963                 if (!shared)
3964                         initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
3965                                                                                         initFileRelationIds);
3966         }
3967
3968         pfree(rels);
3969         FreeFile(fp);
3970
3971         if (shared)
3972                 criticalSharedRelcachesBuilt = true;
3973         else
3974                 criticalRelcachesBuilt = true;
3975         return true;
3976
3977         /*
3978          * init file is broken, so do it the hard way.  We don't bother trying to
3979          * free the clutter we just allocated; it's not in the relcache so it
3980          * won't hurt.
3981          */
3982 read_failed:
3983         pfree(rels);
3984         FreeFile(fp);
3985
3986         return false;
3987 }
3988
3989 /*
3990  * Write out a new initialization file with the current contents
3991  * of the relcache (either shared rels or local rels, as indicated).
3992  */
3993 static void
3994 write_relcache_init_file(bool shared)
3995 {
3996         FILE       *fp;
3997         char            tempfilename[MAXPGPATH];
3998         char            finalfilename[MAXPGPATH];
3999         int                     magic;
4000         HASH_SEQ_STATUS status;
4001         RelIdCacheEnt *idhentry;
4002         MemoryContext oldcxt;
4003         int                     i;
4004
4005         /*
4006          * We must write a temporary file and rename it into place. Otherwise,
4007          * another backend starting at about the same time might crash trying to
4008          * read the partially-complete file.
4009          */
4010         if (shared)
4011         {
4012                 snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
4013                                  RELCACHE_INIT_FILENAME, MyProcPid);
4014                 snprintf(finalfilename, sizeof(finalfilename), "global/%s",
4015                                  RELCACHE_INIT_FILENAME);
4016         }
4017         else
4018         {
4019                 snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
4020                                  DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
4021                 snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
4022                                  DatabasePath, RELCACHE_INIT_FILENAME);
4023         }
4024
4025         unlink(tempfilename);           /* in case it exists w/wrong permissions */
4026
4027         fp = AllocateFile(tempfilename, PG_BINARY_W);
4028         if (fp == NULL)
4029         {
4030                 /*
4031                  * We used to consider this a fatal error, but we might as well
4032                  * continue with backend startup ...
4033                  */
4034                 ereport(WARNING,
4035                                 (errcode_for_file_access(),
4036                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
4037                                                 tempfilename),
4038                           errdetail("Continuing anyway, but there's something wrong.")));
4039                 return;
4040         }
4041
4042         /*
4043          * Write a magic number to serve as a file version identifier.  We can
4044          * change the magic number whenever the relcache layout changes.
4045          */
4046         magic = RELCACHE_INIT_FILEMAGIC;
4047         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
4048                 elog(FATAL, "could not write init file");
4049
4050         /*
4051          * Write all the appropriate reldescs (in no particular order).
4052          */
4053         hash_seq_init(&status, RelationIdCache);
4054
4055         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4056         {
4057                 Relation        rel = idhentry->reldesc;
4058                 Form_pg_class relform = rel->rd_rel;
4059
4060                 /* ignore if not correct group */
4061                 if (relform->relisshared != shared)
4062                         continue;
4063
4064                 /* first write the relcache entry proper */
4065                 write_item(rel, sizeof(RelationData), fp);
4066
4067                 /* next write the relation tuple form */
4068                 write_item(relform, CLASS_TUPLE_SIZE, fp);
4069
4070                 /* next, do all the attribute tuple form data entries */
4071                 for (i = 0; i < relform->relnatts; i++)
4072                 {
4073                         write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
4074                 }
4075
4076                 /* next, do the access method specific field */
4077                 write_item(rel->rd_options,
4078                                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
4079                                    fp);
4080
4081                 /* If it's an index, there's more to do */
4082                 if (rel->rd_rel->relkind == RELKIND_INDEX)
4083                 {
4084                         Form_pg_am      am = rel->rd_am;
4085
4086                         /* write the pg_index tuple */
4087                         /* we assume this was created by heap_copytuple! */
4088                         write_item(rel->rd_indextuple,
4089                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
4090                                            fp);
4091
4092                         /* next, write the access method tuple form */
4093                         write_item(am, sizeof(FormData_pg_am), fp);
4094
4095                         /* next, write the vector of opfamily OIDs */
4096                         write_item(rel->rd_opfamily,
4097                                            relform->relnatts * sizeof(Oid),
4098                                            fp);
4099
4100                         /* next, write the vector of opcintype OIDs */
4101                         write_item(rel->rd_opcintype,
4102                                            relform->relnatts * sizeof(Oid),
4103                                            fp);
4104
4105                         /* next, write the vector of operator OIDs */
4106                         write_item(rel->rd_operator,
4107                                            relform->relnatts * (am->amstrategies * sizeof(Oid)),
4108                                            fp);
4109
4110                         /* next, write the vector of support procedures */
4111                         write_item(rel->rd_support,
4112                                   relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
4113                                            fp);
4114
4115                         /* finally, write the vector of indoption values */
4116                         write_item(rel->rd_indoption,
4117                                            relform->relnatts * sizeof(int16),
4118                                            fp);
4119                 }
4120
4121                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
4122                 if (!shared)
4123                 {
4124                         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4125                         initFileRelationIds = lcons_oid(RelationGetRelid(rel),
4126                                                                                         initFileRelationIds);
4127                         MemoryContextSwitchTo(oldcxt);
4128                 }
4129         }
4130
4131         if (FreeFile(fp))
4132                 elog(FATAL, "could not write init file");
4133
4134         /*
4135          * Now we have to check whether the data we've so painstakingly
4136          * accumulated is already obsolete due to someone else's just-committed
4137          * catalog changes.  If so, we just delete the temp file and leave it to
4138          * the next backend to try again.  (Our own relcache entries will be
4139          * updated by SI message processing, but we can't be sure whether what we
4140          * wrote out was up-to-date.)
4141          *
4142          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
4143          * grab a serialization lock for the duration.
4144          */
4145         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
4146
4147         /* Make sure we have seen all incoming SI messages */
4148         AcceptInvalidationMessages();
4149
4150         /*
4151          * If we have received any SI relcache invals since backend start, assume
4152          * we may have written out-of-date data.
4153          */
4154         if (relcacheInvalsReceived == 0L)
4155         {
4156                 /*
4157                  * OK, rename the temp file to its final name, deleting any
4158                  * previously-existing init file.
4159                  *
4160                  * Note: a failure here is possible under Cygwin, if some other
4161                  * backend is holding open an unlinked-but-not-yet-gone init file. So
4162                  * treat this as a noncritical failure; just remove the useless temp
4163                  * file on failure.
4164                  */
4165                 if (rename(tempfilename, finalfilename) < 0)
4166                         unlink(tempfilename);
4167         }
4168         else
4169         {
4170                 /* Delete the already-obsolete temp file */
4171                 unlink(tempfilename);
4172         }
4173
4174         LWLockRelease(RelCacheInitLock);
4175 }
4176
4177 /* write a chunk of data preceded by its length */
4178 static void
4179 write_item(const void *data, Size len, FILE *fp)
4180 {
4181         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
4182                 elog(FATAL, "could not write init file");
4183         if (fwrite(data, 1, len, fp) != len)
4184                 elog(FATAL, "could not write init file");
4185 }
4186
4187 /*
4188  * Detect whether a given relation (identified by OID) is one of the ones
4189  * we store in the local relcache init file.
4190  *
4191  * Note that we effectively assume that all backends running in a database
4192  * would choose to store the same set of relations in the init file;
4193  * otherwise there are cases where we'd fail to detect the need for an init
4194  * file invalidation.  This does not seem likely to be a problem in practice.
4195  */
4196 bool
4197 RelationIdIsInInitFile(Oid relationId)
4198 {
4199         return list_member_oid(initFileRelationIds, relationId);
4200 }
4201
4202 /*
4203  * Invalidate (remove) the init file during commit of a transaction that
4204  * changed one or more of the relation cache entries that are kept in the
4205  * local init file.
4206  *
4207  * We actually need to remove the init file twice: once just before sending
4208  * the SI messages that include relcache inval for such relations, and once
4209  * just after sending them.  The unlink before ensures that a backend that's
4210  * currently starting cannot read the now-obsolete init file and then miss
4211  * the SI messages that will force it to update its relcache entries.  (This
4212  * works because the backend startup sequence gets into the PGPROC array before
4213  * trying to load the init file.)  The unlink after is to synchronize with a
4214  * backend that may currently be trying to write an init file based on data
4215  * that we've just rendered invalid.  Such a backend will see the SI messages,
4216  * but we can't leave the init file sitting around to fool later backends.
4217  *
4218  * Ignore any failure to unlink the file, since it might not be there if
4219  * no backend has been started since the last removal.
4220  *
4221  * Notice this deals only with the local init file, not the shared init file.
4222  * The reason is that there can never be a "significant" change to the
4223  * relcache entry of a shared relation; the most that could happen is
4224  * updates of noncritical fields such as relpages/reltuples.  So, while
4225  * it's worth updating the shared init file from time to time, it can never
4226  * be invalid enough to make it necessary to remove it.
4227  */
4228 void
4229 RelationCacheInitFileInvalidate(bool beforeSend)
4230 {
4231         char            initfilename[MAXPGPATH];
4232
4233         snprintf(initfilename, sizeof(initfilename), "%s/%s",
4234                          DatabasePath, RELCACHE_INIT_FILENAME);
4235
4236         if (beforeSend)
4237         {
4238                 /* no interlock needed here */
4239                 unlink(initfilename);
4240         }
4241         else
4242         {
4243                 /*
4244                  * We need to interlock this against write_relcache_init_file, to
4245                  * guard against possibility that someone renames a new-but-
4246                  * already-obsolete init file into place just after we unlink. With
4247                  * the interlock, it's certain that write_relcache_init_file will
4248                  * notice our SI inval message before renaming into place, or else
4249                  * that we will execute second and successfully unlink the file.
4250                  */
4251                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
4252                 unlink(initfilename);
4253                 LWLockRelease(RelCacheInitLock);
4254         }
4255 }
4256
4257 /*
4258  * Remove the init files during postmaster startup.
4259  *
4260  * We used to keep the init files across restarts, but that is unsafe in PITR
4261  * scenarios, and even in simple crash-recovery cases there are windows for
4262  * the init files to become out-of-sync with the database.  So now we just
4263  * remove them during startup and expect the first backend launch to rebuild
4264  * them.  Of course, this has to happen in each database of the cluster.
4265  */
4266 void
4267 RelationCacheInitFileRemove(void)
4268 {
4269         const char *tblspcdir = "pg_tblspc";
4270         DIR                *dir;
4271         struct dirent *de;
4272         char            path[MAXPGPATH];
4273
4274         /*
4275          * We zap the shared cache file too.  In theory it can't get out of sync
4276          * enough to be a problem, but in data-corruption cases, who knows ...
4277          */
4278         snprintf(path, sizeof(path), "global/%s",
4279                          RELCACHE_INIT_FILENAME);
4280         unlink_initfile(path);
4281
4282         /* Scan everything in the default tablespace */
4283         RelationCacheInitFileRemoveInDir("base");
4284
4285         /* Scan the tablespace link directory to find non-default tablespaces */
4286         dir = AllocateDir(tblspcdir);
4287         if (dir == NULL)
4288         {
4289                 elog(LOG, "could not open tablespace link directory \"%s\": %m",
4290                          tblspcdir);
4291                 return;
4292         }
4293
4294         while ((de = ReadDir(dir, tblspcdir)) != NULL)
4295         {
4296                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
4297                 {
4298                         /* Scan the tablespace dir for per-database dirs */
4299                         snprintf(path, sizeof(path), "%s/%s",
4300                                          tblspcdir, de->d_name);
4301                         RelationCacheInitFileRemoveInDir(path);
4302                 }
4303         }
4304
4305         FreeDir(dir);
4306 }
4307
4308 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
4309 static void
4310 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
4311 {
4312         DIR                *dir;
4313         struct dirent *de;
4314         char            initfilename[MAXPGPATH];
4315
4316         /* Scan the tablespace directory to find per-database directories */
4317         dir = AllocateDir(tblspcpath);
4318         if (dir == NULL)
4319         {
4320                 elog(LOG, "could not open tablespace directory \"%s\": %m",
4321                          tblspcpath);
4322                 return;
4323         }
4324
4325         while ((de = ReadDir(dir, tblspcpath)) != NULL)
4326         {
4327                 if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
4328                 {
4329                         /* Try to remove the init file in each database */
4330                         snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
4331                                          tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
4332                         unlink_initfile(initfilename);
4333                 }
4334         }
4335
4336         FreeDir(dir);
4337 }
4338
4339 static void
4340 unlink_initfile(const char *initfilename)
4341 {
4342         if (unlink(initfilename) < 0)
4343         {
4344                 /* It might not be there, but log any error other than ENOENT */
4345                 if (errno != ENOENT)
4346                         elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
4347         }
4348 }