]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Reimplement the linked list data structure used throughout the backend.
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.203 2004/05/26 04:41:40 neilc Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationSysNameGetRelation              - get a reldesc by system rel name
21  *              RelationIdCacheGetRelation              - get a cached reldesc by relid
22  *              RelationClose                                   - close an open relation
23  *
24  * NOTES
25  *              The following code contains many undocumented hacks.  Please be
26  *              careful....
27  */
28 #include "postgres.h"
29
30 #include <errno.h>
31 #include <sys/file.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "catalog/catalog.h"
38 #include "catalog/catname.h"
39 #include "catalog/indexing.h"
40 #include "catalog/namespace.h"
41 #include "catalog/pg_amop.h"
42 #include "catalog/pg_amproc.h"
43 #include "catalog/pg_attrdef.h"
44 #include "catalog/pg_attribute.h"
45 #include "catalog/pg_constraint.h"
46 #include "catalog/pg_index.h"
47 #include "catalog/pg_namespace.h"
48 #include "catalog/pg_opclass.h"
49 #include "catalog/pg_proc.h"
50 #include "catalog/pg_rewrite.h"
51 #include "catalog/pg_type.h"
52 #include "commands/trigger.h"
53 #include "miscadmin.h"
54 #include "optimizer/clauses.h"
55 #include "optimizer/planmain.h"
56 #include "optimizer/prep.h"
57 #include "storage/fd.h"
58 #include "storage/smgr.h"
59 #include "utils/builtins.h"
60 #include "utils/catcache.h"
61 #include "utils/fmgroids.h"
62 #include "utils/inval.h"
63 #include "utils/lsyscache.h"
64 #include "utils/relcache.h"
65 #include "utils/syscache.h"
66 #include "utils/typcache.h"
67
68
69 /*
70  * name of relcache init file, used to speed up backend startup
71  */
72 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
73
74 #define RELCACHE_INIT_FILEMAGIC         0x573261 /* version ID value */
75
76 /*
77  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
78  */
79 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
80 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
81 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
82 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
83
84 /*
85  *              Hash tables that index the relation cache
86  *
87  *              Relations are looked up two ways, by OID and by name,
88  *              thus there are two hash tables for referencing them.
89  *
90  *              The OID index covers all relcache entries.      The name index
91  *              covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
92  */
93 static HTAB *RelationIdCache;
94 static HTAB *RelationSysNameCache;
95
96 /*
97  * This flag is false until we have prepared the critical relcache entries
98  * that are needed to do indexscans on the tables read by relcache building.
99  */
100 bool            criticalRelcachesBuilt = false;
101
102 /*
103  * This flag is set if we discover that we need to write a new relcache
104  * cache file at the end of startup.
105  */
106 static bool needNewCacheFile = false;
107
108 /*
109  * This counter counts relcache inval events received since backend startup
110  * (but only for rels that are actually in cache).      Presently, we use it only
111  * to detect whether data about to be written by write_relcache_init_file()
112  * might already be obsolete.
113  */
114 static long relcacheInvalsReceived = 0L;
115
116 /*
117  * This list remembers the OIDs of the relations cached in the relcache
118  * init file.
119  */
120 static List *initFileRelationIds = NIL;
121
122 /*
123  *              RelationBuildDescInfo exists so code can be shared
124  *              between RelationIdGetRelation() and RelationSysNameGetRelation()
125  */
126 typedef struct RelationBuildDescInfo
127 {
128         int                     infotype;               /* lookup by id or by name */
129 #define INFO_RELID 1
130 #define INFO_RELNAME 2
131         union
132         {
133                 Oid                     info_id;        /* relation object id */
134                 char       *info_name;  /* system relation name */
135         }                       i;
136 } RelationBuildDescInfo;
137
138 typedef struct relidcacheent
139 {
140         Oid                     reloid;
141         Relation        reldesc;
142 } RelIdCacheEnt;
143
144 typedef struct relnamecacheent
145 {
146         NameData        relname;
147         Relation        reldesc;
148 } RelNameCacheEnt;
149
150 /*
151  *              macros to manipulate the lookup hashtables
152  */
153 #define RelationCacheInsert(RELATION)   \
154 do { \
155         RelIdCacheEnt *idhentry; bool found; \
156         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
157                                                                                    (void *) &(RELATION->rd_id), \
158                                                                                    HASH_ENTER, \
159                                                                                    &found); \
160         if (idhentry == NULL) \
161                 ereport(ERROR, \
162                                 (errcode(ERRCODE_OUT_OF_MEMORY), \
163                                  errmsg("out of memory"))); \
164         /* used to give notice if found -- now just keep quiet */ \
165         idhentry->reldesc = RELATION; \
166         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
167         { \
168                 char *relname = RelationGetRelationName(RELATION); \
169                 RelNameCacheEnt *namehentry; \
170                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
171                                                                                                    relname, \
172                                                                                                    HASH_ENTER, \
173                                                                                                    &found); \
174                 if (namehentry == NULL) \
175                         ereport(ERROR, \
176                                         (errcode(ERRCODE_OUT_OF_MEMORY), \
177                                          errmsg("out of memory"))); \
178                 /* used to give notice if found -- now just keep quiet */ \
179                 namehentry->reldesc = RELATION; \
180         } \
181 } while(0)
182
183 #define RelationIdCacheLookup(ID, RELATION) \
184 do { \
185         RelIdCacheEnt *hentry; \
186         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
187                                                                                  (void *)&(ID), HASH_FIND,NULL); \
188         if (hentry) \
189                 RELATION = hentry->reldesc; \
190         else \
191                 RELATION = NULL; \
192 } while(0)
193
194 #define RelationSysNameCacheLookup(NAME, RELATION) \
195 do { \
196         RelNameCacheEnt *hentry; \
197         hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
198                                                                                    (void *) (NAME), HASH_FIND,NULL); \
199         if (hentry) \
200                 RELATION = hentry->reldesc; \
201         else \
202                 RELATION = NULL; \
203 } while(0)
204
205 #define RelationCacheDelete(RELATION) \
206 do { \
207         RelIdCacheEnt *idhentry; \
208         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
209                                                                                    (void *)&(RELATION->rd_id), \
210                                                                                    HASH_REMOVE, NULL); \
211         if (idhentry == NULL) \
212                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
213         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
214         { \
215                 char *relname = RelationGetRelationName(RELATION); \
216                 RelNameCacheEnt *namehentry; \
217                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
218                                                                                                    relname, \
219                                                                                                    HASH_REMOVE, NULL); \
220                 if (namehentry == NULL) \
221                         elog(WARNING, "trying to delete a relname reldesc that does not exist"); \
222         } \
223 } while(0)
224
225
226 /*
227  * Special cache for opclass-related information
228  *
229  * Note: only default-subtype operators and support procs get cached
230  */
231 typedef struct opclasscacheent
232 {
233         Oid                     opclassoid;             /* lookup key: OID of opclass */
234         bool            valid;                  /* set TRUE after successful fill-in */
235         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
236         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
237         Oid                *operatorOids;       /* strategy operators' OIDs */
238         RegProcedure *supportProcs; /* support procs */
239 } OpClassCacheEnt;
240
241 static HTAB *OpClassCache = NULL;
242
243
244 /* non-export function prototypes */
245
246 static void RelationClearRelation(Relation relation, bool rebuild);
247
248 static void RelationReloadClassinfo(Relation relation);
249 static void RelationFlushRelation(Relation relation);
250 static Relation RelationSysNameCacheGetRelation(const char *relationName);
251 static bool load_relcache_init_file(void);
252 static void write_relcache_init_file(void);
253
254 static void formrdesc(const char *relationName, int natts,
255                   FormData_pg_attribute *att);
256
257 static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo, bool indexOK);
258 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
259 static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
260                                            Relation relation);
261 static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
262                                   Relation oldrelation);
263 static void AttrDefaultFetch(Relation relation);
264 static void CheckConstraintFetch(Relation relation);
265 static List *insert_ordered_oid(List *list, Oid datum);
266 static void IndexSupportInitialize(Form_pg_index iform,
267                                            Oid *indexOperator,
268                                            RegProcedure *indexSupport,
269                                            StrategyNumber maxStrategyNumber,
270                                            StrategyNumber maxSupportNumber,
271                                            AttrNumber maxAttributeNumber);
272 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
273                                   StrategyNumber numStrats,
274                                   StrategyNumber numSupport);
275
276
277 /*
278  *              ScanPgRelation
279  *
280  *              this is used by RelationBuildDesc to find a pg_class
281  *              tuple matching either a relation name or a relation id
282  *              as specified in buildinfo.
283  *
284  *              NB: the returned tuple has been copied into palloc'd storage
285  *              and must eventually be freed with heap_freetuple.
286  */
287 static HeapTuple
288 ScanPgRelation(RelationBuildDescInfo buildinfo, bool indexOK)
289 {
290         HeapTuple       pg_class_tuple;
291         Relation        pg_class_desc;
292         const char *indexRelname;
293         SysScanDesc pg_class_scan;
294         ScanKeyData key[2];
295         int                     nkeys;
296
297         /*
298          * form a scan key
299          */
300         switch (buildinfo.infotype)
301         {
302                 case INFO_RELID:
303                         ScanKeyInit(&key[0],
304                                                 ObjectIdAttributeNumber,
305                                                 BTEqualStrategyNumber, F_OIDEQ,
306                                                 ObjectIdGetDatum(buildinfo.i.info_id));
307                         nkeys = 1;
308                         indexRelname = ClassOidIndex;
309                         break;
310
311                 case INFO_RELNAME:
312                         ScanKeyInit(&key[0],
313                                                 Anum_pg_class_relname,
314                                                 BTEqualStrategyNumber, F_NAMEEQ,
315                                                 NameGetDatum(buildinfo.i.info_name));
316                         ScanKeyInit(&key[1],
317                                                 Anum_pg_class_relnamespace,
318                                                 BTEqualStrategyNumber, F_OIDEQ,
319                                                 ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
320                         nkeys = 2;
321                         indexRelname = ClassNameNspIndex;
322                         break;
323
324                 default:
325                         elog(ERROR, "unrecognized buildinfo type: %d",
326                                  buildinfo.infotype);
327                         return NULL;            /* keep compiler quiet */
328         }
329
330         /*
331          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
332          * built the critical relcache entries (this includes initdb and
333          * startup without a pg_internal.init file).  The caller can also
334          * force a heap scan by setting indexOK == false.
335          */
336         pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
337         pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
338                                                                            indexOK && criticalRelcachesBuilt,
339                                                                            SnapshotNow,
340                                                                            nkeys, key);
341
342         pg_class_tuple = systable_getnext(pg_class_scan);
343
344         /*
345          * Must copy tuple before releasing buffer.
346          */
347         if (HeapTupleIsValid(pg_class_tuple))
348                 pg_class_tuple = heap_copytuple(pg_class_tuple);
349
350         /* all done */
351         systable_endscan(pg_class_scan);
352         heap_close(pg_class_desc, AccessShareLock);
353
354         return pg_class_tuple;
355 }
356
357 /*
358  *              AllocateRelationDesc
359  *
360  *              This is used to allocate memory for a new relation descriptor
361  *              and initialize the rd_rel field.
362  *
363  *              If 'relation' is NULL, allocate a new RelationData object.
364  *              If not, reuse the given object (that path is taken only when
365  *              we have to rebuild a relcache entry during RelationClearRelation).
366  */
367 static Relation
368 AllocateRelationDesc(Relation relation, Form_pg_class relp)
369 {
370         MemoryContext oldcxt;
371         Form_pg_class relationForm;
372
373         /* Relcache entries must live in CacheMemoryContext */
374         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
375
376         /*
377          * allocate space for new relation descriptor, if needed
378          */
379         if (relation == NULL)
380                 relation = (Relation) palloc(sizeof(RelationData));
381
382         /*
383          * clear all fields of reldesc
384          */
385         MemSet((char *) relation, 0, sizeof(RelationData));
386         relation->rd_targblock = InvalidBlockNumber;
387
388         /* make sure relation is marked as having no open file yet */
389         relation->rd_smgr = NULL;
390
391         /*
392          * Copy the relation tuple form
393          *
394          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
395          * relacl is NOT stored in the relcache --- there'd be little point in
396          * it, since we don't copy the tuple's nullvalues bitmap and hence
397          * wouldn't know if the value is valid ... bottom line is that relacl
398          * *cannot* be retrieved from the relcache.  Get it from the syscache
399          * if you need it.
400          */
401         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
402
403         memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
404
405         /* initialize relation tuple form */
406         relation->rd_rel = relationForm;
407
408         /* and allocate attribute tuple form storage */
409         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
410                                                                                            relationForm->relhasoids);
411
412         MemoryContextSwitchTo(oldcxt);
413
414         return relation;
415 }
416
417 /*
418  *              RelationBuildTupleDesc
419  *
420  *              Form the relation's tuple descriptor from information in
421  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
422  */
423 static void
424 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
425                                            Relation relation)
426 {
427         HeapTuple       pg_attribute_tuple;
428         Relation        pg_attribute_desc;
429         SysScanDesc pg_attribute_scan;
430         ScanKeyData skey[2];
431         int                     need;
432         TupleConstr *constr;
433         AttrDefault *attrdef = NULL;
434         int                     ndef = 0;
435
436         /* copy some fields from pg_class row to rd_att */
437         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
438         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
439         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
440
441         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
442                                                                                                 sizeof(TupleConstr));
443         constr->has_not_null = false;
444
445         /*
446          * Form a scan key that selects only user attributes (attnum > 0).
447          * (Eliminating system attribute rows at the index level is lots
448          * faster than fetching them.)
449          */
450         ScanKeyInit(&skey[0],
451                                 Anum_pg_attribute_attrelid,
452                                 BTEqualStrategyNumber, F_OIDEQ,
453                                 ObjectIdGetDatum(RelationGetRelid(relation)));
454         ScanKeyInit(&skey[1],
455                                 Anum_pg_attribute_attnum,
456                                 BTGreaterStrategyNumber, F_INT2GT,
457                                 Int16GetDatum(0));
458
459         /*
460          * Open pg_attribute and begin a scan.  Force heap scan if we haven't
461          * yet built the critical relcache entries (this includes initdb and
462          * startup without a pg_internal.init file).
463          */
464         pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
465         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
466                                                                                    AttributeRelidNumIndex,
467                                                                                    criticalRelcachesBuilt,
468                                                                                    SnapshotNow,
469                                                                                    2, skey);
470
471         /*
472          * add attribute data to relation->rd_att
473          */
474         need = relation->rd_rel->relnatts;
475
476         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
477         {
478                 Form_pg_attribute attp;
479
480                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
481
482                 if (attp->attnum <= 0 ||
483                         attp->attnum > relation->rd_rel->relnatts)
484                         elog(ERROR, "invalid attribute number %d for %s",
485                                  attp->attnum, RelationGetRelationName(relation));
486
487                 relation->rd_att->attrs[attp->attnum - 1] =
488                         (Form_pg_attribute) MemoryContextAlloc(CacheMemoryContext,
489                                                                                                    ATTRIBUTE_TUPLE_SIZE);
490
491                 memcpy((char *) (relation->rd_att->attrs[attp->attnum - 1]),
492                            (char *) attp,
493                            ATTRIBUTE_TUPLE_SIZE);
494
495                 /* Update constraint/default info */
496                 if (attp->attnotnull)
497                         constr->has_not_null = true;
498
499                 if (attp->atthasdef)
500                 {
501                         if (attrdef == NULL)
502                                 attrdef = (AttrDefault *)
503                                         MemoryContextAllocZero(CacheMemoryContext,
504                                                                                    relation->rd_rel->relnatts *
505                                                                                    sizeof(AttrDefault));
506                         attrdef[ndef].adnum = attp->attnum;
507                         attrdef[ndef].adbin = NULL;
508                         ndef++;
509                 }
510                 need--;
511                 if (need == 0)
512                         break;
513         }
514
515         /*
516          * end the scan and close the attribute relation
517          */
518         systable_endscan(pg_attribute_scan);
519         heap_close(pg_attribute_desc, AccessShareLock);
520
521         if (need != 0)
522                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
523                          need, RelationGetRelid(relation));
524
525         /*
526          * The attcacheoff values we read from pg_attribute should all be -1
527          * ("unknown").  Verify this if assert checking is on.  They will be
528          * computed when and if needed during tuple access.
529          */
530 #ifdef USE_ASSERT_CHECKING
531         {
532                 int                     i;
533
534                 for (i = 0; i < relation->rd_rel->relnatts; i++)
535                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
536         }
537 #endif
538
539         /*
540          * However, we can easily set the attcacheoff value for the first
541          * attribute: it must be zero.  This eliminates the need for special
542          * cases for attnum=1 that used to exist in fastgetattr() and
543          * index_getattr().
544          */
545         if (relation->rd_rel->relnatts > 0)
546                 relation->rd_att->attrs[0]->attcacheoff = 0;
547
548         /*
549          * Set up constraint/default info
550          */
551         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
552         {
553                 relation->rd_att->constr = constr;
554
555                 if (ndef > 0)                   /* DEFAULTs */
556                 {
557                         if (ndef < relation->rd_rel->relnatts)
558                                 constr->defval = (AttrDefault *)
559                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
560                         else
561                                 constr->defval = attrdef;
562                         constr->num_defval = ndef;
563                         AttrDefaultFetch(relation);
564                 }
565                 else
566                         constr->num_defval = 0;
567
568                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
569                 {
570                         constr->num_check = relation->rd_rel->relchecks;
571                         constr->check = (ConstrCheck *)
572                                 MemoryContextAllocZero(CacheMemoryContext,
573                                                                         constr->num_check * sizeof(ConstrCheck));
574                         CheckConstraintFetch(relation);
575                 }
576                 else
577                         constr->num_check = 0;
578         }
579         else
580         {
581                 pfree(constr);
582                 relation->rd_att->constr = NULL;
583         }
584 }
585
586 /*
587  *              RelationBuildRuleLock
588  *
589  *              Form the relation's rewrite rules from information in
590  *              the pg_rewrite system catalog.
591  *
592  * Note: The rule parsetrees are potentially very complex node structures.
593  * To allow these trees to be freed when the relcache entry is flushed,
594  * we make a private memory context to hold the RuleLock information for
595  * each relcache entry that has associated rules.  The context is used
596  * just for rule info, not for any other subsidiary data of the relcache
597  * entry, because that keeps the update logic in RelationClearRelation()
598  * manageable.  The other subsidiary data structures are simple enough
599  * to be easy to free explicitly, anyway.
600  */
601 static void
602 RelationBuildRuleLock(Relation relation)
603 {
604         MemoryContext rulescxt;
605         MemoryContext oldcxt;
606         HeapTuple       rewrite_tuple;
607         Relation        rewrite_desc;
608         TupleDesc       rewrite_tupdesc;
609         SysScanDesc rewrite_scan;
610         ScanKeyData key;
611         RuleLock   *rulelock;
612         int                     numlocks;
613         RewriteRule **rules;
614         int                     maxlocks;
615
616         /*
617          * Make the private context.  Parameters are set on the assumption
618          * that it'll probably not contain much data.
619          */
620         rulescxt = AllocSetContextCreate(CacheMemoryContext,
621                                                                          RelationGetRelationName(relation),
622                                                                          ALLOCSET_SMALL_MINSIZE,
623                                                                          ALLOCSET_SMALL_INITSIZE,
624                                                                          ALLOCSET_SMALL_MAXSIZE);
625         relation->rd_rulescxt = rulescxt;
626
627         /*
628          * allocate an array to hold the rewrite rules (the array is extended
629          * if necessary)
630          */
631         maxlocks = 4;
632         rules = (RewriteRule **)
633                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
634         numlocks = 0;
635
636         /*
637          * form a scan key
638          */
639         ScanKeyInit(&key,
640                                 Anum_pg_rewrite_ev_class,
641                                 BTEqualStrategyNumber, F_OIDEQ,
642                                 ObjectIdGetDatum(RelationGetRelid(relation)));
643
644         /*
645          * open pg_rewrite and begin a scan
646          *
647          * Note: since we scan the rules using RewriteRelRulenameIndex, we will
648          * be reading the rules in name order, except possibly during
649          * emergency-recovery operations (ie, IsIgnoringSystemIndexes). This
650          * in turn ensures that rules will be fired in name order.
651          */
652         rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
653         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
654         rewrite_scan = systable_beginscan(rewrite_desc,
655                                                                           RewriteRelRulenameIndex,
656                                                                           true, SnapshotNow,
657                                                                           1, &key);
658
659         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
660         {
661                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
662                 bool            isnull;
663                 Datum           ruleaction;
664                 Datum           rule_evqual;
665                 char       *ruleaction_str;
666                 char       *rule_evqual_str;
667                 RewriteRule *rule;
668
669                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
670                                                                                                   sizeof(RewriteRule));
671
672                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
673
674                 rule->event = rewrite_form->ev_type - '0';
675                 rule->attrno = rewrite_form->ev_attr;
676                 rule->isInstead = rewrite_form->is_instead;
677
678                 /* Must use heap_getattr to fetch ev_qual and ev_action */
679
680                 ruleaction = heap_getattr(rewrite_tuple,
681                                                                   Anum_pg_rewrite_ev_action,
682                                                                   rewrite_tupdesc,
683                                                                   &isnull);
684                 Assert(!isnull);
685                 ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
686                                                                                                                          ruleaction));
687                 oldcxt = MemoryContextSwitchTo(rulescxt);
688                 rule->actions = (List *) stringToNode(ruleaction_str);
689                 MemoryContextSwitchTo(oldcxt);
690                 pfree(ruleaction_str);
691
692                 rule_evqual = heap_getattr(rewrite_tuple,
693                                                                    Anum_pg_rewrite_ev_qual,
694                                                                    rewrite_tupdesc,
695                                                                    &isnull);
696                 Assert(!isnull);
697                 rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
698                                                                                                                    rule_evqual));
699                 oldcxt = MemoryContextSwitchTo(rulescxt);
700                 rule->qual = (Node *) stringToNode(rule_evqual_str);
701                 MemoryContextSwitchTo(oldcxt);
702                 pfree(rule_evqual_str);
703
704                 if (numlocks >= maxlocks)
705                 {
706                         maxlocks *= 2;
707                         rules = (RewriteRule **)
708                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
709                 }
710                 rules[numlocks++] = rule;
711         }
712
713         /*
714          * end the scan and close the attribute relation
715          */
716         systable_endscan(rewrite_scan);
717         heap_close(rewrite_desc, AccessShareLock);
718
719         /*
720          * form a RuleLock and insert into relation
721          */
722         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
723         rulelock->numLocks = numlocks;
724         rulelock->rules = rules;
725
726         relation->rd_rules = rulelock;
727 }
728
729 /*
730  *              equalRuleLocks
731  *
732  *              Determine whether two RuleLocks are equivalent
733  *
734  *              Probably this should be in the rules code someplace...
735  */
736 static bool
737 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
738 {
739         int                     i;
740
741         /*
742          * As of 7.3 we assume the rule ordering is repeatable, because
743          * RelationBuildRuleLock should read 'em in a consistent order.  So
744          * just compare corresponding slots.
745          */
746         if (rlock1 != NULL)
747         {
748                 if (rlock2 == NULL)
749                         return false;
750                 if (rlock1->numLocks != rlock2->numLocks)
751                         return false;
752                 for (i = 0; i < rlock1->numLocks; i++)
753                 {
754                         RewriteRule *rule1 = rlock1->rules[i];
755                         RewriteRule *rule2 = rlock2->rules[i];
756
757                         if (rule1->ruleId != rule2->ruleId)
758                                 return false;
759                         if (rule1->event != rule2->event)
760                                 return false;
761                         if (rule1->attrno != rule2->attrno)
762                                 return false;
763                         if (rule1->isInstead != rule2->isInstead)
764                                 return false;
765                         if (!equal(rule1->qual, rule2->qual))
766                                 return false;
767                         if (!equal(rule1->actions, rule2->actions))
768                                 return false;
769                 }
770         }
771         else if (rlock2 != NULL)
772                 return false;
773         return true;
774 }
775
776
777 /* ----------------------------------
778  *              RelationBuildDesc
779  *
780  *              Build a relation descriptor --- either a new one, or by
781  *              recycling the given old relation object.  The latter case
782  *              supports rebuilding a relcache entry without invalidating
783  *              pointers to it.
784  * --------------------------------
785  */
786 static Relation
787 RelationBuildDesc(RelationBuildDescInfo buildinfo,
788                                   Relation oldrelation)
789 {
790         Relation        relation;
791         Oid                     relid;
792         HeapTuple       pg_class_tuple;
793         Form_pg_class relp;
794         MemoryContext oldcxt;
795
796         /*
797          * find the tuple in pg_class corresponding to the given relation id
798          */
799         pg_class_tuple = ScanPgRelation(buildinfo, true);
800
801         /*
802          * if no such tuple exists, return NULL
803          */
804         if (!HeapTupleIsValid(pg_class_tuple))
805                 return NULL;
806
807         /*
808          * get information from the pg_class_tuple
809          */
810         relid = HeapTupleGetOid(pg_class_tuple);
811         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
812
813         /*
814          * allocate storage for the relation descriptor, and copy
815          * pg_class_tuple to relation->rd_rel.
816          */
817         relation = AllocateRelationDesc(oldrelation, relp);
818
819         /*
820          * now we can free the memory allocated for pg_class_tuple
821          */
822         heap_freetuple(pg_class_tuple);
823
824         /*
825          * initialize the relation's relation id (relation->rd_id)
826          */
827         RelationGetRelid(relation) = relid;
828
829         /*
830          * initialize relation->rd_refcnt
831          */
832         RelationSetReferenceCount(relation, 1);
833
834         /*
835          * normal relations are not nailed into the cache; nor can a
836          * pre-existing relation be new.  It could be temp though.      (Actually,
837          * it could be new too, but it's okay to forget that fact if forced to
838          * flush the entry.)
839          */
840         relation->rd_isnailed = 0;
841         relation->rd_isnew = false;
842         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
843
844         /*
845          * initialize the tuple descriptor (relation->rd_att).
846          */
847         RelationBuildTupleDesc(buildinfo, relation);
848
849         /*
850          * Fetch rules and triggers that affect this relation
851          */
852         if (relation->rd_rel->relhasrules)
853                 RelationBuildRuleLock(relation);
854         else
855         {
856                 relation->rd_rules = NULL;
857                 relation->rd_rulescxt = NULL;
858         }
859
860         if (relation->rd_rel->reltriggers > 0)
861                 RelationBuildTriggers(relation);
862         else
863                 relation->trigdesc = NULL;
864
865         /*
866          * if it's an index, initialize index-related information
867          */
868         if (OidIsValid(relation->rd_rel->relam))
869                 RelationInitIndexAccessInfo(relation);
870
871         /*
872          * initialize the relation lock manager information
873          */
874         RelationInitLockInfo(relation);         /* see lmgr.c */
875
876         if (relation->rd_rel->relisshared)
877                 relation->rd_node.tblNode = InvalidOid;
878         else
879                 relation->rd_node.tblNode = MyDatabaseId;
880         relation->rd_node.relNode = relation->rd_rel->relfilenode;
881
882         /* make sure relation is marked as having no open file yet */
883         relation->rd_smgr = NULL;
884
885         /*
886          * Insert newly created relation into relcache hash tables.
887          */
888         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
889         RelationCacheInsert(relation);
890         MemoryContextSwitchTo(oldcxt);
891
892         return relation;
893 }
894
895 /*
896  * Initialize index-access-method support data for an index relation
897  */
898 void
899 RelationInitIndexAccessInfo(Relation relation)
900 {
901         HeapTuple       tuple;
902         Form_pg_am      aform;
903         MemoryContext indexcxt;
904         MemoryContext oldcontext;
905         Oid                *operator;
906         RegProcedure *support;
907         FmgrInfo   *supportinfo;
908         int                     natts;
909         uint16          amstrategies;
910         uint16          amsupport;
911
912         /*
913          * Make a copy of the pg_index entry for the index.  Since pg_index
914          * contains variable-length and possibly-null fields, we have to do
915          * this honestly rather than just treating it as a Form_pg_index
916          * struct.
917          */
918         tuple = SearchSysCache(INDEXRELID,
919                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
920                                                    0, 0, 0);
921         if (!HeapTupleIsValid(tuple))
922                 elog(ERROR, "cache lookup failed for index %u",
923                          RelationGetRelid(relation));
924         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
925         relation->rd_indextuple = heap_copytuple(tuple);
926         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
927         MemoryContextSwitchTo(oldcontext);
928         ReleaseSysCache(tuple);
929
930         /*
931          * Make a copy of the pg_am entry for the index's access method
932          */
933         tuple = SearchSysCache(AMOID,
934                                                    ObjectIdGetDatum(relation->rd_rel->relam),
935                                                    0, 0, 0);
936         if (!HeapTupleIsValid(tuple))
937                 elog(ERROR, "cache lookup failed for access method %u",
938                          relation->rd_rel->relam);
939         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
940         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
941         ReleaseSysCache(tuple);
942         relation->rd_am = aform;
943
944         natts = relation->rd_rel->relnatts;
945         if (natts != relation->rd_index->indnatts)
946                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
947                          RelationGetRelid(relation));
948         amstrategies = aform->amstrategies;
949         amsupport = aform->amsupport;
950
951         /*
952          * Make the private context to hold index access info.  The reason we
953          * need a context, and not just a couple of pallocs, is so that we
954          * won't leak any subsidiary info attached to fmgr lookup records.
955          *
956          * Context parameters are set on the assumption that it'll probably not
957          * contain much data.
958          */
959         indexcxt = AllocSetContextCreate(CacheMemoryContext,
960                                                                          RelationGetRelationName(relation),
961                                                                          ALLOCSET_SMALL_MINSIZE,
962                                                                          ALLOCSET_SMALL_INITSIZE,
963                                                                          ALLOCSET_SMALL_MAXSIZE);
964         relation->rd_indexcxt = indexcxt;
965
966         /*
967          * Allocate arrays to hold data
968          */
969         if (amstrategies > 0)
970                 operator = (Oid *)
971                         MemoryContextAllocZero(indexcxt,
972                                                                    natts * amstrategies * sizeof(Oid));
973         else
974                 operator = NULL;
975
976         if (amsupport > 0)
977         {
978                 int                     nsupport = natts * amsupport;
979
980                 support = (RegProcedure *)
981                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
982                 supportinfo = (FmgrInfo *)
983                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
984         }
985         else
986         {
987                 support = NULL;
988                 supportinfo = NULL;
989         }
990
991         relation->rd_operator = operator;
992         relation->rd_support = support;
993         relation->rd_supportinfo = supportinfo;
994
995         /*
996          * Fill the operator and support procedure OID arrays.
997          * (supportinfo is left as zeroes, and is filled on-the-fly when used)
998          */
999         IndexSupportInitialize(relation->rd_index,
1000                                                    operator, support,
1001                                                    amstrategies, amsupport, natts);
1002
1003         /*
1004          * expressions and predicate cache will be filled later
1005          */
1006         relation->rd_indexprs = NIL;
1007         relation->rd_indpred = NIL;
1008 }
1009
1010 /*
1011  * IndexSupportInitialize
1012  *              Initializes an index's cached opclass information,
1013  *              given the index's pg_index tuple.
1014  *
1015  * Data is returned into *indexOperator and *indexSupport, which are arrays
1016  * allocated by the caller.
1017  *
1018  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1019  * maxAttributeNumber, since these indicate the size of the arrays
1020  * it has allocated --- but in practice these numbers must always match
1021  * those obtainable from the system catalog entries for the index and
1022  * access method.
1023  */
1024 static void
1025 IndexSupportInitialize(Form_pg_index iform,
1026                                            Oid *indexOperator,
1027                                            RegProcedure *indexSupport,
1028                                            StrategyNumber maxStrategyNumber,
1029                                            StrategyNumber maxSupportNumber,
1030                                            AttrNumber maxAttributeNumber)
1031 {
1032         int                     attIndex;
1033
1034         /*
1035          * XXX note that the following assumes the INDEX tuple is well formed
1036          * and that the *key and *class are 0 terminated.
1037          */
1038         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1039         {
1040                 OpClassCacheEnt *opcentry;
1041
1042                 if (!OidIsValid(iform->indclass[attIndex]))
1043                         elog(ERROR, "bogus pg_index tuple");
1044
1045                 /* look up the info for this opclass, using a cache */
1046                 opcentry = LookupOpclassInfo(iform->indclass[attIndex],
1047                                                                          maxStrategyNumber,
1048                                                                          maxSupportNumber);
1049
1050                 /* copy cached data into relcache entry */
1051                 if (maxStrategyNumber > 0)
1052                         memcpy(&indexOperator[attIndex * maxStrategyNumber],
1053                                    opcentry->operatorOids,
1054                                    maxStrategyNumber * sizeof(Oid));
1055                 if (maxSupportNumber > 0)
1056                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1057                                    opcentry->supportProcs,
1058                                    maxSupportNumber * sizeof(RegProcedure));
1059         }
1060 }
1061
1062 /*
1063  * LookupOpclassInfo
1064  *
1065  * This routine maintains a per-opclass cache of the information needed
1066  * by IndexSupportInitialize().  This is more efficient than relying on
1067  * the catalog cache, because we can load all the info about a particular
1068  * opclass in a single indexscan of pg_amproc or pg_amop.
1069  *
1070  * The information from pg_am about expected range of strategy and support
1071  * numbers is passed in, rather than being looked up, mainly because the
1072  * caller will have it already.
1073  *
1074  * XXX There isn't any provision for flushing the cache.  However, there
1075  * isn't any provision for flushing relcache entries when opclass info
1076  * changes, either :-(
1077  */
1078 static OpClassCacheEnt *
1079 LookupOpclassInfo(Oid operatorClassOid,
1080                                   StrategyNumber numStrats,
1081                                   StrategyNumber numSupport)
1082 {
1083         OpClassCacheEnt *opcentry;
1084         bool            found;
1085         Relation        rel;
1086         SysScanDesc scan;
1087         ScanKeyData skey[2];
1088         HeapTuple       htup;
1089         bool            indexOK;
1090
1091         if (OpClassCache == NULL)
1092         {
1093                 /* First time through: initialize the opclass cache */
1094                 HASHCTL         ctl;
1095
1096                 if (!CacheMemoryContext)
1097                         CreateCacheMemoryContext();
1098
1099                 MemSet(&ctl, 0, sizeof(ctl));
1100                 ctl.keysize = sizeof(Oid);
1101                 ctl.entrysize = sizeof(OpClassCacheEnt);
1102                 ctl.hash = tag_hash;
1103                 OpClassCache = hash_create("Operator class cache", 64,
1104                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1105         }
1106
1107         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1108                                                                                            (void *) &operatorClassOid,
1109                                                                                            HASH_ENTER, &found);
1110         if (opcentry == NULL)
1111                 ereport(ERROR,
1112                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1113                                  errmsg("out of memory")));
1114
1115         if (found && opcentry->valid)
1116         {
1117                 /* Already made an entry for it */
1118                 Assert(numStrats == opcentry->numStrats);
1119                 Assert(numSupport == opcentry->numSupport);
1120                 return opcentry;
1121         }
1122
1123         /* Need to fill in new entry */
1124         opcentry->valid = false;        /* until known OK */
1125         opcentry->numStrats = numStrats;
1126         opcentry->numSupport = numSupport;
1127
1128         if (numStrats > 0)
1129                 opcentry->operatorOids = (Oid *)
1130                         MemoryContextAllocZero(CacheMemoryContext,
1131                                                                    numStrats * sizeof(Oid));
1132         else
1133                 opcentry->operatorOids = NULL;
1134
1135         if (numSupport > 0)
1136                 opcentry->supportProcs = (RegProcedure *)
1137                         MemoryContextAllocZero(CacheMemoryContext,
1138                                                                    numSupport * sizeof(RegProcedure));
1139         else
1140                 opcentry->supportProcs = NULL;
1141
1142         /*
1143          * To avoid infinite recursion during startup, force heap scans if
1144          * we're looking up info for the opclasses used by the indexes we
1145          * would like to reference here.
1146          */
1147         indexOK = criticalRelcachesBuilt ||
1148                 (operatorClassOid != OID_BTREE_OPS_OID &&
1149                  operatorClassOid != INT2_BTREE_OPS_OID);
1150
1151         /*
1152          * Scan pg_amop to obtain operators for the opclass.  We only fetch
1153          * the default ones (those with subtype zero).
1154          */
1155         if (numStrats > 0)
1156         {
1157                 ScanKeyInit(&skey[0],
1158                                         Anum_pg_amop_amopclaid,
1159                                         BTEqualStrategyNumber, F_OIDEQ,
1160                                         ObjectIdGetDatum(operatorClassOid));
1161                 ScanKeyInit(&skey[1],
1162                                         Anum_pg_amop_amopsubtype,
1163                                         BTEqualStrategyNumber, F_OIDEQ,
1164                                         ObjectIdGetDatum(InvalidOid));
1165                 rel = heap_openr(AccessMethodOperatorRelationName,
1166                                                  AccessShareLock);
1167                 scan = systable_beginscan(rel, AccessMethodStrategyIndex, indexOK,
1168                                                                   SnapshotNow, 2, skey);
1169
1170                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1171                 {
1172                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1173
1174                         if (amopform->amopstrategy <= 0 ||
1175                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1176                                 elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1177                                          amopform->amopstrategy, operatorClassOid);
1178                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1179                                 amopform->amopopr;
1180                 }
1181
1182                 systable_endscan(scan);
1183                 heap_close(rel, AccessShareLock);
1184         }
1185
1186         /*
1187          * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1188          * the default ones (those with subtype zero).
1189          */
1190         if (numSupport > 0)
1191         {
1192                 ScanKeyInit(&skey[0],
1193                                         Anum_pg_amproc_amopclaid,
1194                                         BTEqualStrategyNumber, F_OIDEQ,
1195                                         ObjectIdGetDatum(operatorClassOid));
1196                 ScanKeyInit(&skey[1],
1197                                         Anum_pg_amproc_amprocsubtype,
1198                                         BTEqualStrategyNumber, F_OIDEQ,
1199                                         ObjectIdGetDatum(InvalidOid));
1200                 rel = heap_openr(AccessMethodProcedureRelationName,
1201                                                  AccessShareLock);
1202                 scan = systable_beginscan(rel, AccessMethodProcedureIndex, indexOK,
1203                                                                   SnapshotNow, 2, skey);
1204
1205                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1206                 {
1207                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1208
1209                         if (amprocform->amprocnum <= 0 ||
1210                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1211                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1212                                          amprocform->amprocnum, operatorClassOid);
1213
1214                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1215                                 amprocform->amproc;
1216                 }
1217
1218                 systable_endscan(scan);
1219                 heap_close(rel, AccessShareLock);
1220         }
1221
1222         opcentry->valid = true;
1223         return opcentry;
1224 }
1225
1226
1227 /*
1228  *              formrdesc
1229  *
1230  *              This is a special cut-down version of RelationBuildDesc()
1231  *              used by RelationCacheInitialize() in initializing the relcache.
1232  *              The relation descriptor is built just from the supplied parameters,
1233  *              without actually looking at any system table entries.  We cheat
1234  *              quite a lot since we only need to work for a few basic system
1235  *              catalogs.
1236  *
1237  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1238  * and pg_type (see RelationCacheInitialize).
1239  *
1240  * Note that these catalogs can't have constraints (except attnotnull),
1241  * default values, rules, or triggers, since we don't cope with any of that.
1242  *
1243  * NOTE: we assume we are already switched into CacheMemoryContext.
1244  */
1245 static void
1246 formrdesc(const char *relationName,
1247                   int natts,
1248                   FormData_pg_attribute *att)
1249 {
1250         Relation        relation;
1251         int                     i;
1252         bool            has_not_null;
1253
1254         /*
1255          * allocate new relation desc clear all fields of reldesc
1256          */
1257         relation = (Relation) palloc0(sizeof(RelationData));
1258         relation->rd_targblock = InvalidBlockNumber;
1259
1260         /* make sure relation is marked as having no open file yet */
1261         relation->rd_smgr = NULL;
1262
1263         /*
1264          * initialize reference count
1265          */
1266         RelationSetReferenceCount(relation, 1);
1267
1268         /*
1269          * all entries built with this routine are nailed-in-cache; none are
1270          * for new or temp relations.
1271          */
1272         relation->rd_isnailed = 1;
1273         relation->rd_isnew = false;
1274         relation->rd_istemp = false;
1275
1276         /*
1277          * initialize relation tuple form
1278          *
1279          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1280          * get us launched.  RelationCacheInitializePhase2() will read the
1281          * real data from pg_class and replace what we've done here.
1282          */
1283         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1284
1285         namestrcpy(&relation->rd_rel->relname, relationName);
1286         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1287
1288         /*
1289          * It's important to distinguish between shared and non-shared
1290          * relations, even at bootstrap time, to make sure we know where they
1291          * are stored.  At present, all relations that formrdesc is used for
1292          * are not shared.
1293          */
1294         relation->rd_rel->relisshared = false;
1295
1296         relation->rd_rel->relpages = 1;
1297         relation->rd_rel->reltuples = 1;
1298         relation->rd_rel->relkind = RELKIND_RELATION;
1299         relation->rd_rel->relhasoids = true;
1300         relation->rd_rel->relnatts = (int16) natts;
1301
1302         /*
1303          * initialize attribute tuple form
1304          *
1305          * Unlike the case with the relation tuple, this data had better be right
1306          * because it will never be replaced.  The input values must be
1307          * correctly defined by macros in src/include/catalog/ headers.
1308          *
1309          * Note however that rd_att's tdtypeid, tdtypmod, tdhasoid fields are
1310          * not right at this point.  They will be fixed later when the real
1311          * pg_class row is loaded.
1312          */
1313         relation->rd_att = CreateTemplateTupleDesc(natts, false);
1314
1315         /*
1316          * initialize tuple desc info
1317          */
1318         has_not_null = false;
1319         for (i = 0; i < natts; i++)
1320         {
1321                 relation->rd_att->attrs[i] = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
1322                 memcpy((char *) relation->rd_att->attrs[i],
1323                            (char *) &att[i],
1324                            ATTRIBUTE_TUPLE_SIZE);
1325                 has_not_null |= att[i].attnotnull;
1326                 /* make sure attcacheoff is valid */
1327                 relation->rd_att->attrs[i]->attcacheoff = -1;
1328         }
1329
1330         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1331         relation->rd_att->attrs[0]->attcacheoff = 0;
1332
1333         /* mark not-null status */
1334         if (has_not_null)
1335         {
1336                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1337
1338                 constr->has_not_null = true;
1339                 relation->rd_att->constr = constr;
1340         }
1341
1342         /*
1343          * initialize relation id from info in att array (my, this is ugly)
1344          */
1345         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1346
1347         /*
1348          * initialize the relation's lock manager and RelFileNode information
1349          */
1350         RelationInitLockInfo(relation);         /* see lmgr.c */
1351
1352         if (relation->rd_rel->relisshared)
1353                 relation->rd_node.tblNode = InvalidOid;
1354         else
1355                 relation->rd_node.tblNode = MyDatabaseId;
1356         relation->rd_node.relNode =
1357                 relation->rd_rel->relfilenode = RelationGetRelid(relation);
1358
1359         /*
1360          * initialize the rel-has-index flag, using hardwired knowledge
1361          */
1362         relation->rd_rel->relhasindex = false;
1363
1364         /* In bootstrap mode, we have no indexes */
1365         if (!IsBootstrapProcessingMode())
1366         {
1367                 /* Otherwise, all the rels formrdesc is used for have indexes */
1368                 relation->rd_rel->relhasindex = true;
1369         }
1370
1371         /*
1372          * add new reldesc to relcache
1373          */
1374         RelationCacheInsert(relation);
1375 }
1376
1377
1378 /* ----------------------------------------------------------------
1379  *                               Relation Descriptor Lookup Interface
1380  * ----------------------------------------------------------------
1381  */
1382
1383 /*
1384  *              RelationIdCacheGetRelation
1385  *
1386  *              Lookup an existing reldesc by OID.
1387  *
1388  *              Only try to get the reldesc by looking in the cache,
1389  *              do not go to the disk if it's not present.
1390  *
1391  *              NB: relation ref count is incremented if successful.
1392  *              Caller should eventually decrement count.  (Usually,
1393  *              that happens by calling RelationClose().)
1394  */
1395 Relation
1396 RelationIdCacheGetRelation(Oid relationId)
1397 {
1398         Relation        rd;
1399
1400         RelationIdCacheLookup(relationId, rd);
1401
1402         if (RelationIsValid(rd))
1403         {
1404                 RelationIncrementReferenceCount(rd);
1405                 /* revalidate nailed index if necessary */
1406                 if (rd->rd_isnailed == 2)
1407                         RelationReloadClassinfo(rd);
1408         }
1409
1410         return rd;
1411 }
1412
1413 /*
1414  *              RelationSysNameCacheGetRelation
1415  *
1416  *              As above, but lookup by name; only works for system catalogs.
1417  */
1418 static Relation
1419 RelationSysNameCacheGetRelation(const char *relationName)
1420 {
1421         Relation        rd;
1422         NameData        name;
1423
1424         /*
1425          * make sure that the name key used for hash lookup is properly
1426          * null-padded
1427          */
1428         namestrcpy(&name, relationName);
1429         RelationSysNameCacheLookup(NameStr(name), rd);
1430
1431         if (RelationIsValid(rd))
1432         {
1433                 RelationIncrementReferenceCount(rd);
1434                 /* revalidate nailed index if necessary */
1435                 if (rd->rd_isnailed == 2)
1436                         RelationReloadClassinfo(rd);
1437         }
1438
1439         return rd;
1440 }
1441
1442 /*
1443  *              RelationIdGetRelation
1444  *
1445  *              Lookup a reldesc by OID; make one if not already in cache.
1446  *
1447  *              NB: relation ref count is incremented, or set to 1 if new entry.
1448  *              Caller should eventually decrement count.  (Usually,
1449  *              that happens by calling RelationClose().)
1450  */
1451 Relation
1452 RelationIdGetRelation(Oid relationId)
1453 {
1454         Relation        rd;
1455         RelationBuildDescInfo buildinfo;
1456
1457         /*
1458          * first try and get a reldesc from the cache
1459          */
1460         rd = RelationIdCacheGetRelation(relationId);
1461         if (RelationIsValid(rd))
1462                 return rd;
1463
1464         /*
1465          * no reldesc in the cache, so have RelationBuildDesc() build one and
1466          * add it.
1467          */
1468         buildinfo.infotype = INFO_RELID;
1469         buildinfo.i.info_id = relationId;
1470
1471         rd = RelationBuildDesc(buildinfo, NULL);
1472         return rd;
1473 }
1474
1475 /*
1476  *              RelationSysNameGetRelation
1477  *
1478  *              As above, but lookup by name; only works for system catalogs.
1479  */
1480 Relation
1481 RelationSysNameGetRelation(const char *relationName)
1482 {
1483         Relation        rd;
1484         RelationBuildDescInfo buildinfo;
1485
1486         /*
1487          * first try and get a reldesc from the cache
1488          */
1489         rd = RelationSysNameCacheGetRelation(relationName);
1490         if (RelationIsValid(rd))
1491                 return rd;
1492
1493         /*
1494          * no reldesc in the cache, so have RelationBuildDesc() build one and
1495          * add it.
1496          */
1497         buildinfo.infotype = INFO_RELNAME;
1498         buildinfo.i.info_name = (char *) relationName;
1499
1500         rd = RelationBuildDesc(buildinfo, NULL);
1501         return rd;
1502 }
1503
1504 /* ----------------------------------------------------------------
1505  *                              cache invalidation support routines
1506  * ----------------------------------------------------------------
1507  */
1508
1509 /*
1510  * RelationClose - close an open relation
1511  *
1512  *      Actually, we just decrement the refcount.
1513  *
1514  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1515  *      will be freed as soon as their refcount goes to zero.  In combination
1516  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1517  *      to catch references to already-released relcache entries.  It slows
1518  *      things down quite a bit, however.
1519  */
1520 void
1521 RelationClose(Relation relation)
1522 {
1523         /* Note: no locking manipulations needed */
1524         RelationDecrementReferenceCount(relation);
1525
1526 #ifdef RELCACHE_FORCE_RELEASE
1527         if (RelationHasReferenceCountZero(relation) &&
1528                 !relation->rd_isnew)
1529                 RelationClearRelation(relation, false);
1530 #endif
1531 }
1532
1533 /*
1534  * RelationReloadClassinfo - reload the pg_class row (only)
1535  *
1536  *      This function is used only for nailed indexes.  Since a REINDEX can
1537  *      change the relfilenode value for a nailed index, we have to reread
1538  *      the pg_class row anytime we get an SI invalidation on a nailed index
1539  *      (without throwing away the whole relcache entry, since we'd be unable
1540  *      to rebuild it).
1541  *
1542  *      We can't necessarily reread the pg_class row right away; we might be
1543  *      in a failed transaction when we receive the SI notification.  If so,
1544  *      RelationClearRelation just marks the entry as invalid by setting
1545  *      rd_isnailed to 2.  This routine is called to fix the entry when it
1546  *      is next needed.
1547  */
1548 static void
1549 RelationReloadClassinfo(Relation relation)
1550 {
1551         RelationBuildDescInfo buildinfo;
1552         bool            indexOK;
1553         HeapTuple       pg_class_tuple;
1554         Form_pg_class relp;
1555
1556         /* Should be called only for invalidated nailed indexes */
1557         Assert(relation->rd_isnailed == 2 &&
1558                    relation->rd_rel->relkind == RELKIND_INDEX);
1559         /* Read the pg_class row */
1560         buildinfo.infotype = INFO_RELID;
1561         buildinfo.i.info_id = relation->rd_id;
1562         /*
1563          * Don't try to use an indexscan of pg_class_oid_index to reload the
1564          * info for pg_class_oid_index ...
1565          */
1566         indexOK = strcmp(RelationGetRelationName(relation), ClassOidIndex) != 0;
1567         pg_class_tuple = ScanPgRelation(buildinfo, indexOK);
1568         if (!HeapTupleIsValid(pg_class_tuple))
1569                 elog(ERROR, "could not find tuple for system relation %u",
1570                          relation->rd_id);
1571         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1572         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
1573         relation->rd_node.relNode = relp->relfilenode;
1574         heap_freetuple(pg_class_tuple);
1575         relation->rd_targblock = InvalidBlockNumber;
1576         /* Okay, now it's valid again */
1577         relation->rd_isnailed = 1;
1578 }
1579
1580 /*
1581  * RelationClearRelation
1582  *
1583  *       Physically blow away a relation cache entry, or reset it and rebuild
1584  *       it from scratch (that is, from catalog entries).  The latter path is
1585  *       usually used when we are notified of a change to an open relation
1586  *       (one with refcount > 0).  However, this routine just does whichever
1587  *       it's told to do; callers must determine which they want.
1588  */
1589 static void
1590 RelationClearRelation(Relation relation, bool rebuild)
1591 {
1592         Oid                     old_reltype = relation->rd_rel->reltype;
1593         MemoryContext oldcxt;
1594
1595         /*
1596          * Make sure smgr and lower levels close the relation's files, if they
1597          * weren't closed already.  If the relation is not getting deleted,
1598          * the next smgr access should reopen the files automatically.  This
1599          * ensures that the low-level file access state is updated after, say,
1600          * a vacuum truncation.
1601          */
1602         if (relation->rd_smgr)
1603         {
1604                 smgrclose(relation->rd_smgr);
1605                 relation->rd_smgr = NULL;
1606         }
1607
1608         /*
1609          * Never, never ever blow away a nailed-in system relation, because
1610          * we'd be unable to recover.  However, we must reset rd_targblock, in
1611          * case we got called because of a relation cache flush that was triggered
1612          * by VACUUM.
1613          *
1614          * If it's a nailed index, then we need to re-read the pg_class row to see
1615          * if its relfilenode changed.  We can't necessarily do that here, because
1616          * we might be in a failed transaction.  We assume it's okay to do it if
1617          * there are open references to the relcache entry (cf notes for
1618          * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
1619          * invalid, and it'll be fixed when next opened.
1620          */
1621         if (relation->rd_isnailed)
1622         {
1623                 relation->rd_targblock = InvalidBlockNumber;
1624                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1625                 {
1626                         relation->rd_isnailed = 2;      /* needs to be revalidated */
1627                         if (relation->rd_refcnt > 1)
1628                                 RelationReloadClassinfo(relation);
1629                 }
1630                 return;
1631         }
1632
1633         /*
1634          * Remove relation from hash tables
1635          *
1636          * Note: we might be reinserting it momentarily, but we must not have it
1637          * visible in the hash tables until it's valid again, so don't try to
1638          * optimize this away...
1639          */
1640         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1641         RelationCacheDelete(relation);
1642         MemoryContextSwitchTo(oldcxt);
1643
1644         /* Clear out catcache's entries for this relation */
1645         CatalogCacheFlushRelation(RelationGetRelid(relation));
1646
1647         /*
1648          * Free all the subsidiary data structures of the relcache entry. We
1649          * cannot free rd_att if we are trying to rebuild the entry, however,
1650          * because pointers to it may be cached in various places. The rule
1651          * manager might also have pointers into the rewrite rules. So to
1652          * begin with, we can only get rid of these fields:
1653          */
1654         FreeTriggerDesc(relation->trigdesc);
1655         if (relation->rd_indextuple)
1656                 pfree(relation->rd_indextuple);
1657         if (relation->rd_am)
1658                 pfree(relation->rd_am);
1659         if (relation->rd_rel)
1660                 pfree(relation->rd_rel);
1661         freeList(relation->rd_indexlist);
1662         if (relation->rd_indexcxt)
1663                 MemoryContextDelete(relation->rd_indexcxt);
1664
1665         /*
1666          * If we're really done with the relcache entry, blow it away. But if
1667          * someone is still using it, reconstruct the whole deal without
1668          * moving the physical RelationData record (so that the someone's
1669          * pointer is still valid).
1670          */
1671         if (!rebuild)
1672         {
1673                 /* ok to zap remaining substructure */
1674                 flush_rowtype_cache(old_reltype);
1675                 FreeTupleDesc(relation->rd_att);
1676                 if (relation->rd_rulescxt)
1677                         MemoryContextDelete(relation->rd_rulescxt);
1678                 pfree(relation);
1679         }
1680         else
1681         {
1682                 /*
1683                  * When rebuilding an open relcache entry, must preserve ref count
1684                  * and rd_isnew flag.  Also attempt to preserve the tupledesc and
1685                  * rewrite-rule substructures in place.
1686                  */
1687                 int                     old_refcnt = relation->rd_refcnt;
1688                 bool            old_isnew = relation->rd_isnew;
1689                 TupleDesc       old_att = relation->rd_att;
1690                 RuleLock   *old_rules = relation->rd_rules;
1691                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1692                 RelationBuildDescInfo buildinfo;
1693
1694                 buildinfo.infotype = INFO_RELID;
1695                 buildinfo.i.info_id = RelationGetRelid(relation);
1696
1697                 if (RelationBuildDesc(buildinfo, relation) != relation)
1698                 {
1699                         /* Should only get here if relation was deleted */
1700                         flush_rowtype_cache(old_reltype);
1701                         FreeTupleDesc(old_att);
1702                         if (old_rulescxt)
1703                                 MemoryContextDelete(old_rulescxt);
1704                         pfree(relation);
1705                         elog(ERROR, "relation %u deleted while still in use",
1706                                  buildinfo.i.info_id);
1707                 }
1708                 RelationSetReferenceCount(relation, old_refcnt);
1709                 relation->rd_isnew = old_isnew;
1710                 if (equalTupleDescs(old_att, relation->rd_att))
1711                 {
1712                         /* needn't flush typcache here */
1713                         FreeTupleDesc(relation->rd_att);
1714                         relation->rd_att = old_att;
1715                 }
1716                 else
1717                 {
1718                         flush_rowtype_cache(old_reltype);
1719                         FreeTupleDesc(old_att);
1720                 }
1721                 if (equalRuleLocks(old_rules, relation->rd_rules))
1722                 {
1723                         if (relation->rd_rulescxt)
1724                                 MemoryContextDelete(relation->rd_rulescxt);
1725                         relation->rd_rules = old_rules;
1726                         relation->rd_rulescxt = old_rulescxt;
1727                 }
1728                 else
1729                 {
1730                         if (old_rulescxt)
1731                                 MemoryContextDelete(old_rulescxt);
1732                 }
1733         }
1734 }
1735
1736 /*
1737  * RelationFlushRelation
1738  *
1739  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1740  */
1741 static void
1742 RelationFlushRelation(Relation relation)
1743 {
1744         bool            rebuild;
1745
1746         if (relation->rd_isnew)
1747         {
1748                 /*
1749                  * New relcache entries are always rebuilt, not flushed; else we'd
1750                  * forget the "new" status of the relation, which is a useful
1751                  * optimization to have.
1752                  */
1753                 rebuild = true;
1754         }
1755         else
1756         {
1757                 /*
1758                  * Pre-existing rels can be dropped from the relcache if not open.
1759                  */
1760                 rebuild = !RelationHasReferenceCountZero(relation);
1761         }
1762
1763         RelationClearRelation(relation, rebuild);
1764 }
1765
1766 /*
1767  * RelationForgetRelation - unconditionally remove a relcache entry
1768  *
1769  *                 External interface for destroying a relcache entry when we
1770  *                 drop the relation.
1771  */
1772 void
1773 RelationForgetRelation(Oid rid)
1774 {
1775         Relation        relation;
1776
1777         RelationIdCacheLookup(rid, relation);
1778
1779         if (!PointerIsValid(relation))
1780                 return;                                 /* not in cache, nothing to do */
1781
1782         if (!RelationHasReferenceCountZero(relation))
1783                 elog(ERROR, "relation %u is still open", rid);
1784
1785         /* Unconditionally destroy the relcache entry */
1786         RelationClearRelation(relation, false);
1787 }
1788
1789 /*
1790  *              RelationCacheInvalidateEntry
1791  *
1792  *              This routine is invoked for SI cache flush messages.
1793  *
1794  * Any relcache entry matching the relid must be flushed.  (Note: caller has
1795  * already determined that the relid belongs to our database or is a shared
1796  * relation.)  If rnode isn't NULL, we must also ensure that any smgr cache
1797  * entry matching that rnode is flushed.
1798  *
1799  * Ordinarily, if rnode is supplied then it will match the relfilenode of
1800  * the target relid.  However, it's possible for rnode to be different if
1801  * someone is engaged in a relfilenode change.  In that case we want to
1802  * make sure we clear the right cache entries.  This has to be done here
1803  * to keep things in sync between relcache and smgr cache --- we can't have
1804  * someone flushing an smgr cache entry that a relcache entry still points
1805  * to.
1806  *
1807  * We used to skip local relations, on the grounds that they could
1808  * not be targets of cross-backend SI update messages; but it seems
1809  * safer to process them, so that our *own* SI update messages will
1810  * have the same effects during CommandCounterIncrement for both
1811  * local and nonlocal relations.
1812  */
1813 void
1814 RelationCacheInvalidateEntry(Oid relationId, RelFileNode *rnode)
1815 {
1816         Relation        relation;
1817
1818         RelationIdCacheLookup(relationId, relation);
1819
1820         if (PointerIsValid(relation))
1821         {
1822                 relcacheInvalsReceived++;
1823                 if (rnode)
1824                 {
1825                         /* Need to be sure smgr is flushed, but don't do it twice */
1826                         if (relation->rd_smgr == NULL ||
1827                                 !RelFileNodeEquals(*rnode, relation->rd_node))
1828                                 smgrclosenode(*rnode);
1829                 }
1830                 RelationFlushRelation(relation);
1831         }
1832         else
1833         {
1834                 if (rnode)
1835                         smgrclosenode(*rnode);
1836         }
1837 }
1838
1839 /*
1840  * RelationCacheInvalidate
1841  *       Blow away cached relation descriptors that have zero reference counts,
1842  *       and rebuild those with positive reference counts.  Also reset the smgr
1843  *       relation cache.
1844  *
1845  *       This is currently used only to recover from SI message buffer overflow,
1846  *       so we do not touch new-in-transaction relations; they cannot be targets
1847  *       of cross-backend SI updates (and our own updates now go through a
1848  *       separate linked list that isn't limited by the SI message buffer size).
1849  *
1850  *       We do this in two phases: the first pass deletes deletable items, and
1851  *       the second one rebuilds the rebuildable items.  This is essential for
1852  *       safety, because hash_seq_search only copes with concurrent deletion of
1853  *       the element it is currently visiting.  If a second SI overflow were to
1854  *       occur while we are walking the table, resulting in recursive entry to
1855  *       this routine, we could crash because the inner invocation blows away
1856  *       the entry next to be visited by the outer scan.  But this way is OK,
1857  *       because (a) during the first pass we won't process any more SI messages,
1858  *       so hash_seq_search will complete safely; (b) during the second pass we
1859  *       only hold onto pointers to nondeletable entries.
1860  *
1861  *       The two-phase approach also makes it easy to ensure that we process
1862  *       nailed-in-cache indexes before other nondeletable items, and that we
1863  *       process pg_class_oid_index first of all.  In scenarios where a nailed
1864  *       index has been given a new relfilenode, we have to detect that update
1865  *       before the nailed index is used in reloading any other relcache entry.
1866  */
1867 void
1868 RelationCacheInvalidate(void)
1869 {
1870         HASH_SEQ_STATUS status;
1871         RelIdCacheEnt *idhentry;
1872         Relation        relation;
1873         List       *rebuildFirstList = NIL;
1874         List       *rebuildList = NIL;
1875         ListCell   *l;
1876
1877         /* Phase 1 */
1878         hash_seq_init(&status, RelationIdCache);
1879
1880         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1881         {
1882                 relation = idhentry->reldesc;
1883
1884                 /* Must close all smgr references to avoid leaving dangling ptrs */
1885                 if (relation->rd_smgr)
1886                 {
1887                         smgrclose(relation->rd_smgr);
1888                         relation->rd_smgr = NULL;
1889                 }
1890
1891                 /* Ignore new relations, since they are never SI targets */
1892                 if (relation->rd_isnew)
1893                         continue;
1894
1895                 relcacheInvalsReceived++;
1896
1897                 if (RelationHasReferenceCountZero(relation))
1898                 {
1899                         /* Delete this entry immediately */
1900                         Assert(!relation->rd_isnailed);
1901                         RelationClearRelation(relation, false);
1902                 }
1903                 else
1904                 {
1905                         /*
1906                          * Add this entry to list of stuff to rebuild in second pass.
1907                          * pg_class_oid_index goes on the front of rebuildFirstList,
1908                          * other nailed indexes on the back, and everything else into
1909                          * rebuildList (in no particular order).
1910                          */
1911                         if (relation->rd_isnailed &&
1912                                 relation->rd_rel->relkind == RELKIND_INDEX)
1913                         {
1914                                 if (strcmp(RelationGetRelationName(relation),
1915                                                    ClassOidIndex) == 0)
1916                                         rebuildFirstList = lcons(relation, rebuildFirstList);
1917                                 else
1918                                         rebuildFirstList = lappend(rebuildFirstList, relation);
1919                         }
1920                         else
1921                                 rebuildList = lcons(relation, rebuildList);
1922                 }
1923         }
1924
1925         rebuildList = nconc(rebuildFirstList, rebuildList);
1926
1927         /*
1928          * Now zap any remaining smgr cache entries.  This must happen before
1929          * we start to rebuild entries, since that may involve catalog fetches
1930          * which will re-open catalog files.
1931          */
1932         smgrcloseall();
1933
1934         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1935         foreach(l, rebuildList)
1936         {
1937                 relation = (Relation) lfirst(l);
1938                 RelationClearRelation(relation, true);
1939         }
1940         freeList(rebuildList);
1941 }
1942
1943 /*
1944  * AtEOXact_RelationCache
1945  *
1946  *      Clean up the relcache at transaction commit or abort.
1947  *
1948  * Note: this must be called *before* processing invalidation messages.
1949  * In the case of abort, we don't want to try to rebuild any invalidated
1950  * cache entries (since we can't safely do database accesses).  Therefore
1951  * we must reset refcnts before handling pending invalidations.
1952  */
1953 void
1954 AtEOXact_RelationCache(bool commit)
1955 {
1956         HASH_SEQ_STATUS status;
1957         RelIdCacheEnt *idhentry;
1958
1959         hash_seq_init(&status, RelationIdCache);
1960
1961         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1962         {
1963                 Relation        relation = idhentry->reldesc;
1964                 int                     expected_refcnt;
1965
1966                 /*
1967                  * Is it a relation created in the current transaction?
1968                  *
1969                  * During commit, reset the flag to false, since we are now out of
1970                  * the creating transaction.  During abort, simply delete the
1971                  * relcache entry --- it isn't interesting any longer.  (NOTE: if
1972                  * we have forgotten the isnew state of a new relation due to a
1973                  * forced cache flush, the entry will get deleted anyway by
1974                  * shared-cache-inval processing of the aborted pg_class
1975                  * insertion.)
1976                  */
1977                 if (relation->rd_isnew)
1978                 {
1979                         if (commit)
1980                                 relation->rd_isnew = false;
1981                         else
1982                         {
1983                                 RelationClearRelation(relation, false);
1984                                 continue;
1985                         }
1986                 }
1987
1988                 /*
1989                  * During transaction abort, we must also reset relcache entry ref
1990                  * counts to their normal not-in-a-transaction state.  A ref count
1991                  * may be too high because some routine was exited by ereport()
1992                  * between incrementing and decrementing the count.
1993                  *
1994                  * During commit, we should not have to do this, but it's still
1995                  * useful to check that the counts are correct to catch missed
1996                  * relcache closes.
1997                  *
1998                  * In bootstrap mode, do NOT reset the refcnt nor complain that it's
1999                  * nonzero --- the bootstrap code expects relations to stay open
2000                  * across start/commit transaction calls.  (That seems bogus, but
2001                  * it's not worth fixing.)
2002                  */
2003                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
2004
2005                 if (commit)
2006                 {
2007                         if (relation->rd_refcnt != expected_refcnt &&
2008                                 !IsBootstrapProcessingMode())
2009                         {
2010                                 elog(WARNING, "relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
2011                                          RelationGetRelationName(relation),
2012                                          relation->rd_refcnt, expected_refcnt);
2013                                 RelationSetReferenceCount(relation, expected_refcnt);
2014                         }
2015                 }
2016                 else
2017                 {
2018                         /* abort case, just reset it quietly */
2019                         RelationSetReferenceCount(relation, expected_refcnt);
2020                 }
2021
2022                 /*
2023                  * Flush any temporary index list.
2024                  */
2025                 if (relation->rd_indexvalid == 2)
2026                 {
2027                         freeList(relation->rd_indexlist);
2028                         relation->rd_indexlist = NIL;
2029                         relation->rd_indexvalid = 0;
2030                 }
2031         }
2032 }
2033
2034 /*
2035  *              RelationBuildLocalRelation
2036  *                      Build a relcache entry for an about-to-be-created relation,
2037  *                      and enter it into the relcache.
2038  */
2039 Relation
2040 RelationBuildLocalRelation(const char *relname,
2041                                                    Oid relnamespace,
2042                                                    TupleDesc tupDesc,
2043                                                    Oid relid, Oid dbid,
2044                                                    RelFileNode rnode,
2045                                                    bool nailit)
2046 {
2047         Relation        rel;
2048         MemoryContext oldcxt;
2049         int                     natts = tupDesc->natts;
2050         int                     i;
2051         bool            has_not_null;
2052
2053         AssertArg(natts >= 0);
2054
2055         /*
2056          * switch to the cache context to create the relcache entry.
2057          */
2058         if (!CacheMemoryContext)
2059                 CreateCacheMemoryContext();
2060
2061         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2062
2063         /*
2064          * allocate a new relation descriptor and fill in basic state fields.
2065          */
2066         rel = (Relation) palloc0(sizeof(RelationData));
2067
2068         rel->rd_targblock = InvalidBlockNumber;
2069
2070         /* make sure relation is marked as having no open file yet */
2071         rel->rd_smgr = NULL;
2072
2073         RelationSetReferenceCount(rel, 1);
2074
2075         /* it's being created in this transaction */
2076         rel->rd_isnew = true;
2077
2078         /* is it a temporary relation? */
2079         rel->rd_istemp = isTempNamespace(relnamespace);
2080
2081         /*
2082          * nail the reldesc if this is a bootstrap create reln and we may need
2083          * it in the cache later on in the bootstrap process so we don't ever
2084          * want it kicked out.  e.g. pg_attribute!!!
2085          */
2086         if (nailit)
2087                 rel->rd_isnailed = 1;
2088
2089         /*
2090          * create a new tuple descriptor from the one passed in.  We do this
2091          * partly to copy it into the cache context, and partly because the
2092          * new relation can't have any defaults or constraints yet; they have
2093          * to be added in later steps, because they require additions to
2094          * multiple system catalogs.  We can copy attnotnull constraints here,
2095          * however.
2096          */
2097         rel->rd_att = CreateTupleDescCopy(tupDesc);
2098         has_not_null = false;
2099         for (i = 0; i < natts; i++)
2100         {
2101                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2102                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2103         }
2104
2105         if (has_not_null)
2106         {
2107                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2108
2109                 constr->has_not_null = true;
2110                 rel->rd_att->constr = constr;
2111         }
2112
2113         /*
2114          * initialize relation tuple form (caller may add/override data later)
2115          */
2116         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2117
2118         namestrcpy(&rel->rd_rel->relname, relname);
2119         rel->rd_rel->relnamespace = relnamespace;
2120
2121         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2122         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2123         rel->rd_rel->relnatts = natts;
2124         rel->rd_rel->reltype = InvalidOid;
2125
2126         /*
2127          * Insert relation physical and logical identifiers (OIDs) into the
2128          * right places.
2129          */
2130         rel->rd_rel->relisshared = (dbid == InvalidOid);
2131
2132         RelationGetRelid(rel) = relid;
2133
2134         for (i = 0; i < natts; i++)
2135                 rel->rd_att->attrs[i]->attrelid = relid;
2136
2137         rel->rd_node = rnode;
2138         rel->rd_rel->relfilenode = rnode.relNode;
2139
2140         RelationInitLockInfo(rel);      /* see lmgr.c */
2141
2142         /*
2143          * Okay to insert into the relcache hash tables.
2144          */
2145         RelationCacheInsert(rel);
2146
2147         /*
2148          * done building relcache entry.
2149          */
2150         MemoryContextSwitchTo(oldcxt);
2151
2152         return rel;
2153 }
2154
2155 /*
2156  *              RelationCacheInitialize
2157  *
2158  *              This initializes the relation descriptor cache.  At the time
2159  *              that this is invoked, we can't do database access yet (mainly
2160  *              because the transaction subsystem is not up), so we can't get
2161  *              "real" info.  However it's okay to read the pg_internal.init
2162  *              cache file, if one is available.  Otherwise we make phony
2163  *              entries for the minimum set of nailed-in-cache relations.
2164  */
2165
2166 #define INITRELCACHESIZE                400
2167
2168 void
2169 RelationCacheInitialize(void)
2170 {
2171         MemoryContext oldcxt;
2172         HASHCTL         ctl;
2173
2174         /*
2175          * switch to cache memory context
2176          */
2177         if (!CacheMemoryContext)
2178                 CreateCacheMemoryContext();
2179
2180         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2181
2182         /*
2183          * create hashtables that index the relcache
2184          */
2185         MemSet(&ctl, 0, sizeof(ctl));
2186         ctl.keysize = sizeof(NameData);
2187         ctl.entrysize = sizeof(RelNameCacheEnt);
2188         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2189                                                                            &ctl, HASH_ELEM);
2190
2191         ctl.keysize = sizeof(Oid);
2192         ctl.entrysize = sizeof(RelIdCacheEnt);
2193         ctl.hash = tag_hash;
2194         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2195                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2196
2197         /*
2198          * Try to load the relcache cache file.  If successful, we're done for
2199          * now.  Otherwise, initialize the cache with pre-made descriptors for
2200          * the critical "nailed-in" system catalogs.
2201          */
2202         if (IsBootstrapProcessingMode() ||
2203                 !load_relcache_init_file())
2204         {
2205                 formrdesc(RelationRelationName,
2206                                   Natts_pg_class, Desc_pg_class);
2207                 formrdesc(AttributeRelationName,
2208                                   Natts_pg_attribute, Desc_pg_attribute);
2209                 formrdesc(ProcedureRelationName,
2210                                   Natts_pg_proc, Desc_pg_proc);
2211                 formrdesc(TypeRelationName,
2212                                   Natts_pg_type, Desc_pg_type);
2213
2214 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2215         }
2216
2217         MemoryContextSwitchTo(oldcxt);
2218 }
2219
2220 /*
2221  *              RelationCacheInitializePhase2
2222  *
2223  *              This is called as soon as the catcache and transaction system
2224  *              are functional.  At this point we can actually read data from
2225  *              the system catalogs.  Update the relcache entries made during
2226  *              RelationCacheInitialize, and make sure we have entries for the
2227  *              critical system indexes.
2228  */
2229 void
2230 RelationCacheInitializePhase2(void)
2231 {
2232         HASH_SEQ_STATUS status;
2233         RelIdCacheEnt *idhentry;
2234
2235         if (IsBootstrapProcessingMode())
2236                 return;
2237
2238         /*
2239          * If we didn't get the critical system indexes loaded into relcache,
2240          * do so now.  These are critical because the catcache depends on them
2241          * for catcache fetches that are done during relcache load.  Thus, we
2242          * have an infinite-recursion problem.  We can break the recursion by
2243          * doing heapscans instead of indexscans at certain key spots. To
2244          * avoid hobbling performance, we only want to do that until we have
2245          * the critical indexes loaded into relcache.  Thus, the flag
2246          * criticalRelcachesBuilt is used to decide whether to do heapscan or
2247          * indexscan at the key spots, and we set it true after we've loaded
2248          * the critical indexes.
2249          *
2250          * The critical indexes are marked as "nailed in cache", partly to make
2251          * it easy for load_relcache_init_file to count them, but mainly
2252          * because we cannot flush and rebuild them once we've set
2253          * criticalRelcachesBuilt to true.      (NOTE: perhaps it would be
2254          * possible to reload them by temporarily setting
2255          * criticalRelcachesBuilt to false again.  For now, though, we just
2256          * nail 'em in.)
2257          */
2258         if (!criticalRelcachesBuilt)
2259         {
2260                 RelationBuildDescInfo buildinfo;
2261                 Relation        ird;
2262
2263 #define LOAD_CRIT_INDEX(indname) \
2264                 do { \
2265                         buildinfo.infotype = INFO_RELNAME; \
2266                         buildinfo.i.info_name = (indname); \
2267                         ird = RelationBuildDesc(buildinfo, NULL); \
2268                         ird->rd_isnailed = 1; \
2269                         RelationSetReferenceCount(ird, 1); \
2270                 } while (0)
2271
2272                 LOAD_CRIT_INDEX(ClassNameNspIndex);
2273                 LOAD_CRIT_INDEX(ClassOidIndex);
2274                 LOAD_CRIT_INDEX(AttributeRelidNumIndex);
2275                 LOAD_CRIT_INDEX(IndexRelidIndex);
2276                 LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
2277                 LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
2278                 LOAD_CRIT_INDEX(OperatorOidIndex);
2279
2280 #define NUM_CRITICAL_INDEXES    7               /* fix if you change list above */
2281
2282                 criticalRelcachesBuilt = true;
2283         }
2284
2285         /*
2286          * Now, scan all the relcache entries and update anything that might
2287          * be wrong in the results from formrdesc or the relcache cache file.
2288          * If we faked up relcache entries using formrdesc, then read the real
2289          * pg_class rows and replace the fake entries with them. Also, if any
2290          * of the relcache entries have rules or triggers, load that info the
2291          * hard way since it isn't recorded in the cache file.
2292          */
2293         hash_seq_init(&status, RelationIdCache);
2294
2295         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2296         {
2297                 Relation        relation = idhentry->reldesc;
2298
2299                 /*
2300                  * If it's a faked-up entry, read the real pg_class tuple.
2301                  */
2302                 if (needNewCacheFile && relation->rd_isnailed)
2303                 {
2304                         HeapTuple       htup;
2305                         Form_pg_class relp;
2306
2307                         htup = SearchSysCache(RELOID,
2308                                                         ObjectIdGetDatum(RelationGetRelid(relation)),
2309                                                                   0, 0, 0);
2310                         if (!HeapTupleIsValid(htup))
2311                                 elog(FATAL, "cache lookup failed for relation %u",
2312                                          RelationGetRelid(relation));
2313                         relp = (Form_pg_class) GETSTRUCT(htup);
2314
2315                         /*
2316                          * Copy tuple to relation->rd_rel. (See notes in
2317                          * AllocateRelationDesc())
2318                          */
2319                         Assert(relation->rd_rel != NULL);
2320                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2321
2322                         /*
2323                          * Also update the derived fields in rd_att.
2324                          */
2325                         relation->rd_att->tdtypeid = relp->reltype;
2326                         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
2327                         relation->rd_att->tdhasoid = relp->relhasoids;
2328
2329                         ReleaseSysCache(htup);
2330                 }
2331
2332                 /*
2333                  * Fix data that isn't saved in relcache cache file.
2334                  */
2335                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2336                         RelationBuildRuleLock(relation);
2337                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2338                         RelationBuildTriggers(relation);
2339         }
2340 }
2341
2342 /*
2343  *              RelationCacheInitializePhase3
2344  *
2345  *              Final step of relcache initialization: write out a new relcache
2346  *              cache file if one is needed.
2347  */
2348 void
2349 RelationCacheInitializePhase3(void)
2350 {
2351         if (IsBootstrapProcessingMode())
2352                 return;
2353
2354         if (needNewCacheFile)
2355         {
2356                 /*
2357                  * Force all the catcaches to finish initializing and thereby open
2358                  * the catalogs and indexes they use.  This will preload the
2359                  * relcache with entries for all the most important system
2360                  * catalogs and indexes, so that the init file will be most useful
2361                  * for future backends.
2362                  */
2363                 InitCatalogCachePhase2();
2364
2365                 /* now write the file */
2366                 write_relcache_init_file();
2367         }
2368 }
2369
2370 static void
2371 AttrDefaultFetch(Relation relation)
2372 {
2373         AttrDefault *attrdef = relation->rd_att->constr->defval;
2374         int                     ndef = relation->rd_att->constr->num_defval;
2375         Relation        adrel;
2376         SysScanDesc adscan;
2377         ScanKeyData skey;
2378         HeapTuple       htup;
2379         Datum           val;
2380         bool            isnull;
2381         int                     found;
2382         int                     i;
2383
2384         ScanKeyInit(&skey,
2385                                 Anum_pg_attrdef_adrelid,
2386                                 BTEqualStrategyNumber, F_OIDEQ,
2387                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2388
2389         adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
2390         adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
2391                                                                 SnapshotNow, 1, &skey);
2392         found = 0;
2393
2394         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2395         {
2396                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2397
2398                 for (i = 0; i < ndef; i++)
2399                 {
2400                         if (adform->adnum != attrdef[i].adnum)
2401                                 continue;
2402                         if (attrdef[i].adbin != NULL)
2403                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
2404                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2405                                          RelationGetRelationName(relation));
2406                         else
2407                                 found++;
2408
2409                         val = fastgetattr(htup,
2410                                                           Anum_pg_attrdef_adbin,
2411                                                           adrel->rd_att, &isnull);
2412                         if (isnull)
2413                                 elog(WARNING, "null adbin for attr %s of rel %s",
2414                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2415                                          RelationGetRelationName(relation));
2416                         else
2417                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2418                                                          DatumGetCString(DirectFunctionCall1(textout,
2419                                                                                                                                  val)));
2420                         break;
2421                 }
2422
2423                 if (i >= ndef)
2424                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
2425                                  adform->adnum, RelationGetRelationName(relation));
2426         }
2427
2428         systable_endscan(adscan);
2429         heap_close(adrel, AccessShareLock);
2430
2431         if (found != ndef)
2432                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
2433                          ndef - found, RelationGetRelationName(relation));
2434 }
2435
2436 static void
2437 CheckConstraintFetch(Relation relation)
2438 {
2439         ConstrCheck *check = relation->rd_att->constr->check;
2440         int                     ncheck = relation->rd_att->constr->num_check;
2441         Relation        conrel;
2442         SysScanDesc conscan;
2443         ScanKeyData skey[1];
2444         HeapTuple       htup;
2445         Datum           val;
2446         bool            isnull;
2447         int                     found = 0;
2448
2449         ScanKeyInit(&skey[0],
2450                                 Anum_pg_constraint_conrelid,
2451                                 BTEqualStrategyNumber, F_OIDEQ,
2452                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2453
2454         conrel = heap_openr(ConstraintRelationName, AccessShareLock);
2455         conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
2456                                                                  SnapshotNow, 1, skey);
2457
2458         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2459         {
2460                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2461
2462                 /* We want check constraints only */
2463                 if (conform->contype != CONSTRAINT_CHECK)
2464                         continue;
2465
2466                 if (found >= ncheck)
2467                         elog(ERROR, "unexpected constraint record found for rel %s",
2468                                  RelationGetRelationName(relation));
2469
2470                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2471                                                                                           NameStr(conform->conname));
2472
2473                 /* Grab and test conbin is actually set */
2474                 val = fastgetattr(htup,
2475                                                   Anum_pg_constraint_conbin,
2476                                                   conrel->rd_att, &isnull);
2477                 if (isnull)
2478                         elog(ERROR, "null conbin for rel %s",
2479                                  RelationGetRelationName(relation));
2480
2481                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2482                                                          DatumGetCString(DirectFunctionCall1(textout,
2483                                                                                                                                  val)));
2484                 found++;
2485         }
2486
2487         systable_endscan(conscan);
2488         heap_close(conrel, AccessShareLock);
2489
2490         if (found != ncheck)
2491                 elog(ERROR, "%d constraint record(s) missing for rel %s",
2492                          ncheck - found, RelationGetRelationName(relation));
2493 }
2494
2495 /*
2496  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2497  *
2498  * The index list is created only if someone requests it.  We scan pg_index
2499  * to find relevant indexes, and add the list to the relcache entry so that
2500  * we won't have to compute it again.  Note that shared cache inval of a
2501  * relcache entry will delete the old list and set rd_indexvalid to 0,
2502  * so that we must recompute the index list on next request.  This handles
2503  * creation or deletion of an index.
2504  *
2505  * The returned list is guaranteed to be sorted in order by OID.  This is
2506  * needed by the executor, since for index types that we obtain exclusive
2507  * locks on when updating the index, all backends must lock the indexes in
2508  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2509  * consistent ordering would do, but ordering by OID is easy.
2510  *
2511  * Since shared cache inval causes the relcache's copy of the list to go away,
2512  * we return a copy of the list palloc'd in the caller's context.  The caller
2513  * may freeList() the returned list after scanning it.  This is necessary
2514  * since the caller will typically be doing syscache lookups on the relevant
2515  * indexes, and syscache lookup could cause SI messages to be processed!
2516  */
2517 List *
2518 RelationGetIndexList(Relation relation)
2519 {
2520         Relation        indrel;
2521         SysScanDesc indscan;
2522         ScanKeyData skey;
2523         HeapTuple       htup;
2524         List       *result;
2525         MemoryContext oldcxt;
2526
2527         /* Quick exit if we already computed the list. */
2528         if (relation->rd_indexvalid != 0)
2529                 return listCopy(relation->rd_indexlist);
2530
2531         /*
2532          * We build the list we intend to return (in the caller's context)
2533          * while doing the scan.  After successfully completing the scan, we
2534          * copy that list into the relcache entry.      This avoids cache-context
2535          * memory leakage if we get some sort of error partway through.
2536          */
2537         result = NIL;
2538
2539         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2540         ScanKeyInit(&skey,
2541                                 Anum_pg_index_indrelid,
2542                                 BTEqualStrategyNumber, F_OIDEQ,
2543                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2544
2545         indrel = heap_openr(IndexRelationName, AccessShareLock);
2546         indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
2547                                                                  SnapshotNow, 1, &skey);
2548
2549         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2550         {
2551                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2552
2553                 result = insert_ordered_oid(result, index->indexrelid);
2554         }
2555
2556         systable_endscan(indscan);
2557         heap_close(indrel, AccessShareLock);
2558
2559         /* Now save a copy of the completed list in the relcache entry. */
2560         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2561         relation->rd_indexlist = listCopy(result);
2562         relation->rd_indexvalid = 1;
2563         MemoryContextSwitchTo(oldcxt);
2564
2565         return result;
2566 }
2567
2568 /*
2569  * insert_ordered_oid
2570  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2571  *
2572  * Building the ordered list this way is O(N^2), but with a pretty small
2573  * constant, so for the number of entries we expect it will probably be
2574  * faster than trying to apply qsort().  Most tables don't have very many
2575  * indexes...
2576  */
2577 static List *
2578 insert_ordered_oid(List *list, Oid datum)
2579 {
2580         ListCell *prev;
2581
2582         /* Does the datum belong at the front? */
2583         if (list == NIL || datum < linitial_oid(list))
2584                 return lcons_oid(datum, list);
2585         /* No, so find the entry it belongs after */
2586         prev = list_head(list);
2587         for (;;)
2588         {
2589                 ListCell *curr = lnext(prev);
2590
2591                 if (curr == NULL || datum < lfirst_oid(curr))
2592                         break;          /* it belongs after 'prev', before 'curr' */
2593
2594                 prev = curr;
2595         }
2596         /* Insert datum into list after 'prev' */
2597         lappend_cell_oid(list, prev, datum);
2598         return list;
2599 }
2600
2601 /*
2602  * RelationSetIndexList -- externally force the index list contents
2603  *
2604  * This is used to temporarily override what we think the set of valid
2605  * indexes is.  The forcing will be valid only until transaction commit
2606  * or abort.
2607  *
2608  * This should only be applied to nailed relations, because in a non-nailed
2609  * relation the hacked index list could be lost at any time due to SI
2610  * messages.  In practice it is only used on pg_class (see REINDEX).
2611  *
2612  * It is up to the caller to make sure the given list is correctly ordered.
2613  */
2614 void
2615 RelationSetIndexList(Relation relation, List *indexIds)
2616 {
2617         MemoryContext oldcxt;
2618
2619         Assert(relation->rd_isnailed == 1);
2620         /* Copy the list into the cache context (could fail for lack of mem) */
2621         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2622         indexIds = listCopy(indexIds);
2623         MemoryContextSwitchTo(oldcxt);
2624         /* Okay to replace old list */
2625         freeList(relation->rd_indexlist);
2626         relation->rd_indexlist = indexIds;
2627         relation->rd_indexvalid = 2;            /* mark list as forced */
2628 }
2629
2630 /*
2631  * RelationGetIndexExpressions -- get the index expressions for an index
2632  *
2633  * We cache the result of transforming pg_index.indexprs into a node tree.
2634  * If the rel is not an index or has no expressional columns, we return NIL.
2635  * Otherwise, the returned tree is copied into the caller's memory context.
2636  * (We don't want to return a pointer to the relcache copy, since it could
2637  * disappear due to relcache invalidation.)
2638  */
2639 List *
2640 RelationGetIndexExpressions(Relation relation)
2641 {
2642         List       *result;
2643         Datum           exprsDatum;
2644         bool            isnull;
2645         char       *exprsString;
2646         MemoryContext oldcxt;
2647
2648         /* Quick exit if we already computed the result. */
2649         if (relation->rd_indexprs)
2650                 return (List *) copyObject(relation->rd_indexprs);
2651
2652         /* Quick exit if there is nothing to do. */
2653         if (relation->rd_indextuple == NULL ||
2654                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
2655                 return NIL;
2656
2657         /*
2658          * We build the tree we intend to return in the caller's context.
2659          * After successfully completing the work, we copy it into the
2660          * relcache entry.      This avoids problems if we get some sort of error
2661          * partway through.
2662          *
2663          * We make use of the syscache's copy of pg_index's tupledesc to access
2664          * the non-fixed fields of the tuple.  We assume that the syscache
2665          * will be initialized before any access of a partial index could
2666          * occur.  (This would probably fail if we were to allow partial
2667          * indexes on system catalogs.)
2668          */
2669         exprsDatum = SysCacheGetAttr(INDEXRELID, relation->rd_indextuple,
2670                                                                  Anum_pg_index_indexprs, &isnull);
2671         Assert(!isnull);
2672         exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
2673         result = (List *) stringToNode(exprsString);
2674         pfree(exprsString);
2675
2676         /*
2677          * Run the expressions through flatten_andors and eval_const_expressions.
2678          * This is not just an optimization, but is necessary, because the planner
2679          * will be comparing them to similarly-processed qual clauses, and may
2680          * fail to detect valid matches without this.
2681          */
2682         result = (List *) flatten_andors((Node *) result);
2683
2684         result = (List *) eval_const_expressions((Node *) result);
2685
2686         /*
2687          * Also mark any coercion format fields as "don't care", so that the
2688          * planner can match to both explicit and implicit coercions.
2689          */
2690         set_coercionform_dontcare((Node *) result);
2691
2692         /* May as well fix opfuncids too */
2693         fix_opfuncids((Node *) result);
2694
2695         /* Now save a copy of the completed tree in the relcache entry. */
2696         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2697         relation->rd_indexprs = (List *) copyObject(result);
2698         MemoryContextSwitchTo(oldcxt);
2699
2700         return result;
2701 }
2702
2703 /*
2704  * RelationGetIndexPredicate -- get the index predicate for an index
2705  *
2706  * We cache the result of transforming pg_index.indpred into an implicit-AND
2707  * node tree (suitable for ExecQual).
2708  * If the rel is not an index or has no predicate, we return NIL.
2709  * Otherwise, the returned tree is copied into the caller's memory context.
2710  * (We don't want to return a pointer to the relcache copy, since it could
2711  * disappear due to relcache invalidation.)
2712  */
2713 List *
2714 RelationGetIndexPredicate(Relation relation)
2715 {
2716         List       *result;
2717         Datum           predDatum;
2718         bool            isnull;
2719         char       *predString;
2720         MemoryContext oldcxt;
2721
2722         /* Quick exit if we already computed the result. */
2723         if (relation->rd_indpred)
2724                 return (List *) copyObject(relation->rd_indpred);
2725
2726         /* Quick exit if there is nothing to do. */
2727         if (relation->rd_indextuple == NULL ||
2728                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
2729                 return NIL;
2730
2731         /*
2732          * We build the tree we intend to return in the caller's context.
2733          * After successfully completing the work, we copy it into the
2734          * relcache entry.      This avoids problems if we get some sort of error
2735          * partway through.
2736          *
2737          * We make use of the syscache's copy of pg_index's tupledesc to access
2738          * the non-fixed fields of the tuple.  We assume that the syscache
2739          * will be initialized before any access of a partial index could
2740          * occur.  (This would probably fail if we were to allow partial
2741          * indexes on system catalogs.)
2742          */
2743         predDatum = SysCacheGetAttr(INDEXRELID, relation->rd_indextuple,
2744                                                                 Anum_pg_index_indpred, &isnull);
2745         Assert(!isnull);
2746         predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
2747         result = (List *) stringToNode(predString);
2748         pfree(predString);
2749
2750         /*
2751          * Run the expression through canonicalize_qual and eval_const_expressions.
2752          * This is not just an optimization, but is necessary, because the planner
2753          * will be comparing it to similarly-processed qual clauses, and may fail
2754          * to detect valid matches without this.
2755          */
2756         result = (List *) canonicalize_qual((Expr *) result);
2757
2758         result = (List *) eval_const_expressions((Node *) result);
2759
2760         /*
2761          * Also mark any coercion format fields as "don't care", so that the
2762          * planner can match to both explicit and implicit coercions.
2763          */
2764         set_coercionform_dontcare((Node *) result);
2765
2766         /* Also convert to implicit-AND format */
2767         result = make_ands_implicit((Expr *) result);
2768
2769         /* May as well fix opfuncids too */
2770         fix_opfuncids((Node *) result);
2771
2772         /* Now save a copy of the completed tree in the relcache entry. */
2773         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2774         relation->rd_indpred = (List *) copyObject(result);
2775         MemoryContextSwitchTo(oldcxt);
2776
2777         return result;
2778 }
2779
2780
2781 /*
2782  *      load_relcache_init_file, write_relcache_init_file
2783  *
2784  *              In late 1992, we started regularly having databases with more than
2785  *              a thousand classes in them.  With this number of classes, it became
2786  *              critical to do indexed lookups on the system catalogs.
2787  *
2788  *              Bootstrapping these lookups is very hard.  We want to be able to
2789  *              use an index on pg_attribute, for example, but in order to do so,
2790  *              we must have read pg_attribute for the attributes in the index,
2791  *              which implies that we need to use the index.
2792  *
2793  *              In order to get around the problem, we do the following:
2794  *
2795  *                 +  When the database system is initialized (at initdb time), we
2796  *                        don't use indexes.  We do sequential scans.
2797  *
2798  *                 +  When the backend is started up in normal mode, we load an image
2799  *                        of the appropriate relation descriptors, in internal format,
2800  *                        from an initialization file in the data/base/... directory.
2801  *
2802  *                 +  If the initialization file isn't there, then we create the
2803  *                        relation descriptors using sequential scans and write 'em to
2804  *                        the initialization file for use by subsequent backends.
2805  *
2806  *              We could dispense with the initialization file and just build the
2807  *              critical reldescs the hard way on every backend startup, but that
2808  *              slows down backend startup noticeably.
2809  *
2810  *              We can in fact go further, and save more relcache entries than
2811  *              just the ones that are absolutely critical; this allows us to speed
2812  *              up backend startup by not having to build such entries the hard way.
2813  *              Presently, all the catalog and index entries that are referred to
2814  *              by catcaches are stored in the initialization file.
2815  *
2816  *              The same mechanism that detects when catcache and relcache entries
2817  *              need to be invalidated (due to catalog updates) also arranges to
2818  *              unlink the initialization file when its contents may be out of date.
2819  *              The file will then be rebuilt during the next backend startup.
2820  */
2821
2822 /*
2823  * load_relcache_init_file -- attempt to load cache from the init file
2824  *
2825  * If successful, return TRUE and set criticalRelcachesBuilt to true.
2826  * If not successful, return FALSE and set needNewCacheFile to true.
2827  *
2828  * NOTE: we assume we are already switched into CacheMemoryContext.
2829  */
2830 static bool
2831 load_relcache_init_file(void)
2832 {
2833         FILE       *fp;
2834         char            initfilename[MAXPGPATH];
2835         Relation   *rels;
2836         int                     relno,
2837                                 num_rels,
2838                                 max_rels,
2839                                 nailed_rels,
2840                                 nailed_indexes,
2841                                 magic;
2842         int                     i;
2843
2844         snprintf(initfilename, sizeof(initfilename), "%s/%s",
2845                          DatabasePath, RELCACHE_INIT_FILENAME);
2846
2847         fp = AllocateFile(initfilename, PG_BINARY_R);
2848         if (fp == NULL)
2849         {
2850                 needNewCacheFile = true;
2851                 return false;
2852         }
2853
2854         /*
2855          * Read the index relcache entries from the file.  Note we will not
2856          * enter any of them into the cache if the read fails partway through;
2857          * this helps to guard against broken init files.
2858          */
2859         max_rels = 100;
2860         rels = (Relation *) palloc(max_rels * sizeof(Relation));
2861         num_rels = 0;
2862         nailed_rels = nailed_indexes = 0;
2863         initFileRelationIds = NIL;
2864
2865         /* check for correct magic number (compatible version) */
2866         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
2867                 goto read_failed;
2868         if (magic != RELCACHE_INIT_FILEMAGIC)
2869                 goto read_failed;
2870
2871         for (relno = 0;; relno++)
2872         {
2873                 Size            len;
2874                 size_t          nread;
2875                 Relation        rel;
2876                 Form_pg_class relform;
2877                 bool            has_not_null;
2878
2879                 /* first read the relation descriptor length */
2880                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2881                 {
2882                         if (nread == 0)
2883                                 break;                  /* end of file */
2884                         goto read_failed;
2885                 }
2886
2887                 /* safety check for incompatible relcache layout */
2888                 if (len != sizeof(RelationData))
2889                         goto read_failed;
2890
2891                 /* allocate another relcache header */
2892                 if (num_rels >= max_rels)
2893                 {
2894                         max_rels *= 2;
2895                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
2896                 }
2897
2898                 rel = rels[num_rels++] = (Relation) palloc(len);
2899
2900                 /* then, read the Relation structure */
2901                 if ((nread = fread(rel, 1, len, fp)) != len)
2902                         goto read_failed;
2903
2904                 /* next read the relation tuple form */
2905                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2906                         goto read_failed;
2907
2908                 relform = (Form_pg_class) palloc(len);
2909                 if ((nread = fread(relform, 1, len, fp)) != len)
2910                         goto read_failed;
2911
2912                 rel->rd_rel = relform;
2913
2914                 /* initialize attribute tuple forms */
2915                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
2916                                                                                           relform->relhasoids);
2917                 rel->rd_att->tdtypeid = relform->reltype;
2918                 rel->rd_att->tdtypmod = -1;                     /* unnecessary, but... */
2919
2920                 /* next read all the attribute tuple form data entries */
2921                 has_not_null = false;
2922                 for (i = 0; i < relform->relnatts; i++)
2923                 {
2924                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2925                                 goto read_failed;
2926
2927                         rel->rd_att->attrs[i] = (Form_pg_attribute) palloc(len);
2928
2929                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
2930                                 goto read_failed;
2931
2932                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
2933                 }
2934
2935                 /* mark not-null status */
2936                 if (has_not_null)
2937                 {
2938                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2939
2940                         constr->has_not_null = true;
2941                         rel->rd_att->constr = constr;
2942                 }
2943
2944                 /* If it's an index, there's more to do */
2945                 if (rel->rd_rel->relkind == RELKIND_INDEX)
2946                 {
2947                         Form_pg_am      am;
2948                         MemoryContext indexcxt;
2949                         Oid                *operator;
2950                         RegProcedure *support;
2951                         int                     nsupport;
2952
2953                         /* Count nailed indexes to ensure we have 'em all */
2954                         if (rel->rd_isnailed)
2955                                 nailed_indexes++;
2956
2957                         /* next, read the pg_index tuple */
2958                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2959                                 goto read_failed;
2960
2961                         rel->rd_indextuple = (HeapTuple) palloc(len);
2962                         if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
2963                                 goto read_failed;
2964
2965                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
2966                         rel->rd_indextuple->t_datamcxt = CurrentMemoryContext;
2967                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
2968                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
2969
2970                         /* next, read the access method tuple form */
2971                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2972                                 goto read_failed;
2973
2974                         am = (Form_pg_am) palloc(len);
2975                         if ((nread = fread(am, 1, len, fp)) != len)
2976                                 goto read_failed;
2977                         rel->rd_am = am;
2978
2979                         /*
2980                          * prepare index info context --- parameters should match
2981                          * RelationInitIndexAccessInfo
2982                          */
2983                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
2984                                                                                          RelationGetRelationName(rel),
2985                                                                                          ALLOCSET_SMALL_MINSIZE,
2986                                                                                          ALLOCSET_SMALL_INITSIZE,
2987                                                                                          ALLOCSET_SMALL_MAXSIZE);
2988                         rel->rd_indexcxt = indexcxt;
2989
2990                         /* next, read the vector of operator OIDs */
2991                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2992                                 goto read_failed;
2993
2994                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
2995                         if ((nread = fread(operator, 1, len, fp)) != len)
2996                                 goto read_failed;
2997
2998                         rel->rd_operator = operator;
2999
3000                         /* finally, read the vector of support procedures */
3001                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3002                                 goto read_failed;
3003                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
3004                         if ((nread = fread(support, 1, len, fp)) != len)
3005                                 goto read_failed;
3006
3007                         rel->rd_support = support;
3008
3009                         /* add a zeroed support-fmgr-info vector */
3010                         nsupport = relform->relnatts * am->amsupport;
3011                         rel->rd_supportinfo = (FmgrInfo *)
3012                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3013                 }
3014                 else
3015                 {
3016                         /* Count nailed rels to ensure we have 'em all */
3017                         if (rel->rd_isnailed)
3018                                 nailed_rels++;
3019
3020                         Assert(rel->rd_index == NULL);
3021                         Assert(rel->rd_indextuple == NULL);
3022                         Assert(rel->rd_am == NULL);
3023                         Assert(rel->rd_indexcxt == NULL);
3024                         Assert(rel->rd_operator == NULL);
3025                         Assert(rel->rd_support == NULL);
3026                         Assert(rel->rd_supportinfo == NULL);
3027                 }
3028
3029                 /*
3030                  * Rules and triggers are not saved (mainly because the internal
3031                  * format is complex and subject to change).  They must be rebuilt
3032                  * if needed by RelationCacheInitializePhase2.  This is not
3033                  * expected to be a big performance hit since few system catalogs
3034                  * have such.  Ditto for index expressions and predicates.
3035                  */
3036                 rel->rd_rules = NULL;
3037                 rel->rd_rulescxt = NULL;
3038                 rel->trigdesc = NULL;
3039                 rel->rd_indexprs = NIL;
3040                 rel->rd_indpred = NIL;
3041
3042                 /*
3043                  * Reset transient-state fields in the relcache entry
3044                  */
3045                 rel->rd_smgr = NULL;
3046                 rel->rd_targblock = InvalidBlockNumber;
3047                 if (rel->rd_isnailed)
3048                         RelationSetReferenceCount(rel, 1);
3049                 else
3050                         RelationSetReferenceCount(rel, 0);
3051                 rel->rd_indexvalid = 0;
3052                 rel->rd_indexlist = NIL;
3053                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3054
3055                 /*
3056                  * Make sure database ID is correct.  This is needed in case the
3057                  * pg_internal.init file was copied from some other database by
3058                  * CREATE DATABASE.
3059                  */
3060                 if (rel->rd_rel->relisshared)
3061                         rel->rd_node.tblNode = InvalidOid;
3062                 else
3063                         rel->rd_node.tblNode = MyDatabaseId;
3064
3065                 RelationInitLockInfo(rel);
3066         }
3067
3068         /*
3069          * We reached the end of the init file without apparent problem. Did
3070          * we get the right number of nailed items?  (This is a useful
3071          * crosscheck in case the set of critical rels or indexes changes.)
3072          */
3073         if (nailed_rels != NUM_CRITICAL_RELS ||
3074                 nailed_indexes != NUM_CRITICAL_INDEXES)
3075                 goto read_failed;
3076
3077         /*
3078          * OK, all appears well.
3079          *
3080          * Now insert all the new relcache entries into the cache.
3081          */
3082         for (relno = 0; relno < num_rels; relno++)
3083         {
3084                 RelationCacheInsert(rels[relno]);
3085                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3086                 initFileRelationIds = lconso(RelationGetRelid(rels[relno]),
3087                                                                          initFileRelationIds);
3088         }
3089
3090         pfree(rels);
3091         FreeFile(fp);
3092
3093         criticalRelcachesBuilt = true;
3094         return true;
3095
3096         /*
3097          * init file is broken, so do it the hard way.  We don't bother trying
3098          * to free the clutter we just allocated; it's not in the relcache so
3099          * it won't hurt.
3100          */
3101 read_failed:
3102         pfree(rels);
3103         FreeFile(fp);
3104
3105         needNewCacheFile = true;
3106         return false;
3107 }
3108
3109 /*
3110  * Write out a new initialization file with the current contents
3111  * of the relcache.
3112  */
3113 static void
3114 write_relcache_init_file(void)
3115 {
3116         FILE       *fp;
3117         char            tempfilename[MAXPGPATH];
3118         char            finalfilename[MAXPGPATH];
3119         int                     magic;
3120         HASH_SEQ_STATUS status;
3121         RelIdCacheEnt *idhentry;
3122         MemoryContext oldcxt;
3123         int                     i;
3124
3125         /*
3126          * We must write a temporary file and rename it into place. Otherwise,
3127          * another backend starting at about the same time might crash trying
3128          * to read the partially-complete file.
3129          */
3130         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
3131                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
3132         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
3133                          DatabasePath, RELCACHE_INIT_FILENAME);
3134
3135         unlink(tempfilename);           /* in case it exists w/wrong permissions */
3136
3137         fp = AllocateFile(tempfilename, PG_BINARY_W);
3138         if (fp == NULL)
3139         {
3140                 /*
3141                  * We used to consider this a fatal error, but we might as well
3142                  * continue with backend startup ...
3143                  */
3144                 ereport(WARNING,
3145                                 (errcode_for_file_access(),
3146                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
3147                                                 tempfilename),
3148                   errdetail("Continuing anyway, but there's something wrong.")));
3149                 return;
3150         }
3151
3152         /*
3153          * Write a magic number to serve as a file version identifier.  We can
3154          * change the magic number whenever the relcache layout changes.
3155          */
3156         magic = RELCACHE_INIT_FILEMAGIC;
3157         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3158                 elog(FATAL, "could not write init file");
3159
3160         /*
3161          * Write all the reldescs (in no particular order).
3162          */
3163         hash_seq_init(&status, RelationIdCache);
3164
3165         initFileRelationIds = NIL;
3166
3167         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3168         {
3169                 Relation        rel = idhentry->reldesc;
3170                 Form_pg_class relform = rel->rd_rel;
3171                 Size            len;
3172
3173                 /*
3174                  * first write the relcache entry proper
3175                  */
3176                 len = sizeof(RelationData);
3177
3178                 /* first, write the relation descriptor length */
3179                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3180                         elog(FATAL, "could not write init file");
3181
3182                 /* next, write out the Relation structure */
3183                 if (fwrite(rel, 1, len, fp) != len)
3184                         elog(FATAL, "could not write init file");
3185
3186                 /* next write the relation tuple form */
3187                 len = sizeof(FormData_pg_class);
3188                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3189                         elog(FATAL, "could not write init file");
3190
3191                 if (fwrite(relform, 1, len, fp) != len)
3192                         elog(FATAL, "could not write init file");
3193
3194                 /* next, do all the attribute tuple form data entries */
3195                 for (i = 0; i < relform->relnatts; i++)
3196                 {
3197                         len = ATTRIBUTE_TUPLE_SIZE;
3198                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3199                                 elog(FATAL, "could not write init file");
3200                         if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
3201                                 elog(FATAL, "could not write init file");
3202                 }
3203
3204                 /* If it's an index, there's more to do */
3205                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3206                 {
3207                         Form_pg_am      am = rel->rd_am;
3208
3209                         /* write the pg_index tuple */
3210                         /* we assume this was created by heap_copytuple! */
3211                         len = HEAPTUPLESIZE + rel->rd_indextuple->t_len;
3212                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3213                                 elog(FATAL, "could not write init file");
3214
3215                         if (fwrite(rel->rd_indextuple, 1, len, fp) != len)
3216                                 elog(FATAL, "could not write init file");
3217
3218                         /* next, write the access method tuple form */
3219                         len = sizeof(FormData_pg_am);
3220                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3221                                 elog(FATAL, "could not write init file");
3222
3223                         if (fwrite(am, 1, len, fp) != len)
3224                                 elog(FATAL, "could not write init file");
3225
3226                         /* next, write the vector of operator OIDs */
3227                         len = relform->relnatts * (am->amstrategies * sizeof(Oid));
3228                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3229                                 elog(FATAL, "could not write init file");
3230
3231                         if (fwrite(rel->rd_operator, 1, len, fp) != len)
3232                                 elog(FATAL, "could not write init file");
3233
3234                         /* finally, write the vector of support procedures */
3235                         len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
3236                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3237                                 elog(FATAL, "could not write init file");
3238
3239                         if (fwrite(rel->rd_support, 1, len, fp) != len)
3240                                 elog(FATAL, "could not write init file");
3241                 }
3242
3243                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3244                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3245                 initFileRelationIds = lconso(RelationGetRelid(rel),
3246                                                                          initFileRelationIds);
3247                 MemoryContextSwitchTo(oldcxt);
3248         }
3249
3250         if (FreeFile(fp))
3251                 elog(FATAL, "could not write init file");
3252
3253         /*
3254          * Now we have to check whether the data we've so painstakingly
3255          * accumulated is already obsolete due to someone else's
3256          * just-committed catalog changes.      If so, we just delete the temp
3257          * file and leave it to the next backend to try again.  (Our own
3258          * relcache entries will be updated by SI message processing, but we
3259          * can't be sure whether what we wrote out was up-to-date.)
3260          *
3261          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
3262          * grab a serialization lock for the duration.
3263          */
3264         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3265
3266         /* Make sure we have seen all incoming SI messages */
3267         AcceptInvalidationMessages();
3268
3269         /*
3270          * If we have received any SI relcache invals since backend start,
3271          * assume we may have written out-of-date data.
3272          */
3273         if (relcacheInvalsReceived == 0L)
3274         {
3275                 /*
3276                  * OK, rename the temp file to its final name, deleting any
3277                  * previously-existing init file.
3278                  */
3279                 rename(tempfilename, finalfilename);
3280                 LWLockRelease(RelCacheInitLock);
3281         }
3282         else
3283         {
3284                 /* Delete the already-obsolete temp file */
3285                 unlink(tempfilename);
3286                 LWLockRelease(RelCacheInitLock);
3287         }
3288 }
3289
3290 /*
3291  * Detect whether a given relation (identified by OID) is one of the ones
3292  * we store in the init file.
3293  *
3294  * Note that we effectively assume that all backends running in a database
3295  * would choose to store the same set of relations in the init file;
3296  * otherwise there are cases where we'd fail to detect the need for an init
3297  * file invalidation.  This does not seem likely to be a problem in practice.
3298  */
3299 bool
3300 RelationIdIsInInitFile(Oid relationId)
3301 {
3302         return oidMember(relationId, initFileRelationIds);
3303 }
3304
3305 /*
3306  * Invalidate (remove) the init file during commit of a transaction that
3307  * changed one or more of the relation cache entries that are kept in the
3308  * init file.
3309  *
3310  * We actually need to remove the init file twice: once just before sending
3311  * the SI messages that include relcache inval for such relations, and once
3312  * just after sending them.  The unlink before ensures that a backend that's
3313  * currently starting cannot read the now-obsolete init file and then miss
3314  * the SI messages that will force it to update its relcache entries.  (This
3315  * works because the backend startup sequence gets into the PROC array before
3316  * trying to load the init file.)  The unlink after is to synchronize with a
3317  * backend that may currently be trying to write an init file based on data
3318  * that we've just rendered invalid.  Such a backend will see the SI messages,
3319  * but we can't leave the init file sitting around to fool later backends.
3320  *
3321  * Ignore any failure to unlink the file, since it might not be there if
3322  * no backend has been started since the last removal.
3323  */
3324 void
3325 RelationCacheInitFileInvalidate(bool beforeSend)
3326 {
3327         char            initfilename[MAXPGPATH];
3328
3329         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3330                          DatabasePath, RELCACHE_INIT_FILENAME);
3331
3332         if (beforeSend)
3333         {
3334                 /* no interlock needed here */
3335                 unlink(initfilename);
3336         }
3337         else
3338         {
3339                 /*
3340                  * We need to interlock this against write_relcache_init_file, to
3341                  * guard against possibility that someone renames a new-but-
3342                  * already-obsolete init file into place just after we unlink.
3343                  * With the interlock, it's certain that write_relcache_init_file
3344                  * will notice our SI inval message before renaming into place, or
3345                  * else that we will execute second and successfully unlink the
3346                  * file.
3347                  */
3348                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3349                 unlink(initfilename);
3350                 LWLockRelease(RelCacheInitLock);
3351         }
3352 }