]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Tweak default memory context allocation policy so that a context is not
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.182 2002/12/15 21:01:34 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationSysNameGetRelation              - get a reldesc by system rel name
21  *              RelationIdCacheGetRelation              - get a cached reldesc by relid
22  *              RelationClose                                   - close an open relation
23  *
24  * NOTES
25  *              The following code contains many undocumented hacks.  Please be
26  *              careful....
27  */
28 #include "postgres.h"
29
30 #include <errno.h>
31 #include <sys/file.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "access/istrat.h"
38 #include "catalog/catalog.h"
39 #include "catalog/catname.h"
40 #include "catalog/indexing.h"
41 #include "catalog/namespace.h"
42 #include "catalog/pg_amop.h"
43 #include "catalog/pg_amproc.h"
44 #include "catalog/pg_attrdef.h"
45 #include "catalog/pg_attribute.h"
46 #include "catalog/pg_constraint.h"
47 #include "catalog/pg_index.h"
48 #include "catalog/pg_namespace.h"
49 #include "catalog/pg_opclass.h"
50 #include "catalog/pg_proc.h"
51 #include "catalog/pg_rewrite.h"
52 #include "catalog/pg_type.h"
53 #include "commands/trigger.h"
54 #include "miscadmin.h"
55 #include "storage/smgr.h"
56 #include "utils/builtins.h"
57 #include "utils/catcache.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/relcache.h"
62 #include "utils/syscache.h"
63
64
65 /*
66  * name of relcache init file, used to speed up backend startup
67  */
68 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
69
70 /*
71  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
72  */
73 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
74 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
75 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
76 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
77
78 /*
79  *              Hash tables that index the relation cache
80  *
81  *              Relations are looked up two ways, by OID and by name,
82  *              thus there are two hash tables for referencing them.
83  *
84  *              The OID index covers all relcache entries.      The name index
85  *              covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
86  */
87 static HTAB *RelationIdCache;
88 static HTAB *RelationSysNameCache;
89
90 /*
91  * Bufmgr uses RelFileNode for lookup. Actually, I would like to do
92  * not pass Relation to bufmgr & beyond at all and keep some cache
93  * in smgr, but no time to do it right way now.         -- vadim 10/22/2000
94  */
95 static HTAB *RelationNodeCache;
96
97 /*
98  * This flag is false until we have prepared the critical relcache entries
99  * that are needed to do indexscans on the tables read by relcache building.
100  */
101 bool            criticalRelcachesBuilt = false;
102
103 /*
104  * This flag is set if we discover that we need to write a new relcache
105  * cache file at the end of startup.
106  */
107 static bool needNewCacheFile = false;
108
109 /*
110  * This counter counts relcache inval events received since backend startup
111  * (but only for rels that are actually in cache).      Presently, we use it only
112  * to detect whether data about to be written by write_relcache_init_file()
113  * might already be obsolete.
114  */
115 static long relcacheInvalsReceived = 0L;
116
117 /*
118  * This list remembers the OIDs of the relations cached in the relcache
119  * init file.
120  */
121 static List *initFileRelationIds = NIL;
122
123 /*
124  *              RelationBuildDescInfo exists so code can be shared
125  *              between RelationIdGetRelation() and RelationSysNameGetRelation()
126  */
127 typedef struct RelationBuildDescInfo
128 {
129         int                     infotype;               /* lookup by id or by name */
130 #define INFO_RELID 1
131 #define INFO_RELNAME 2
132         union
133         {
134                 Oid                     info_id;        /* relation object id */
135                 char       *info_name;  /* system relation name */
136         }                       i;
137 } RelationBuildDescInfo;
138
139 typedef struct relidcacheent
140 {
141         Oid                     reloid;
142         Relation        reldesc;
143 } RelIdCacheEnt;
144
145 typedef struct relnamecacheent
146 {
147         NameData        relname;
148         Relation        reldesc;
149 } RelNameCacheEnt;
150
151 typedef struct relnodecacheent
152 {
153         RelFileNode relnode;
154         Relation        reldesc;
155 } RelNodeCacheEnt;
156
157 /*
158  *              macros to manipulate the lookup hashtables
159  */
160 #define RelationCacheInsert(RELATION)   \
161 do { \
162         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; bool found; \
163         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
164                                                                                    (void *) &(RELATION->rd_id), \
165                                                                                    HASH_ENTER, \
166                                                                                    &found); \
167         if (idhentry == NULL) \
168                 elog(ERROR, "out of memory for relation descriptor cache"); \
169         /* used to give notice if found -- now just keep quiet */ \
170         idhentry->reldesc = RELATION; \
171         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
172                                                                                    (void *) &(RELATION->rd_node), \
173                                                                                    HASH_ENTER, \
174                                                                                    &found); \
175         if (nodentry == NULL) \
176                 elog(ERROR, "out of memory for relation descriptor cache"); \
177         /* used to give notice if found -- now just keep quiet */ \
178         nodentry->reldesc = RELATION; \
179         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
180         { \
181                 char *relname = RelationGetRelationName(RELATION); \
182                 RelNameCacheEnt *namehentry; \
183                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
184                                                                                                    relname, \
185                                                                                                    HASH_ENTER, \
186                                                                                                    &found); \
187                 if (namehentry == NULL) \
188                         elog(ERROR, "out of memory for relation descriptor cache"); \
189                 /* used to give notice if found -- now just keep quiet */ \
190                 namehentry->reldesc = RELATION; \
191         } \
192 } while(0)
193
194 #define RelationIdCacheLookup(ID, RELATION) \
195 do { \
196         RelIdCacheEnt *hentry; \
197         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
198                                                                                  (void *)&(ID), HASH_FIND,NULL); \
199         if (hentry) \
200                 RELATION = hentry->reldesc; \
201         else \
202                 RELATION = NULL; \
203 } while(0)
204
205 #define RelationSysNameCacheLookup(NAME, RELATION) \
206 do { \
207         RelNameCacheEnt *hentry; \
208         hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
209                                                                                    (void *) (NAME), HASH_FIND,NULL); \
210         if (hentry) \
211                 RELATION = hentry->reldesc; \
212         else \
213                 RELATION = NULL; \
214 } while(0)
215
216 #define RelationNodeCacheLookup(NODE, RELATION) \
217 do { \
218         RelNodeCacheEnt *hentry; \
219         hentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
220                                                                                    (void *)&(NODE), HASH_FIND,NULL); \
221         if (hentry) \
222                 RELATION = hentry->reldesc; \
223         else \
224                 RELATION = NULL; \
225 } while(0)
226
227 #define RelationCacheDelete(RELATION) \
228 do { \
229         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; \
230         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
231                                                                                    (void *)&(RELATION->rd_id), \
232                                                                                    HASH_REMOVE, NULL); \
233         if (idhentry == NULL) \
234                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist."); \
235         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
236                                                                                    (void *)&(RELATION->rd_node), \
237                                                                                    HASH_REMOVE, NULL); \
238         if (nodentry == NULL) \
239                 elog(WARNING, "trying to delete a rd_node reldesc that does not exist."); \
240         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
241         { \
242                 char *relname = RelationGetRelationName(RELATION); \
243                 RelNameCacheEnt *namehentry; \
244                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
245                                                                                                    relname, \
246                                                                                                    HASH_REMOVE, NULL); \
247                 if (namehentry == NULL) \
248                         elog(WARNING, "trying to delete a relname reldesc that does not exist."); \
249         } \
250 } while(0)
251
252
253 /*
254  * Special cache for opclass-related information
255  */
256 typedef struct opclasscacheent
257 {
258         Oid                     opclassoid;             /* lookup key: OID of opclass */
259         bool            valid;                  /* set TRUE after successful fill-in */
260         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
261         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
262         Oid                *operatorOids;       /* strategy operators' OIDs */
263         RegProcedure *operatorProcs;    /* strategy operators' procs */
264         RegProcedure *supportProcs; /* support procs */
265 } OpClassCacheEnt;
266
267 static HTAB *OpClassCache = NULL;
268
269
270 /* non-export function prototypes */
271
272 static void RelationClearRelation(Relation relation, bool rebuild);
273
274 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
275 static void RelationReloadClassinfo(Relation relation);
276 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
277 static void RelationFlushRelation(Relation relation);
278 static Relation RelationSysNameCacheGetRelation(const char *relationName);
279 static bool load_relcache_init_file(void);
280 static void write_relcache_init_file(void);
281
282 static void formrdesc(const char *relationName, int natts,
283                   FormData_pg_attribute *att);
284
285 static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo);
286 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
287 static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
288                                            Relation relation);
289 static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
290                                   Relation oldrelation);
291 static void AttrDefaultFetch(Relation relation);
292 static void CheckConstraintFetch(Relation relation);
293 static List *insert_ordered_oid(List *list, Oid datum);
294 static void IndexSupportInitialize(Form_pg_index iform,
295                                            IndexStrategy indexStrategy,
296                                            Oid *indexOperator,
297                                            RegProcedure *indexSupport,
298                                            StrategyNumber maxStrategyNumber,
299                                            StrategyNumber maxSupportNumber,
300                                            AttrNumber maxAttributeNumber);
301 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
302                                   StrategyNumber numStrats,
303                                   StrategyNumber numSupport);
304
305
306 /*
307  *              ScanPgRelation
308  *
309  *              this is used by RelationBuildDesc to find a pg_class
310  *              tuple matching either a relation name or a relation id
311  *              as specified in buildinfo.
312  *
313  *              NB: the returned tuple has been copied into palloc'd storage
314  *              and must eventually be freed with heap_freetuple.
315  */
316 static HeapTuple
317 ScanPgRelation(RelationBuildDescInfo buildinfo)
318 {
319         HeapTuple       pg_class_tuple;
320         Relation        pg_class_desc;
321         const char *indexRelname;
322         SysScanDesc pg_class_scan;
323         ScanKeyData key[2];
324         int                     nkeys;
325
326         /*
327          * form a scan key
328          */
329         switch (buildinfo.infotype)
330         {
331                 case INFO_RELID:
332                         ScanKeyEntryInitialize(&key[0], 0,
333                                                                    ObjectIdAttributeNumber,
334                                                                    F_OIDEQ,
335                                                                    ObjectIdGetDatum(buildinfo.i.info_id));
336                         nkeys = 1;
337                         indexRelname = ClassOidIndex;
338                         break;
339
340                 case INFO_RELNAME:
341                         ScanKeyEntryInitialize(&key[0], 0,
342                                                                    Anum_pg_class_relname,
343                                                                    F_NAMEEQ,
344                                                                    NameGetDatum(buildinfo.i.info_name));
345                         ScanKeyEntryInitialize(&key[1], 0,
346                                                                    Anum_pg_class_relnamespace,
347                                                                    F_OIDEQ,
348                                                                  ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
349                         nkeys = 2;
350                         indexRelname = ClassNameNspIndex;
351                         break;
352
353                 default:
354                         elog(ERROR, "ScanPgRelation: bad buildinfo");
355                         return NULL;            /* keep compiler quiet */
356         }
357
358         /*
359          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
360          * built the critical relcache entries (this includes initdb and
361          * startup without a pg_internal.init file).
362          */
363         pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
364         pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
365                                                                            criticalRelcachesBuilt,
366                                                                            SnapshotNow,
367                                                                            nkeys, key);
368
369         pg_class_tuple = systable_getnext(pg_class_scan);
370
371         /*
372          * Must copy tuple before releasing buffer.
373          */
374         if (HeapTupleIsValid(pg_class_tuple))
375                 pg_class_tuple = heap_copytuple(pg_class_tuple);
376
377         /* all done */
378         systable_endscan(pg_class_scan);
379         heap_close(pg_class_desc, AccessShareLock);
380
381         return pg_class_tuple;
382 }
383
384 /*
385  *              AllocateRelationDesc
386  *
387  *              This is used to allocate memory for a new relation descriptor
388  *              and initialize the rd_rel field.
389  *
390  *              If 'relation' is NULL, allocate a new RelationData object.
391  *              If not, reuse the given object (that path is taken only when
392  *              we have to rebuild a relcache entry during RelationClearRelation).
393  */
394 static Relation
395 AllocateRelationDesc(Relation relation, Form_pg_class relp)
396 {
397         MemoryContext oldcxt;
398         Form_pg_class relationForm;
399
400         /* Relcache entries must live in CacheMemoryContext */
401         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
402
403         /*
404          * allocate space for new relation descriptor, if needed
405          */
406         if (relation == NULL)
407                 relation = (Relation) palloc(sizeof(RelationData));
408
409         /*
410          * clear all fields of reldesc
411          */
412         MemSet((char *) relation, 0, sizeof(RelationData));
413         relation->rd_targblock = InvalidBlockNumber;
414
415         /* make sure relation is marked as having no open file yet */
416         relation->rd_fd = -1;
417
418         /*
419          * Copy the relation tuple form
420          *
421          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
422          * relacl is NOT stored in the relcache --- there'd be little point in
423          * it, since we don't copy the tuple's nullvalues bitmap and hence
424          * wouldn't know if the value is valid ... bottom line is that relacl
425          * *cannot* be retrieved from the relcache.  Get it from the syscache
426          * if you need it.
427          */
428         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
429
430         memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
431
432         /* initialize relation tuple form */
433         relation->rd_rel = relationForm;
434
435         /* and allocate attribute tuple form storage */
436         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
437                                                                                            relationForm->relhasoids);
438
439         MemoryContextSwitchTo(oldcxt);
440
441         return relation;
442 }
443
444 /*
445  *              RelationBuildTupleDesc
446  *
447  *              Form the relation's tuple descriptor from information in
448  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
449  */
450 static void
451 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
452                                            Relation relation)
453 {
454         HeapTuple       pg_attribute_tuple;
455         Relation        pg_attribute_desc;
456         SysScanDesc pg_attribute_scan;
457         ScanKeyData skey[2];
458         int                     need;
459         TupleConstr *constr;
460         AttrDefault *attrdef = NULL;
461         int                     ndef = 0;
462
463         relation->rd_att->tdhasoid = RelationGetForm(relation)->relhasoids;
464
465         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
466                                                                                                 sizeof(TupleConstr));
467         constr->has_not_null = false;
468
469         /*
470          * Form a scan key that selects only user attributes (attnum > 0).
471          * (Eliminating system attribute rows at the index level is lots
472          * faster than fetching them.)
473          */
474         ScanKeyEntryInitialize(&skey[0], 0,
475                                                    Anum_pg_attribute_attrelid,
476                                                    F_OIDEQ,
477                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
478         ScanKeyEntryInitialize(&skey[1], 0,
479                                                    Anum_pg_attribute_attnum,
480                                                    F_INT2GT,
481                                                    Int16GetDatum(0));
482
483         /*
484          * Open pg_attribute and begin a scan.  Force heap scan if we haven't
485          * yet built the critical relcache entries (this includes initdb and
486          * startup without a pg_internal.init file).
487          */
488         pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
489         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
490                                                                                    AttributeRelidNumIndex,
491                                                                                    criticalRelcachesBuilt,
492                                                                                    SnapshotNow,
493                                                                                    2, skey);
494
495         /*
496          * add attribute data to relation->rd_att
497          */
498         need = relation->rd_rel->relnatts;
499
500         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
501         {
502                 Form_pg_attribute attp;
503
504                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
505
506                 if (attp->attnum <= 0 ||
507                         attp->attnum > relation->rd_rel->relnatts)
508                         elog(ERROR, "Bogus attribute number %d for %s",
509                                  attp->attnum, RelationGetRelationName(relation));
510
511                 relation->rd_att->attrs[attp->attnum - 1] =
512                         (Form_pg_attribute) MemoryContextAlloc(CacheMemoryContext,
513                                                                                                    ATTRIBUTE_TUPLE_SIZE);
514
515                 memcpy((char *) (relation->rd_att->attrs[attp->attnum - 1]),
516                            (char *) attp,
517                            ATTRIBUTE_TUPLE_SIZE);
518
519                 /* Update constraint/default info */
520                 if (attp->attnotnull)
521                         constr->has_not_null = true;
522
523                 if (attp->atthasdef)
524                 {
525                         if (attrdef == NULL)
526                         {
527                                 attrdef = (AttrDefault *)
528                                         MemoryContextAlloc(CacheMemoryContext,
529                                                                            relation->rd_rel->relnatts *
530                                                                            sizeof(AttrDefault));
531                                 MemSet(attrdef, 0,
532                                            relation->rd_rel->relnatts * sizeof(AttrDefault));
533                         }
534                         attrdef[ndef].adnum = attp->attnum;
535                         attrdef[ndef].adbin = NULL;
536                         ndef++;
537                 }
538                 need--;
539                 if (need == 0)
540                         break;
541         }
542
543         /*
544          * end the scan and close the attribute relation
545          */
546         systable_endscan(pg_attribute_scan);
547         heap_close(pg_attribute_desc, AccessShareLock);
548
549         if (need != 0)
550                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
551                          need, RelationGetRelid(relation));
552
553         /*
554          * The attcacheoff values we read from pg_attribute should all be -1
555          * ("unknown").  Verify this if assert checking is on.  They will be
556          * computed when and if needed during tuple access.
557          */
558 #ifdef USE_ASSERT_CHECKING
559         {
560                 int                     i;
561
562                 for (i = 0; i < relation->rd_rel->relnatts; i++)
563                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
564         }
565 #endif
566
567         /*
568          * However, we can easily set the attcacheoff value for the first
569          * attribute: it must be zero.  This eliminates the need for special
570          * cases for attnum=1 that used to exist in fastgetattr() and
571          * index_getattr().
572          */
573         relation->rd_att->attrs[0]->attcacheoff = 0;
574
575         /*
576          * Set up constraint/default info
577          */
578         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
579         {
580                 relation->rd_att->constr = constr;
581
582                 if (ndef > 0)                   /* DEFAULTs */
583                 {
584                         if (ndef < relation->rd_rel->relnatts)
585                                 constr->defval = (AttrDefault *)
586                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
587                         else
588                                 constr->defval = attrdef;
589                         constr->num_defval = ndef;
590                         AttrDefaultFetch(relation);
591                 }
592                 else
593                         constr->num_defval = 0;
594
595                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
596                 {
597                         constr->num_check = relation->rd_rel->relchecks;
598                         constr->check = (ConstrCheck *)
599                                 MemoryContextAlloc(CacheMemoryContext,
600                                                                 constr->num_check * sizeof(ConstrCheck));
601                         MemSet(constr->check, 0, constr->num_check * sizeof(ConstrCheck));
602                         CheckConstraintFetch(relation);
603                 }
604                 else
605                         constr->num_check = 0;
606         }
607         else
608         {
609                 pfree(constr);
610                 relation->rd_att->constr = NULL;
611         }
612 }
613
614 /*
615  *              RelationBuildRuleLock
616  *
617  *              Form the relation's rewrite rules from information in
618  *              the pg_rewrite system catalog.
619  *
620  * Note: The rule parsetrees are potentially very complex node structures.
621  * To allow these trees to be freed when the relcache entry is flushed,
622  * we make a private memory context to hold the RuleLock information for
623  * each relcache entry that has associated rules.  The context is used
624  * just for rule info, not for any other subsidiary data of the relcache
625  * entry, because that keeps the update logic in RelationClearRelation()
626  * manageable.  The other subsidiary data structures are simple enough
627  * to be easy to free explicitly, anyway.
628  */
629 static void
630 RelationBuildRuleLock(Relation relation)
631 {
632         MemoryContext rulescxt;
633         MemoryContext oldcxt;
634         HeapTuple       rewrite_tuple;
635         Relation        rewrite_desc;
636         TupleDesc       rewrite_tupdesc;
637         SysScanDesc rewrite_scan;
638         ScanKeyData key;
639         RuleLock   *rulelock;
640         int                     numlocks;
641         RewriteRule **rules;
642         int                     maxlocks;
643
644         /*
645          * Make the private context.  Parameters are set on the assumption
646          * that it'll probably not contain much data.
647          */
648         rulescxt = AllocSetContextCreate(CacheMemoryContext,
649                                                                          RelationGetRelationName(relation),
650                                                                          ALLOCSET_SMALL_MINSIZE,
651                                                                          ALLOCSET_SMALL_INITSIZE,
652                                                                          ALLOCSET_SMALL_MAXSIZE);
653         relation->rd_rulescxt = rulescxt;
654
655         /*
656          * allocate an array to hold the rewrite rules (the array is extended
657          * if necessary)
658          */
659         maxlocks = 4;
660         rules = (RewriteRule **)
661                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
662         numlocks = 0;
663
664         /*
665          * form a scan key
666          */
667         ScanKeyEntryInitialize(&key, 0,
668                                                    Anum_pg_rewrite_ev_class,
669                                                    F_OIDEQ,
670                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
671
672         /*
673          * open pg_rewrite and begin a scan
674          *
675          * Note: since we scan the rules using RewriteRelRulenameIndex, we will
676          * be reading the rules in name order, except possibly during
677          * emergency-recovery operations (ie, IsIgnoringSystemIndexes). This
678          * in turn ensures that rules will be fired in name order.
679          */
680         rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
681         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
682         rewrite_scan = systable_beginscan(rewrite_desc,
683                                                                           RewriteRelRulenameIndex,
684                                                                           true, SnapshotNow,
685                                                                           1, &key);
686
687         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
688         {
689                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
690                 bool            isnull;
691                 Datum           ruleaction;
692                 Datum           rule_evqual;
693                 char       *ruleaction_str;
694                 char       *rule_evqual_str;
695                 RewriteRule *rule;
696
697                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
698                                                                                                   sizeof(RewriteRule));
699
700                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
701
702                 rule->event = rewrite_form->ev_type - '0';
703                 rule->attrno = rewrite_form->ev_attr;
704                 rule->isInstead = rewrite_form->is_instead;
705
706                 /* Must use heap_getattr to fetch ev_qual and ev_action */
707
708                 ruleaction = heap_getattr(rewrite_tuple,
709                                                                   Anum_pg_rewrite_ev_action,
710                                                                   rewrite_tupdesc,
711                                                                   &isnull);
712                 Assert(!isnull);
713                 ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
714                                                                                                                          ruleaction));
715                 oldcxt = MemoryContextSwitchTo(rulescxt);
716                 rule->actions = (List *) stringToNode(ruleaction_str);
717                 MemoryContextSwitchTo(oldcxt);
718                 pfree(ruleaction_str);
719
720                 rule_evqual = heap_getattr(rewrite_tuple,
721                                                                    Anum_pg_rewrite_ev_qual,
722                                                                    rewrite_tupdesc,
723                                                                    &isnull);
724                 Assert(!isnull);
725                 rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
726                                                                                                                    rule_evqual));
727                 oldcxt = MemoryContextSwitchTo(rulescxt);
728                 rule->qual = (Node *) stringToNode(rule_evqual_str);
729                 MemoryContextSwitchTo(oldcxt);
730                 pfree(rule_evqual_str);
731
732                 if (numlocks >= maxlocks)
733                 {
734                         maxlocks *= 2;
735                         rules = (RewriteRule **)
736                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
737                 }
738                 rules[numlocks++] = rule;
739         }
740
741         /*
742          * end the scan and close the attribute relation
743          */
744         systable_endscan(rewrite_scan);
745         heap_close(rewrite_desc, AccessShareLock);
746
747         /*
748          * form a RuleLock and insert into relation
749          */
750         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
751         rulelock->numLocks = numlocks;
752         rulelock->rules = rules;
753
754         relation->rd_rules = rulelock;
755 }
756
757 /*
758  *              equalRuleLocks
759  *
760  *              Determine whether two RuleLocks are equivalent
761  *
762  *              Probably this should be in the rules code someplace...
763  */
764 static bool
765 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
766 {
767         int                     i;
768
769         /*
770          * As of 7.3 we assume the rule ordering is repeatable, because
771          * RelationBuildRuleLock should read 'em in a consistent order.  So
772          * just compare corresponding slots.
773          */
774         if (rlock1 != NULL)
775         {
776                 if (rlock2 == NULL)
777                         return false;
778                 if (rlock1->numLocks != rlock2->numLocks)
779                         return false;
780                 for (i = 0; i < rlock1->numLocks; i++)
781                 {
782                         RewriteRule *rule1 = rlock1->rules[i];
783                         RewriteRule *rule2 = rlock2->rules[i];
784
785                         if (rule1->ruleId != rule2->ruleId)
786                                 return false;
787                         if (rule1->event != rule2->event)
788                                 return false;
789                         if (rule1->attrno != rule2->attrno)
790                                 return false;
791                         if (rule1->isInstead != rule2->isInstead)
792                                 return false;
793                         if (!equal(rule1->qual, rule2->qual))
794                                 return false;
795                         if (!equal(rule1->actions, rule2->actions))
796                                 return false;
797                 }
798         }
799         else if (rlock2 != NULL)
800                 return false;
801         return true;
802 }
803
804
805 /* ----------------------------------
806  *              RelationBuildDesc
807  *
808  *              Build a relation descriptor --- either a new one, or by
809  *              recycling the given old relation object.  The latter case
810  *              supports rebuilding a relcache entry without invalidating
811  *              pointers to it.
812  * --------------------------------
813  */
814 static Relation
815 RelationBuildDesc(RelationBuildDescInfo buildinfo,
816                                   Relation oldrelation)
817 {
818         Relation        relation;
819         Oid                     relid;
820         HeapTuple       pg_class_tuple;
821         Form_pg_class relp;
822         MemoryContext oldcxt;
823
824         /*
825          * find the tuple in pg_class corresponding to the given relation id
826          */
827         pg_class_tuple = ScanPgRelation(buildinfo);
828
829         /*
830          * if no such tuple exists, return NULL
831          */
832         if (!HeapTupleIsValid(pg_class_tuple))
833                 return NULL;
834
835         /*
836          * get information from the pg_class_tuple
837          */
838         relid = HeapTupleGetOid(pg_class_tuple);
839         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
840
841         /*
842          * allocate storage for the relation descriptor, and copy
843          * pg_class_tuple to relation->rd_rel.
844          */
845         relation = AllocateRelationDesc(oldrelation, relp);
846
847         /*
848          * now we can free the memory allocated for pg_class_tuple
849          */
850         heap_freetuple(pg_class_tuple);
851
852         /*
853          * initialize the relation's relation id (relation->rd_id)
854          */
855         RelationGetRelid(relation) = relid;
856
857         /*
858          * initialize relation->rd_refcnt
859          */
860         RelationSetReferenceCount(relation, 1);
861
862         /*
863          * normal relations are not nailed into the cache; nor can a
864          * pre-existing relation be new.  It could be temp though.      (Actually,
865          * it could be new too, but it's okay to forget that fact if forced to
866          * flush the entry.)
867          */
868         relation->rd_isnailed = false;
869         relation->rd_isnew = false;
870         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
871
872         /*
873          * initialize the tuple descriptor (relation->rd_att).
874          */
875         RelationBuildTupleDesc(buildinfo, relation);
876
877         /*
878          * Fetch rules and triggers that affect this relation
879          */
880         if (relation->rd_rel->relhasrules)
881                 RelationBuildRuleLock(relation);
882         else
883         {
884                 relation->rd_rules = NULL;
885                 relation->rd_rulescxt = NULL;
886         }
887
888         if (relation->rd_rel->reltriggers > 0)
889                 RelationBuildTriggers(relation);
890         else
891                 relation->trigdesc = NULL;
892
893         /*
894          * if it's an index, initialize index-related information
895          */
896         if (OidIsValid(relation->rd_rel->relam))
897                 RelationInitIndexAccessInfo(relation);
898
899         /*
900          * initialize the relation lock manager information
901          */
902         RelationInitLockInfo(relation);         /* see lmgr.c */
903
904         if (relation->rd_rel->relisshared)
905                 relation->rd_node.tblNode = InvalidOid;
906         else
907                 relation->rd_node.tblNode = MyDatabaseId;
908         relation->rd_node.relNode = relation->rd_rel->relfilenode;
909
910         /* make sure relation is marked as having no open file yet */
911         relation->rd_fd = -1;
912
913         /*
914          * Insert newly created relation into relcache hash tables.
915          */
916         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
917         RelationCacheInsert(relation);
918         MemoryContextSwitchTo(oldcxt);
919
920         /*
921          * If it's a temp rel, RelationGetNumberOfBlocks will assume that
922          * rd_nblocks is correct.  Must forcibly update the block count when
923          * creating the relcache entry.  But if we are doing a rebuild, don't
924          * do this yet; leave it to RelationClearRelation to do at the end.
925          * (Otherwise, an elog in RelationUpdateNumberOfBlocks would leave us
926          * with inconsistent relcache state.)
927          */
928         if (relation->rd_istemp && oldrelation == NULL)
929                 RelationUpdateNumberOfBlocks(relation);
930
931         return relation;
932 }
933
934 /*
935  * Initialize index-access-method support data for an index relation
936  */
937 void
938 RelationInitIndexAccessInfo(Relation relation)
939 {
940         HeapTuple       tuple;
941         Size            iformsize;
942         Form_pg_index iform;
943         Form_pg_am      aform;
944         MemoryContext indexcxt;
945         IndexStrategy strategy;
946         Oid                *operator;
947         RegProcedure *support;
948         FmgrInfo   *supportinfo;
949         int                     natts;
950         uint16          amstrategies;
951         uint16          amsupport;
952
953         /*
954          * Make a copy of the pg_index entry for the index.  Note that this is
955          * a variable-length tuple.
956          */
957         tuple = SearchSysCache(INDEXRELID,
958                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
959                                                    0, 0, 0);
960         if (!HeapTupleIsValid(tuple))
961                 elog(ERROR, "RelationInitIndexAccessInfo: no pg_index entry for index %u",
962                          RelationGetRelid(relation));
963         iformsize = tuple->t_len - tuple->t_data->t_hoff;
964         iform = (Form_pg_index) MemoryContextAlloc(CacheMemoryContext, iformsize);
965         memcpy(iform, GETSTRUCT(tuple), iformsize);
966         ReleaseSysCache(tuple);
967         relation->rd_index = iform;
968
969         /*
970          * Make a copy of the pg_am entry for the index's access method
971          */
972         tuple = SearchSysCache(AMOID,
973                                                    ObjectIdGetDatum(relation->rd_rel->relam),
974                                                    0, 0, 0);
975         if (!HeapTupleIsValid(tuple))
976                 elog(ERROR, "RelationInitIndexAccessInfo: cache lookup failed for AM %u",
977                          relation->rd_rel->relam);
978         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
979         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
980         ReleaseSysCache(tuple);
981         relation->rd_am = aform;
982
983         natts = relation->rd_rel->relnatts;
984         amstrategies = aform->amstrategies;
985         amsupport = aform->amsupport;
986
987         /*
988          * Make the private context to hold index access info.  The reason we
989          * need a context, and not just a couple of pallocs, is so that we
990          * won't leak any subsidiary info attached to fmgr lookup records.
991          *
992          * Context parameters are set on the assumption that it'll probably not
993          * contain much data.
994          */
995         indexcxt = AllocSetContextCreate(CacheMemoryContext,
996                                                                          RelationGetRelationName(relation),
997                                                                          ALLOCSET_SMALL_MINSIZE,
998                                                                          ALLOCSET_SMALL_INITSIZE,
999                                                                          ALLOCSET_SMALL_MAXSIZE);
1000         relation->rd_indexcxt = indexcxt;
1001
1002         /*
1003          * Allocate arrays to hold data
1004          */
1005         if (amstrategies > 0)
1006         {
1007                 int                     noperators = natts * amstrategies;
1008                 Size            stratSize;
1009
1010                 stratSize = AttributeNumberGetIndexStrategySize(natts, amstrategies);
1011                 strategy = (IndexStrategy) MemoryContextAlloc(indexcxt, stratSize);
1012                 MemSet(strategy, 0, stratSize);
1013                 operator = (Oid *)
1014                         MemoryContextAlloc(indexcxt, noperators * sizeof(Oid));
1015                 MemSet(operator, 0, noperators * sizeof(Oid));
1016         }
1017         else
1018         {
1019                 strategy = NULL;
1020                 operator = NULL;
1021         }
1022
1023         if (amsupport > 0)
1024         {
1025                 int                     nsupport = natts * amsupport;
1026
1027                 support = (RegProcedure *)
1028                         MemoryContextAlloc(indexcxt, nsupport * sizeof(RegProcedure));
1029                 MemSet(support, 0, nsupport * sizeof(RegProcedure));
1030                 supportinfo = (FmgrInfo *)
1031                         MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
1032                 MemSet(supportinfo, 0, nsupport * sizeof(FmgrInfo));
1033         }
1034         else
1035         {
1036                 support = NULL;
1037                 supportinfo = NULL;
1038         }
1039
1040         relation->rd_istrat = strategy;
1041         relation->rd_operator = operator;
1042         relation->rd_support = support;
1043         relation->rd_supportinfo = supportinfo;
1044
1045         /*
1046          * Fill the strategy map and the support RegProcedure arrays.
1047          * (supportinfo is left as zeroes, and is filled on-the-fly when used)
1048          */
1049         IndexSupportInitialize(iform,
1050                                                    strategy, operator, support,
1051                                                    amstrategies, amsupport, natts);
1052 }
1053
1054 /*
1055  * IndexSupportInitialize
1056  *              Initializes an index strategy and associated support procedures,
1057  *              given the index's pg_index tuple.
1058  *
1059  * Data is returned into *indexStrategy, *indexOperator, and *indexSupport,
1060  * all of which are objects allocated by the caller.
1061  *
1062  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1063  * maxAttributeNumber, since these indicate the size of the arrays
1064  * it has allocated --- but in practice these numbers must always match
1065  * those obtainable from the system catalog entries for the index and
1066  * access method.
1067  */
1068 static void
1069 IndexSupportInitialize(Form_pg_index iform,
1070                                            IndexStrategy indexStrategy,
1071                                            Oid *indexOperator,
1072                                            RegProcedure *indexSupport,
1073                                            StrategyNumber maxStrategyNumber,
1074                                            StrategyNumber maxSupportNumber,
1075                                            AttrNumber maxAttributeNumber)
1076 {
1077         int                     attIndex;
1078
1079         maxStrategyNumber = AMStrategies(maxStrategyNumber);
1080
1081         /*
1082          * XXX note that the following assumes the INDEX tuple is well formed
1083          * and that the *key and *class are 0 terminated.
1084          */
1085         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1086         {
1087                 OpClassCacheEnt *opcentry;
1088
1089                 if (iform->indkey[attIndex] == InvalidAttrNumber ||
1090                         !OidIsValid(iform->indclass[attIndex]))
1091                         elog(ERROR, "IndexSupportInitialize: bogus pg_index tuple");
1092
1093                 /* look up the info for this opclass, using a cache */
1094                 opcentry = LookupOpclassInfo(iform->indclass[attIndex],
1095                                                                          maxStrategyNumber,
1096                                                                          maxSupportNumber);
1097
1098                 /* load the strategy information for the index operators */
1099                 if (maxStrategyNumber > 0)
1100                 {
1101                         StrategyMap map;
1102                         Oid                *opers;
1103                         StrategyNumber strategy;
1104
1105                         map = IndexStrategyGetStrategyMap(indexStrategy,
1106                                                                                           maxStrategyNumber,
1107                                                                                           attIndex + 1);
1108                         opers = &indexOperator[attIndex * maxStrategyNumber];
1109
1110                         for (strategy = 0; strategy < maxStrategyNumber; strategy++)
1111                         {
1112                                 ScanKey         mapentry;
1113
1114                                 mapentry = StrategyMapGetScanKeyEntry(map, strategy + 1);
1115                                 if (RegProcedureIsValid(opcentry->operatorProcs[strategy]))
1116                                 {
1117                                         MemSet(mapentry, 0, sizeof(*mapentry));
1118                                         mapentry->sk_flags = 0;
1119                                         mapentry->sk_procedure = opcentry->operatorProcs[strategy];
1120
1121                                         /*
1122                                          * Mark mapentry->sk_func invalid, until and unless
1123                                          * someone sets it up.
1124                                          */
1125                                         mapentry->sk_func.fn_oid = InvalidOid;
1126                                 }
1127                                 else
1128                                         ScanKeyEntrySetIllegal(mapentry);
1129                                 opers[strategy] = opcentry->operatorOids[strategy];
1130                         }
1131                 }
1132
1133                 /* if support routines exist for this access method, load them */
1134                 if (maxSupportNumber > 0)
1135                 {
1136                         RegProcedure *procs;
1137                         StrategyNumber support;
1138
1139                         procs = &indexSupport[attIndex * maxSupportNumber];
1140
1141                         for (support = 0; support < maxSupportNumber; ++support)
1142                                 procs[support] = opcentry->supportProcs[support];
1143                 }
1144         }
1145 }
1146
1147 /*
1148  * LookupOpclassInfo
1149  *
1150  * This routine maintains a per-opclass cache of the information needed
1151  * by IndexSupportInitialize().  This is more efficient than relying on
1152  * the catalog cache, because we can load all the info about a particular
1153  * opclass in a single indexscan of pg_amproc or pg_amop.
1154  *
1155  * The information from pg_am about expected range of strategy and support
1156  * numbers is passed in, rather than being looked up, mainly because the
1157  * caller will have it already.
1158  *
1159  * XXX There isn't any provision for flushing the cache.  However, there
1160  * isn't any provision for flushing relcache entries when opclass info
1161  * changes, either :-(
1162  */
1163 static OpClassCacheEnt *
1164 LookupOpclassInfo(Oid operatorClassOid,
1165                                   StrategyNumber numStrats,
1166                                   StrategyNumber numSupport)
1167 {
1168         OpClassCacheEnt *opcentry;
1169         bool            found;
1170         Relation        pg_amop_desc;
1171         Relation        pg_amproc_desc;
1172         SysScanDesc pg_amop_scan;
1173         SysScanDesc pg_amproc_scan;
1174         ScanKeyData key;
1175         HeapTuple       htup;
1176         bool            indexOK;
1177
1178         if (OpClassCache == NULL)
1179         {
1180                 /* First time through: initialize the opclass cache */
1181                 HASHCTL         ctl;
1182
1183                 if (!CacheMemoryContext)
1184                         CreateCacheMemoryContext();
1185
1186                 MemSet(&ctl, 0, sizeof(ctl));
1187                 ctl.keysize = sizeof(Oid);
1188                 ctl.entrysize = sizeof(OpClassCacheEnt);
1189                 ctl.hash = tag_hash;
1190                 OpClassCache = hash_create("Operator class cache", 64,
1191                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1192         }
1193
1194         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1195                                                                                            (void *) &operatorClassOid,
1196                                                                                            HASH_ENTER, &found);
1197         if (opcentry == NULL)
1198                 elog(ERROR, "out of memory for operator class cache");
1199
1200         if (found && opcentry->valid)
1201         {
1202                 /* Already made an entry for it */
1203                 Assert(numStrats == opcentry->numStrats);
1204                 Assert(numSupport == opcentry->numSupport);
1205                 return opcentry;
1206         }
1207
1208         /* Need to fill in new entry */
1209         opcentry->valid = false;        /* until known OK */
1210         opcentry->numStrats = numStrats;
1211         opcentry->numSupport = numSupport;
1212
1213         if (numStrats > 0)
1214         {
1215                 opcentry->operatorOids = (Oid *)
1216                         MemoryContextAlloc(CacheMemoryContext,
1217                                                            numStrats * sizeof(Oid));
1218                 MemSet(opcentry->operatorOids, 0, numStrats * sizeof(Oid));
1219                 opcentry->operatorProcs = (RegProcedure *)
1220                         MemoryContextAlloc(CacheMemoryContext,
1221                                                            numStrats * sizeof(RegProcedure));
1222                 MemSet(opcentry->operatorProcs, 0, numStrats * sizeof(RegProcedure));
1223         }
1224         else
1225         {
1226                 opcentry->operatorOids = NULL;
1227                 opcentry->operatorProcs = NULL;
1228         }
1229
1230         if (numSupport > 0)
1231         {
1232                 opcentry->supportProcs = (RegProcedure *)
1233                         MemoryContextAlloc(CacheMemoryContext,
1234                                                            numSupport * sizeof(RegProcedure));
1235                 MemSet(opcentry->supportProcs, 0, numSupport * sizeof(RegProcedure));
1236         }
1237         else
1238                 opcentry->supportProcs = NULL;
1239
1240         /*
1241          * To avoid infinite recursion during startup, force a heap scan if
1242          * we're looking up info for the opclasses used by the indexes we
1243          * would like to reference here.
1244          */
1245         indexOK = criticalRelcachesBuilt ||
1246                 (operatorClassOid != OID_BTREE_OPS_OID &&
1247                  operatorClassOid != INT2_BTREE_OPS_OID);
1248
1249         /*
1250          * Scan pg_amop to obtain operators for the opclass
1251          */
1252         if (numStrats > 0)
1253         {
1254                 ScanKeyEntryInitialize(&key, 0,
1255                                                            Anum_pg_amop_amopclaid,
1256                                                            F_OIDEQ,
1257                                                            ObjectIdGetDatum(operatorClassOid));
1258                 pg_amop_desc = heap_openr(AccessMethodOperatorRelationName,
1259                                                                   AccessShareLock);
1260                 pg_amop_scan = systable_beginscan(pg_amop_desc,
1261                                                                                   AccessMethodStrategyIndex,
1262                                                                                   indexOK,
1263                                                                                   SnapshotNow,
1264                                                                                   1, &key);
1265
1266                 while (HeapTupleIsValid(htup = systable_getnext(pg_amop_scan)))
1267                 {
1268                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1269
1270                         if (amopform->amopstrategy <= 0 ||
1271                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1272                                 elog(ERROR, "Bogus amopstrategy number %d for opclass %u",
1273                                          amopform->amopstrategy, operatorClassOid);
1274                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1275                                 amopform->amopopr;
1276                         opcentry->operatorProcs[amopform->amopstrategy - 1] =
1277                                 get_opcode(amopform->amopopr);
1278                 }
1279
1280                 systable_endscan(pg_amop_scan);
1281                 heap_close(pg_amop_desc, AccessShareLock);
1282         }
1283
1284         /*
1285          * Scan pg_amproc to obtain support procs for the opclass
1286          */
1287         if (numSupport > 0)
1288         {
1289                 ScanKeyEntryInitialize(&key, 0,
1290                                                            Anum_pg_amproc_amopclaid,
1291                                                            F_OIDEQ,
1292                                                            ObjectIdGetDatum(operatorClassOid));
1293                 pg_amproc_desc = heap_openr(AccessMethodProcedureRelationName,
1294                                                                         AccessShareLock);
1295                 pg_amproc_scan = systable_beginscan(pg_amproc_desc,
1296                                                                                         AccessMethodProcedureIndex,
1297                                                                                         indexOK,
1298                                                                                         SnapshotNow,
1299                                                                                         1, &key);
1300
1301                 while (HeapTupleIsValid(htup = systable_getnext(pg_amproc_scan)))
1302                 {
1303                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1304
1305                         if (amprocform->amprocnum <= 0 ||
1306                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1307                                 elog(ERROR, "Bogus amproc number %d for opclass %u",
1308                                          amprocform->amprocnum, operatorClassOid);
1309
1310                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1311                                 amprocform->amproc;
1312                 }
1313
1314                 systable_endscan(pg_amproc_scan);
1315                 heap_close(pg_amproc_desc, AccessShareLock);
1316         }
1317
1318         opcentry->valid = true;
1319         return opcentry;
1320 }
1321
1322
1323 /*
1324  *              formrdesc
1325  *
1326  *              This is a special cut-down version of RelationBuildDesc()
1327  *              used by RelationCacheInitialize() in initializing the relcache.
1328  *              The relation descriptor is built just from the supplied parameters,
1329  *              without actually looking at any system table entries.  We cheat
1330  *              quite a lot since we only need to work for a few basic system
1331  *              catalogs.
1332  *
1333  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1334  * and pg_type (see RelationCacheInitialize).
1335  *
1336  * Note that these catalogs can't have constraints (except attnotnull),
1337  * default values, rules, or triggers, since we don't cope with any of that.
1338  *
1339  * NOTE: we assume we are already switched into CacheMemoryContext.
1340  */
1341 static void
1342 formrdesc(const char *relationName,
1343                   int natts,
1344                   FormData_pg_attribute *att)
1345 {
1346         Relation        relation;
1347         int                     i;
1348         bool            has_not_null;
1349
1350         /*
1351          * allocate new relation desc
1352          * clear all fields of reldesc
1353          */
1354         relation = (Relation) palloc0(sizeof(RelationData));
1355         relation->rd_targblock = InvalidBlockNumber;
1356
1357         /* make sure relation is marked as having no open file yet */
1358         relation->rd_fd = -1;
1359
1360         /*
1361          * initialize reference count
1362          */
1363         RelationSetReferenceCount(relation, 1);
1364
1365         /*
1366          * all entries built with this routine are nailed-in-cache; none are
1367          * for new or temp relations.
1368          */
1369         relation->rd_isnailed = true;
1370         relation->rd_isnew = false;
1371         relation->rd_istemp = false;
1372
1373         /*
1374          * initialize relation tuple form
1375          *
1376          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1377          * get us launched.  RelationCacheInitializePhase2() will read the
1378          * real data from pg_class and replace what we've done here.
1379          */
1380         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1381
1382         namestrcpy(&relation->rd_rel->relname, relationName);
1383         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1384
1385         /*
1386          * It's important to distinguish between shared and non-shared
1387          * relations, even at bootstrap time, to make sure we know where they
1388          * are stored.  At present, all relations that formrdesc is used for
1389          * are not shared.
1390          */
1391         relation->rd_rel->relisshared = false;
1392
1393         relation->rd_rel->relpages = 1;
1394         relation->rd_rel->reltuples = 1;
1395         relation->rd_rel->relkind = RELKIND_RELATION;
1396         relation->rd_rel->relhasoids = true;
1397         relation->rd_rel->relnatts = (int16) natts;
1398
1399         /*
1400          * initialize attribute tuple form
1401          *
1402          * Unlike the case with the relation tuple, this data had better be right
1403          * because it will never be replaced.  The input values must be
1404          * correctly defined by macros in src/include/catalog/ headers.
1405          */
1406         relation->rd_att = CreateTemplateTupleDesc(natts,
1407                                                                                    relation->rd_rel->relhasoids);
1408
1409         /*
1410          * initialize tuple desc info
1411          */
1412         has_not_null = false;
1413         for (i = 0; i < natts; i++)
1414         {
1415                 relation->rd_att->attrs[i] = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
1416                 memcpy((char *) relation->rd_att->attrs[i],
1417                            (char *) &att[i],
1418                            ATTRIBUTE_TUPLE_SIZE);
1419                 has_not_null |= att[i].attnotnull;
1420                 /* make sure attcacheoff is valid */
1421                 relation->rd_att->attrs[i]->attcacheoff = -1;
1422         }
1423
1424         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1425         relation->rd_att->attrs[0]->attcacheoff = 0;
1426
1427         /* mark not-null status */
1428         if (has_not_null)
1429         {
1430                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1431
1432                 constr->has_not_null = true;
1433                 relation->rd_att->constr = constr;
1434         }
1435
1436         /*
1437          * initialize relation id from info in att array (my, this is ugly)
1438          */
1439         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1440
1441         /*
1442          * initialize the relation's lock manager and RelFileNode information
1443          */
1444         RelationInitLockInfo(relation);         /* see lmgr.c */
1445
1446         if (relation->rd_rel->relisshared)
1447                 relation->rd_node.tblNode = InvalidOid;
1448         else
1449                 relation->rd_node.tblNode = MyDatabaseId;
1450         relation->rd_node.relNode =
1451                 relation->rd_rel->relfilenode = RelationGetRelid(relation);
1452
1453         /*
1454          * initialize the rel-has-index flag, using hardwired knowledge
1455          */
1456         relation->rd_rel->relhasindex = false;
1457
1458         /* In bootstrap mode, we have no indexes */
1459         if (!IsBootstrapProcessingMode())
1460         {
1461                 /* Otherwise, all the rels formrdesc is used for have indexes */
1462                 relation->rd_rel->relhasindex = true;
1463         }
1464
1465         /*
1466          * add new reldesc to relcache
1467          */
1468         RelationCacheInsert(relation);
1469 }
1470
1471
1472 /* ----------------------------------------------------------------
1473  *                               Relation Descriptor Lookup Interface
1474  * ----------------------------------------------------------------
1475  */
1476
1477 /*
1478  *              RelationIdCacheGetRelation
1479  *
1480  *              Lookup an existing reldesc by OID.
1481  *
1482  *              Only try to get the reldesc by looking in the cache,
1483  *              do not go to the disk.
1484  *
1485  *              NB: relation ref count is incremented if successful.
1486  *              Caller should eventually decrement count.  (Usually,
1487  *              that happens by calling RelationClose().)
1488  */
1489 Relation
1490 RelationIdCacheGetRelation(Oid relationId)
1491 {
1492         Relation        rd;
1493
1494         RelationIdCacheLookup(relationId, rd);
1495
1496         if (RelationIsValid(rd))
1497                 RelationIncrementReferenceCount(rd);
1498
1499         return rd;
1500 }
1501
1502 /*
1503  *              RelationSysNameCacheGetRelation
1504  *
1505  *              As above, but lookup by name; only works for system catalogs.
1506  */
1507 static Relation
1508 RelationSysNameCacheGetRelation(const char *relationName)
1509 {
1510         Relation        rd;
1511         NameData        name;
1512
1513         /*
1514          * make sure that the name key used for hash lookup is properly
1515          * null-padded
1516          */
1517         namestrcpy(&name, relationName);
1518         RelationSysNameCacheLookup(NameStr(name), rd);
1519
1520         if (RelationIsValid(rd))
1521                 RelationIncrementReferenceCount(rd);
1522
1523         return rd;
1524 }
1525
1526 Relation
1527 RelationNodeCacheGetRelation(RelFileNode rnode)
1528 {
1529         Relation        rd;
1530
1531         RelationNodeCacheLookup(rnode, rd);
1532
1533         if (RelationIsValid(rd))
1534                 RelationIncrementReferenceCount(rd);
1535
1536         return rd;
1537 }
1538
1539 /*
1540  *              RelationIdGetRelation
1541  *
1542  *              Lookup a reldesc by OID; make one if not already in cache.
1543  *
1544  *              NB: relation ref count is incremented, or set to 1 if new entry.
1545  *              Caller should eventually decrement count.  (Usually,
1546  *              that happens by calling RelationClose().)
1547  */
1548 Relation
1549 RelationIdGetRelation(Oid relationId)
1550 {
1551         Relation        rd;
1552         RelationBuildDescInfo buildinfo;
1553
1554         /*
1555          * first try and get a reldesc from the cache
1556          */
1557         rd = RelationIdCacheGetRelation(relationId);
1558         if (RelationIsValid(rd))
1559                 return rd;
1560
1561         /*
1562          * no reldesc in the cache, so have RelationBuildDesc() build one and
1563          * add it.
1564          */
1565         buildinfo.infotype = INFO_RELID;
1566         buildinfo.i.info_id = relationId;
1567
1568         rd = RelationBuildDesc(buildinfo, NULL);
1569         return rd;
1570 }
1571
1572 /*
1573  *              RelationSysNameGetRelation
1574  *
1575  *              As above, but lookup by name; only works for system catalogs.
1576  */
1577 Relation
1578 RelationSysNameGetRelation(const char *relationName)
1579 {
1580         Relation        rd;
1581         RelationBuildDescInfo buildinfo;
1582
1583         /*
1584          * first try and get a reldesc from the cache
1585          */
1586         rd = RelationSysNameCacheGetRelation(relationName);
1587         if (RelationIsValid(rd))
1588                 return rd;
1589
1590         /*
1591          * no reldesc in the cache, so have RelationBuildDesc() build one and
1592          * add it.
1593          */
1594         buildinfo.infotype = INFO_RELNAME;
1595         buildinfo.i.info_name = (char *) relationName;
1596
1597         rd = RelationBuildDesc(buildinfo, NULL);
1598         return rd;
1599 }
1600
1601 /* ----------------------------------------------------------------
1602  *                              cache invalidation support routines
1603  * ----------------------------------------------------------------
1604  */
1605
1606 /*
1607  * RelationClose - close an open relation
1608  *
1609  *      Actually, we just decrement the refcount.
1610  *
1611  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1612  *      will be freed as soon as their refcount goes to zero.  In combination
1613  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1614  *      to catch references to already-released relcache entries.  It slows
1615  *      things down quite a bit, however.
1616  */
1617 void
1618 RelationClose(Relation relation)
1619 {
1620         /* Note: no locking manipulations needed */
1621         RelationDecrementReferenceCount(relation);
1622
1623 #ifdef RELCACHE_FORCE_RELEASE
1624         if (RelationHasReferenceCountZero(relation) &&
1625                 !relation->rd_isnew)
1626                 RelationClearRelation(relation, false);
1627 #endif
1628 }
1629
1630 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1631 /*
1632  * RelationReloadClassinfo
1633  *
1634  *      This function is especially for nailed relations.
1635  *      relhasindex/relfilenode could be changed even for
1636  *      nailed relations.
1637  */
1638 static void
1639 RelationReloadClassinfo(Relation relation)
1640 {
1641         RelationBuildDescInfo buildinfo;
1642         HeapTuple       pg_class_tuple;
1643         Form_pg_class relp;
1644
1645         if (!relation->rd_rel)
1646                 return;
1647         buildinfo.infotype = INFO_RELID;
1648         buildinfo.i.info_id = relation->rd_id;
1649         pg_class_tuple = ScanPgRelation(buildinfo);
1650         if (!HeapTupleIsValid(pg_class_tuple))
1651         {
1652                 elog(ERROR, "RelationReloadClassinfo system relation id=%d doesn't exist", relation->rd_id);
1653                 return;
1654         }
1655         RelationCacheDelete(relation);
1656         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1657         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
1658         relation->rd_node.relNode = relp->relfilenode;
1659         RelationCacheInsert(relation);
1660         heap_freetuple(pg_class_tuple);
1661
1662         return;
1663 }
1664 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1665
1666 /*
1667  * RelationClearRelation
1668  *
1669  *       Physically blow away a relation cache entry, or reset it and rebuild
1670  *       it from scratch (that is, from catalog entries).  The latter path is
1671  *       usually used when we are notified of a change to an open relation
1672  *       (one with refcount > 0).  However, this routine just does whichever
1673  *       it's told to do; callers must determine which they want.
1674  */
1675 static void
1676 RelationClearRelation(Relation relation, bool rebuild)
1677 {
1678         MemoryContext oldcxt;
1679
1680         /*
1681          * Make sure smgr and lower levels close the relation's files, if they
1682          * weren't closed already.  If the relation is not getting deleted,
1683          * the next smgr access should reopen the files automatically.  This
1684          * ensures that the low-level file access state is updated after, say,
1685          * a vacuum truncation.
1686          */
1687         if (relation->rd_fd >= 0)
1688         {
1689                 smgrclose(DEFAULT_SMGR, relation);
1690                 relation->rd_fd = -1;
1691         }
1692
1693         /*
1694          * Never, never ever blow away a nailed-in system relation, because
1695          * we'd be unable to recover.  However, we must update rd_nblocks
1696          * and reset rd_targblock, in case we got called because of a relation
1697          * cache flush that was triggered by VACUUM.
1698          */
1699         if (relation->rd_isnailed)
1700         {
1701                 relation->rd_targblock = InvalidBlockNumber;
1702                 RelationUpdateNumberOfBlocks(relation);
1703 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1704                 RelationReloadClassinfo(relation);
1705 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1706                 return;
1707         }
1708
1709         /*
1710          * Remove relation from hash tables
1711          *
1712          * Note: we might be reinserting it momentarily, but we must not have it
1713          * visible in the hash tables until it's valid again, so don't try to
1714          * optimize this away...
1715          */
1716         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1717         RelationCacheDelete(relation);
1718         MemoryContextSwitchTo(oldcxt);
1719
1720         /* Clear out catcache's entries for this relation */
1721         CatalogCacheFlushRelation(RelationGetRelid(relation));
1722
1723         /*
1724          * Free all the subsidiary data structures of the relcache entry. We
1725          * cannot free rd_att if we are trying to rebuild the entry, however,
1726          * because pointers to it may be cached in various places. The rule
1727          * manager might also have pointers into the rewrite rules. So to begin
1728          * with, we can only get rid of these fields:
1729          */
1730         FreeTriggerDesc(relation->trigdesc);
1731         if (relation->rd_index)
1732                 pfree(relation->rd_index);
1733         if (relation->rd_am)
1734                 pfree(relation->rd_am);
1735         if (relation->rd_rel)
1736                 pfree(relation->rd_rel);
1737         freeList(relation->rd_indexlist);
1738         if (relation->rd_indexcxt)
1739                 MemoryContextDelete(relation->rd_indexcxt);
1740
1741         /*
1742          * If we're really done with the relcache entry, blow it away. But if
1743          * someone is still using it, reconstruct the whole deal without
1744          * moving the physical RelationData record (so that the someone's
1745          * pointer is still valid).
1746          */
1747         if (!rebuild)
1748         {
1749                 /* ok to zap remaining substructure */
1750                 FreeTupleDesc(relation->rd_att);
1751                 if (relation->rd_rulescxt)
1752                         MemoryContextDelete(relation->rd_rulescxt);
1753                 pfree(relation);
1754         }
1755         else
1756         {
1757                 /*
1758                  * When rebuilding an open relcache entry, must preserve ref count
1759                  * and rd_isnew flag.  Also attempt to preserve the tupledesc and
1760                  * rewrite-rule substructures in place.
1761                  */
1762                 int                     old_refcnt = relation->rd_refcnt;
1763                 bool            old_isnew = relation->rd_isnew;
1764                 TupleDesc       old_att = relation->rd_att;
1765                 RuleLock   *old_rules = relation->rd_rules;
1766                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1767                 RelationBuildDescInfo buildinfo;
1768
1769                 buildinfo.infotype = INFO_RELID;
1770                 buildinfo.i.info_id = RelationGetRelid(relation);
1771
1772                 if (RelationBuildDesc(buildinfo, relation) != relation)
1773                 {
1774                         /* Should only get here if relation was deleted */
1775                         FreeTupleDesc(old_att);
1776                         if (old_rulescxt)
1777                                 MemoryContextDelete(old_rulescxt);
1778                         pfree(relation);
1779                         elog(ERROR, "RelationClearRelation: relation %u deleted while still in use",
1780                                  buildinfo.i.info_id);
1781                 }
1782                 RelationSetReferenceCount(relation, old_refcnt);
1783                 relation->rd_isnew = old_isnew;
1784                 if (equalTupleDescs(old_att, relation->rd_att))
1785                 {
1786                         FreeTupleDesc(relation->rd_att);
1787                         relation->rd_att = old_att;
1788                 }
1789                 else
1790                         FreeTupleDesc(old_att);
1791                 if (equalRuleLocks(old_rules, relation->rd_rules))
1792                 {
1793                         if (relation->rd_rulescxt)
1794                                 MemoryContextDelete(relation->rd_rulescxt);
1795                         relation->rd_rules = old_rules;
1796                         relation->rd_rulescxt = old_rulescxt;
1797                 }
1798                 else
1799                 {
1800                         if (old_rulescxt)
1801                                 MemoryContextDelete(old_rulescxt);
1802                 }
1803
1804                 /*
1805                  * Update rd_nblocks.  This is kind of expensive, but I think we
1806                  * must do it in case relation has been truncated... we definitely
1807                  * must do it if the rel is new or temp, since
1808                  * RelationGetNumberOfBlocks will subsequently assume that the
1809                  * block count is correct.
1810                  */
1811                 RelationUpdateNumberOfBlocks(relation);
1812         }
1813 }
1814
1815 /*
1816  * RelationFlushRelation
1817  *
1818  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1819  */
1820 static void
1821 RelationFlushRelation(Relation relation)
1822 {
1823         bool            rebuild;
1824
1825         if (relation->rd_isnew)
1826         {
1827                 /*
1828                  * New relcache entries are always rebuilt, not flushed; else we'd
1829                  * forget the "new" status of the relation, which is a useful
1830                  * optimization to have.
1831                  */
1832                 rebuild = true;
1833         }
1834         else
1835         {
1836                 /*
1837                  * Pre-existing rels can be dropped from the relcache if not open.
1838                  */
1839                 rebuild = !RelationHasReferenceCountZero(relation);
1840         }
1841
1842         RelationClearRelation(relation, rebuild);
1843 }
1844
1845 /*
1846  * RelationForgetRelation - unconditionally remove a relcache entry
1847  *
1848  *                 External interface for destroying a relcache entry when we
1849  *                 drop the relation.
1850  */
1851 void
1852 RelationForgetRelation(Oid rid)
1853 {
1854         Relation        relation;
1855
1856         RelationIdCacheLookup(rid, relation);
1857
1858         if (!PointerIsValid(relation))
1859                 return;                                 /* not in cache, nothing to do */
1860
1861         if (!RelationHasReferenceCountZero(relation))
1862                 elog(ERROR, "RelationForgetRelation: relation %u is still open", rid);
1863
1864         /* Unconditionally destroy the relcache entry */
1865         RelationClearRelation(relation, false);
1866 }
1867
1868 /*
1869  *              RelationIdInvalidateRelationCacheByRelationId
1870  *
1871  *              This routine is invoked for SI cache flush messages.
1872  *
1873  *              We used to skip local relations, on the grounds that they could
1874  *              not be targets of cross-backend SI update messages; but it seems
1875  *              safer to process them, so that our *own* SI update messages will
1876  *              have the same effects during CommandCounterIncrement for both
1877  *              local and nonlocal relations.
1878  */
1879 void
1880 RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
1881 {
1882         Relation        relation;
1883
1884         RelationIdCacheLookup(relationId, relation);
1885
1886         if (PointerIsValid(relation))
1887         {
1888                 relcacheInvalsReceived++;
1889                 RelationFlushRelation(relation);
1890         }
1891 }
1892
1893 /*
1894  * RelationCacheInvalidate
1895  *       Blow away cached relation descriptors that have zero reference counts,
1896  *       and rebuild those with positive reference counts.
1897  *
1898  *       This is currently used only to recover from SI message buffer overflow,
1899  *       so we do not touch new-in-transaction relations; they cannot be targets
1900  *       of cross-backend SI updates (and our own updates now go through a
1901  *       separate linked list that isn't limited by the SI message buffer size).
1902  *
1903  *       We do this in two phases: the first pass deletes deletable items, and
1904  *       the second one rebuilds the rebuildable items.  This is essential for
1905  *       safety, because hash_seq_search only copes with concurrent deletion of
1906  *       the element it is currently visiting.  If a second SI overflow were to
1907  *       occur while we are walking the table, resulting in recursive entry to
1908  *       this routine, we could crash because the inner invocation blows away
1909  *       the entry next to be visited by the outer scan.  But this way is OK,
1910  *       because (a) during the first pass we won't process any more SI messages,
1911  *       so hash_seq_search will complete safely; (b) during the second pass we
1912  *       only hold onto pointers to nondeletable entries.
1913  */
1914 void
1915 RelationCacheInvalidate(void)
1916 {
1917         HASH_SEQ_STATUS status;
1918         RelIdCacheEnt *idhentry;
1919         Relation        relation;
1920         List       *rebuildList = NIL;
1921         List       *l;
1922
1923         /* Phase 1 */
1924         hash_seq_init(&status, RelationIdCache);
1925
1926         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1927         {
1928                 relation = idhentry->reldesc;
1929
1930                 /* Ignore new relations, since they are never SI targets */
1931                 if (relation->rd_isnew)
1932                         continue;
1933
1934                 relcacheInvalsReceived++;
1935
1936                 if (RelationHasReferenceCountZero(relation))
1937                 {
1938                         /* Delete this entry immediately */
1939                         RelationClearRelation(relation, false);
1940                 }
1941                 else
1942                 {
1943                         /* Add entry to list of stuff to rebuild in second pass */
1944                         rebuildList = lcons(relation, rebuildList);
1945                 }
1946         }
1947
1948         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1949         foreach(l, rebuildList)
1950         {
1951                 relation = (Relation) lfirst(l);
1952                 RelationClearRelation(relation, true);
1953         }
1954         freeList(rebuildList);
1955 }
1956
1957 /*
1958  * AtEOXact_RelationCache
1959  *
1960  *      Clean up the relcache at transaction commit or abort.
1961  */
1962 void
1963 AtEOXact_RelationCache(bool commit)
1964 {
1965         HASH_SEQ_STATUS status;
1966         RelIdCacheEnt *idhentry;
1967
1968         hash_seq_init(&status, RelationIdCache);
1969
1970         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1971         {
1972                 Relation        relation = idhentry->reldesc;
1973                 int                     expected_refcnt;
1974
1975                 /*
1976                  * Is it a relation created in the current transaction?
1977                  *
1978                  * During commit, reset the flag to false, since we are now out of
1979                  * the creating transaction.  During abort, simply delete the
1980                  * relcache entry --- it isn't interesting any longer.  (NOTE: if
1981                  * we have forgotten the isnew state of a new relation due to a
1982                  * forced cache flush, the entry will get deleted anyway by
1983                  * shared-cache-inval processing of the aborted pg_class
1984                  * insertion.)
1985                  */
1986                 if (relation->rd_isnew)
1987                 {
1988                         if (commit)
1989                                 relation->rd_isnew = false;
1990                         else
1991                         {
1992                                 RelationClearRelation(relation, false);
1993                                 continue;
1994                         }
1995                 }
1996
1997                 /*
1998                  * During transaction abort, we must also reset relcache entry ref
1999                  * counts to their normal not-in-a-transaction state.  A ref count
2000                  * may be too high because some routine was exited by elog()
2001                  * between incrementing and decrementing the count.
2002                  *
2003                  * During commit, we should not have to do this, but it's still
2004                  * useful to check that the counts are correct to catch missed
2005                  * relcache closes.
2006                  *
2007                  * In bootstrap mode, do NOT reset the refcnt nor complain that it's
2008                  * nonzero --- the bootstrap code expects relations to stay open
2009                  * across start/commit transaction calls.  (That seems bogus, but
2010                  * it's not worth fixing.)
2011                  */
2012                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
2013
2014                 if (commit)
2015                 {
2016                         if (relation->rd_refcnt != expected_refcnt &&
2017                                 !IsBootstrapProcessingMode())
2018                         {
2019                                 elog(WARNING, "Relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
2020                                          RelationGetRelationName(relation),
2021                                          relation->rd_refcnt, expected_refcnt);
2022                                 RelationSetReferenceCount(relation, expected_refcnt);
2023                         }
2024                 }
2025                 else
2026                 {
2027                         /* abort case, just reset it quietly */
2028                         RelationSetReferenceCount(relation, expected_refcnt);
2029                 }
2030         }
2031 }
2032
2033 /*
2034  *              RelationBuildLocalRelation
2035  *                      Build a relcache entry for an about-to-be-created relation,
2036  *                      and enter it into the relcache.
2037  */
2038 Relation
2039 RelationBuildLocalRelation(const char *relname,
2040                                                    Oid relnamespace,
2041                                                    TupleDesc tupDesc,
2042                                                    Oid relid, Oid dbid,
2043                                                    RelFileNode rnode,
2044                                                    bool nailit)
2045 {
2046         Relation        rel;
2047         MemoryContext oldcxt;
2048         int                     natts = tupDesc->natts;
2049         int                     i;
2050         bool            has_not_null;
2051
2052         AssertArg(natts > 0);
2053
2054         /*
2055          * switch to the cache context to create the relcache entry.
2056          */
2057         if (!CacheMemoryContext)
2058                 CreateCacheMemoryContext();
2059
2060         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2061
2062         /*
2063          * allocate a new relation descriptor and fill in basic state fields.
2064          */
2065         rel = (Relation) palloc0(sizeof(RelationData));
2066
2067         rel->rd_targblock = InvalidBlockNumber;
2068
2069         /* make sure relation is marked as having no open file yet */
2070         rel->rd_fd = -1;
2071
2072         RelationSetReferenceCount(rel, 1);
2073
2074         /* it's being created in this transaction */
2075         rel->rd_isnew = true;
2076
2077         /* is it a temporary relation? */
2078         rel->rd_istemp = isTempNamespace(relnamespace);
2079
2080         /*
2081          * nail the reldesc if this is a bootstrap create reln and we may need
2082          * it in the cache later on in the bootstrap process so we don't ever
2083          * want it kicked out.  e.g. pg_attribute!!!
2084          */
2085         if (nailit)
2086                 rel->rd_isnailed = true;
2087
2088         /*
2089          * create a new tuple descriptor from the one passed in.  We do this
2090          * partly to copy it into the cache context, and partly because the
2091          * new relation can't have any defaults or constraints yet; they have
2092          * to be added in later steps, because they require additions to
2093          * multiple system catalogs.  We can copy attnotnull constraints here,
2094          * however.
2095          */
2096         rel->rd_att = CreateTupleDescCopy(tupDesc);
2097         has_not_null = false;
2098         for (i = 0; i < natts; i++)
2099         {
2100                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2101                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2102         }
2103
2104         if (has_not_null)
2105         {
2106                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2107
2108                 constr->has_not_null = true;
2109                 rel->rd_att->constr = constr;
2110         }
2111
2112         /*
2113          * initialize relation tuple form (caller may add/override data later)
2114          */
2115         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2116
2117         namestrcpy(&rel->rd_rel->relname, relname);
2118         rel->rd_rel->relnamespace = relnamespace;
2119
2120         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2121         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2122         rel->rd_rel->relnatts = natts;
2123         rel->rd_rel->reltype = InvalidOid;
2124
2125         /*
2126          * Insert relation physical and logical identifiers (OIDs) into the
2127          * right places.
2128          */
2129         rel->rd_rel->relisshared = (dbid == InvalidOid);
2130
2131         RelationGetRelid(rel) = relid;
2132
2133         for (i = 0; i < natts; i++)
2134                 rel->rd_att->attrs[i]->attrelid = relid;
2135
2136         rel->rd_node = rnode;
2137         rel->rd_rel->relfilenode = rnode.relNode;
2138
2139         RelationInitLockInfo(rel);      /* see lmgr.c */
2140
2141         /*
2142          * Okay to insert into the relcache hash tables.
2143          */
2144         RelationCacheInsert(rel);
2145
2146         /*
2147          * done building relcache entry.
2148          */
2149         MemoryContextSwitchTo(oldcxt);
2150
2151         return rel;
2152 }
2153
2154 /*
2155  *              RelationCacheInitialize
2156  *
2157  *              This initializes the relation descriptor cache.  At the time
2158  *              that this is invoked, we can't do database access yet (mainly
2159  *              because the transaction subsystem is not up), so we can't get
2160  *              "real" info.  However it's okay to read the pg_internal.init
2161  *              cache file, if one is available.  Otherwise we make phony
2162  *              entries for the minimum set of nailed-in-cache relations.
2163  */
2164
2165 #define INITRELCACHESIZE                400
2166
2167 void
2168 RelationCacheInitialize(void)
2169 {
2170         MemoryContext oldcxt;
2171         HASHCTL         ctl;
2172
2173         /*
2174          * switch to cache memory context
2175          */
2176         if (!CacheMemoryContext)
2177                 CreateCacheMemoryContext();
2178
2179         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2180
2181         /*
2182          * create hashtables that index the relcache
2183          */
2184         MemSet(&ctl, 0, sizeof(ctl));
2185         ctl.keysize = sizeof(NameData);
2186         ctl.entrysize = sizeof(RelNameCacheEnt);
2187         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2188                                                                            &ctl, HASH_ELEM);
2189
2190         ctl.keysize = sizeof(Oid);
2191         ctl.entrysize = sizeof(RelIdCacheEnt);
2192         ctl.hash = tag_hash;
2193         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2194                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2195
2196         ctl.keysize = sizeof(RelFileNode);
2197         ctl.entrysize = sizeof(RelNodeCacheEnt);
2198         ctl.hash = tag_hash;
2199         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2200                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2201
2202         /*
2203          * Try to load the relcache cache file.  If successful, we're done for
2204          * now.  Otherwise, initialize the cache with pre-made descriptors for
2205          * the critical "nailed-in" system catalogs.
2206          */
2207         if (IsBootstrapProcessingMode() ||
2208                 !load_relcache_init_file())
2209         {
2210                 formrdesc(RelationRelationName,
2211                                   Natts_pg_class, Desc_pg_class);
2212                 formrdesc(AttributeRelationName,
2213                                   Natts_pg_attribute, Desc_pg_attribute);
2214                 formrdesc(ProcedureRelationName,
2215                                   Natts_pg_proc, Desc_pg_proc);
2216                 formrdesc(TypeRelationName,
2217                                   Natts_pg_type, Desc_pg_type);
2218
2219 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2220         }
2221
2222         MemoryContextSwitchTo(oldcxt);
2223 }
2224
2225 /*
2226  *              RelationCacheInitializePhase2
2227  *
2228  *              This is called as soon as the catcache and transaction system
2229  *              are functional.  At this point we can actually read data from
2230  *              the system catalogs.  Update the relcache entries made during
2231  *              RelationCacheInitialize, and make sure we have entries for the
2232  *              critical system indexes.
2233  */
2234 void
2235 RelationCacheInitializePhase2(void)
2236 {
2237         HASH_SEQ_STATUS status;
2238         RelIdCacheEnt *idhentry;
2239
2240         if (IsBootstrapProcessingMode())
2241                 return;
2242
2243         /*
2244          * If we didn't get the critical system indexes loaded into relcache,
2245          * do so now.  These are critical because the catcache depends on them
2246          * for catcache fetches that are done during relcache load.  Thus, we
2247          * have an infinite-recursion problem.  We can break the recursion by
2248          * doing heapscans instead of indexscans at certain key spots. To
2249          * avoid hobbling performance, we only want to do that until we have
2250          * the critical indexes loaded into relcache.  Thus, the flag
2251          * criticalRelcachesBuilt is used to decide whether to do heapscan or
2252          * indexscan at the key spots, and we set it true after we've loaded
2253          * the critical indexes.
2254          *
2255          * The critical indexes are marked as "nailed in cache", partly to make
2256          * it easy for load_relcache_init_file to count them, but mainly
2257          * because we cannot flush and rebuild them once we've set
2258          * criticalRelcachesBuilt to true.      (NOTE: perhaps it would be
2259          * possible to reload them by temporarily setting
2260          * criticalRelcachesBuilt to false again.  For now, though, we just
2261          * nail 'em in.)
2262          */
2263         if (!criticalRelcachesBuilt)
2264         {
2265                 RelationBuildDescInfo buildinfo;
2266                 Relation        ird;
2267
2268 #define LOAD_CRIT_INDEX(indname) \
2269                 do { \
2270                         buildinfo.infotype = INFO_RELNAME; \
2271                         buildinfo.i.info_name = (indname); \
2272                         ird = RelationBuildDesc(buildinfo, NULL); \
2273                         ird->rd_isnailed = true; \
2274                         RelationSetReferenceCount(ird, 1); \
2275                 } while (0)
2276
2277                 LOAD_CRIT_INDEX(ClassNameNspIndex);
2278                 LOAD_CRIT_INDEX(ClassOidIndex);
2279                 LOAD_CRIT_INDEX(AttributeRelidNumIndex);
2280                 LOAD_CRIT_INDEX(IndexRelidIndex);
2281                 LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
2282                 LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
2283                 LOAD_CRIT_INDEX(OperatorOidIndex);
2284
2285 #define NUM_CRITICAL_INDEXES    7               /* fix if you change list above */
2286
2287                 criticalRelcachesBuilt = true;
2288         }
2289
2290         /*
2291          * Now, scan all the relcache entries and update anything that might
2292          * be wrong in the results from formrdesc or the relcache cache file.
2293          * If we faked up relcache entries using formrdesc, then read the real
2294          * pg_class rows and replace the fake entries with them. Also, if any
2295          * of the relcache entries have rules or triggers, load that info the
2296          * hard way since it isn't recorded in the cache file.
2297          */
2298         hash_seq_init(&status, RelationIdCache);
2299
2300         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2301         {
2302                 Relation        relation = idhentry->reldesc;
2303
2304                 /*
2305                  * If it's a faked-up entry, read the real pg_class tuple.
2306                  */
2307                 if (needNewCacheFile && relation->rd_isnailed)
2308                 {
2309                         HeapTuple       htup;
2310                         Form_pg_class relp;
2311
2312                         htup = SearchSysCache(RELOID,
2313                                                         ObjectIdGetDatum(RelationGetRelid(relation)),
2314                                                                   0, 0, 0);
2315                         if (!HeapTupleIsValid(htup))
2316                                 elog(FATAL, "RelationCacheInitializePhase2: no pg_class entry for %s",
2317                                          RelationGetRelationName(relation));
2318                         relp = (Form_pg_class) GETSTRUCT(htup);
2319
2320                         /*
2321                          * Copy tuple to relation->rd_rel. (See notes in
2322                          * AllocateRelationDesc())
2323                          */
2324                         Assert(relation->rd_rel != NULL);
2325                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2326                         relation->rd_att->tdhasoid = relp->relhasoids;
2327
2328                         ReleaseSysCache(htup);
2329                 }
2330
2331                 /*
2332                  * Fix data that isn't saved in relcache cache file.
2333                  */
2334                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2335                         RelationBuildRuleLock(relation);
2336                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2337                         RelationBuildTriggers(relation);
2338         }
2339 }
2340
2341 /*
2342  *              RelationCacheInitializePhase3
2343  *
2344  *              Final step of relcache initialization: write out a new relcache
2345  *              cache file if one is needed.
2346  */
2347 void
2348 RelationCacheInitializePhase3(void)
2349 {
2350         if (IsBootstrapProcessingMode())
2351                 return;
2352
2353         if (needNewCacheFile)
2354         {
2355                 /*
2356                  * Force all the catcaches to finish initializing and thereby open
2357                  * the catalogs and indexes they use.  This will preload the
2358                  * relcache with entries for all the most important system
2359                  * catalogs and indexes, so that the init file will be most useful
2360                  * for future backends.
2361                  */
2362                 InitCatalogCachePhase2();
2363
2364                 /* now write the file */
2365                 write_relcache_init_file();
2366         }
2367 }
2368
2369
2370 /* used by XLogInitCache */
2371 void            CreateDummyCaches(void);
2372 void            DestroyDummyCaches(void);
2373
2374 void
2375 CreateDummyCaches(void)
2376 {
2377         MemoryContext oldcxt;
2378         HASHCTL         ctl;
2379
2380         if (!CacheMemoryContext)
2381                 CreateCacheMemoryContext();
2382
2383         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2384
2385         MemSet(&ctl, 0, sizeof(ctl));
2386         ctl.keysize = sizeof(NameData);
2387         ctl.entrysize = sizeof(RelNameCacheEnt);
2388         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2389                                                                            &ctl, HASH_ELEM);
2390
2391         ctl.keysize = sizeof(Oid);
2392         ctl.entrysize = sizeof(RelIdCacheEnt);
2393         ctl.hash = tag_hash;
2394         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2395                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2396
2397         ctl.keysize = sizeof(RelFileNode);
2398         ctl.entrysize = sizeof(RelNodeCacheEnt);
2399         ctl.hash = tag_hash;
2400         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2401                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2402
2403         MemoryContextSwitchTo(oldcxt);
2404 }
2405
2406 void
2407 DestroyDummyCaches(void)
2408 {
2409         MemoryContext oldcxt;
2410
2411         if (!CacheMemoryContext)
2412                 return;
2413
2414         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2415
2416         if (RelationIdCache)
2417                 hash_destroy(RelationIdCache);
2418         if (RelationSysNameCache)
2419                 hash_destroy(RelationSysNameCache);
2420         if (RelationNodeCache)
2421                 hash_destroy(RelationNodeCache);
2422
2423         RelationIdCache = RelationSysNameCache = RelationNodeCache = NULL;
2424
2425         MemoryContextSwitchTo(oldcxt);
2426 }
2427
2428 static void
2429 AttrDefaultFetch(Relation relation)
2430 {
2431         AttrDefault *attrdef = relation->rd_att->constr->defval;
2432         int                     ndef = relation->rd_att->constr->num_defval;
2433         Relation        adrel;
2434         SysScanDesc adscan;
2435         ScanKeyData skey;
2436         HeapTuple       htup;
2437         Datum           val;
2438         bool            isnull;
2439         int                     found;
2440         int                     i;
2441
2442         ScanKeyEntryInitialize(&skey,
2443                                                    (bits16) 0x0,
2444                                                    (AttrNumber) Anum_pg_attrdef_adrelid,
2445                                                    (RegProcedure) F_OIDEQ,
2446                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2447
2448         adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
2449         adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
2450                                                                 SnapshotNow,
2451                                                                 1, &skey);
2452         found = 0;
2453
2454         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2455         {
2456                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2457
2458                 found++;
2459                 for (i = 0; i < ndef; i++)
2460                 {
2461                         if (adform->adnum != attrdef[i].adnum)
2462                                 continue;
2463                         if (attrdef[i].adbin != NULL)
2464                                 elog(WARNING, "AttrDefaultFetch: second record found for attr %s in rel %s",
2465                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2466                                          RelationGetRelationName(relation));
2467
2468                         val = fastgetattr(htup,
2469                                                           Anum_pg_attrdef_adbin,
2470                                                           adrel->rd_att, &isnull);
2471                         if (isnull)
2472                                 elog(WARNING, "AttrDefaultFetch: adbin IS NULL for attr %s in rel %s",
2473                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2474                                          RelationGetRelationName(relation));
2475                         else
2476                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2477                                                          DatumGetCString(DirectFunctionCall1(textout,
2478                                                                                                                                  val)));
2479                         break;
2480                 }
2481
2482                 if (i >= ndef)
2483                         elog(WARNING, "AttrDefaultFetch: unexpected record found for attr %d in rel %s",
2484                                  adform->adnum,
2485                                  RelationGetRelationName(relation));
2486         }
2487
2488         systable_endscan(adscan);
2489         heap_close(adrel, AccessShareLock);
2490
2491         if (found != ndef)
2492                 elog(WARNING, "AttrDefaultFetch: %d record(s) not found for rel %s",
2493                          ndef - found, RelationGetRelationName(relation));
2494 }
2495
2496 static void
2497 CheckConstraintFetch(Relation relation)
2498 {
2499         ConstrCheck *check = relation->rd_att->constr->check;
2500         int                     ncheck = relation->rd_att->constr->num_check;
2501         Relation        conrel;
2502         SysScanDesc conscan;
2503         ScanKeyData skey[1];
2504         HeapTuple       htup;
2505         Datum           val;
2506         bool            isnull;
2507         int                     found = 0;
2508
2509         ScanKeyEntryInitialize(&skey[0], 0x0,
2510                                                    Anum_pg_constraint_conrelid, F_OIDEQ,
2511                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2512
2513         conrel = heap_openr(ConstraintRelationName, AccessShareLock);
2514         conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
2515                                                                  SnapshotNow, 1, skey);
2516
2517         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2518         {
2519                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2520
2521                 /* We want check constraints only */
2522                 if (conform->contype != CONSTRAINT_CHECK)
2523                         continue;
2524
2525                 if (found == ncheck)
2526                         elog(ERROR, "CheckConstraintFetch: unexpected record found for rel %s",
2527                                  RelationGetRelationName(relation));
2528
2529                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2530                                                                                           NameStr(conform->conname));
2531
2532                 /* Grab and test conbin is actually set */
2533                 val = fastgetattr(htup,
2534                                                   Anum_pg_constraint_conbin,
2535                                                   conrel->rd_att, &isnull);
2536                 if (isnull)
2537                         elog(ERROR, "CheckConstraintFetch: conbin IS NULL for rel %s",
2538                                  RelationGetRelationName(relation));
2539
2540                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2541                                                          DatumGetCString(DirectFunctionCall1(textout,
2542                                                                                                                                  val)));
2543                 found++;
2544         }
2545
2546         systable_endscan(conscan);
2547         heap_close(conrel, AccessShareLock);
2548
2549         if (found != ncheck)
2550                 elog(ERROR, "CheckConstraintFetch: %d record(s) not found for rel %s",
2551                          ncheck - found, RelationGetRelationName(relation));
2552 }
2553
2554 /*
2555  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2556  *
2557  * The index list is created only if someone requests it.  We scan pg_index
2558  * to find relevant indexes, and add the list to the relcache entry so that
2559  * we won't have to compute it again.  Note that shared cache inval of a
2560  * relcache entry will delete the old list and set rd_indexfound to false,
2561  * so that we must recompute the index list on next request.  This handles
2562  * creation or deletion of an index.
2563  *
2564  * The returned list is guaranteed to be sorted in order by OID.  This is
2565  * needed by the executor, since for index types that we obtain exclusive
2566  * locks on when updating the index, all backends must lock the indexes in
2567  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2568  * consistent ordering would do, but ordering by OID is easy.
2569  *
2570  * Since shared cache inval causes the relcache's copy of the list to go away,
2571  * we return a copy of the list palloc'd in the caller's context.  The caller
2572  * may freeList() the returned list after scanning it.  This is necessary
2573  * since the caller will typically be doing syscache lookups on the relevant
2574  * indexes, and syscache lookup could cause SI messages to be processed!
2575  */
2576 List *
2577 RelationGetIndexList(Relation relation)
2578 {
2579         Relation        indrel;
2580         SysScanDesc indscan;
2581         ScanKeyData skey;
2582         HeapTuple       htup;
2583         List       *result;
2584         MemoryContext oldcxt;
2585
2586         /* Quick exit if we already computed the list. */
2587         if (relation->rd_indexfound)
2588                 return listCopy(relation->rd_indexlist);
2589
2590         /*
2591          * We build the list we intend to return (in the caller's context)
2592          * while doing the scan.  After successfully completing the scan, we
2593          * copy that list into the relcache entry.      This avoids cache-context
2594          * memory leakage if we get some sort of error partway through.
2595          */
2596         result = NIL;
2597
2598         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2599         ScanKeyEntryInitialize(&skey,
2600                                                    (bits16) 0x0,
2601                                                    (AttrNumber) Anum_pg_index_indrelid,
2602                                                    (RegProcedure) F_OIDEQ,
2603                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2604
2605         indrel = heap_openr(IndexRelationName, AccessShareLock);
2606         indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
2607                                                                  SnapshotNow,
2608                                                                  1, &skey);
2609
2610         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2611         {
2612                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2613
2614                 result = insert_ordered_oid(result, index->indexrelid);
2615         }
2616
2617         systable_endscan(indscan);
2618         heap_close(indrel, AccessShareLock);
2619
2620         /* Now save a copy of the completed list in the relcache entry. */
2621         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2622         relation->rd_indexlist = listCopy(result);
2623         relation->rd_indexfound = true;
2624         MemoryContextSwitchTo(oldcxt);
2625
2626         return result;
2627 }
2628
2629 /*
2630  * insert_ordered_oid
2631  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2632  *
2633  * Building the ordered list this way is O(N^2), but with a pretty small
2634  * constant, so for the number of entries we expect it will probably be
2635  * faster than trying to apply qsort().  Most tables don't have very many
2636  * indexes...
2637  */
2638 static List *
2639 insert_ordered_oid(List *list, Oid datum)
2640 {
2641         List       *l;
2642
2643         /* Does the datum belong at the front? */
2644         if (list == NIL || datum < (Oid) lfirsti(list))
2645                 return lconsi(datum, list);
2646         /* No, so find the entry it belongs after */
2647         l = list;
2648         for (;;)
2649         {
2650                 List       *n = lnext(l);
2651
2652                 if (n == NIL || datum < (Oid) lfirsti(n))
2653                         break;                          /* it belongs before n */
2654                 l = n;
2655         }
2656         /* Insert datum into list after item l */
2657         lnext(l) = lconsi(datum, lnext(l));
2658         return list;
2659 }
2660
2661
2662 /*
2663  *      load_relcache_init_file, write_relcache_init_file
2664  *
2665  *              In late 1992, we started regularly having databases with more than
2666  *              a thousand classes in them.  With this number of classes, it became
2667  *              critical to do indexed lookups on the system catalogs.
2668  *
2669  *              Bootstrapping these lookups is very hard.  We want to be able to
2670  *              use an index on pg_attribute, for example, but in order to do so,
2671  *              we must have read pg_attribute for the attributes in the index,
2672  *              which implies that we need to use the index.
2673  *
2674  *              In order to get around the problem, we do the following:
2675  *
2676  *                 +  When the database system is initialized (at initdb time), we
2677  *                        don't use indexes.  We do sequential scans.
2678  *
2679  *                 +  When the backend is started up in normal mode, we load an image
2680  *                        of the appropriate relation descriptors, in internal format,
2681  *                        from an initialization file in the data/base/... directory.
2682  *
2683  *                 +  If the initialization file isn't there, then we create the
2684  *                        relation descriptors using sequential scans and write 'em to
2685  *                        the initialization file for use by subsequent backends.
2686  *
2687  *              We could dispense with the initialization file and just build the
2688  *              critical reldescs the hard way on every backend startup, but that
2689  *              slows down backend startup noticeably.
2690  *
2691  *              We can in fact go further, and save more relcache entries than
2692  *              just the ones that are absolutely critical; this allows us to speed
2693  *              up backend startup by not having to build such entries the hard way.
2694  *              Presently, all the catalog and index entries that are referred to
2695  *              by catcaches are stored in the initialization file.
2696  *
2697  *              The same mechanism that detects when catcache and relcache entries
2698  *              need to be invalidated (due to catalog updates) also arranges to
2699  *              unlink the initialization file when its contents may be out of date.
2700  *              The file will then be rebuilt during the next backend startup.
2701  */
2702
2703 /*
2704  * load_relcache_init_file -- attempt to load cache from the init file
2705  *
2706  * If successful, return TRUE and set criticalRelcachesBuilt to true.
2707  * If not successful, return FALSE and set needNewCacheFile to true.
2708  *
2709  * NOTE: we assume we are already switched into CacheMemoryContext.
2710  */
2711 static bool
2712 load_relcache_init_file(void)
2713 {
2714         FILE       *fp;
2715         char            initfilename[MAXPGPATH];
2716         Relation   *rels;
2717         int                     relno,
2718                                 num_rels,
2719                                 max_rels,
2720                                 nailed_rels,
2721                                 nailed_indexes;
2722         int                     i;
2723
2724         snprintf(initfilename, sizeof(initfilename), "%s/%s",
2725                          DatabasePath, RELCACHE_INIT_FILENAME);
2726
2727         fp = AllocateFile(initfilename, PG_BINARY_R);
2728         if (fp == NULL)
2729         {
2730                 needNewCacheFile = true;
2731                 return false;
2732         }
2733
2734         /*
2735          * Read the index relcache entries from the file.  Note we will not
2736          * enter any of them into the cache if the read fails partway through;
2737          * this helps to guard against broken init files.
2738          */
2739         max_rels = 100;
2740         rels = (Relation *) palloc(max_rels * sizeof(Relation));
2741         num_rels = 0;
2742         nailed_rels = nailed_indexes = 0;
2743         initFileRelationIds = NIL;
2744
2745         for (relno = 0;; relno++)
2746         {
2747                 Size            len;
2748                 size_t          nread;
2749                 Relation        rel;
2750                 Form_pg_class relform;
2751                 bool            has_not_null;
2752
2753                 /* first read the relation descriptor length */
2754                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2755                 {
2756                         if (nread == 0)
2757                                 break;                  /* end of file */
2758                         goto read_failed;
2759                 }
2760
2761                 /* safety check for incompatible relcache layout */
2762                 if (len != sizeof(RelationData))
2763                         goto read_failed;
2764
2765                 /* allocate another relcache header */
2766                 if (num_rels >= max_rels)
2767                 {
2768                         max_rels *= 2;
2769                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
2770                 }
2771
2772                 rel = rels[num_rels++] = (Relation) palloc(len);
2773
2774                 /* then, read the Relation structure */
2775                 if ((nread = fread(rel, 1, len, fp)) != len)
2776                         goto read_failed;
2777
2778                 /* next read the relation tuple form */
2779                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2780                         goto read_failed;
2781
2782                 relform = (Form_pg_class) palloc(len);
2783                 if ((nread = fread(relform, 1, len, fp)) != len)
2784                         goto read_failed;
2785
2786                 rel->rd_rel = relform;
2787
2788                 /* initialize attribute tuple forms */
2789                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
2790                                                                                           relform->relhasoids);
2791
2792                 /* next read all the attribute tuple form data entries */
2793                 has_not_null = false;
2794                 for (i = 0; i < relform->relnatts; i++)
2795                 {
2796                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2797                                 goto read_failed;
2798
2799                         rel->rd_att->attrs[i] = (Form_pg_attribute) palloc(len);
2800
2801                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
2802                                 goto read_failed;
2803
2804                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
2805                 }
2806
2807                 /* mark not-null status */
2808                 if (has_not_null)
2809                 {
2810                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2811
2812                         constr->has_not_null = true;
2813                         rel->rd_att->constr = constr;
2814                 }
2815
2816                 /* If it's an index, there's more to do */
2817                 if (rel->rd_rel->relkind == RELKIND_INDEX)
2818                 {
2819                         Form_pg_am      am;
2820                         MemoryContext indexcxt;
2821                         IndexStrategy strat;
2822                         Oid                *operator;
2823                         RegProcedure *support;
2824                         int                     nstrategies,
2825                                                 nsupport;
2826
2827                         /* Count nailed indexes to ensure we have 'em all */
2828                         if (rel->rd_isnailed)
2829                                 nailed_indexes++;
2830
2831                         /* next, read the pg_index tuple form */
2832                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2833                                 goto read_failed;
2834
2835                         rel->rd_index = (Form_pg_index) palloc(len);
2836                         if ((nread = fread(rel->rd_index, 1, len, fp)) != len)
2837                                 goto read_failed;
2838
2839                         /* next, read the access method tuple form */
2840                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2841                                 goto read_failed;
2842
2843                         am = (Form_pg_am) palloc(len);
2844                         if ((nread = fread(am, 1, len, fp)) != len)
2845                                 goto read_failed;
2846                         rel->rd_am = am;
2847
2848                         /*
2849                          * prepare index info context --- parameters should match
2850                          * RelationInitIndexAccessInfo
2851                          */
2852                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
2853                                                                                          RelationGetRelationName(rel),
2854                                                                                          ALLOCSET_SMALL_MINSIZE,
2855                                                                                          ALLOCSET_SMALL_INITSIZE,
2856                                                                                          ALLOCSET_SMALL_MAXSIZE);
2857                         rel->rd_indexcxt = indexcxt;
2858
2859                         /* next, read the index strategy map */
2860                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2861                                 goto read_failed;
2862
2863                         strat = (IndexStrategy) MemoryContextAlloc(indexcxt, len);
2864                         if ((nread = fread(strat, 1, len, fp)) != len)
2865                                 goto read_failed;
2866
2867                         /* have to invalidate any FmgrInfo data in the strategy maps */
2868                         nstrategies = am->amstrategies * relform->relnatts;
2869                         for (i = 0; i < nstrategies; i++)
2870                                 strat->strategyMapData[i].entry[0].sk_func.fn_oid = InvalidOid;
2871
2872                         rel->rd_istrat = strat;
2873
2874                         /* next, read the vector of operator OIDs */
2875                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2876                                 goto read_failed;
2877
2878                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
2879                         if ((nread = fread(operator, 1, len, fp)) != len)
2880                                 goto read_failed;
2881
2882                         rel->rd_operator = operator;
2883
2884                         /* finally, read the vector of support procedures */
2885                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2886                                 goto read_failed;
2887                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
2888                         if ((nread = fread(support, 1, len, fp)) != len)
2889                                 goto read_failed;
2890
2891                         rel->rd_support = support;
2892
2893                         /* add a zeroed support-fmgr-info vector */
2894                         nsupport = relform->relnatts * am->amsupport;
2895                         rel->rd_supportinfo = (FmgrInfo *)
2896                                 MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
2897                         MemSet(rel->rd_supportinfo, 0, nsupport * sizeof(FmgrInfo));
2898                 }
2899                 else
2900                 {
2901                         /* Count nailed rels to ensure we have 'em all */
2902                         if (rel->rd_isnailed)
2903                                 nailed_rels++;
2904
2905                         Assert(rel->rd_index == NULL);
2906                         Assert(rel->rd_am == NULL);
2907                         Assert(rel->rd_indexcxt == NULL);
2908                         Assert(rel->rd_istrat == NULL);
2909                         Assert(rel->rd_operator == NULL);
2910                         Assert(rel->rd_support == NULL);
2911                         Assert(rel->rd_supportinfo == NULL);
2912                 }
2913
2914                 /*
2915                  * Rules and triggers are not saved (mainly because the internal
2916                  * format is complex and subject to change).  They must be rebuilt
2917                  * if needed by RelationCacheInitializePhase2.  This is not
2918                  * expected to be a big performance hit since few system catalogs
2919                  * have such.
2920                  */
2921                 rel->rd_rules = NULL;
2922                 rel->rd_rulescxt = NULL;
2923                 rel->trigdesc = NULL;
2924
2925                 /*
2926                  * Reset transient-state fields in the relcache entry
2927                  */
2928                 rel->rd_fd = -1;
2929                 rel->rd_targblock = InvalidBlockNumber;
2930                 if (rel->rd_isnailed)
2931                         RelationSetReferenceCount(rel, 1);
2932                 else
2933                         RelationSetReferenceCount(rel, 0);
2934                 rel->rd_indexfound = false;
2935                 rel->rd_indexlist = NIL;
2936                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
2937
2938                 /*
2939                  * Make sure database ID is correct.  This is needed in case the
2940                  * pg_internal.init file was copied from some other database by
2941                  * CREATE DATABASE.
2942                  */
2943                 if (rel->rd_rel->relisshared)
2944                         rel->rd_node.tblNode = InvalidOid;
2945                 else
2946                         rel->rd_node.tblNode = MyDatabaseId;
2947
2948                 RelationInitLockInfo(rel);
2949         }
2950
2951         /*
2952          * We reached the end of the init file without apparent problem. Did
2953          * we get the right number of nailed items?  (This is a useful
2954          * crosscheck in case the set of critical rels or indexes changes.)
2955          */
2956         if (nailed_rels != NUM_CRITICAL_RELS ||
2957                 nailed_indexes != NUM_CRITICAL_INDEXES)
2958                 goto read_failed;
2959
2960         /*
2961          * OK, all appears well.
2962          *
2963          * Now insert all the new relcache entries into the cache.
2964          */
2965         for (relno = 0; relno < num_rels; relno++)
2966         {
2967                 RelationCacheInsert(rels[relno]);
2968                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
2969                 initFileRelationIds = lconsi((int) RelationGetRelid(rels[relno]),
2970                                                                          initFileRelationIds);
2971         }
2972
2973         pfree(rels);
2974         FreeFile(fp);
2975
2976         criticalRelcachesBuilt = true;
2977         return true;
2978
2979         /*
2980          * init file is broken, so do it the hard way.  We don't bother trying
2981          * to free the clutter we just allocated; it's not in the relcache so
2982          * it won't hurt.
2983          */
2984 read_failed:
2985         pfree(rels);
2986         FreeFile(fp);
2987
2988         needNewCacheFile = true;
2989         return false;
2990 }
2991
2992 /*
2993  * Write out a new initialization file with the current contents
2994  * of the relcache.
2995  */
2996 static void
2997 write_relcache_init_file(void)
2998 {
2999         FILE       *fp;
3000         char            tempfilename[MAXPGPATH];
3001         char            finalfilename[MAXPGPATH];
3002         HASH_SEQ_STATUS status;
3003         RelIdCacheEnt *idhentry;
3004         MemoryContext oldcxt;
3005         int                     i;
3006
3007         /*
3008          * We must write a temporary file and rename it into place. Otherwise,
3009          * another backend starting at about the same time might crash trying
3010          * to read the partially-complete file.
3011          */
3012         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
3013                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
3014         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
3015                          DatabasePath, RELCACHE_INIT_FILENAME);
3016
3017         unlink(tempfilename);           /* in case it exists w/wrong permissions */
3018
3019         fp = AllocateFile(tempfilename, PG_BINARY_W);
3020         if (fp == NULL)
3021         {
3022                 /*
3023                  * We used to consider this a fatal error, but we might as well
3024                  * continue with backend startup ...
3025                  */
3026                 elog(WARNING, "Cannot create init file %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename);
3027                 return;
3028         }
3029
3030         /*
3031          * Write all the reldescs (in no particular order).
3032          */
3033         hash_seq_init(&status, RelationIdCache);
3034
3035         initFileRelationIds = NIL;
3036
3037         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3038         {
3039                 Relation        rel = idhentry->reldesc;
3040                 Form_pg_class relform = rel->rd_rel;
3041                 Size            len;
3042
3043                 /*
3044                  * first write the relcache entry proper
3045                  */
3046                 len = sizeof(RelationData);
3047
3048                 /* first, write the relation descriptor length */
3049                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3050                         elog(FATAL, "cannot write init file -- descriptor length");
3051
3052                 /* next, write out the Relation structure */
3053                 if (fwrite(rel, 1, len, fp) != len)
3054                         elog(FATAL, "cannot write init file -- reldesc");
3055
3056                 /* next write the relation tuple form */
3057                 len = sizeof(FormData_pg_class);
3058                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3059                         elog(FATAL, "cannot write init file -- relation tuple form length");
3060
3061                 if (fwrite(relform, 1, len, fp) != len)
3062                         elog(FATAL, "cannot write init file -- relation tuple form");
3063
3064                 /* next, do all the attribute tuple form data entries */
3065                 for (i = 0; i < relform->relnatts; i++)
3066                 {
3067                         len = ATTRIBUTE_TUPLE_SIZE;
3068                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3069                                 elog(FATAL, "cannot write init file -- length of attdesc %d", i);
3070                         if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
3071                                 elog(FATAL, "cannot write init file -- attdesc %d", i);
3072                 }
3073
3074                 /* If it's an index, there's more to do */
3075                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3076                 {
3077                         Form_pg_am      am = rel->rd_am;
3078                         HeapTuple       tuple;
3079
3080                         /*
3081                          * We need to write the index tuple form, but this is a bit
3082                          * tricky since it's a variable-length struct.  Rather than
3083                          * hoping to intuit the length, fetch the pg_index tuple
3084                          * afresh using the syscache, and write that.
3085                          */
3086                         tuple = SearchSysCache(INDEXRELID,
3087                                                                  ObjectIdGetDatum(RelationGetRelid(rel)),
3088                                                                    0, 0, 0);
3089                         if (!HeapTupleIsValid(tuple))
3090                                 elog(ERROR, "write_relcache_init_file: no pg_index entry for index %u",
3091                                          RelationGetRelid(rel));
3092                         len = tuple->t_len - tuple->t_data->t_hoff;
3093                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3094                                 elog(FATAL, "cannot write init file -- index tuple form length");
3095                         if (fwrite(GETSTRUCT(tuple), 1, len, fp) != len)
3096                                 elog(FATAL, "cannot write init file -- index tuple form");
3097                         ReleaseSysCache(tuple);
3098
3099                         /* next, write the access method tuple form */
3100                         len = sizeof(FormData_pg_am);
3101                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3102                                 elog(FATAL, "cannot write init file -- am tuple form length");
3103
3104                         if (fwrite(am, 1, len, fp) != len)
3105                                 elog(FATAL, "cannot write init file -- am tuple form");
3106
3107                         /* next, write the index strategy map */
3108                         len = AttributeNumberGetIndexStrategySize(relform->relnatts,
3109                                                                                                           am->amstrategies);
3110                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3111                                 elog(FATAL, "cannot write init file -- strategy map length");
3112
3113                         if (fwrite(rel->rd_istrat, 1, len, fp) != len)
3114                                 elog(FATAL, "cannot write init file -- strategy map");
3115
3116                         /* next, write the vector of operator OIDs */
3117                         len = relform->relnatts * (am->amstrategies * sizeof(Oid));
3118                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3119                                 elog(FATAL, "cannot write init file -- operator vector length");
3120
3121                         if (fwrite(rel->rd_operator, 1, len, fp) != len)
3122                                 elog(FATAL, "cannot write init file -- operator vector");
3123
3124                         /* finally, write the vector of support procedures */
3125                         len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
3126                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3127                                 elog(FATAL, "cannot write init file -- support vector length");
3128
3129                         if (fwrite(rel->rd_support, 1, len, fp) != len)
3130                                 elog(FATAL, "cannot write init file -- support vector");
3131                 }
3132
3133                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3134                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3135                 initFileRelationIds = lconsi((int) RelationGetRelid(rel),
3136                                                                          initFileRelationIds);
3137                 MemoryContextSwitchTo(oldcxt);
3138         }
3139
3140         FreeFile(fp);
3141
3142         /*
3143          * Now we have to check whether the data we've so painstakingly
3144          * accumulated is already obsolete due to someone else's
3145          * just-committed catalog changes.      If so, we just delete the temp
3146          * file and leave it to the next backend to try again.  (Our own
3147          * relcache entries will be updated by SI message processing, but we
3148          * can't be sure whether what we wrote out was up-to-date.)
3149          *
3150          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
3151          * grab a serialization lock for the duration.
3152          */
3153         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3154
3155         /* Make sure we have seen all incoming SI messages */
3156         AcceptInvalidationMessages();
3157
3158         /*
3159          * If we have received any SI relcache invals since backend start,
3160          * assume we may have written out-of-date data.
3161          */
3162         if (relcacheInvalsReceived == 0L)
3163         {
3164                 /*
3165                  * OK, rename the temp file to its final name, deleting any
3166                  * previously-existing init file.
3167                  *
3168                  * Note: a failure here is possible under Cygwin, if some other
3169                  * backend is holding open an unlinked-but-not-yet-gone init file.
3170                  * So treat this as a noncritical failure.
3171                  */
3172                 if (rename(tempfilename, finalfilename) < 0)
3173                 {
3174                         elog(WARNING, "Cannot rename init file %s to %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename, finalfilename);
3175
3176                         /*
3177                          * If we fail, try to clean up the useless temp file; don't
3178                          * bother to complain if this fails too.
3179                          */
3180                         unlink(tempfilename);
3181                 }
3182         }
3183         else
3184         {
3185                 /* Delete the already-obsolete temp file */
3186                 unlink(tempfilename);
3187         }
3188
3189         LWLockRelease(RelCacheInitLock);
3190 }
3191
3192 /*
3193  * Detect whether a given relation (identified by OID) is one of the ones
3194  * we store in the init file.
3195  *
3196  * Note that we effectively assume that all backends running in a database
3197  * would choose to store the same set of relations in the init file;
3198  * otherwise there are cases where we'd fail to detect the need for an init
3199  * file invalidation.  This does not seem likely to be a problem in practice.
3200  */
3201 bool
3202 RelationIdIsInInitFile(Oid relationId)
3203 {
3204         return intMember((int) relationId, initFileRelationIds);
3205 }
3206
3207 /*
3208  * Invalidate (remove) the init file during commit of a transaction that
3209  * changed one or more of the relation cache entries that are kept in the
3210  * init file.
3211  *
3212  * We actually need to remove the init file twice: once just before sending
3213  * the SI messages that include relcache inval for such relations, and once
3214  * just after sending them.  The unlink before ensures that a backend that's
3215  * currently starting cannot read the now-obsolete init file and then miss
3216  * the SI messages that will force it to update its relcache entries.  (This
3217  * works because the backend startup sequence gets into the PROC array before
3218  * trying to load the init file.)  The unlink after is to synchronize with a
3219  * backend that may currently be trying to write an init file based on data
3220  * that we've just rendered invalid.  Such a backend will see the SI messages,
3221  * but we can't leave the init file sitting around to fool later backends.
3222  *
3223  * Ignore any failure to unlink the file, since it might not be there if
3224  * no backend has been started since the last removal.
3225  */
3226 void
3227 RelationCacheInitFileInvalidate(bool beforeSend)
3228 {
3229         char            initfilename[MAXPGPATH];
3230
3231         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3232                          DatabasePath, RELCACHE_INIT_FILENAME);
3233
3234         if (beforeSend)
3235         {
3236                 /* no interlock needed here */
3237                 unlink(initfilename);
3238         }
3239         else
3240         {
3241                 /*
3242                  * We need to interlock this against write_relcache_init_file, to
3243                  * guard against possibility that someone renames a new-but-
3244                  * already-obsolete init file into place just after we unlink.
3245                  * With the interlock, it's certain that write_relcache_init_file
3246                  * will notice our SI inval message before renaming into place, or
3247                  * else that we will execute second and successfully unlink the
3248                  * file.
3249                  */
3250                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3251                 unlink(initfilename);
3252                 LWLockRelease(RelCacheInitLock);
3253         }
3254 }