]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Code review of CLUSTER patch. Clean up problems with relcache getting
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.172 2002/08/11 21:17:35 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationSysNameGetRelation              - get a reldesc by system rel name
21  *              RelationIdCacheGetRelation              - get a cached reldesc by relid
22  *              RelationClose                                   - close an open relation
23  *
24  * NOTES
25  *              The following code contains many undocumented hacks.  Please be
26  *              careful....
27  */
28 #include "postgres.h"
29
30 #include <sys/types.h>
31 #include <errno.h>
32 #include <sys/file.h>
33 #include <fcntl.h>
34 #include <unistd.h>
35
36 #include "access/genam.h"
37 #include "access/heapam.h"
38 #include "access/istrat.h"
39 #include "catalog/catalog.h"
40 #include "catalog/catname.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/pg_amop.h"
44 #include "catalog/pg_amproc.h"
45 #include "catalog/pg_attrdef.h"
46 #include "catalog/pg_attribute.h"
47 #include "catalog/pg_constraint.h"
48 #include "catalog/pg_index.h"
49 #include "catalog/pg_namespace.h"
50 #include "catalog/pg_opclass.h"
51 #include "catalog/pg_proc.h"
52 #include "catalog/pg_rewrite.h"
53 #include "catalog/pg_type.h"
54 #include "commands/trigger.h"
55 #include "miscadmin.h"
56 #include "storage/smgr.h"
57 #include "utils/builtins.h"
58 #include "utils/catcache.h"
59 #include "utils/fmgroids.h"
60 #include "utils/inval.h"
61 #include "utils/lsyscache.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
64
65
66 /*
67  * name of relcache init file, used to speed up backend startup
68  */
69 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
70
71 /*
72  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
73  */
74 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
75 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
76 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
77 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
78
79 /*
80  *              Hash tables that index the relation cache
81  *
82  *              Relations are looked up two ways, by OID and by name,
83  *              thus there are two hash tables for referencing them.
84  *
85  *              The OID index covers all relcache entries.  The name index
86  *              covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
87  */
88 static HTAB *RelationIdCache;
89 static HTAB *RelationSysNameCache;
90
91 /*
92  * Bufmgr uses RelFileNode for lookup. Actually, I would like to do
93  * not pass Relation to bufmgr & beyond at all and keep some cache
94  * in smgr, but no time to do it right way now.         -- vadim 10/22/2000
95  */
96 static HTAB *RelationNodeCache;
97
98 /*
99  * This flag is false until we have prepared the critical relcache entries
100  * that are needed to do indexscans on the tables read by relcache building.
101  */
102 bool criticalRelcachesBuilt = false;
103
104 /*
105  * This flag is set if we discover that we need to write a new relcache
106  * cache file at the end of startup.
107  */
108 static bool needNewCacheFile = false;
109
110 /*
111  * This counter counts relcache inval events received since backend startup
112  * (but only for rels that are actually in cache).  Presently, we use it only
113  * to detect whether data about to be written by write_relcache_init_file()
114  * might already be obsolete.
115  */
116 static long relcacheInvalsReceived = 0L;
117
118 /*
119  * This list remembers the OIDs of the relations cached in the relcache
120  * init file.
121  */
122 static List *initFileRelationIds = NIL;
123
124 /*
125  *              RelationBuildDescInfo exists so code can be shared
126  *              between RelationIdGetRelation() and RelationSysNameGetRelation()
127  */
128 typedef struct RelationBuildDescInfo
129 {
130         int                     infotype;               /* lookup by id or by name */
131 #define INFO_RELID 1
132 #define INFO_RELNAME 2
133         union
134         {
135                 Oid                     info_id;        /* relation object id */
136                 char       *info_name;  /* system relation name */
137         }                       i;
138 } RelationBuildDescInfo;
139
140 typedef struct relidcacheent
141 {
142         Oid                     reloid;
143         Relation        reldesc;
144 } RelIdCacheEnt;
145
146 typedef struct relnamecacheent
147 {
148         NameData        relname;
149         Relation        reldesc;
150 } RelNameCacheEnt;
151
152 typedef struct relnodecacheent
153 {
154         RelFileNode relnode;
155         Relation        reldesc;
156 } RelNodeCacheEnt;
157
158 /*
159  *              macros to manipulate the lookup hashtables
160  */
161 #define RelationCacheInsert(RELATION)   \
162 do { \
163         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; bool found; \
164         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
165                                                                                    (void *) &(RELATION->rd_id), \
166                                                                                    HASH_ENTER, \
167                                                                                    &found); \
168         if (idhentry == NULL) \
169                 elog(ERROR, "out of memory for relation descriptor cache"); \
170         /* used to give notice if found -- now just keep quiet */ \
171         idhentry->reldesc = RELATION; \
172         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
173                                                                                    (void *) &(RELATION->rd_node), \
174                                                                                    HASH_ENTER, \
175                                                                                    &found); \
176         if (nodentry == NULL) \
177                 elog(ERROR, "out of memory for relation descriptor cache"); \
178         /* used to give notice if found -- now just keep quiet */ \
179         nodentry->reldesc = RELATION; \
180         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
181         { \
182                 char *relname = RelationGetRelationName(RELATION); \
183                 RelNameCacheEnt *namehentry; \
184                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
185                                                                                                    relname, \
186                                                                                                    HASH_ENTER, \
187                                                                                                    &found); \
188                 if (namehentry == NULL) \
189                         elog(ERROR, "out of memory for relation descriptor cache"); \
190                 /* used to give notice if found -- now just keep quiet */ \
191                 namehentry->reldesc = RELATION; \
192         } \
193 } while(0)
194
195 #define RelationIdCacheLookup(ID, RELATION) \
196 do { \
197         RelIdCacheEnt *hentry; \
198         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
199                                                                                  (void *)&(ID), HASH_FIND,NULL); \
200         if (hentry) \
201                 RELATION = hentry->reldesc; \
202         else \
203                 RELATION = NULL; \
204 } while(0)
205
206 #define RelationSysNameCacheLookup(NAME, RELATION) \
207 do { \
208         RelNameCacheEnt *hentry; \
209         hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
210                                                                                    (void *) (NAME), HASH_FIND,NULL); \
211         if (hentry) \
212                 RELATION = hentry->reldesc; \
213         else \
214                 RELATION = NULL; \
215 } while(0)
216
217 #define RelationNodeCacheLookup(NODE, RELATION) \
218 do { \
219         RelNodeCacheEnt *hentry; \
220         hentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
221                                                                                    (void *)&(NODE), HASH_FIND,NULL); \
222         if (hentry) \
223                 RELATION = hentry->reldesc; \
224         else \
225                 RELATION = NULL; \
226 } while(0)
227
228 #define RelationCacheDelete(RELATION) \
229 do { \
230         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; \
231         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
232                                                                                    (void *)&(RELATION->rd_id), \
233                                                                                    HASH_REMOVE, NULL); \
234         if (idhentry == NULL) \
235                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist."); \
236         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
237                                                                                    (void *)&(RELATION->rd_node), \
238                                                                                    HASH_REMOVE, NULL); \
239         if (nodentry == NULL) \
240                 elog(WARNING, "trying to delete a rd_node reldesc that does not exist."); \
241         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
242         { \
243                 char *relname = RelationGetRelationName(RELATION); \
244                 RelNameCacheEnt *namehentry; \
245                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
246                                                                                                    relname, \
247                                                                                                    HASH_REMOVE, NULL); \
248                 if (namehentry == NULL) \
249                         elog(WARNING, "trying to delete a relname reldesc that does not exist."); \
250         } \
251 } while(0)
252
253
254 /*
255  * Special cache for opclass-related information
256  */
257 typedef struct opclasscacheent
258 {
259         Oid                     opclassoid;             /* lookup key: OID of opclass */
260         bool            valid;                  /* set TRUE after successful fill-in */
261         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
262         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
263         Oid                *operatorOids;       /* strategy operators' OIDs */
264         RegProcedure *operatorProcs; /* strategy operators' procs */
265         RegProcedure *supportProcs;     /* support procs */
266 } OpClassCacheEnt;
267
268 static HTAB *OpClassCache = NULL;
269
270
271 /* non-export function prototypes */
272
273 static void RelationClearRelation(Relation relation, bool rebuild);
274
275 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
276 static void RelationReloadClassinfo(Relation relation);
277 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
278 static void RelationFlushRelation(Relation relation);
279 static Relation RelationSysNameCacheGetRelation(const char *relationName);
280 static bool load_relcache_init_file(void);
281 static void write_relcache_init_file(void);
282
283 static void formrdesc(const char *relationName, int natts,
284                   FormData_pg_attribute *att);
285
286 static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo);
287 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
288 static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
289                                            Relation relation);
290 static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
291                                   Relation oldrelation);
292 static void AttrDefaultFetch(Relation relation);
293 static void CheckConstraintFetch(Relation relation);
294 static List *insert_ordered_oid(List *list, Oid datum);
295 static void IndexSupportInitialize(Form_pg_index iform,
296                                                                    IndexStrategy indexStrategy,
297                                                                    Oid *indexOperator,
298                                                                    RegProcedure *indexSupport,
299                                                                    StrategyNumber maxStrategyNumber,
300                                                                    StrategyNumber maxSupportNumber,
301                                                                    AttrNumber maxAttributeNumber);
302 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
303                                                                                   StrategyNumber numStrats,
304                                                                                   StrategyNumber numSupport);
305
306
307 /*
308  *              ScanPgRelation
309  *
310  *              this is used by RelationBuildDesc to find a pg_class
311  *              tuple matching either a relation name or a relation id
312  *              as specified in buildinfo.
313  *
314  *              NB: the returned tuple has been copied into palloc'd storage
315  *              and must eventually be freed with heap_freetuple.
316  */
317 static HeapTuple
318 ScanPgRelation(RelationBuildDescInfo buildinfo)
319 {
320         HeapTuple       pg_class_tuple;
321         Relation        pg_class_desc;
322         const char *indexRelname;
323         SysScanDesc pg_class_scan;
324         ScanKeyData key[2];
325         int                     nkeys;
326
327         /*
328          * form a scan key
329          */
330         switch (buildinfo.infotype)
331         {
332                 case INFO_RELID:
333                         ScanKeyEntryInitialize(&key[0], 0,
334                                                                    ObjectIdAttributeNumber,
335                                                                    F_OIDEQ,
336                                                                    ObjectIdGetDatum(buildinfo.i.info_id));
337                         nkeys = 1;
338                         indexRelname = ClassOidIndex;
339                         break;
340
341                 case INFO_RELNAME:
342                         ScanKeyEntryInitialize(&key[0], 0,
343                                                                    Anum_pg_class_relname,
344                                                                    F_NAMEEQ,
345                                                                    NameGetDatum(buildinfo.i.info_name));
346                         ScanKeyEntryInitialize(&key[1], 0,
347                                                                    Anum_pg_class_relnamespace,
348                                                                    F_OIDEQ,
349                                                                    ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
350                         nkeys = 2;
351                         indexRelname = ClassNameNspIndex;
352                         break;
353
354                 default:
355                         elog(ERROR, "ScanPgRelation: bad buildinfo");
356                         return NULL;            /* keep compiler quiet */
357         }
358
359         /*
360          * Open pg_class and fetch a tuple.  Force heap scan if we haven't
361          * yet built the critical relcache entries (this includes initdb
362          * and startup without a pg_internal.init file).
363          */
364         pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
365         pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
366                                                                            criticalRelcachesBuilt,
367                                                                            SnapshotNow,
368                                                                            nkeys, key);
369
370         pg_class_tuple = systable_getnext(pg_class_scan);
371
372         /*
373          * Must copy tuple before releasing buffer.
374          */
375         if (HeapTupleIsValid(pg_class_tuple))
376                 pg_class_tuple = heap_copytuple(pg_class_tuple);
377
378         /* all done */
379         systable_endscan(pg_class_scan);
380         heap_close(pg_class_desc, AccessShareLock);
381
382         return pg_class_tuple;
383 }
384
385 /*
386  *              AllocateRelationDesc
387  *
388  *              This is used to allocate memory for a new relation descriptor
389  *              and initialize the rd_rel field.
390  *
391  *              If 'relation' is NULL, allocate a new RelationData object.
392  *              If not, reuse the given object (that path is taken only when
393  *              we have to rebuild a relcache entry during RelationClearRelation).
394  */
395 static Relation
396 AllocateRelationDesc(Relation relation, Form_pg_class relp)
397 {
398         MemoryContext oldcxt;
399         Form_pg_class relationForm;
400
401         /* Relcache entries must live in CacheMemoryContext */
402         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
403
404         /*
405          * allocate space for new relation descriptor, if needed
406          */
407         if (relation == NULL)
408                 relation = (Relation) palloc(sizeof(RelationData));
409
410         /*
411          * clear all fields of reldesc
412          */
413         MemSet((char *) relation, 0, sizeof(RelationData));
414         relation->rd_targblock = InvalidBlockNumber;
415
416         /* make sure relation is marked as having no open file yet */
417         relation->rd_fd = -1;
418
419         /*
420          * Copy the relation tuple form
421          *
422          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
423          * relacl is NOT stored in the relcache --- there'd be little point in
424          * it, since we don't copy the tuple's nullvalues bitmap and hence
425          * wouldn't know if the value is valid ... bottom line is that relacl
426          * *cannot* be retrieved from the relcache.  Get it from the syscache
427          * if you need it.
428          */
429         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
430
431         memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
432
433         /* initialize relation tuple form */
434         relation->rd_rel = relationForm;
435
436         /* and allocate attribute tuple form storage */
437         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts, BoolToHasOid(relationForm->relhasoids));
438
439         MemoryContextSwitchTo(oldcxt);
440
441         return relation;
442 }
443
444 /*
445  *              RelationBuildTupleDesc
446  *
447  *              Form the relation's tuple descriptor from information in
448  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
449  */
450 static void
451 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
452                                            Relation relation)
453 {
454         HeapTuple       pg_attribute_tuple;
455         Relation        pg_attribute_desc;
456         SysScanDesc pg_attribute_scan;
457         ScanKeyData skey[2];
458         int                     need;
459         TupleConstr *constr;
460         AttrDefault *attrdef = NULL;
461         int                     ndef = 0;
462
463         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
464                                                                                                 sizeof(TupleConstr));
465         constr->has_not_null = false;
466
467         /*
468          * Form a scan key that selects only user attributes (attnum > 0).
469          * (Eliminating system attribute rows at the index level is lots
470          * faster than fetching them.)
471          */
472         ScanKeyEntryInitialize(&skey[0], 0,
473                                                    Anum_pg_attribute_attrelid,
474                                                    F_OIDEQ,
475                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
476         ScanKeyEntryInitialize(&skey[1], 0,
477                                                    Anum_pg_attribute_attnum,
478                                                    F_INT2GT,
479                                                    Int16GetDatum(0));
480
481         /*
482          * Open pg_attribute and begin a scan.  Force heap scan if we haven't
483          * yet built the critical relcache entries (this includes initdb
484          * and startup without a pg_internal.init file).
485          */
486         pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
487         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
488                                                                                    AttributeRelidNumIndex,
489                                                                                    criticalRelcachesBuilt,
490                                                                                    SnapshotNow,
491                                                                                    2, skey);
492
493         /*
494          * add attribute data to relation->rd_att
495          */
496         need = relation->rd_rel->relnatts;
497
498         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
499         {
500                 Form_pg_attribute attp;
501
502                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
503
504                 if (attp->attnum <= 0 ||
505                         attp->attnum > relation->rd_rel->relnatts)
506                         elog(ERROR, "Bogus attribute number %d for %s",
507                                  attp->attnum, RelationGetRelationName(relation));
508
509                 relation->rd_att->attrs[attp->attnum - 1] =
510                         (Form_pg_attribute) MemoryContextAlloc(CacheMemoryContext,
511                                                                                                    ATTRIBUTE_TUPLE_SIZE);
512
513                 memcpy((char *) (relation->rd_att->attrs[attp->attnum - 1]),
514                            (char *) attp,
515                            ATTRIBUTE_TUPLE_SIZE);
516
517                 /* Update constraint/default info */
518                 if (attp->attnotnull)
519                         constr->has_not_null = true;
520
521                 if (attp->atthasdef)
522                 {
523                         if (attrdef == NULL)
524                         {
525                                 attrdef = (AttrDefault *)
526                                         MemoryContextAlloc(CacheMemoryContext,
527                                                                            relation->rd_rel->relnatts *
528                                                                            sizeof(AttrDefault));
529                                 MemSet(attrdef, 0,
530                                            relation->rd_rel->relnatts * sizeof(AttrDefault));
531                         }
532                         attrdef[ndef].adnum = attp->attnum;
533                         attrdef[ndef].adbin = NULL;
534                         ndef++;
535                 }
536                 need--;
537                 if (need == 0)
538                         break;
539         }
540
541         /*
542          * end the scan and close the attribute relation
543          */
544         systable_endscan(pg_attribute_scan);
545         heap_close(pg_attribute_desc, AccessShareLock);
546
547         if (need != 0)
548                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
549                          need, RelationGetRelid(relation));
550
551         /*
552          * The attcacheoff values we read from pg_attribute should all be -1
553          * ("unknown").  Verify this if assert checking is on.  They will be
554          * computed when and if needed during tuple access.
555          */
556 #ifdef USE_ASSERT_CHECKING
557         {
558                 int                     i;
559
560                 for (i = 0; i < relation->rd_rel->relnatts; i++)
561                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
562         }
563 #endif
564
565         /*
566          * However, we can easily set the attcacheoff value for the first
567          * attribute: it must be zero.  This eliminates the need for special
568          * cases for attnum=1 that used to exist in fastgetattr() and
569          * index_getattr().
570          */
571         relation->rd_att->attrs[0]->attcacheoff = 0;
572
573         /*
574          * Set up constraint/default info
575          */
576         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
577         {
578                 relation->rd_att->constr = constr;
579
580                 if (ndef > 0)                   /* DEFAULTs */
581                 {
582                         if (ndef < relation->rd_rel->relnatts)
583                                 constr->defval = (AttrDefault *)
584                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
585                         else
586                                 constr->defval = attrdef;
587                         constr->num_defval = ndef;
588                         AttrDefaultFetch(relation);
589                 }
590                 else
591                         constr->num_defval = 0;
592
593                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
594                 {
595                         constr->num_check = relation->rd_rel->relchecks;
596                         constr->check = (ConstrCheck *)
597                                 MemoryContextAlloc(CacheMemoryContext,
598                                                                 constr->num_check * sizeof(ConstrCheck));
599                         MemSet(constr->check, 0, constr->num_check * sizeof(ConstrCheck));
600                         CheckConstraintFetch(relation);
601                 }
602                 else
603                         constr->num_check = 0;
604         }
605         else
606         {
607                 pfree(constr);
608                 relation->rd_att->constr = NULL;
609         }
610 }
611
612 /*
613  *              RelationBuildRuleLock
614  *
615  *              Form the relation's rewrite rules from information in
616  *              the pg_rewrite system catalog.
617  *
618  * Note: The rule parsetrees are potentially very complex node structures.
619  * To allow these trees to be freed when the relcache entry is flushed,
620  * we make a private memory context to hold the RuleLock information for
621  * each relcache entry that has associated rules.  The context is used
622  * just for rule info, not for any other subsidiary data of the relcache
623  * entry, because that keeps the update logic in RelationClearRelation()
624  * manageable.  The other subsidiary data structures are simple enough
625  * to be easy to free explicitly, anyway.
626  */
627 static void
628 RelationBuildRuleLock(Relation relation)
629 {
630         MemoryContext rulescxt;
631         MemoryContext oldcxt;
632         HeapTuple       rewrite_tuple;
633         Relation        rewrite_desc;
634         TupleDesc       rewrite_tupdesc;
635         SysScanDesc rewrite_scan;
636         ScanKeyData key;
637         RuleLock   *rulelock;
638         int                     numlocks;
639         RewriteRule **rules;
640         int                     maxlocks;
641
642         /*
643          * Make the private context.  Parameters are set on the assumption
644          * that it'll probably not contain much data.
645          */
646         rulescxt = AllocSetContextCreate(CacheMemoryContext,
647                                                                          RelationGetRelationName(relation),
648                                                                          0, /* minsize */
649                                                                          1024,          /* initsize */
650                                                                          1024);         /* maxsize */
651         relation->rd_rulescxt = rulescxt;
652
653         /*
654          * allocate an array to hold the rewrite rules (the array is extended if
655          * necessary)
656          */
657         maxlocks = 4;
658         rules = (RewriteRule **)
659                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
660         numlocks = 0;
661
662         /*
663          * form a scan key
664          */
665         ScanKeyEntryInitialize(&key, 0,
666                                                    Anum_pg_rewrite_ev_class,
667                                                    F_OIDEQ,
668                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
669
670         /*
671          * open pg_rewrite and begin a scan
672          *
673          * Note: since we scan the rules using RewriteRelRulenameIndex,
674          * we will be reading the rules in name order, except possibly
675          * during emergency-recovery operations (ie, IsIgnoringSystemIndexes).
676          * This in turn ensures that rules will be fired in name order.
677          */
678         rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
679         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
680         rewrite_scan = systable_beginscan(rewrite_desc, 
681                                                                           RewriteRelRulenameIndex,
682                                                                           true, SnapshotNow,
683                                                                           1, &key);
684
685         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
686         {
687                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
688                 bool            isnull;
689                 Datum           ruleaction;
690                 Datum           rule_evqual;
691                 char       *ruleaction_str;
692                 char       *rule_evqual_str;
693                 RewriteRule *rule;
694
695                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
696                                                                                                   sizeof(RewriteRule));
697
698                 AssertTupleDescHasOid(rewrite_tupdesc);
699                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
700
701                 rule->event = rewrite_form->ev_type - '0';
702                 rule->attrno = rewrite_form->ev_attr;
703                 rule->isInstead = rewrite_form->is_instead;
704
705                 /* Must use heap_getattr to fetch ev_qual and ev_action */
706
707                 ruleaction = heap_getattr(rewrite_tuple,
708                                                                   Anum_pg_rewrite_ev_action,
709                                                                   rewrite_tupdesc,
710                                                                   &isnull);
711                 Assert(!isnull);
712                 ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
713                                                                                                                          ruleaction));
714                 oldcxt = MemoryContextSwitchTo(rulescxt);
715                 rule->actions = (List *) stringToNode(ruleaction_str);
716                 MemoryContextSwitchTo(oldcxt);
717                 pfree(ruleaction_str);
718
719                 rule_evqual = heap_getattr(rewrite_tuple,
720                                                                    Anum_pg_rewrite_ev_qual,
721                                                                    rewrite_tupdesc,
722                                                                    &isnull);
723                 Assert(!isnull);
724                 rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
725                                                                                                                           rule_evqual));
726                 oldcxt = MemoryContextSwitchTo(rulescxt);
727                 rule->qual = (Node *) stringToNode(rule_evqual_str);
728                 MemoryContextSwitchTo(oldcxt);
729                 pfree(rule_evqual_str);
730
731                 if (numlocks >= maxlocks)
732                 {
733                         maxlocks *= 2;
734                         rules = (RewriteRule **)
735                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
736                 }
737                 rules[numlocks++] = rule;
738         }
739
740         /*
741          * end the scan and close the attribute relation
742          */
743         systable_endscan(rewrite_scan);
744         heap_close(rewrite_desc, AccessShareLock);
745
746         /*
747          * form a RuleLock and insert into relation
748          */
749         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
750         rulelock->numLocks = numlocks;
751         rulelock->rules = rules;
752
753         relation->rd_rules = rulelock;
754 }
755
756 /*
757  *              equalRuleLocks
758  *
759  *              Determine whether two RuleLocks are equivalent
760  *
761  *              Probably this should be in the rules code someplace...
762  */
763 static bool
764 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
765 {
766         int                     i;
767
768         /*
769          * As of 7.3 we assume the rule ordering is repeatable,
770          * because RelationBuildRuleLock should read 'em in a
771          * consistent order.  So just compare corresponding slots.
772          */
773         if (rlock1 != NULL)
774         {
775                 if (rlock2 == NULL)
776                         return false;
777                 if (rlock1->numLocks != rlock2->numLocks)
778                         return false;
779                 for (i = 0; i < rlock1->numLocks; i++)
780                 {
781                         RewriteRule *rule1 = rlock1->rules[i];
782                         RewriteRule *rule2 = rlock2->rules[i];
783
784                         if (rule1->ruleId != rule2->ruleId)
785                                 return false;
786                         if (rule1->event != rule2->event)
787                                 return false;
788                         if (rule1->attrno != rule2->attrno)
789                                 return false;
790                         if (rule1->isInstead != rule2->isInstead)
791                                 return false;
792                         if (!equal(rule1->qual, rule2->qual))
793                                 return false;
794                         if (!equal(rule1->actions, rule2->actions))
795                                 return false;
796                 }
797         }
798         else if (rlock2 != NULL)
799                 return false;
800         return true;
801 }
802
803
804 /* ----------------------------------
805  *              RelationBuildDesc
806  *
807  *              Build a relation descriptor --- either a new one, or by
808  *              recycling the given old relation object.  The latter case
809  *              supports rebuilding a relcache entry without invalidating
810  *              pointers to it.
811  * --------------------------------
812  */
813 static Relation
814 RelationBuildDesc(RelationBuildDescInfo buildinfo,
815                                   Relation oldrelation)
816 {
817         Relation        relation;
818         Oid                     relid;
819         HeapTuple       pg_class_tuple;
820         Form_pg_class relp;
821         MemoryContext oldcxt;
822
823         /*
824          * find the tuple in pg_class corresponding to the given relation id
825          */
826         pg_class_tuple = ScanPgRelation(buildinfo);
827
828         /*
829          * if no such tuple exists, return NULL
830          */
831         if (!HeapTupleIsValid(pg_class_tuple))
832                 return NULL;
833
834         /*
835          * get information from the pg_class_tuple
836          */
837         relid = HeapTupleGetOid(pg_class_tuple);
838         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
839
840         /*
841          * allocate storage for the relation descriptor, and copy
842          * pg_class_tuple to relation->rd_rel.
843          */
844         relation = AllocateRelationDesc(oldrelation, relp);
845
846         /*
847          * now we can free the memory allocated for pg_class_tuple
848          */
849         heap_freetuple(pg_class_tuple);
850
851         /*
852          * initialize the relation's relation id (relation->rd_id)
853          */
854         RelationGetRelid(relation) = relid;
855
856         /*
857          * initialize relation->rd_refcnt
858          */
859         RelationSetReferenceCount(relation, 1);
860
861         /*
862          * normal relations are not nailed into the cache; nor can a pre-existing
863          * relation be new.  It could be temp though.  (Actually, it could be new
864          * too, but it's okay to forget that fact if forced to flush the entry.)
865          */
866         relation->rd_isnailed = false;
867         relation->rd_isnew = false;
868         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
869
870         /*
871          * initialize the tuple descriptor (relation->rd_att).
872          */
873         RelationBuildTupleDesc(buildinfo, relation);
874         RelationGetDescr(relation)->tdhasoid = BoolToHasOid(RelationGetForm(relation)->relhasoids);
875
876         /*
877          * Fetch rules and triggers that affect this relation
878          */
879         if (relation->rd_rel->relhasrules)
880                 RelationBuildRuleLock(relation);
881         else
882         {
883                 relation->rd_rules = NULL;
884                 relation->rd_rulescxt = NULL;
885         }
886
887         if (relation->rd_rel->reltriggers > 0)
888                 RelationBuildTriggers(relation);
889         else
890                 relation->trigdesc = NULL;
891
892         /*
893          * if it's an index, initialize index-related information
894          */
895         if (OidIsValid(relation->rd_rel->relam))
896                 RelationInitIndexAccessInfo(relation);
897
898         /*
899          * initialize the relation lock manager information
900          */
901         RelationInitLockInfo(relation);         /* see lmgr.c */
902
903         if (relation->rd_rel->relisshared)
904                 relation->rd_node.tblNode = InvalidOid;
905         else
906                 relation->rd_node.tblNode = MyDatabaseId;
907         relation->rd_node.relNode = relation->rd_rel->relfilenode;
908
909         /* make sure relation is marked as having no open file yet */
910         relation->rd_fd = -1;
911
912         /*
913          * Insert newly created relation into relcache hash tables.
914          */
915         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
916         RelationCacheInsert(relation);
917         MemoryContextSwitchTo(oldcxt);
918
919         /*
920          * If it's a temp rel, RelationGetNumberOfBlocks will assume that
921          * rd_nblocks is correct.  Must forcibly update the block count when
922          * creating the relcache entry.  But if we are doing a rebuild, don't
923          * do this yet; leave it to RelationClearRelation to do at the end.
924          * (Otherwise, an elog in RelationUpdateNumberOfBlocks would leave us
925          * with inconsistent relcache state.)
926          */
927         if (relation->rd_istemp && oldrelation == NULL)
928                 RelationUpdateNumberOfBlocks(relation);
929
930         return relation;
931 }
932
933 /*
934  * Initialize index-access-method support data for an index relation
935  */
936 void
937 RelationInitIndexAccessInfo(Relation relation)
938 {
939         HeapTuple       tuple;
940         Size            iformsize;
941         Form_pg_index iform;
942         Form_pg_am      aform;
943         MemoryContext indexcxt;
944         IndexStrategy strategy;
945         Oid                *operator;
946         RegProcedure *support;
947         FmgrInfo   *supportinfo;
948         int                     natts;
949         uint16          amstrategies;
950         uint16          amsupport;
951
952         /*
953          * Make a copy of the pg_index entry for the index.  Note that this
954          * is a variable-length tuple.
955          */
956         tuple = SearchSysCache(INDEXRELID,
957                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
958                                                    0, 0, 0);
959         if (!HeapTupleIsValid(tuple))
960                 elog(ERROR, "RelationInitIndexAccessInfo: no pg_index entry for index %u",
961                          RelationGetRelid(relation));
962         iformsize = tuple->t_len - tuple->t_data->t_hoff;
963         iform = (Form_pg_index) MemoryContextAlloc(CacheMemoryContext, iformsize);
964         memcpy(iform, GETSTRUCT(tuple), iformsize);
965         ReleaseSysCache(tuple);
966         relation->rd_index = iform;
967
968         /*
969          * Make a copy of the pg_am entry for the index's access method
970          */
971         tuple = SearchSysCache(AMOID,
972                                                    ObjectIdGetDatum(relation->rd_rel->relam),
973                                                    0, 0, 0);
974         if (!HeapTupleIsValid(tuple))
975                 elog(ERROR, "RelationInitIndexAccessInfo: cache lookup failed for AM %u",
976                          relation->rd_rel->relam);
977         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
978         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
979         ReleaseSysCache(tuple);
980         relation->rd_am = aform;
981
982         natts = relation->rd_rel->relnatts;
983         amstrategies = aform->amstrategies;
984         amsupport = aform->amsupport;
985
986         /*
987          * Make the private context to hold index access info.  The reason we
988          * need a context, and not just a couple of pallocs, is so that we
989          * won't leak any subsidiary info attached to fmgr lookup records.
990          *
991          * Context parameters are set on the assumption that it'll probably not
992          * contain much data.
993          */
994         indexcxt = AllocSetContextCreate(CacheMemoryContext,
995                                                                          RelationGetRelationName(relation),
996                                                                          0, /* minsize */
997                                                                          512,           /* initsize */
998                                                                          1024);         /* maxsize */
999         relation->rd_indexcxt = indexcxt;
1000
1001         /*
1002          * Allocate arrays to hold data
1003          */
1004         if (amstrategies > 0)
1005         {
1006                 int                     noperators = natts * amstrategies;
1007                 Size            stratSize;
1008
1009                 stratSize = AttributeNumberGetIndexStrategySize(natts, amstrategies);
1010                 strategy = (IndexStrategy) MemoryContextAlloc(indexcxt, stratSize);
1011                 MemSet(strategy, 0, stratSize);
1012                 operator = (Oid *)
1013                         MemoryContextAlloc(indexcxt, noperators * sizeof(Oid));
1014                 MemSet(operator, 0, noperators * sizeof(Oid));
1015         }
1016         else
1017         {
1018                 strategy = NULL;
1019                 operator = NULL;
1020         }
1021
1022         if (amsupport > 0)
1023         {
1024                 int                     nsupport = natts * amsupport;
1025
1026                 support = (RegProcedure *)
1027                         MemoryContextAlloc(indexcxt, nsupport * sizeof(RegProcedure));
1028                 MemSet(support, 0, nsupport * sizeof(RegProcedure));
1029                 supportinfo = (FmgrInfo *)
1030                         MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
1031                 MemSet(supportinfo, 0, nsupport * sizeof(FmgrInfo));
1032         }
1033         else
1034         {
1035                 support = NULL;
1036                 supportinfo = NULL;
1037         }
1038
1039         relation->rd_istrat = strategy;
1040         relation->rd_operator = operator;
1041         relation->rd_support = support;
1042         relation->rd_supportinfo = supportinfo;
1043
1044         /*
1045          * Fill the strategy map and the support RegProcedure arrays.
1046          * (supportinfo is left as zeroes, and is filled on-the-fly when used)
1047          */
1048         IndexSupportInitialize(iform,
1049                                                    strategy, operator, support,
1050                                                    amstrategies, amsupport, natts);
1051 }
1052
1053 /*
1054  * IndexSupportInitialize
1055  *              Initializes an index strategy and associated support procedures,
1056  *              given the index's pg_index tuple.
1057  *
1058  * Data is returned into *indexStrategy, *indexOperator, and *indexSupport,
1059  * all of which are objects allocated by the caller.
1060  *
1061  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1062  * maxAttributeNumber, since these indicate the size of the arrays
1063  * it has allocated --- but in practice these numbers must always match
1064  * those obtainable from the system catalog entries for the index and
1065  * access method.
1066  */
1067 static void
1068 IndexSupportInitialize(Form_pg_index iform,
1069                                            IndexStrategy indexStrategy,
1070                                            Oid *indexOperator,
1071                                            RegProcedure *indexSupport,
1072                                            StrategyNumber maxStrategyNumber,
1073                                            StrategyNumber maxSupportNumber,
1074                                            AttrNumber maxAttributeNumber)
1075 {
1076         int                     attIndex;
1077
1078         maxStrategyNumber = AMStrategies(maxStrategyNumber);
1079
1080         /*
1081          * XXX note that the following assumes the INDEX tuple is well formed
1082          * and that the *key and *class are 0 terminated.
1083          */
1084         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1085         {
1086                 OpClassCacheEnt *opcentry;
1087
1088                 if (iform->indkey[attIndex] == InvalidAttrNumber ||
1089                         !OidIsValid(iform->indclass[attIndex]))
1090                         elog(ERROR, "IndexSupportInitialize: bogus pg_index tuple");
1091
1092                 /* look up the info for this opclass, using a cache */
1093                 opcentry = LookupOpclassInfo(iform->indclass[attIndex],
1094                                                                          maxStrategyNumber,
1095                                                                          maxSupportNumber);
1096
1097                 /* load the strategy information for the index operators */
1098                 if (maxStrategyNumber > 0)
1099                 {
1100                         StrategyMap map;
1101                         Oid                *opers;
1102                         StrategyNumber strategy;
1103
1104                         map = IndexStrategyGetStrategyMap(indexStrategy,
1105                                                                                           maxStrategyNumber,
1106                                                                                           attIndex + 1);
1107                         opers = &indexOperator[attIndex * maxStrategyNumber];
1108
1109                         for (strategy = 0; strategy < maxStrategyNumber; strategy++)
1110                         {
1111                                 ScanKey         mapentry;
1112
1113                                 mapentry = StrategyMapGetScanKeyEntry(map, strategy + 1);
1114                                 if (RegProcedureIsValid(opcentry->operatorProcs[strategy]))
1115                                 {
1116                                         MemSet(mapentry, 0, sizeof(*mapentry));
1117                                         mapentry->sk_flags = 0;
1118                                         mapentry->sk_procedure = opcentry->operatorProcs[strategy];
1119                                         /*
1120                                          * Mark mapentry->sk_func invalid, until and unless
1121                                          * someone sets it up.
1122                                          */
1123                                         mapentry->sk_func.fn_oid = InvalidOid;
1124                                 }
1125                                 else
1126                                         ScanKeyEntrySetIllegal(mapentry);
1127                                 opers[strategy] = opcentry->operatorOids[strategy];
1128                         }
1129                 }
1130
1131                 /* if support routines exist for this access method, load them */
1132                 if (maxSupportNumber > 0)
1133                 {
1134                         RegProcedure *procs;
1135                         StrategyNumber support;
1136
1137                         procs = &indexSupport[attIndex * maxSupportNumber];
1138
1139                         for (support = 0; support < maxSupportNumber; ++support)
1140                                 procs[support] = opcentry->supportProcs[support];
1141                 }
1142         }
1143 }
1144
1145 /*
1146  * LookupOpclassInfo
1147  *
1148  * This routine maintains a per-opclass cache of the information needed
1149  * by IndexSupportInitialize().  This is more efficient than relying on
1150  * the catalog cache, because we can load all the info about a particular
1151  * opclass in a single indexscan of pg_amproc or pg_amop.
1152  *
1153  * The information from pg_am about expected range of strategy and support
1154  * numbers is passed in, rather than being looked up, mainly because the
1155  * caller will have it already.
1156  *
1157  * XXX There isn't any provision for flushing the cache.  However, there
1158  * isn't any provision for flushing relcache entries when opclass info
1159  * changes, either :-(
1160  */
1161 static OpClassCacheEnt *
1162 LookupOpclassInfo(Oid operatorClassOid,
1163                                   StrategyNumber numStrats,
1164                                   StrategyNumber numSupport)
1165 {
1166         OpClassCacheEnt *opcentry;
1167         bool            found;
1168         Relation        pg_amop_desc;
1169         Relation        pg_amproc_desc;
1170         SysScanDesc pg_amop_scan;
1171         SysScanDesc pg_amproc_scan;
1172         ScanKeyData key;
1173         HeapTuple       htup;
1174         bool            indexOK;
1175
1176         if (OpClassCache == NULL)
1177         {
1178                 /* First time through: initialize the opclass cache */
1179                 HASHCTL         ctl;
1180
1181                 if (!CacheMemoryContext)
1182                         CreateCacheMemoryContext();
1183
1184                 MemSet(&ctl, 0, sizeof(ctl));
1185                 ctl.keysize = sizeof(Oid);
1186                 ctl.entrysize = sizeof(OpClassCacheEnt);
1187                 ctl.hash = tag_hash;
1188                 OpClassCache = hash_create("Operator class cache", 64,
1189                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1190         }
1191
1192         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1193                                                                                            (void *) &operatorClassOid,
1194                                                                                            HASH_ENTER, &found);
1195         if (opcentry == NULL)
1196                 elog(ERROR, "out of memory for operator class cache");
1197
1198         if (found && opcentry->valid)
1199         {
1200                 /* Already made an entry for it */
1201                 Assert(numStrats == opcentry->numStrats);
1202                 Assert(numSupport == opcentry->numSupport);
1203                 return opcentry;
1204         }
1205
1206         /* Need to fill in new entry */
1207         opcentry->valid = false;        /* until known OK */
1208         opcentry->numStrats = numStrats;
1209         opcentry->numSupport = numSupport;
1210
1211         if (numStrats > 0)
1212         {
1213                 opcentry->operatorOids = (Oid *)
1214                         MemoryContextAlloc(CacheMemoryContext,
1215                                                            numStrats * sizeof(Oid));
1216                 MemSet(opcentry->operatorOids, 0, numStrats * sizeof(Oid));
1217                 opcentry->operatorProcs = (RegProcedure *)
1218                         MemoryContextAlloc(CacheMemoryContext,
1219                                                            numStrats * sizeof(RegProcedure));
1220                 MemSet(opcentry->operatorProcs, 0, numStrats * sizeof(RegProcedure));
1221         }
1222         else
1223         {
1224                 opcentry->operatorOids = NULL;
1225                 opcentry->operatorProcs = NULL;
1226         }
1227
1228         if (numSupport > 0)
1229         {
1230                 opcentry->supportProcs = (RegProcedure *)
1231                         MemoryContextAlloc(CacheMemoryContext,
1232                                                            numSupport * sizeof(RegProcedure));
1233                 MemSet(opcentry->supportProcs, 0, numSupport * sizeof(RegProcedure));
1234         }
1235         else
1236                 opcentry->supportProcs = NULL;
1237
1238         /*
1239          * To avoid infinite recursion during startup, force a heap scan if
1240          * we're looking up info for the opclasses used by the indexes we
1241          * would like to reference here.
1242          */
1243         indexOK = criticalRelcachesBuilt ||
1244                 (operatorClassOid != OID_BTREE_OPS_OID &&
1245                  operatorClassOid != INT2_BTREE_OPS_OID);
1246
1247         /*
1248          * Scan pg_amop to obtain operators for the opclass
1249          */
1250         if (numStrats > 0)
1251         {
1252                 ScanKeyEntryInitialize(&key, 0,
1253                                                            Anum_pg_amop_amopclaid,
1254                                                            F_OIDEQ,
1255                                                            ObjectIdGetDatum(operatorClassOid));
1256                 pg_amop_desc = heap_openr(AccessMethodOperatorRelationName,
1257                                                                   AccessShareLock);
1258                 pg_amop_scan = systable_beginscan(pg_amop_desc,
1259                                                                                   AccessMethodStrategyIndex,
1260                                                                                   indexOK,
1261                                                                                   SnapshotNow,
1262                                                                                   1, &key);
1263
1264                 while (HeapTupleIsValid(htup = systable_getnext(pg_amop_scan)))
1265                 {
1266                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1267
1268                         if (amopform->amopstrategy <= 0 ||
1269                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1270                                 elog(ERROR, "Bogus amopstrategy number %d for opclass %u",
1271                                          amopform->amopstrategy, operatorClassOid);
1272                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1273                                 amopform->amopopr;
1274                         opcentry->operatorProcs[amopform->amopstrategy - 1] =
1275                                 get_opcode(amopform->amopopr);
1276                 }
1277
1278                 systable_endscan(pg_amop_scan);
1279                 heap_close(pg_amop_desc, AccessShareLock);
1280         }
1281
1282         /*
1283          * Scan pg_amproc to obtain support procs for the opclass
1284          */
1285         if (numSupport > 0)
1286         {
1287                 ScanKeyEntryInitialize(&key, 0,
1288                                                            Anum_pg_amproc_amopclaid,
1289                                                            F_OIDEQ,
1290                                                            ObjectIdGetDatum(operatorClassOid));
1291                 pg_amproc_desc = heap_openr(AccessMethodProcedureRelationName,
1292                                                                         AccessShareLock);
1293                 pg_amproc_scan = systable_beginscan(pg_amproc_desc,
1294                                                                                         AccessMethodProcedureIndex,
1295                                                                                         indexOK,
1296                                                                                         SnapshotNow,
1297                                                                                         1, &key);
1298
1299                 while (HeapTupleIsValid(htup = systable_getnext(pg_amproc_scan)))
1300                 {
1301                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1302
1303                         if (amprocform->amprocnum <= 0 ||
1304                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1305                                 elog(ERROR, "Bogus amproc number %d for opclass %u",
1306                                          amprocform->amprocnum, operatorClassOid);
1307
1308                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1309                                 amprocform->amproc;
1310                 }
1311
1312                 systable_endscan(pg_amproc_scan);
1313                 heap_close(pg_amproc_desc, AccessShareLock);
1314         }
1315
1316         opcentry->valid = true;
1317         return opcentry;
1318 }
1319
1320
1321 /*
1322  *              formrdesc
1323  *
1324  *              This is a special cut-down version of RelationBuildDesc()
1325  *              used by RelationCacheInitialize() in initializing the relcache.
1326  *              The relation descriptor is built just from the supplied parameters,
1327  *              without actually looking at any system table entries.  We cheat
1328  *              quite a lot since we only need to work for a few basic system
1329  *              catalogs.
1330  *
1331  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1332  * and pg_type (see RelationCacheInitialize).
1333  *
1334  * Note that these catalogs can't have constraints, default values,
1335  * rules, or triggers, since we don't cope with any of that.
1336  *
1337  * NOTE: we assume we are already switched into CacheMemoryContext.
1338  */
1339 static void
1340 formrdesc(const char *relationName,
1341                   int natts,
1342                   FormData_pg_attribute *att)
1343 {
1344         Relation        relation;
1345         int                     i;
1346
1347         /*
1348          * allocate new relation desc
1349          */
1350         relation = (Relation) palloc(sizeof(RelationData));
1351
1352         /*
1353          * clear all fields of reldesc
1354          */
1355         MemSet((char *) relation, 0, sizeof(RelationData));
1356         relation->rd_targblock = InvalidBlockNumber;
1357
1358         /* make sure relation is marked as having no open file yet */
1359         relation->rd_fd = -1;
1360
1361         /*
1362          * initialize reference count
1363          */
1364         RelationSetReferenceCount(relation, 1);
1365
1366         /*
1367          * all entries built with this routine are nailed-in-cache; none are
1368          * for new or temp relations.
1369          */
1370         relation->rd_isnailed = true;
1371         relation->rd_isnew = false;
1372         relation->rd_istemp = false;
1373
1374         /*
1375          * initialize relation tuple form
1376          *
1377          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1378          * get us launched.  RelationCacheInitializePhase2() will read the
1379          * real data from pg_class and replace what we've done here.
1380          */
1381         relation->rd_rel = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
1382         MemSet(relation->rd_rel, 0, CLASS_TUPLE_SIZE);
1383
1384         namestrcpy(&relation->rd_rel->relname, relationName);
1385         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1386
1387         /*
1388          * It's important to distinguish between shared and non-shared
1389          * relations, even at bootstrap time, to make sure we know where they
1390          * are stored.  At present, all relations that formrdesc is used for
1391          * are not shared.
1392          */
1393         relation->rd_rel->relisshared = false;
1394
1395         relation->rd_rel->relpages = 1;
1396         relation->rd_rel->reltuples = 1;
1397         relation->rd_rel->relkind = RELKIND_RELATION;
1398         relation->rd_rel->relhasoids = true;
1399         relation->rd_rel->relnatts = (int16) natts;
1400
1401         /*
1402          * initialize attribute tuple form
1403          *
1404          * Unlike the case with the relation tuple, this data had better be
1405          * right because it will never be replaced.  The input values must be
1406          * correctly defined by macros in src/include/catalog/ headers.
1407          */
1408         relation->rd_att = CreateTemplateTupleDesc(natts, BoolToHasOid(relation->rd_rel->relhasoids));
1409
1410         /*
1411          * initialize tuple desc info
1412          */
1413         for (i = 0; i < natts; i++)
1414         {
1415                 relation->rd_att->attrs[i] = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
1416                 memcpy((char *) relation->rd_att->attrs[i],
1417                            (char *) &att[i],
1418                            ATTRIBUTE_TUPLE_SIZE);
1419                 /* make sure attcacheoff is valid */
1420                 relation->rd_att->attrs[i]->attcacheoff = -1;
1421         }
1422
1423         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1424         relation->rd_att->attrs[0]->attcacheoff = 0;
1425
1426         /*
1427          * initialize relation id from info in att array (my, this is ugly)
1428          */
1429         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1430
1431         /*
1432          * initialize the relation's lock manager and RelFileNode information
1433          */
1434         RelationInitLockInfo(relation);         /* see lmgr.c */
1435
1436         if (relation->rd_rel->relisshared)
1437                 relation->rd_node.tblNode = InvalidOid;
1438         else
1439                 relation->rd_node.tblNode = MyDatabaseId;
1440         relation->rd_node.relNode =
1441                 relation->rd_rel->relfilenode = RelationGetRelid(relation);
1442
1443         /*
1444          * initialize the rel-has-index flag, using hardwired knowledge
1445          */
1446         relation->rd_rel->relhasindex = false;
1447
1448         /* In bootstrap mode, we have no indexes */
1449         if (!IsBootstrapProcessingMode())
1450         {
1451                 /* Otherwise, all the rels formrdesc is used for have indexes */
1452                 relation->rd_rel->relhasindex = true;
1453         }
1454
1455         /*
1456          * add new reldesc to relcache
1457          */
1458         RelationCacheInsert(relation);
1459 }
1460
1461
1462 /* ----------------------------------------------------------------
1463  *                               Relation Descriptor Lookup Interface
1464  * ----------------------------------------------------------------
1465  */
1466
1467 /*
1468  *              RelationIdCacheGetRelation
1469  *
1470  *              Lookup an existing reldesc by OID.
1471  *
1472  *              Only try to get the reldesc by looking in the cache,
1473  *              do not go to the disk.
1474  *
1475  *              NB: relation ref count is incremented if successful.
1476  *              Caller should eventually decrement count.  (Usually,
1477  *              that happens by calling RelationClose().)
1478  */
1479 Relation
1480 RelationIdCacheGetRelation(Oid relationId)
1481 {
1482         Relation        rd;
1483
1484         RelationIdCacheLookup(relationId, rd);
1485
1486         if (RelationIsValid(rd))
1487                 RelationIncrementReferenceCount(rd);
1488
1489         return rd;
1490 }
1491
1492 /*
1493  *              RelationSysNameCacheGetRelation
1494  *
1495  *              As above, but lookup by name; only works for system catalogs.
1496  */
1497 static Relation
1498 RelationSysNameCacheGetRelation(const char *relationName)
1499 {
1500         Relation        rd;
1501         NameData        name;
1502
1503         /*
1504          * make sure that the name key used for hash lookup is properly
1505          * null-padded
1506          */
1507         namestrcpy(&name, relationName);
1508         RelationSysNameCacheLookup(NameStr(name), rd);
1509
1510         if (RelationIsValid(rd))
1511                 RelationIncrementReferenceCount(rd);
1512
1513         return rd;
1514 }
1515
1516 Relation
1517 RelationNodeCacheGetRelation(RelFileNode rnode)
1518 {
1519         Relation        rd;
1520
1521         RelationNodeCacheLookup(rnode, rd);
1522
1523         if (RelationIsValid(rd))
1524                 RelationIncrementReferenceCount(rd);
1525
1526         return rd;
1527 }
1528
1529 /*
1530  *              RelationIdGetRelation
1531  *
1532  *              Lookup a reldesc by OID; make one if not already in cache.
1533  *
1534  *              NB: relation ref count is incremented, or set to 1 if new entry.
1535  *              Caller should eventually decrement count.  (Usually,
1536  *              that happens by calling RelationClose().)
1537  */
1538 Relation
1539 RelationIdGetRelation(Oid relationId)
1540 {
1541         Relation        rd;
1542         RelationBuildDescInfo buildinfo;
1543
1544         /*
1545          * first try and get a reldesc from the cache
1546          */
1547         rd = RelationIdCacheGetRelation(relationId);
1548         if (RelationIsValid(rd))
1549                 return rd;
1550
1551         /*
1552          * no reldesc in the cache, so have RelationBuildDesc() build one and
1553          * add it.
1554          */
1555         buildinfo.infotype = INFO_RELID;
1556         buildinfo.i.info_id = relationId;
1557
1558         rd = RelationBuildDesc(buildinfo, NULL);
1559         return rd;
1560 }
1561
1562 /*
1563  *              RelationSysNameGetRelation
1564  *
1565  *              As above, but lookup by name; only works for system catalogs.
1566  */
1567 Relation
1568 RelationSysNameGetRelation(const char *relationName)
1569 {
1570         Relation        rd;
1571         RelationBuildDescInfo buildinfo;
1572
1573         /*
1574          * first try and get a reldesc from the cache
1575          */
1576         rd = RelationSysNameCacheGetRelation(relationName);
1577         if (RelationIsValid(rd))
1578                 return rd;
1579
1580         /*
1581          * no reldesc in the cache, so have RelationBuildDesc() build one and
1582          * add it.
1583          */
1584         buildinfo.infotype = INFO_RELNAME;
1585         buildinfo.i.info_name = (char *) relationName;
1586
1587         rd = RelationBuildDesc(buildinfo, NULL);
1588         return rd;
1589 }
1590
1591 /* ----------------------------------------------------------------
1592  *                              cache invalidation support routines
1593  * ----------------------------------------------------------------
1594  */
1595
1596 /*
1597  * RelationClose - close an open relation
1598  *
1599  *      Actually, we just decrement the refcount.
1600  *
1601  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1602  *      will be freed as soon as their refcount goes to zero.  In combination
1603  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1604  *      to catch references to already-released relcache entries.  It slows
1605  *      things down quite a bit, however.
1606  */
1607 void
1608 RelationClose(Relation relation)
1609 {
1610         /* Note: no locking manipulations needed */
1611         RelationDecrementReferenceCount(relation);
1612
1613 #ifdef RELCACHE_FORCE_RELEASE
1614         if (RelationHasReferenceCountZero(relation) &&
1615                 !relation->rd_isnew)
1616                 RelationClearRelation(relation, false);
1617 #endif
1618 }
1619
1620 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1621 /*
1622  * RelationReloadClassinfo
1623  *
1624  *      This function is especially for nailed relations.
1625  *      relhasindex/relfilenode could be changed even for
1626  *      nailed relations.
1627  */
1628 static void
1629 RelationReloadClassinfo(Relation relation)
1630 {
1631         RelationBuildDescInfo buildinfo;
1632         HeapTuple       pg_class_tuple;
1633         Form_pg_class relp;
1634
1635         if (!relation->rd_rel)
1636                 return;
1637         buildinfo.infotype = INFO_RELID;
1638         buildinfo.i.info_id = relation->rd_id;
1639         pg_class_tuple = ScanPgRelation(buildinfo);
1640         if (!HeapTupleIsValid(pg_class_tuple))
1641         {
1642                 elog(ERROR, "RelationReloadClassinfo system relation id=%d doesn't exist", relation->rd_id);
1643                 return;
1644         }
1645         RelationCacheDelete(relation);
1646         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1647         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
1648         relation->rd_node.relNode = relp->relfilenode;
1649         RelationCacheInsert(relation);
1650         heap_freetuple(pg_class_tuple);
1651
1652         return;
1653 }
1654 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1655
1656 /*
1657  * RelationClearRelation
1658  *
1659  *       Physically blow away a relation cache entry, or reset it and rebuild
1660  *       it from scratch (that is, from catalog entries).  The latter path is
1661  *       usually used when we are notified of a change to an open relation
1662  *       (one with refcount > 0).  However, this routine just does whichever
1663  *       it's told to do; callers must determine which they want.
1664  */
1665 static void
1666 RelationClearRelation(Relation relation, bool rebuild)
1667 {
1668         MemoryContext oldcxt;
1669
1670         /*
1671          * Make sure smgr and lower levels close the relation's files, if they
1672          * weren't closed already.  If the relation is not getting deleted,
1673          * the next smgr access should reopen the files automatically.  This
1674          * ensures that the low-level file access state is updated after, say,
1675          * a vacuum truncation.
1676          */
1677         if (relation->rd_fd >= 0)
1678         {
1679                 smgrclose(DEFAULT_SMGR, relation);
1680                 relation->rd_fd = -1;
1681         }
1682
1683         /*
1684          * Never, never ever blow away a nailed-in system relation, because
1685          * we'd be unable to recover.
1686          */
1687         if (relation->rd_isnailed)
1688         {
1689 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1690                 RelationReloadClassinfo(relation);
1691 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1692                 return;
1693         }
1694
1695         /*
1696          * Remove relation from hash tables
1697          *
1698          * Note: we might be reinserting it momentarily, but we must not have it
1699          * visible in the hash tables until it's valid again, so don't try to
1700          * optimize this away...
1701          */
1702         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1703         RelationCacheDelete(relation);
1704         MemoryContextSwitchTo(oldcxt);
1705
1706         /* Clear out catcache's entries for this relation */
1707         CatalogCacheFlushRelation(RelationGetRelid(relation));
1708
1709         /*
1710          * Free all the subsidiary data structures of the relcache entry. We
1711          * cannot free rd_att if we are trying to rebuild the entry, however,
1712          * because pointers to it may be cached in various places. The trigger
1713          * manager might also have pointers into the trigdesc, and the rule
1714          * manager might have pointers into the rewrite rules. So to begin
1715          * with, we can only get rid of these fields:
1716          */
1717         if (relation->rd_index)
1718                 pfree(relation->rd_index);
1719         if (relation->rd_am)
1720                 pfree(relation->rd_am);
1721         if (relation->rd_rel)
1722                 pfree(relation->rd_rel);
1723         freeList(relation->rd_indexlist);
1724         if (relation->rd_indexcxt)
1725                 MemoryContextDelete(relation->rd_indexcxt);
1726
1727         /*
1728          * If we're really done with the relcache entry, blow it away. But if
1729          * someone is still using it, reconstruct the whole deal without
1730          * moving the physical RelationData record (so that the someone's
1731          * pointer is still valid).
1732          */
1733         if (!rebuild)
1734         {
1735                 /* ok to zap remaining substructure */
1736                 FreeTupleDesc(relation->rd_att);
1737                 if (relation->rd_rulescxt)
1738                         MemoryContextDelete(relation->rd_rulescxt);
1739                 FreeTriggerDesc(relation->trigdesc);
1740                 pfree(relation);
1741         }
1742         else
1743         {
1744                 /*
1745                  * When rebuilding an open relcache entry, must preserve ref count
1746                  * and rd_isnew flag.  Also attempt to preserve the tupledesc,
1747                  * rewrite rules, and trigger substructures in place.
1748                  */
1749                 int                     old_refcnt = relation->rd_refcnt;
1750                 bool            old_isnew = relation->rd_isnew;
1751                 TupleDesc       old_att = relation->rd_att;
1752                 RuleLock   *old_rules = relation->rd_rules;
1753                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1754                 TriggerDesc *old_trigdesc = relation->trigdesc;
1755                 RelationBuildDescInfo buildinfo;
1756
1757                 buildinfo.infotype = INFO_RELID;
1758                 buildinfo.i.info_id = RelationGetRelid(relation);
1759
1760                 if (RelationBuildDesc(buildinfo, relation) != relation)
1761                 {
1762                         /* Should only get here if relation was deleted */
1763                         FreeTupleDesc(old_att);
1764                         if (old_rulescxt)
1765                                 MemoryContextDelete(old_rulescxt);
1766                         FreeTriggerDesc(old_trigdesc);
1767                         pfree(relation);
1768                         elog(ERROR, "RelationClearRelation: relation %u deleted while still in use",
1769                                  buildinfo.i.info_id);
1770                 }
1771                 RelationSetReferenceCount(relation, old_refcnt);
1772                 relation->rd_isnew = old_isnew;
1773                 if (equalTupleDescs(old_att, relation->rd_att))
1774                 {
1775                         FreeTupleDesc(relation->rd_att);
1776                         relation->rd_att = old_att;
1777                 }
1778                 else
1779                         FreeTupleDesc(old_att);
1780                 if (equalRuleLocks(old_rules, relation->rd_rules))
1781                 {
1782                         if (relation->rd_rulescxt)
1783                                 MemoryContextDelete(relation->rd_rulescxt);
1784                         relation->rd_rules = old_rules;
1785                         relation->rd_rulescxt = old_rulescxt;
1786                 }
1787                 else
1788                 {
1789                         if (old_rulescxt)
1790                                 MemoryContextDelete(old_rulescxt);
1791                 }
1792                 if (equalTriggerDescs(old_trigdesc, relation->trigdesc))
1793                 {
1794                         FreeTriggerDesc(relation->trigdesc);
1795                         relation->trigdesc = old_trigdesc;
1796                 }
1797                 else
1798                         FreeTriggerDesc(old_trigdesc);
1799
1800                 /*
1801                  * Update rd_nblocks.  This is kind of expensive, but I think we must
1802                  * do it in case relation has been truncated... we definitely must
1803                  * do it if the rel is new or temp, since RelationGetNumberOfBlocks
1804                  * will subsequently assume that the block count is correct.
1805                  */
1806                 RelationUpdateNumberOfBlocks(relation);
1807         }
1808 }
1809
1810 /*
1811  * RelationFlushRelation
1812  *
1813  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1814  */
1815 static void
1816 RelationFlushRelation(Relation relation)
1817 {
1818         bool            rebuild;
1819
1820         if (relation->rd_isnew)
1821         {
1822                 /*
1823                  * New relcache entries are always rebuilt, not flushed; else we'd
1824                  * forget the "new" status of the relation, which is a useful
1825                  * optimization to have.
1826                  */
1827                 rebuild = true;
1828         }
1829         else
1830         {
1831                 /*
1832                  * Pre-existing rels can be dropped from the relcache if not open.
1833                  */
1834                 rebuild = !RelationHasReferenceCountZero(relation);
1835         }
1836
1837         RelationClearRelation(relation, rebuild);
1838 }
1839
1840 /*
1841  * RelationForgetRelation - unconditionally remove a relcache entry
1842  *
1843  *                 External interface for destroying a relcache entry when we
1844  *                 drop the relation.
1845  */
1846 void
1847 RelationForgetRelation(Oid rid)
1848 {
1849         Relation        relation;
1850
1851         RelationIdCacheLookup(rid, relation);
1852
1853         if (!PointerIsValid(relation))
1854                 return;                                 /* not in cache, nothing to do */
1855
1856         if (!RelationHasReferenceCountZero(relation))
1857                 elog(ERROR, "RelationForgetRelation: relation %u is still open", rid);
1858
1859         /* Unconditionally destroy the relcache entry */
1860         RelationClearRelation(relation, false);
1861 }
1862
1863 /*
1864  *              RelationIdInvalidateRelationCacheByRelationId
1865  *
1866  *              This routine is invoked for SI cache flush messages.
1867  *
1868  *              We used to skip local relations, on the grounds that they could
1869  *              not be targets of cross-backend SI update messages; but it seems
1870  *              safer to process them, so that our *own* SI update messages will
1871  *              have the same effects during CommandCounterIncrement for both
1872  *              local and nonlocal relations.
1873  */
1874 void
1875 RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
1876 {
1877         Relation        relation;
1878
1879         RelationIdCacheLookup(relationId, relation);
1880
1881         if (PointerIsValid(relation))
1882         {
1883                 relcacheInvalsReceived++;
1884                 RelationFlushRelation(relation);
1885         }
1886 }
1887
1888 /*
1889  * RelationCacheInvalidate
1890  *       Blow away cached relation descriptors that have zero reference counts,
1891  *       and rebuild those with positive reference counts.
1892  *
1893  *       This is currently used only to recover from SI message buffer overflow,
1894  *       so we do not touch new-in-transaction relations; they cannot be targets
1895  *       of cross-backend SI updates (and our own updates now go through a
1896  *       separate linked list that isn't limited by the SI message buffer size).
1897  *
1898  *       We do this in two phases: the first pass deletes deletable items, and
1899  *       the second one rebuilds the rebuildable items.  This is essential for
1900  *       safety, because hash_seq_search only copes with concurrent deletion of
1901  *       the element it is currently visiting.  If a second SI overflow were to
1902  *       occur while we are walking the table, resulting in recursive entry to
1903  *       this routine, we could crash because the inner invocation blows away
1904  *       the entry next to be visited by the outer scan.  But this way is OK,
1905  *       because (a) during the first pass we won't process any more SI messages,
1906  *       so hash_seq_search will complete safely; (b) during the second pass we
1907  *       only hold onto pointers to nondeletable entries.
1908  */
1909 void
1910 RelationCacheInvalidate(void)
1911 {
1912         HASH_SEQ_STATUS status;
1913         RelIdCacheEnt *idhentry;
1914         Relation        relation;
1915         List       *rebuildList = NIL;
1916         List       *l;
1917
1918         /* Phase 1 */
1919         hash_seq_init(&status, RelationIdCache);
1920
1921         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1922         {
1923                 relation = idhentry->reldesc;
1924
1925                 /* Ignore new relations, since they are never SI targets */
1926                 if (relation->rd_isnew)
1927                         continue;
1928
1929                 relcacheInvalsReceived++;
1930
1931                 if (RelationHasReferenceCountZero(relation))
1932                 {
1933                         /* Delete this entry immediately */
1934                         RelationClearRelation(relation, false);
1935                 }
1936                 else
1937                 {
1938                         /* Add entry to list of stuff to rebuild in second pass */
1939                         rebuildList = lcons(relation, rebuildList);
1940                 }
1941         }
1942
1943         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1944         foreach(l, rebuildList)
1945         {
1946                 relation = (Relation) lfirst(l);
1947                 RelationClearRelation(relation, true);
1948         }
1949         freeList(rebuildList);
1950 }
1951
1952 /*
1953  * AtEOXact_RelationCache
1954  *
1955  *      Clean up the relcache at transaction commit or abort.
1956  */
1957 void
1958 AtEOXact_RelationCache(bool commit)
1959 {
1960         HASH_SEQ_STATUS status;
1961         RelIdCacheEnt *idhentry;
1962
1963         hash_seq_init(&status, RelationIdCache);
1964
1965         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1966         {
1967                 Relation        relation = idhentry->reldesc;
1968                 int                     expected_refcnt;
1969
1970                 /*
1971                  * Is it a relation created in the current transaction?
1972                  *
1973                  * During commit, reset the flag to false, since we are now out of the
1974                  * creating transaction.  During abort, simply delete the relcache
1975                  * entry --- it isn't interesting any longer.  (NOTE: if we have
1976                  * forgotten the isnew state of a new relation due to a forced cache
1977                  * flush, the entry will get deleted anyway by shared-cache-inval
1978                  * processing of the aborted pg_class insertion.)
1979                  */
1980                 if (relation->rd_isnew)
1981                 {
1982                         if (commit)
1983                                 relation->rd_isnew = false;
1984                         else
1985                         {
1986                                 RelationClearRelation(relation, false);
1987                                 continue;
1988                         }
1989                 }
1990
1991                 /*
1992                  * During transaction abort, we must also reset relcache entry ref
1993                  * counts to their normal not-in-a-transaction state.  A ref count may
1994                  * be too high because some routine was exited by elog() between
1995                  * incrementing and decrementing the count.
1996                  *
1997                  * During commit, we should not have to do this, but it's still useful
1998                  * to check that the counts are correct to catch missed relcache
1999                  * closes.
2000                  *
2001                  * In bootstrap mode, do NOT reset the refcnt nor complain that it's
2002                  * nonzero --- the bootstrap code expects relations to stay open
2003                  * across start/commit transaction calls.  (That seems bogus, but it's
2004                  * not worth fixing.)
2005                  */
2006                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
2007
2008                 if (commit)
2009                 {
2010                         if (relation->rd_refcnt != expected_refcnt &&
2011                                 !IsBootstrapProcessingMode())
2012                         {
2013                                 elog(WARNING, "Relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
2014                                          RelationGetRelationName(relation),
2015                                          relation->rd_refcnt, expected_refcnt);
2016                                 RelationSetReferenceCount(relation, expected_refcnt);
2017                         }
2018                 }
2019                 else
2020                 {
2021                         /* abort case, just reset it quietly */
2022                         RelationSetReferenceCount(relation, expected_refcnt);
2023                 }
2024         }
2025 }
2026
2027 /*
2028  *              RelationBuildLocalRelation
2029  *                      Build a relcache entry for an about-to-be-created relation,
2030  *                      and enter it into the relcache.
2031  */
2032 Relation
2033 RelationBuildLocalRelation(const char *relname,
2034                                                    Oid relnamespace,
2035                                                    TupleDesc tupDesc,
2036                                                    Oid relid, Oid dbid,
2037                                                    RelFileNode rnode,
2038                                                    bool nailit)
2039 {
2040         Relation        rel;
2041         MemoryContext oldcxt;
2042         int                     natts = tupDesc->natts;
2043         int                     i;
2044
2045         AssertArg(natts > 0);
2046
2047         /*
2048          * switch to the cache context to create the relcache entry.
2049          */
2050         if (!CacheMemoryContext)
2051                 CreateCacheMemoryContext();
2052
2053         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2054
2055         /*
2056          * allocate a new relation descriptor and fill in basic state fields.
2057          */
2058         rel = (Relation) palloc(sizeof(RelationData));
2059         MemSet((char *) rel, 0, sizeof(RelationData));
2060
2061         rel->rd_targblock = InvalidBlockNumber;
2062
2063         /* make sure relation is marked as having no open file yet */
2064         rel->rd_fd = -1;
2065
2066         RelationSetReferenceCount(rel, 1);
2067
2068         /* it's being created in this transaction */
2069         rel->rd_isnew = true;
2070
2071         /* is it a temporary relation? */
2072         rel->rd_istemp = isTempNamespace(relnamespace);
2073
2074         /*
2075          * nail the reldesc if this is a bootstrap create reln and we may need
2076          * it in the cache later on in the bootstrap process so we don't ever
2077          * want it kicked out.  e.g. pg_attribute!!!
2078          */
2079         if (nailit)
2080                 rel->rd_isnailed = true;
2081
2082         /*
2083          * create a new tuple descriptor from the one passed in.  We do this
2084          * partly to copy it into the cache context, and partly because the
2085          * new relation can't have any defaults or constraints yet; they
2086          * have to be added in later steps, because they require additions
2087          * to multiple system catalogs.  We can copy attnotnull constraints
2088          * here, however.
2089          */
2090         rel->rd_att = CreateTupleDescCopy(tupDesc);
2091         for (i = 0; i < natts; i++)
2092                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2093
2094         /*
2095          * initialize relation tuple form (caller may add/override data later)
2096          */
2097         rel->rd_rel = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
2098         MemSet((char *) rel->rd_rel, 0, CLASS_TUPLE_SIZE);
2099
2100         namestrcpy(&rel->rd_rel->relname, relname);
2101         rel->rd_rel->relnamespace = relnamespace;
2102
2103         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2104         rel->rd_rel->relhasoids = (rel->rd_att->tdhasoid == WITHOID);
2105         rel->rd_rel->relnatts = natts;
2106         rel->rd_rel->reltype = InvalidOid;
2107
2108         /*
2109          * Insert relation physical and logical identifiers (OIDs) into the
2110          * right places.
2111          */
2112         rel->rd_rel->relisshared = (dbid == InvalidOid);
2113
2114         RelationGetRelid(rel) = relid;
2115
2116         for (i = 0; i < natts; i++)
2117                 rel->rd_att->attrs[i]->attrelid = relid;
2118
2119         rel->rd_node = rnode;
2120         rel->rd_rel->relfilenode = rnode.relNode;
2121
2122         RelationInitLockInfo(rel);      /* see lmgr.c */
2123
2124         /*
2125          * Okay to insert into the relcache hash tables.
2126          */
2127         RelationCacheInsert(rel);
2128
2129         /*
2130          * done building relcache entry.
2131          */
2132         MemoryContextSwitchTo(oldcxt);
2133
2134         return rel;
2135 }
2136
2137 /*
2138  *              RelationCacheInitialize
2139  *
2140  *              This initializes the relation descriptor cache.  At the time
2141  *              that this is invoked, we can't do database access yet (mainly
2142  *              because the transaction subsystem is not up), so we can't get
2143  *              "real" info.  However it's okay to read the pg_internal.init
2144  *              cache file, if one is available.  Otherwise we make phony
2145  *              entries for the minimum set of nailed-in-cache relations.
2146  */
2147
2148 #define INITRELCACHESIZE                400
2149
2150 void
2151 RelationCacheInitialize(void)
2152 {
2153         MemoryContext oldcxt;
2154         HASHCTL         ctl;
2155
2156         /*
2157          * switch to cache memory context
2158          */
2159         if (!CacheMemoryContext)
2160                 CreateCacheMemoryContext();
2161
2162         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2163
2164         /*
2165          * create hashtables that index the relcache
2166          */
2167         MemSet(&ctl, 0, sizeof(ctl));
2168         ctl.keysize = sizeof(NameData);
2169         ctl.entrysize = sizeof(RelNameCacheEnt);
2170         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2171                                                                            &ctl, HASH_ELEM);
2172
2173         ctl.keysize = sizeof(Oid);
2174         ctl.entrysize = sizeof(RelIdCacheEnt);
2175         ctl.hash = tag_hash;
2176         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2177                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2178
2179         ctl.keysize = sizeof(RelFileNode);
2180         ctl.entrysize = sizeof(RelNodeCacheEnt);
2181         ctl.hash = tag_hash;
2182         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2183                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2184
2185         /*
2186          * Try to load the relcache cache file.  If successful, we're done
2187          * for now.  Otherwise, initialize the cache with pre-made descriptors
2188          * for the critical "nailed-in" system catalogs.
2189          */
2190         if (IsBootstrapProcessingMode() ||
2191                 ! load_relcache_init_file())
2192         {
2193                 formrdesc(RelationRelationName,
2194                                   Natts_pg_class, Desc_pg_class);
2195                 formrdesc(AttributeRelationName,
2196                                   Natts_pg_attribute, Desc_pg_attribute);
2197                 formrdesc(ProcedureRelationName,
2198                                   Natts_pg_proc, Desc_pg_proc);
2199                 formrdesc(TypeRelationName,
2200                                   Natts_pg_type, Desc_pg_type);
2201
2202 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2203         }
2204
2205         MemoryContextSwitchTo(oldcxt);
2206 }
2207
2208 /*
2209  *              RelationCacheInitializePhase2
2210  *
2211  *              This is called as soon as the catcache and transaction system
2212  *              are functional.  At this point we can actually read data from
2213  *              the system catalogs.  Update the relcache entries made during
2214  *              RelationCacheInitialize, and make sure we have entries for the
2215  *              critical system indexes.
2216  */
2217 void
2218 RelationCacheInitializePhase2(void)
2219 {
2220         HASH_SEQ_STATUS status;
2221         RelIdCacheEnt *idhentry;
2222
2223         if (IsBootstrapProcessingMode())
2224                 return;
2225
2226         /*
2227          * If we didn't get the critical system indexes loaded into relcache,
2228          * do so now.  These are critical because the catcache depends on them
2229          * for catcache fetches that are done during relcache load.  Thus, we
2230          * have an infinite-recursion problem.  We can break the recursion
2231          * by doing heapscans instead of indexscans at certain key spots.
2232          * To avoid hobbling performance, we only want to do that until we
2233          * have the critical indexes loaded into relcache.  Thus, the flag
2234          * criticalRelcachesBuilt is used to decide whether to do heapscan
2235          * or indexscan at the key spots, and we set it true after we've loaded
2236          * the critical indexes.
2237          *
2238          * The critical indexes are marked as "nailed in cache", partly to make
2239          * it easy for load_relcache_init_file to count them, but mainly
2240          * because we cannot flush and rebuild them once we've set
2241          * criticalRelcachesBuilt to true.  (NOTE: perhaps it would be possible
2242          * to reload them by temporarily setting criticalRelcachesBuilt to
2243          * false again.  For now, though, we just nail 'em in.)
2244          */
2245         if (! criticalRelcachesBuilt)
2246         {
2247                 RelationBuildDescInfo buildinfo;
2248                 Relation        ird;
2249
2250 #define LOAD_CRIT_INDEX(indname) \
2251                 do { \
2252                         buildinfo.infotype = INFO_RELNAME; \
2253                         buildinfo.i.info_name = (indname); \
2254                         ird = RelationBuildDesc(buildinfo, NULL); \
2255                         ird->rd_isnailed = true; \
2256                         RelationSetReferenceCount(ird, 1); \
2257                 } while (0)
2258
2259                 LOAD_CRIT_INDEX(ClassNameNspIndex);
2260                 LOAD_CRIT_INDEX(ClassOidIndex);
2261                 LOAD_CRIT_INDEX(AttributeRelidNumIndex);
2262                 LOAD_CRIT_INDEX(IndexRelidIndex);
2263                 LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
2264                 LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
2265                 LOAD_CRIT_INDEX(OperatorOidIndex);
2266
2267 #define NUM_CRITICAL_INDEXES    7       /* fix if you change list above */
2268
2269                 criticalRelcachesBuilt = true;
2270         }
2271
2272         /*
2273          * Now, scan all the relcache entries and update anything that might
2274          * be wrong in the results from formrdesc or the relcache cache file.
2275          * If we faked up relcache entries using formrdesc, then read
2276          * the real pg_class rows and replace the fake entries with them.
2277          * Also, if any of the relcache entries have rules or triggers,
2278          * load that info the hard way since it isn't recorded in the cache file.
2279          */
2280         hash_seq_init(&status, RelationIdCache);
2281
2282         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2283         {
2284                 Relation        relation = idhentry->reldesc;
2285
2286                 /*
2287                  * If it's a faked-up entry, read the real pg_class tuple.
2288                  */
2289                 if (needNewCacheFile && relation->rd_isnailed)
2290                 {
2291                         HeapTuple       htup;
2292                         Form_pg_class relp;
2293
2294                         htup = SearchSysCache(RELOID,
2295                                                                   ObjectIdGetDatum(RelationGetRelid(relation)),
2296                                                                   0, 0, 0);
2297                         if (!HeapTupleIsValid(htup))
2298                                 elog(FATAL, "RelationCacheInitializePhase2: no pg_class entry for %s",
2299                                          RelationGetRelationName(relation));
2300                         relp = (Form_pg_class) GETSTRUCT(htup);
2301                         /*
2302                          * Copy tuple to relation->rd_rel. (See notes in
2303                          * AllocateRelationDesc())
2304                          */
2305                         Assert(relation->rd_rel != NULL);
2306                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2307                         relation->rd_att->tdhasoid = BoolToHasOid(relp->relhasoids);
2308
2309                         ReleaseSysCache(htup);
2310                 }
2311
2312                 /*
2313                  * Fix data that isn't saved in relcache cache file.
2314                  */
2315                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2316                         RelationBuildRuleLock(relation);
2317                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2318                         RelationBuildTriggers(relation);
2319         }
2320 }
2321
2322 /*
2323  *              RelationCacheInitializePhase3
2324  *
2325  *              Final step of relcache initialization: write out a new relcache
2326  *              cache file if one is needed.
2327  */
2328 void
2329 RelationCacheInitializePhase3(void)
2330 {
2331         if (IsBootstrapProcessingMode())
2332                 return;
2333
2334         if (needNewCacheFile)
2335         {
2336                 /*
2337                  * Force all the catcaches to finish initializing and thereby
2338                  * open the catalogs and indexes they use.  This will preload
2339                  * the relcache with entries for all the most important system
2340                  * catalogs and indexes, so that the init file will be most
2341                  * useful for future backends.
2342                  */
2343                 InitCatalogCachePhase2();
2344
2345                 /* now write the file */
2346                 write_relcache_init_file();
2347         }
2348 }
2349
2350
2351 /* used by XLogInitCache */
2352 void            CreateDummyCaches(void);
2353 void            DestroyDummyCaches(void);
2354
2355 void
2356 CreateDummyCaches(void)
2357 {
2358         MemoryContext oldcxt;
2359         HASHCTL         ctl;
2360
2361         if (!CacheMemoryContext)
2362                 CreateCacheMemoryContext();
2363
2364         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2365
2366         MemSet(&ctl, 0, sizeof(ctl));
2367         ctl.keysize = sizeof(NameData);
2368         ctl.entrysize = sizeof(RelNameCacheEnt);
2369         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2370                                                                            &ctl, HASH_ELEM);
2371
2372         ctl.keysize = sizeof(Oid);
2373         ctl.entrysize = sizeof(RelIdCacheEnt);
2374         ctl.hash = tag_hash;
2375         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2376                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2377
2378         ctl.keysize = sizeof(RelFileNode);
2379         ctl.entrysize = sizeof(RelNodeCacheEnt);
2380         ctl.hash = tag_hash;
2381         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2382                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2383
2384         MemoryContextSwitchTo(oldcxt);
2385 }
2386
2387 void
2388 DestroyDummyCaches(void)
2389 {
2390         MemoryContext oldcxt;
2391
2392         if (!CacheMemoryContext)
2393                 return;
2394
2395         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2396
2397         if (RelationIdCache)
2398                 hash_destroy(RelationIdCache);
2399         if (RelationSysNameCache)
2400                 hash_destroy(RelationSysNameCache);
2401         if (RelationNodeCache)
2402                 hash_destroy(RelationNodeCache);
2403
2404         RelationIdCache = RelationSysNameCache = RelationNodeCache = NULL;
2405
2406         MemoryContextSwitchTo(oldcxt);
2407 }
2408
2409 static void
2410 AttrDefaultFetch(Relation relation)
2411 {
2412         AttrDefault *attrdef = relation->rd_att->constr->defval;
2413         int                     ndef = relation->rd_att->constr->num_defval;
2414         Relation        adrel;
2415         SysScanDesc adscan;
2416         ScanKeyData skey;
2417         HeapTuple       htup;
2418         Datum           val;
2419         bool            isnull;
2420         int                     found;
2421         int                     i;
2422
2423         ScanKeyEntryInitialize(&skey,
2424                                                    (bits16) 0x0,
2425                                                    (AttrNumber) Anum_pg_attrdef_adrelid,
2426                                                    (RegProcedure) F_OIDEQ,
2427                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2428
2429         adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
2430         adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
2431                                                                 SnapshotNow,
2432                                                                 1, &skey);
2433         found = 0;
2434
2435         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2436         {
2437                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2438
2439                 found++;
2440                 for (i = 0; i < ndef; i++)
2441                 {
2442                         if (adform->adnum != attrdef[i].adnum)
2443                                 continue;
2444                         if (attrdef[i].adbin != NULL)
2445                                 elog(WARNING, "AttrDefaultFetch: second record found for attr %s in rel %s",
2446                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2447                                          RelationGetRelationName(relation));
2448
2449                         val = fastgetattr(htup,
2450                                                           Anum_pg_attrdef_adbin,
2451                                                           adrel->rd_att, &isnull);
2452                         if (isnull)
2453                                 elog(WARNING, "AttrDefaultFetch: adbin IS NULL for attr %s in rel %s",
2454                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2455                                          RelationGetRelationName(relation));
2456                         else
2457                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2458                                                          DatumGetCString(DirectFunctionCall1(textout,
2459                                                                                                                                  val)));
2460                         break;
2461                 }
2462
2463                 if (i >= ndef)
2464                         elog(WARNING, "AttrDefaultFetch: unexpected record found for attr %d in rel %s",
2465                                  adform->adnum,
2466                                  RelationGetRelationName(relation));
2467         }
2468
2469         systable_endscan(adscan);
2470         heap_close(adrel, AccessShareLock);
2471
2472         if (found != ndef)
2473                 elog(WARNING, "AttrDefaultFetch: %d record(s) not found for rel %s",
2474                          ndef - found, RelationGetRelationName(relation));
2475 }
2476
2477 static void
2478 CheckConstraintFetch(Relation relation)
2479 {
2480         ConstrCheck *check = relation->rd_att->constr->check;
2481         int                     ncheck = relation->rd_att->constr->num_check;
2482         Relation        conrel;
2483         SysScanDesc conscan;
2484         ScanKeyData skey[1];
2485         HeapTuple       htup;
2486         Datum           val;
2487         bool            isnull;
2488         int                     found = 0;
2489
2490         ScanKeyEntryInitialize(&skey[0], 0x0,
2491                                                    Anum_pg_constraint_conrelid, F_OIDEQ,
2492                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2493
2494         conrel = heap_openr(ConstraintRelationName, AccessShareLock);
2495         conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
2496                                                                  SnapshotNow, 1, skey);
2497
2498         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2499         {
2500                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2501
2502                 /* We want check constraints only */
2503                 if (conform->contype != CONSTRAINT_CHECK)
2504                         continue;
2505
2506                 if (found == ncheck)
2507                         elog(ERROR, "CheckConstraintFetch: unexpected record found for rel %s",
2508                                  RelationGetRelationName(relation));
2509
2510                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2511                                                                                                   NameStr(conform->conname));
2512
2513                 /* Grab and test conbin is actually set */
2514                 val = fastgetattr(htup,
2515                                                   Anum_pg_constraint_conbin,
2516                                                   conrel->rd_att, &isnull);
2517                 if (isnull)
2518                         elog(ERROR, "CheckConstraintFetch: conbin IS NULL for rel %s",
2519                                  RelationGetRelationName(relation));
2520
2521                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2522                                                          DatumGetCString(DirectFunctionCall1(textout,
2523                                                                                                                                  val)));
2524                 found++;
2525         }
2526
2527         systable_endscan(conscan);
2528         heap_close(conrel, AccessShareLock);
2529
2530         if (found != ncheck)
2531                 elog(ERROR, "CheckConstraintFetch: %d record(s) not found for rel %s",
2532                          ncheck - found, RelationGetRelationName(relation));
2533 }
2534
2535 /*
2536  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2537  *
2538  * The index list is created only if someone requests it.  We scan pg_index
2539  * to find relevant indexes, and add the list to the relcache entry so that
2540  * we won't have to compute it again.  Note that shared cache inval of a
2541  * relcache entry will delete the old list and set rd_indexfound to false,
2542  * so that we must recompute the index list on next request.  This handles
2543  * creation or deletion of an index.
2544  *
2545  * The returned list is guaranteed to be sorted in order by OID.  This is
2546  * needed by the executor, since for index types that we obtain exclusive
2547  * locks on when updating the index, all backends must lock the indexes in
2548  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2549  * consistent ordering would do, but ordering by OID is easy.
2550  *
2551  * Since shared cache inval causes the relcache's copy of the list to go away,
2552  * we return a copy of the list palloc'd in the caller's context.  The caller
2553  * may freeList() the returned list after scanning it.  This is necessary
2554  * since the caller will typically be doing syscache lookups on the relevant
2555  * indexes, and syscache lookup could cause SI messages to be processed!
2556  */
2557 List *
2558 RelationGetIndexList(Relation relation)
2559 {
2560         Relation        indrel;
2561         SysScanDesc     indscan;
2562         ScanKeyData skey;
2563         HeapTuple       htup;
2564         List       *result;
2565         MemoryContext oldcxt;
2566
2567         /* Quick exit if we already computed the list. */
2568         if (relation->rd_indexfound)
2569                 return listCopy(relation->rd_indexlist);
2570
2571         /*
2572          * We build the list we intend to return (in the caller's context)
2573          * while doing the scan.  After successfully completing the scan, we
2574          * copy that list into the relcache entry.      This avoids cache-context
2575          * memory leakage if we get some sort of error partway through.
2576          */
2577         result = NIL;
2578
2579         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2580         ScanKeyEntryInitialize(&skey,
2581                                                    (bits16) 0x0,
2582                                                    (AttrNumber) Anum_pg_index_indrelid,
2583                                                    (RegProcedure) F_OIDEQ,
2584                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2585
2586         indrel = heap_openr(IndexRelationName, AccessShareLock);
2587         indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
2588                                                                  SnapshotNow,
2589                                                                  1, &skey);
2590
2591         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2592         {
2593                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2594
2595                 result = insert_ordered_oid(result, index->indexrelid);
2596         }
2597
2598         systable_endscan(indscan);
2599         heap_close(indrel, AccessShareLock);
2600
2601         /* Now save a copy of the completed list in the relcache entry. */
2602         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2603         relation->rd_indexlist = listCopy(result);
2604         relation->rd_indexfound = true;
2605         MemoryContextSwitchTo(oldcxt);
2606
2607         return result;
2608 }
2609
2610 /*
2611  * insert_ordered_oid
2612  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2613  *
2614  * Building the ordered list this way is O(N^2), but with a pretty small
2615  * constant, so for the number of entries we expect it will probably be
2616  * faster than trying to apply qsort().  Most tables don't have very many
2617  * indexes...
2618  */
2619 static List *
2620 insert_ordered_oid(List *list, Oid datum)
2621 {
2622         List       *l;
2623
2624         /* Does the datum belong at the front? */
2625         if (list == NIL || datum < (Oid) lfirsti(list))
2626                 return lconsi(datum, list);
2627         /* No, so find the entry it belongs after */
2628         l = list;
2629         for (;;)
2630         {
2631                 List       *n = lnext(l);
2632
2633                 if (n == NIL || datum < (Oid) lfirsti(n))
2634                         break;                          /* it belongs before n */
2635                 l = n;
2636         }
2637         /* Insert datum into list after item l */
2638         lnext(l) = lconsi(datum, lnext(l));
2639         return list;
2640 }
2641
2642
2643 /*
2644  *      load_relcache_init_file, write_relcache_init_file
2645  *
2646  *              In late 1992, we started regularly having databases with more than
2647  *              a thousand classes in them.  With this number of classes, it became
2648  *              critical to do indexed lookups on the system catalogs.
2649  *
2650  *              Bootstrapping these lookups is very hard.  We want to be able to
2651  *              use an index on pg_attribute, for example, but in order to do so,
2652  *              we must have read pg_attribute for the attributes in the index,
2653  *              which implies that we need to use the index.
2654  *
2655  *              In order to get around the problem, we do the following:
2656  *
2657  *                 +  When the database system is initialized (at initdb time), we
2658  *                        don't use indexes.  We do sequential scans.
2659  *
2660  *                 +  When the backend is started up in normal mode, we load an image
2661  *                        of the appropriate relation descriptors, in internal format,
2662  *                        from an initialization file in the data/base/... directory.
2663  *
2664  *                 +  If the initialization file isn't there, then we create the
2665  *                        relation descriptors using sequential scans and write 'em to
2666  *                        the initialization file for use by subsequent backends.
2667  *
2668  *              We could dispense with the initialization file and just build the
2669  *              critical reldescs the hard way on every backend startup, but that
2670  *              slows down backend startup noticeably.
2671  *
2672  *              We can in fact go further, and save more relcache entries than
2673  *              just the ones that are absolutely critical; this allows us to speed
2674  *              up backend startup by not having to build such entries the hard way.
2675  *              Presently, all the catalog and index entries that are referred to
2676  *              by catcaches are stored in the initialization file.
2677  *
2678  *              The same mechanism that detects when catcache and relcache entries
2679  *              need to be invalidated (due to catalog updates) also arranges to
2680  *              unlink the initialization file when its contents may be out of date.
2681  *              The file will then be rebuilt during the next backend startup.
2682  */
2683
2684 /*
2685  * load_relcache_init_file -- attempt to load cache from the init file
2686  *
2687  * If successful, return TRUE and set criticalRelcachesBuilt to true.
2688  * If not successful, return FALSE and set needNewCacheFile to true.
2689  *
2690  * NOTE: we assume we are already switched into CacheMemoryContext.
2691  */
2692 static bool
2693 load_relcache_init_file(void)
2694 {
2695         FILE       *fp;
2696         char            initfilename[MAXPGPATH];
2697         Relation   *rels;
2698         int                     relno,
2699                                 num_rels,
2700                                 max_rels,
2701                                 nailed_rels,
2702                                 nailed_indexes;
2703         int                     i;
2704
2705         snprintf(initfilename, sizeof(initfilename), "%s/%s",
2706                          DatabasePath, RELCACHE_INIT_FILENAME);
2707
2708         fp = AllocateFile(initfilename, PG_BINARY_R);
2709         if (fp == NULL)
2710         {
2711                 needNewCacheFile = true;
2712                 return false;
2713         }
2714
2715         /*
2716          * Read the index relcache entries from the file.  Note we will not
2717          * enter any of them into the cache if the read fails partway through;
2718          * this helps to guard against broken init files.
2719          */
2720         max_rels = 100;
2721         rels = (Relation *) palloc(max_rels * sizeof(Relation));
2722         num_rels = 0;
2723         nailed_rels = nailed_indexes = 0;
2724         initFileRelationIds = NIL;
2725
2726         for (relno = 0; ; relno++)
2727         {
2728                 Size            len;
2729                 size_t          nread;
2730                 Relation        rel;
2731                 Form_pg_class relform;
2732
2733                 /* first read the relation descriptor length */
2734                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2735                 {
2736                         if (nread == 0)
2737                                 break;                  /* end of file */
2738                         goto read_failed;
2739                 }
2740
2741                 /* safety check for incompatible relcache layout */
2742                 if (len != sizeof(RelationData))
2743                         goto read_failed;
2744
2745                 /* allocate another relcache header */
2746                 if (num_rels >= max_rels)
2747                 {
2748                         max_rels *= 2;
2749                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
2750                 }
2751
2752                 rel = rels[num_rels++] = (Relation) palloc(len);
2753
2754                 /* then, read the Relation structure */
2755                 if ((nread = fread(rel, 1, len, fp)) != len)
2756                         goto read_failed;
2757
2758                 /* next read the relation tuple form */
2759                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2760                         goto read_failed;
2761
2762                 relform = (Form_pg_class) palloc(len);
2763                 if ((nread = fread(relform, 1, len, fp)) != len)
2764                         goto read_failed;
2765
2766                 rel->rd_rel = relform;
2767
2768                 /* initialize attribute tuple forms */
2769                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts, BoolToHasOid(relform->relhasoids));
2770
2771                 /* next read all the attribute tuple form data entries */
2772                 for (i = 0; i < relform->relnatts; i++)
2773                 {
2774                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2775                                 goto read_failed;
2776
2777                         rel->rd_att->attrs[i] = (Form_pg_attribute) palloc(len);
2778
2779                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
2780                                 goto read_failed;
2781                 }
2782
2783                 /* If it's an index, there's more to do */
2784                 if (rel->rd_rel->relkind == RELKIND_INDEX)
2785                 {
2786                         Form_pg_am      am;
2787                         MemoryContext indexcxt;
2788                         IndexStrategy strat;
2789                         Oid                *operator;
2790                         RegProcedure *support;
2791                         int                     nstrategies,
2792                                                 nsupport;
2793
2794                         /* Count nailed indexes to ensure we have 'em all */
2795                         if (rel->rd_isnailed)
2796                                 nailed_indexes++;
2797
2798                         /* next, read the pg_index tuple form */
2799                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2800                                 goto read_failed;
2801
2802                         rel->rd_index = (Form_pg_index) palloc(len);
2803                         if ((nread = fread(rel->rd_index, 1, len, fp)) != len)
2804                                 goto read_failed;
2805
2806                         /* next, read the access method tuple form */
2807                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2808                                 goto read_failed;
2809
2810                         am = (Form_pg_am) palloc(len);
2811                         if ((nread = fread(am, 1, len, fp)) != len)
2812                                 goto read_failed;
2813                         rel->rd_am = am;
2814
2815                         /*
2816                          * prepare index info context --- parameters should match
2817                          * RelationInitIndexAccessInfo
2818                          */
2819                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
2820                                                                                          RelationGetRelationName(rel),
2821                                                                                          0,             /* minsize */
2822                                                                                          512,   /* initsize */
2823                                                                                          1024); /* maxsize */
2824                         rel->rd_indexcxt = indexcxt;
2825
2826                         /* next, read the index strategy map */
2827                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2828                                 goto read_failed;
2829
2830                         strat = (IndexStrategy) MemoryContextAlloc(indexcxt, len);
2831                         if ((nread = fread(strat, 1, len, fp)) != len)
2832                                 goto read_failed;
2833
2834                         /* have to invalidate any FmgrInfo data in the strategy maps */
2835                         nstrategies = am->amstrategies * relform->relnatts;
2836                         for (i = 0; i < nstrategies; i++)
2837                                 strat->strategyMapData[i].entry[0].sk_func.fn_oid = InvalidOid;
2838
2839                         rel->rd_istrat = strat;
2840
2841                         /* next, read the vector of operator OIDs */
2842                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2843                                 goto read_failed;
2844
2845                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
2846                         if ((nread = fread(operator, 1, len, fp)) != len)
2847                                 goto read_failed;
2848
2849                         rel->rd_operator = operator;
2850
2851                         /* finally, read the vector of support procedures */
2852                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2853                                 goto read_failed;
2854                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
2855                         if ((nread = fread(support, 1, len, fp)) != len)
2856                                 goto read_failed;
2857
2858                         rel->rd_support = support;
2859
2860                         /* add a zeroed support-fmgr-info vector */
2861                         nsupport = relform->relnatts * am->amsupport;
2862                         rel->rd_supportinfo = (FmgrInfo *)
2863                                 MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
2864                         MemSet(rel->rd_supportinfo, 0, nsupport * sizeof(FmgrInfo));
2865                 }
2866                 else
2867                 {
2868                         /* Count nailed rels to ensure we have 'em all */
2869                         if (rel->rd_isnailed)
2870                                 nailed_rels++;
2871
2872                         Assert(rel->rd_index == NULL);
2873                         Assert(rel->rd_am == NULL);
2874                         Assert(rel->rd_indexcxt == NULL);
2875                         Assert(rel->rd_istrat == NULL);
2876                         Assert(rel->rd_operator == NULL);
2877                         Assert(rel->rd_support == NULL);
2878                         Assert(rel->rd_supportinfo == NULL);
2879                 }
2880
2881                 /*
2882                  * Rules and triggers are not saved (mainly because the internal
2883                  * format is complex and subject to change).  They must be rebuilt
2884                  * if needed by RelationCacheInitializePhase2.  This is not expected
2885                  * to be a big performance hit since few system catalogs have such.
2886                  */
2887                 rel->rd_rules = NULL;
2888                 rel->rd_rulescxt = NULL;
2889                 rel->trigdesc = NULL;
2890
2891                 /*
2892                  * Reset transient-state fields in the relcache entry
2893                  */
2894                 rel->rd_fd = -1;
2895                 rel->rd_targblock = InvalidBlockNumber;
2896                 if (rel->rd_isnailed)
2897                         RelationSetReferenceCount(rel, 1);
2898                 else
2899                         RelationSetReferenceCount(rel, 0);
2900                 rel->rd_indexfound = false;
2901                 rel->rd_indexlist = NIL;
2902                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
2903
2904                 /*
2905                  * Make sure database ID is correct.  This is needed in case the
2906                  * pg_internal.init file was copied from some other database by
2907                  * CREATE DATABASE.
2908                  */
2909                 if (rel->rd_rel->relisshared)
2910                         rel->rd_node.tblNode = InvalidOid;
2911                 else
2912                         rel->rd_node.tblNode = MyDatabaseId;
2913
2914                 RelationInitLockInfo(rel);
2915         }
2916
2917         /*
2918          * We reached the end of the init file without apparent problem.
2919          * Did we get the right number of nailed items?  (This is a useful
2920          * crosscheck in case the set of critical rels or indexes changes.)
2921          */
2922         if (nailed_rels != NUM_CRITICAL_RELS ||
2923                 nailed_indexes != NUM_CRITICAL_INDEXES)
2924                 goto read_failed;
2925
2926         /*
2927          * OK, all appears well.
2928          *
2929          * Now insert all the new relcache entries into the cache.
2930          */
2931         for (relno = 0; relno < num_rels; relno++)
2932         {
2933                 RelationCacheInsert(rels[relno]);
2934                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
2935                 initFileRelationIds = lconsi((int) RelationGetRelid(rels[relno]),
2936                                                                          initFileRelationIds);
2937         }
2938
2939         pfree(rels);
2940         FreeFile(fp);
2941
2942         criticalRelcachesBuilt = true;
2943         return true;
2944
2945         /*
2946          * init file is broken, so do it the hard way.  We don't bother
2947          * trying to free the clutter we just allocated; it's not in the
2948          * relcache so it won't hurt.
2949          */
2950 read_failed:
2951         pfree(rels);
2952         FreeFile(fp);
2953
2954         needNewCacheFile = true;
2955         return false;
2956 }
2957
2958 /*
2959  * Write out a new initialization file with the current contents
2960  * of the relcache.
2961  */
2962 static void
2963 write_relcache_init_file(void)
2964 {
2965         FILE       *fp;
2966         char            tempfilename[MAXPGPATH];
2967         char            finalfilename[MAXPGPATH];
2968         HASH_SEQ_STATUS status;
2969         RelIdCacheEnt *idhentry;
2970         MemoryContext oldcxt;
2971         int                     i;
2972
2973         /*
2974          * We must write a temporary file and rename it into place. Otherwise,
2975          * another backend starting at about the same time might crash trying
2976          * to read the partially-complete file.
2977          */
2978         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
2979                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
2980         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
2981                          DatabasePath, RELCACHE_INIT_FILENAME);
2982
2983         unlink(tempfilename);           /* in case it exists w/wrong permissions */
2984
2985         fp = AllocateFile(tempfilename, PG_BINARY_W);
2986         if (fp == NULL)
2987         {
2988                 /*
2989                  * We used to consider this a fatal error, but we might as well
2990                  * continue with backend startup ...
2991                  */
2992                 elog(WARNING, "Cannot create init file %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename);
2993                 return;
2994         }
2995
2996         /*
2997          * Write all the reldescs (in no particular order).
2998          */
2999         hash_seq_init(&status, RelationIdCache);
3000
3001         initFileRelationIds = NIL;
3002
3003         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3004         {
3005                 Relation        rel = idhentry->reldesc;
3006                 Form_pg_class relform = rel->rd_rel;
3007                 Size            len;
3008
3009                 /*
3010                  * first write the relcache entry proper
3011                  */
3012                 len = sizeof(RelationData);
3013
3014                 /* first, write the relation descriptor length */
3015                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3016                         elog(FATAL, "cannot write init file -- descriptor length");
3017
3018                 /* next, write out the Relation structure */
3019                 if (fwrite(rel, 1, len, fp) != len)
3020                         elog(FATAL, "cannot write init file -- reldesc");
3021
3022                 /* next write the relation tuple form */
3023                 len = sizeof(FormData_pg_class);
3024                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3025                         elog(FATAL, "cannot write init file -- relation tuple form length");
3026
3027                 if (fwrite(relform, 1, len, fp) != len)
3028                         elog(FATAL, "cannot write init file -- relation tuple form");
3029
3030                 /* next, do all the attribute tuple form data entries */
3031                 for (i = 0; i < relform->relnatts; i++)
3032                 {
3033                         len = ATTRIBUTE_TUPLE_SIZE;
3034                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3035                                 elog(FATAL, "cannot write init file -- length of attdesc %d", i);
3036                         if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
3037                                 elog(FATAL, "cannot write init file -- attdesc %d", i);
3038                 }
3039
3040                 /* If it's an index, there's more to do */
3041                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3042                 {
3043                         Form_pg_am      am = rel->rd_am;
3044                         HeapTuple       tuple;
3045
3046                         /*
3047                          * We need to write the index tuple form, but this is a bit
3048                          * tricky since it's a variable-length struct.  Rather than
3049                          * hoping to intuit the length, fetch the pg_index tuple
3050                          * afresh using the syscache, and write that.
3051                          */
3052                         tuple = SearchSysCache(INDEXRELID,
3053                                                                    ObjectIdGetDatum(RelationGetRelid(rel)),
3054                                                                    0, 0, 0);
3055                         if (!HeapTupleIsValid(tuple))
3056                                 elog(ERROR, "write_relcache_init_file: no pg_index entry for index %u",
3057                                          RelationGetRelid(rel));
3058                         len = tuple->t_len - tuple->t_data->t_hoff;
3059                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3060                                 elog(FATAL, "cannot write init file -- index tuple form length");
3061                         if (fwrite(GETSTRUCT(tuple), 1, len, fp) != len)
3062                                 elog(FATAL, "cannot write init file -- index tuple form");
3063                         ReleaseSysCache(tuple);
3064
3065                         /* next, write the access method tuple form */
3066                         len = sizeof(FormData_pg_am);
3067                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3068                                 elog(FATAL, "cannot write init file -- am tuple form length");
3069
3070                         if (fwrite(am, 1, len, fp) != len)
3071                                 elog(FATAL, "cannot write init file -- am tuple form");
3072
3073                         /* next, write the index strategy map */
3074                         len = AttributeNumberGetIndexStrategySize(relform->relnatts,
3075                                                                                                           am->amstrategies);
3076                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3077                                 elog(FATAL, "cannot write init file -- strategy map length");
3078
3079                         if (fwrite(rel->rd_istrat, 1, len, fp) != len)
3080                                 elog(FATAL, "cannot write init file -- strategy map");
3081
3082                         /* next, write the vector of operator OIDs */
3083                         len = relform->relnatts * (am->amstrategies * sizeof(Oid));
3084                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3085                                 elog(FATAL, "cannot write init file -- operator vector length");
3086
3087                         if (fwrite(rel->rd_operator, 1, len, fp) != len)
3088                                 elog(FATAL, "cannot write init file -- operator vector");
3089
3090                         /* finally, write the vector of support procedures */
3091                         len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
3092                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3093                                 elog(FATAL, "cannot write init file -- support vector length");
3094
3095                         if (fwrite(rel->rd_support, 1, len, fp) != len)
3096                                 elog(FATAL, "cannot write init file -- support vector");
3097                 }
3098
3099                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3100                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3101                 initFileRelationIds = lconsi((int) RelationGetRelid(rel),
3102                                                                          initFileRelationIds);
3103                 MemoryContextSwitchTo(oldcxt);
3104         }
3105
3106         FreeFile(fp);
3107
3108         /*
3109          * Now we have to check whether the data we've so painstakingly
3110          * accumulated is already obsolete due to someone else's just-committed
3111          * catalog changes.  If so, we just delete the temp file and leave it
3112          * to the next backend to try again.  (Our own relcache entries will be
3113          * updated by SI message processing, but we can't be sure whether what
3114          * we wrote out was up-to-date.)
3115          *
3116          * This mustn't run concurrently with RelationCacheInitFileInvalidate,
3117          * so grab a serialization lock for the duration.
3118          */
3119         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3120
3121         /* Make sure we have seen all incoming SI messages */
3122         AcceptInvalidationMessages();
3123
3124         /*
3125          * If we have received any SI relcache invals since backend start,
3126          * assume we may have written out-of-date data.
3127          */
3128         if (relcacheInvalsReceived == 0L)
3129         {
3130                 /*
3131                  * OK, rename the temp file to its final name, deleting any
3132                  * previously-existing init file.
3133                  *
3134                  * Note: a failure here is possible under Cygwin, if some other
3135                  * backend is holding open an unlinked-but-not-yet-gone init file.
3136                  * So treat this as a noncritical failure.
3137                  */
3138                 if (rename(tempfilename, finalfilename) < 0)
3139                 {
3140                         elog(WARNING, "Cannot rename init file %s to %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename, finalfilename);
3141                         /*
3142                          * If we fail, try to clean up the useless temp file; don't bother
3143                          * to complain if this fails too.
3144                          */
3145                         unlink(tempfilename);
3146                 }
3147         }
3148         else
3149         {
3150                 /* Delete the already-obsolete temp file */
3151                 unlink(tempfilename);
3152         }
3153
3154         LWLockRelease(RelCacheInitLock);
3155 }
3156
3157 /*
3158  * Detect whether a given relation (identified by OID) is one of the ones
3159  * we store in the init file.
3160  *
3161  * Note that we effectively assume that all backends running in a database
3162  * would choose to store the same set of relations in the init file;
3163  * otherwise there are cases where we'd fail to detect the need for an init
3164  * file invalidation.  This does not seem likely to be a problem in practice.
3165  */
3166 bool
3167 RelationIdIsInInitFile(Oid relationId)
3168 {
3169         return intMember((int) relationId, initFileRelationIds);
3170 }
3171
3172 /*
3173  * Invalidate (remove) the init file during commit of a transaction that
3174  * changed one or more of the relation cache entries that are kept in the
3175  * init file.
3176  *
3177  * We actually need to remove the init file twice: once just before sending
3178  * the SI messages that include relcache inval for such relations, and once
3179  * just after sending them.  The unlink before ensures that a backend that's
3180  * currently starting cannot read the now-obsolete init file and then miss
3181  * the SI messages that will force it to update its relcache entries.  (This
3182  * works because the backend startup sequence gets into the PROC array before
3183  * trying to load the init file.)  The unlink after is to synchronize with a
3184  * backend that may currently be trying to write an init file based on data
3185  * that we've just rendered invalid.  Such a backend will see the SI messages,
3186  * but we can't leave the init file sitting around to fool later backends.
3187  *
3188  * Ignore any failure to unlink the file, since it might not be there if
3189  * no backend has been started since the last removal.
3190  */
3191 void
3192 RelationCacheInitFileInvalidate(bool beforeSend)
3193 {
3194         char            initfilename[MAXPGPATH];
3195
3196         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3197                          DatabasePath, RELCACHE_INIT_FILENAME);
3198
3199         if (beforeSend)
3200         {
3201                 /* no interlock needed here */
3202                 unlink(initfilename);
3203         }
3204         else
3205         {
3206                 /*
3207                  * We need to interlock this against write_relcache_init_file,
3208                  * to guard against possibility that someone renames a new-but-
3209                  * already-obsolete init file into place just after we unlink.
3210                  * With the interlock, it's certain that write_relcache_init_file
3211                  * will notice our SI inval message before renaming into place,
3212                  * or else that we will execute second and successfully unlink
3213                  * the file.
3214                  */
3215                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3216                 unlink(initfilename);
3217                 LWLockRelease(RelCacheInitLock);
3218         }
3219 }