]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
Code review for HeapTupleHeader changes. Add version number to page headers
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.173 2002/09/02 01:05:06 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationSysNameGetRelation              - get a reldesc by system rel name
21  *              RelationIdCacheGetRelation              - get a cached reldesc by relid
22  *              RelationClose                                   - close an open relation
23  *
24  * NOTES
25  *              The following code contains many undocumented hacks.  Please be
26  *              careful....
27  */
28 #include "postgres.h"
29
30 #include <sys/types.h>
31 #include <errno.h>
32 #include <sys/file.h>
33 #include <fcntl.h>
34 #include <unistd.h>
35
36 #include "access/genam.h"
37 #include "access/heapam.h"
38 #include "access/istrat.h"
39 #include "catalog/catalog.h"
40 #include "catalog/catname.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/pg_amop.h"
44 #include "catalog/pg_amproc.h"
45 #include "catalog/pg_attrdef.h"
46 #include "catalog/pg_attribute.h"
47 #include "catalog/pg_constraint.h"
48 #include "catalog/pg_index.h"
49 #include "catalog/pg_namespace.h"
50 #include "catalog/pg_opclass.h"
51 #include "catalog/pg_proc.h"
52 #include "catalog/pg_rewrite.h"
53 #include "catalog/pg_type.h"
54 #include "commands/trigger.h"
55 #include "miscadmin.h"
56 #include "storage/smgr.h"
57 #include "utils/builtins.h"
58 #include "utils/catcache.h"
59 #include "utils/fmgroids.h"
60 #include "utils/inval.h"
61 #include "utils/lsyscache.h"
62 #include "utils/relcache.h"
63 #include "utils/syscache.h"
64
65
66 /*
67  * name of relcache init file, used to speed up backend startup
68  */
69 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
70
71 /*
72  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
73  */
74 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
75 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
76 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
77 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
78
79 /*
80  *              Hash tables that index the relation cache
81  *
82  *              Relations are looked up two ways, by OID and by name,
83  *              thus there are two hash tables for referencing them.
84  *
85  *              The OID index covers all relcache entries.  The name index
86  *              covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
87  */
88 static HTAB *RelationIdCache;
89 static HTAB *RelationSysNameCache;
90
91 /*
92  * Bufmgr uses RelFileNode for lookup. Actually, I would like to do
93  * not pass Relation to bufmgr & beyond at all and keep some cache
94  * in smgr, but no time to do it right way now.         -- vadim 10/22/2000
95  */
96 static HTAB *RelationNodeCache;
97
98 /*
99  * This flag is false until we have prepared the critical relcache entries
100  * that are needed to do indexscans on the tables read by relcache building.
101  */
102 bool criticalRelcachesBuilt = false;
103
104 /*
105  * This flag is set if we discover that we need to write a new relcache
106  * cache file at the end of startup.
107  */
108 static bool needNewCacheFile = false;
109
110 /*
111  * This counter counts relcache inval events received since backend startup
112  * (but only for rels that are actually in cache).  Presently, we use it only
113  * to detect whether data about to be written by write_relcache_init_file()
114  * might already be obsolete.
115  */
116 static long relcacheInvalsReceived = 0L;
117
118 /*
119  * This list remembers the OIDs of the relations cached in the relcache
120  * init file.
121  */
122 static List *initFileRelationIds = NIL;
123
124 /*
125  *              RelationBuildDescInfo exists so code can be shared
126  *              between RelationIdGetRelation() and RelationSysNameGetRelation()
127  */
128 typedef struct RelationBuildDescInfo
129 {
130         int                     infotype;               /* lookup by id or by name */
131 #define INFO_RELID 1
132 #define INFO_RELNAME 2
133         union
134         {
135                 Oid                     info_id;        /* relation object id */
136                 char       *info_name;  /* system relation name */
137         }                       i;
138 } RelationBuildDescInfo;
139
140 typedef struct relidcacheent
141 {
142         Oid                     reloid;
143         Relation        reldesc;
144 } RelIdCacheEnt;
145
146 typedef struct relnamecacheent
147 {
148         NameData        relname;
149         Relation        reldesc;
150 } RelNameCacheEnt;
151
152 typedef struct relnodecacheent
153 {
154         RelFileNode relnode;
155         Relation        reldesc;
156 } RelNodeCacheEnt;
157
158 /*
159  *              macros to manipulate the lookup hashtables
160  */
161 #define RelationCacheInsert(RELATION)   \
162 do { \
163         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; bool found; \
164         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
165                                                                                    (void *) &(RELATION->rd_id), \
166                                                                                    HASH_ENTER, \
167                                                                                    &found); \
168         if (idhentry == NULL) \
169                 elog(ERROR, "out of memory for relation descriptor cache"); \
170         /* used to give notice if found -- now just keep quiet */ \
171         idhentry->reldesc = RELATION; \
172         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
173                                                                                    (void *) &(RELATION->rd_node), \
174                                                                                    HASH_ENTER, \
175                                                                                    &found); \
176         if (nodentry == NULL) \
177                 elog(ERROR, "out of memory for relation descriptor cache"); \
178         /* used to give notice if found -- now just keep quiet */ \
179         nodentry->reldesc = RELATION; \
180         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
181         { \
182                 char *relname = RelationGetRelationName(RELATION); \
183                 RelNameCacheEnt *namehentry; \
184                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
185                                                                                                    relname, \
186                                                                                                    HASH_ENTER, \
187                                                                                                    &found); \
188                 if (namehentry == NULL) \
189                         elog(ERROR, "out of memory for relation descriptor cache"); \
190                 /* used to give notice if found -- now just keep quiet */ \
191                 namehentry->reldesc = RELATION; \
192         } \
193 } while(0)
194
195 #define RelationIdCacheLookup(ID, RELATION) \
196 do { \
197         RelIdCacheEnt *hentry; \
198         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
199                                                                                  (void *)&(ID), HASH_FIND,NULL); \
200         if (hentry) \
201                 RELATION = hentry->reldesc; \
202         else \
203                 RELATION = NULL; \
204 } while(0)
205
206 #define RelationSysNameCacheLookup(NAME, RELATION) \
207 do { \
208         RelNameCacheEnt *hentry; \
209         hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
210                                                                                    (void *) (NAME), HASH_FIND,NULL); \
211         if (hentry) \
212                 RELATION = hentry->reldesc; \
213         else \
214                 RELATION = NULL; \
215 } while(0)
216
217 #define RelationNodeCacheLookup(NODE, RELATION) \
218 do { \
219         RelNodeCacheEnt *hentry; \
220         hentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
221                                                                                    (void *)&(NODE), HASH_FIND,NULL); \
222         if (hentry) \
223                 RELATION = hentry->reldesc; \
224         else \
225                 RELATION = NULL; \
226 } while(0)
227
228 #define RelationCacheDelete(RELATION) \
229 do { \
230         RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; \
231         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
232                                                                                    (void *)&(RELATION->rd_id), \
233                                                                                    HASH_REMOVE, NULL); \
234         if (idhentry == NULL) \
235                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist."); \
236         nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
237                                                                                    (void *)&(RELATION->rd_node), \
238                                                                                    HASH_REMOVE, NULL); \
239         if (nodentry == NULL) \
240                 elog(WARNING, "trying to delete a rd_node reldesc that does not exist."); \
241         if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
242         { \
243                 char *relname = RelationGetRelationName(RELATION); \
244                 RelNameCacheEnt *namehentry; \
245                 namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
246                                                                                                    relname, \
247                                                                                                    HASH_REMOVE, NULL); \
248                 if (namehentry == NULL) \
249                         elog(WARNING, "trying to delete a relname reldesc that does not exist."); \
250         } \
251 } while(0)
252
253
254 /*
255  * Special cache for opclass-related information
256  */
257 typedef struct opclasscacheent
258 {
259         Oid                     opclassoid;             /* lookup key: OID of opclass */
260         bool            valid;                  /* set TRUE after successful fill-in */
261         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
262         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
263         Oid                *operatorOids;       /* strategy operators' OIDs */
264         RegProcedure *operatorProcs; /* strategy operators' procs */
265         RegProcedure *supportProcs;     /* support procs */
266 } OpClassCacheEnt;
267
268 static HTAB *OpClassCache = NULL;
269
270
271 /* non-export function prototypes */
272
273 static void RelationClearRelation(Relation relation, bool rebuild);
274
275 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
276 static void RelationReloadClassinfo(Relation relation);
277 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
278 static void RelationFlushRelation(Relation relation);
279 static Relation RelationSysNameCacheGetRelation(const char *relationName);
280 static bool load_relcache_init_file(void);
281 static void write_relcache_init_file(void);
282
283 static void formrdesc(const char *relationName, int natts,
284                   FormData_pg_attribute *att);
285
286 static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo);
287 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
288 static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
289                                            Relation relation);
290 static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
291                                   Relation oldrelation);
292 static void AttrDefaultFetch(Relation relation);
293 static void CheckConstraintFetch(Relation relation);
294 static List *insert_ordered_oid(List *list, Oid datum);
295 static void IndexSupportInitialize(Form_pg_index iform,
296                                                                    IndexStrategy indexStrategy,
297                                                                    Oid *indexOperator,
298                                                                    RegProcedure *indexSupport,
299                                                                    StrategyNumber maxStrategyNumber,
300                                                                    StrategyNumber maxSupportNumber,
301                                                                    AttrNumber maxAttributeNumber);
302 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
303                                                                                   StrategyNumber numStrats,
304                                                                                   StrategyNumber numSupport);
305
306
307 /*
308  *              ScanPgRelation
309  *
310  *              this is used by RelationBuildDesc to find a pg_class
311  *              tuple matching either a relation name or a relation id
312  *              as specified in buildinfo.
313  *
314  *              NB: the returned tuple has been copied into palloc'd storage
315  *              and must eventually be freed with heap_freetuple.
316  */
317 static HeapTuple
318 ScanPgRelation(RelationBuildDescInfo buildinfo)
319 {
320         HeapTuple       pg_class_tuple;
321         Relation        pg_class_desc;
322         const char *indexRelname;
323         SysScanDesc pg_class_scan;
324         ScanKeyData key[2];
325         int                     nkeys;
326
327         /*
328          * form a scan key
329          */
330         switch (buildinfo.infotype)
331         {
332                 case INFO_RELID:
333                         ScanKeyEntryInitialize(&key[0], 0,
334                                                                    ObjectIdAttributeNumber,
335                                                                    F_OIDEQ,
336                                                                    ObjectIdGetDatum(buildinfo.i.info_id));
337                         nkeys = 1;
338                         indexRelname = ClassOidIndex;
339                         break;
340
341                 case INFO_RELNAME:
342                         ScanKeyEntryInitialize(&key[0], 0,
343                                                                    Anum_pg_class_relname,
344                                                                    F_NAMEEQ,
345                                                                    NameGetDatum(buildinfo.i.info_name));
346                         ScanKeyEntryInitialize(&key[1], 0,
347                                                                    Anum_pg_class_relnamespace,
348                                                                    F_OIDEQ,
349                                                                    ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
350                         nkeys = 2;
351                         indexRelname = ClassNameNspIndex;
352                         break;
353
354                 default:
355                         elog(ERROR, "ScanPgRelation: bad buildinfo");
356                         return NULL;            /* keep compiler quiet */
357         }
358
359         /*
360          * Open pg_class and fetch a tuple.  Force heap scan if we haven't
361          * yet built the critical relcache entries (this includes initdb
362          * and startup without a pg_internal.init file).
363          */
364         pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
365         pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
366                                                                            criticalRelcachesBuilt,
367                                                                            SnapshotNow,
368                                                                            nkeys, key);
369
370         pg_class_tuple = systable_getnext(pg_class_scan);
371
372         /*
373          * Must copy tuple before releasing buffer.
374          */
375         if (HeapTupleIsValid(pg_class_tuple))
376                 pg_class_tuple = heap_copytuple(pg_class_tuple);
377
378         /* all done */
379         systable_endscan(pg_class_scan);
380         heap_close(pg_class_desc, AccessShareLock);
381
382         return pg_class_tuple;
383 }
384
385 /*
386  *              AllocateRelationDesc
387  *
388  *              This is used to allocate memory for a new relation descriptor
389  *              and initialize the rd_rel field.
390  *
391  *              If 'relation' is NULL, allocate a new RelationData object.
392  *              If not, reuse the given object (that path is taken only when
393  *              we have to rebuild a relcache entry during RelationClearRelation).
394  */
395 static Relation
396 AllocateRelationDesc(Relation relation, Form_pg_class relp)
397 {
398         MemoryContext oldcxt;
399         Form_pg_class relationForm;
400
401         /* Relcache entries must live in CacheMemoryContext */
402         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
403
404         /*
405          * allocate space for new relation descriptor, if needed
406          */
407         if (relation == NULL)
408                 relation = (Relation) palloc(sizeof(RelationData));
409
410         /*
411          * clear all fields of reldesc
412          */
413         MemSet((char *) relation, 0, sizeof(RelationData));
414         relation->rd_targblock = InvalidBlockNumber;
415
416         /* make sure relation is marked as having no open file yet */
417         relation->rd_fd = -1;
418
419         /*
420          * Copy the relation tuple form
421          *
422          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
423          * relacl is NOT stored in the relcache --- there'd be little point in
424          * it, since we don't copy the tuple's nullvalues bitmap and hence
425          * wouldn't know if the value is valid ... bottom line is that relacl
426          * *cannot* be retrieved from the relcache.  Get it from the syscache
427          * if you need it.
428          */
429         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
430
431         memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
432
433         /* initialize relation tuple form */
434         relation->rd_rel = relationForm;
435
436         /* and allocate attribute tuple form storage */
437         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
438                                                                                            relationForm->relhasoids);
439
440         MemoryContextSwitchTo(oldcxt);
441
442         return relation;
443 }
444
445 /*
446  *              RelationBuildTupleDesc
447  *
448  *              Form the relation's tuple descriptor from information in
449  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
450  */
451 static void
452 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
453                                            Relation relation)
454 {
455         HeapTuple       pg_attribute_tuple;
456         Relation        pg_attribute_desc;
457         SysScanDesc pg_attribute_scan;
458         ScanKeyData skey[2];
459         int                     need;
460         TupleConstr *constr;
461         AttrDefault *attrdef = NULL;
462         int                     ndef = 0;
463
464         relation->rd_att->tdhasoid = RelationGetForm(relation)->relhasoids;
465
466         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
467                                                                                                 sizeof(TupleConstr));
468         constr->has_not_null = false;
469
470         /*
471          * Form a scan key that selects only user attributes (attnum > 0).
472          * (Eliminating system attribute rows at the index level is lots
473          * faster than fetching them.)
474          */
475         ScanKeyEntryInitialize(&skey[0], 0,
476                                                    Anum_pg_attribute_attrelid,
477                                                    F_OIDEQ,
478                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
479         ScanKeyEntryInitialize(&skey[1], 0,
480                                                    Anum_pg_attribute_attnum,
481                                                    F_INT2GT,
482                                                    Int16GetDatum(0));
483
484         /*
485          * Open pg_attribute and begin a scan.  Force heap scan if we haven't
486          * yet built the critical relcache entries (this includes initdb
487          * and startup without a pg_internal.init file).
488          */
489         pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
490         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
491                                                                                    AttributeRelidNumIndex,
492                                                                                    criticalRelcachesBuilt,
493                                                                                    SnapshotNow,
494                                                                                    2, skey);
495
496         /*
497          * add attribute data to relation->rd_att
498          */
499         need = relation->rd_rel->relnatts;
500
501         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
502         {
503                 Form_pg_attribute attp;
504
505                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
506
507                 if (attp->attnum <= 0 ||
508                         attp->attnum > relation->rd_rel->relnatts)
509                         elog(ERROR, "Bogus attribute number %d for %s",
510                                  attp->attnum, RelationGetRelationName(relation));
511
512                 relation->rd_att->attrs[attp->attnum - 1] =
513                         (Form_pg_attribute) MemoryContextAlloc(CacheMemoryContext,
514                                                                                                    ATTRIBUTE_TUPLE_SIZE);
515
516                 memcpy((char *) (relation->rd_att->attrs[attp->attnum - 1]),
517                            (char *) attp,
518                            ATTRIBUTE_TUPLE_SIZE);
519
520                 /* Update constraint/default info */
521                 if (attp->attnotnull)
522                         constr->has_not_null = true;
523
524                 if (attp->atthasdef)
525                 {
526                         if (attrdef == NULL)
527                         {
528                                 attrdef = (AttrDefault *)
529                                         MemoryContextAlloc(CacheMemoryContext,
530                                                                            relation->rd_rel->relnatts *
531                                                                            sizeof(AttrDefault));
532                                 MemSet(attrdef, 0,
533                                            relation->rd_rel->relnatts * sizeof(AttrDefault));
534                         }
535                         attrdef[ndef].adnum = attp->attnum;
536                         attrdef[ndef].adbin = NULL;
537                         ndef++;
538                 }
539                 need--;
540                 if (need == 0)
541                         break;
542         }
543
544         /*
545          * end the scan and close the attribute relation
546          */
547         systable_endscan(pg_attribute_scan);
548         heap_close(pg_attribute_desc, AccessShareLock);
549
550         if (need != 0)
551                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
552                          need, RelationGetRelid(relation));
553
554         /*
555          * The attcacheoff values we read from pg_attribute should all be -1
556          * ("unknown").  Verify this if assert checking is on.  They will be
557          * computed when and if needed during tuple access.
558          */
559 #ifdef USE_ASSERT_CHECKING
560         {
561                 int                     i;
562
563                 for (i = 0; i < relation->rd_rel->relnatts; i++)
564                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
565         }
566 #endif
567
568         /*
569          * However, we can easily set the attcacheoff value for the first
570          * attribute: it must be zero.  This eliminates the need for special
571          * cases for attnum=1 that used to exist in fastgetattr() and
572          * index_getattr().
573          */
574         relation->rd_att->attrs[0]->attcacheoff = 0;
575
576         /*
577          * Set up constraint/default info
578          */
579         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
580         {
581                 relation->rd_att->constr = constr;
582
583                 if (ndef > 0)                   /* DEFAULTs */
584                 {
585                         if (ndef < relation->rd_rel->relnatts)
586                                 constr->defval = (AttrDefault *)
587                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
588                         else
589                                 constr->defval = attrdef;
590                         constr->num_defval = ndef;
591                         AttrDefaultFetch(relation);
592                 }
593                 else
594                         constr->num_defval = 0;
595
596                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
597                 {
598                         constr->num_check = relation->rd_rel->relchecks;
599                         constr->check = (ConstrCheck *)
600                                 MemoryContextAlloc(CacheMemoryContext,
601                                                                 constr->num_check * sizeof(ConstrCheck));
602                         MemSet(constr->check, 0, constr->num_check * sizeof(ConstrCheck));
603                         CheckConstraintFetch(relation);
604                 }
605                 else
606                         constr->num_check = 0;
607         }
608         else
609         {
610                 pfree(constr);
611                 relation->rd_att->constr = NULL;
612         }
613 }
614
615 /*
616  *              RelationBuildRuleLock
617  *
618  *              Form the relation's rewrite rules from information in
619  *              the pg_rewrite system catalog.
620  *
621  * Note: The rule parsetrees are potentially very complex node structures.
622  * To allow these trees to be freed when the relcache entry is flushed,
623  * we make a private memory context to hold the RuleLock information for
624  * each relcache entry that has associated rules.  The context is used
625  * just for rule info, not for any other subsidiary data of the relcache
626  * entry, because that keeps the update logic in RelationClearRelation()
627  * manageable.  The other subsidiary data structures are simple enough
628  * to be easy to free explicitly, anyway.
629  */
630 static void
631 RelationBuildRuleLock(Relation relation)
632 {
633         MemoryContext rulescxt;
634         MemoryContext oldcxt;
635         HeapTuple       rewrite_tuple;
636         Relation        rewrite_desc;
637         TupleDesc       rewrite_tupdesc;
638         SysScanDesc rewrite_scan;
639         ScanKeyData key;
640         RuleLock   *rulelock;
641         int                     numlocks;
642         RewriteRule **rules;
643         int                     maxlocks;
644
645         /*
646          * Make the private context.  Parameters are set on the assumption
647          * that it'll probably not contain much data.
648          */
649         rulescxt = AllocSetContextCreate(CacheMemoryContext,
650                                                                          RelationGetRelationName(relation),
651                                                                          0, /* minsize */
652                                                                          1024,          /* initsize */
653                                                                          1024);         /* maxsize */
654         relation->rd_rulescxt = rulescxt;
655
656         /*
657          * allocate an array to hold the rewrite rules (the array is extended if
658          * necessary)
659          */
660         maxlocks = 4;
661         rules = (RewriteRule **)
662                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
663         numlocks = 0;
664
665         /*
666          * form a scan key
667          */
668         ScanKeyEntryInitialize(&key, 0,
669                                                    Anum_pg_rewrite_ev_class,
670                                                    F_OIDEQ,
671                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
672
673         /*
674          * open pg_rewrite and begin a scan
675          *
676          * Note: since we scan the rules using RewriteRelRulenameIndex,
677          * we will be reading the rules in name order, except possibly
678          * during emergency-recovery operations (ie, IsIgnoringSystemIndexes).
679          * This in turn ensures that rules will be fired in name order.
680          */
681         rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
682         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
683         rewrite_scan = systable_beginscan(rewrite_desc, 
684                                                                           RewriteRelRulenameIndex,
685                                                                           true, SnapshotNow,
686                                                                           1, &key);
687
688         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
689         {
690                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
691                 bool            isnull;
692                 Datum           ruleaction;
693                 Datum           rule_evqual;
694                 char       *ruleaction_str;
695                 char       *rule_evqual_str;
696                 RewriteRule *rule;
697
698                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
699                                                                                                   sizeof(RewriteRule));
700
701                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
702
703                 rule->event = rewrite_form->ev_type - '0';
704                 rule->attrno = rewrite_form->ev_attr;
705                 rule->isInstead = rewrite_form->is_instead;
706
707                 /* Must use heap_getattr to fetch ev_qual and ev_action */
708
709                 ruleaction = heap_getattr(rewrite_tuple,
710                                                                   Anum_pg_rewrite_ev_action,
711                                                                   rewrite_tupdesc,
712                                                                   &isnull);
713                 Assert(!isnull);
714                 ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
715                                                                                                                          ruleaction));
716                 oldcxt = MemoryContextSwitchTo(rulescxt);
717                 rule->actions = (List *) stringToNode(ruleaction_str);
718                 MemoryContextSwitchTo(oldcxt);
719                 pfree(ruleaction_str);
720
721                 rule_evqual = heap_getattr(rewrite_tuple,
722                                                                    Anum_pg_rewrite_ev_qual,
723                                                                    rewrite_tupdesc,
724                                                                    &isnull);
725                 Assert(!isnull);
726                 rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
727                                                                                                                           rule_evqual));
728                 oldcxt = MemoryContextSwitchTo(rulescxt);
729                 rule->qual = (Node *) stringToNode(rule_evqual_str);
730                 MemoryContextSwitchTo(oldcxt);
731                 pfree(rule_evqual_str);
732
733                 if (numlocks >= maxlocks)
734                 {
735                         maxlocks *= 2;
736                         rules = (RewriteRule **)
737                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
738                 }
739                 rules[numlocks++] = rule;
740         }
741
742         /*
743          * end the scan and close the attribute relation
744          */
745         systable_endscan(rewrite_scan);
746         heap_close(rewrite_desc, AccessShareLock);
747
748         /*
749          * form a RuleLock and insert into relation
750          */
751         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
752         rulelock->numLocks = numlocks;
753         rulelock->rules = rules;
754
755         relation->rd_rules = rulelock;
756 }
757
758 /*
759  *              equalRuleLocks
760  *
761  *              Determine whether two RuleLocks are equivalent
762  *
763  *              Probably this should be in the rules code someplace...
764  */
765 static bool
766 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
767 {
768         int                     i;
769
770         /*
771          * As of 7.3 we assume the rule ordering is repeatable,
772          * because RelationBuildRuleLock should read 'em in a
773          * consistent order.  So just compare corresponding slots.
774          */
775         if (rlock1 != NULL)
776         {
777                 if (rlock2 == NULL)
778                         return false;
779                 if (rlock1->numLocks != rlock2->numLocks)
780                         return false;
781                 for (i = 0; i < rlock1->numLocks; i++)
782                 {
783                         RewriteRule *rule1 = rlock1->rules[i];
784                         RewriteRule *rule2 = rlock2->rules[i];
785
786                         if (rule1->ruleId != rule2->ruleId)
787                                 return false;
788                         if (rule1->event != rule2->event)
789                                 return false;
790                         if (rule1->attrno != rule2->attrno)
791                                 return false;
792                         if (rule1->isInstead != rule2->isInstead)
793                                 return false;
794                         if (!equal(rule1->qual, rule2->qual))
795                                 return false;
796                         if (!equal(rule1->actions, rule2->actions))
797                                 return false;
798                 }
799         }
800         else if (rlock2 != NULL)
801                 return false;
802         return true;
803 }
804
805
806 /* ----------------------------------
807  *              RelationBuildDesc
808  *
809  *              Build a relation descriptor --- either a new one, or by
810  *              recycling the given old relation object.  The latter case
811  *              supports rebuilding a relcache entry without invalidating
812  *              pointers to it.
813  * --------------------------------
814  */
815 static Relation
816 RelationBuildDesc(RelationBuildDescInfo buildinfo,
817                                   Relation oldrelation)
818 {
819         Relation        relation;
820         Oid                     relid;
821         HeapTuple       pg_class_tuple;
822         Form_pg_class relp;
823         MemoryContext oldcxt;
824
825         /*
826          * find the tuple in pg_class corresponding to the given relation id
827          */
828         pg_class_tuple = ScanPgRelation(buildinfo);
829
830         /*
831          * if no such tuple exists, return NULL
832          */
833         if (!HeapTupleIsValid(pg_class_tuple))
834                 return NULL;
835
836         /*
837          * get information from the pg_class_tuple
838          */
839         relid = HeapTupleGetOid(pg_class_tuple);
840         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
841
842         /*
843          * allocate storage for the relation descriptor, and copy
844          * pg_class_tuple to relation->rd_rel.
845          */
846         relation = AllocateRelationDesc(oldrelation, relp);
847
848         /*
849          * now we can free the memory allocated for pg_class_tuple
850          */
851         heap_freetuple(pg_class_tuple);
852
853         /*
854          * initialize the relation's relation id (relation->rd_id)
855          */
856         RelationGetRelid(relation) = relid;
857
858         /*
859          * initialize relation->rd_refcnt
860          */
861         RelationSetReferenceCount(relation, 1);
862
863         /*
864          * normal relations are not nailed into the cache; nor can a pre-existing
865          * relation be new.  It could be temp though.  (Actually, it could be new
866          * too, but it's okay to forget that fact if forced to flush the entry.)
867          */
868         relation->rd_isnailed = false;
869         relation->rd_isnew = false;
870         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
871
872         /*
873          * initialize the tuple descriptor (relation->rd_att).
874          */
875         RelationBuildTupleDesc(buildinfo, relation);
876
877         /*
878          * Fetch rules and triggers that affect this relation
879          */
880         if (relation->rd_rel->relhasrules)
881                 RelationBuildRuleLock(relation);
882         else
883         {
884                 relation->rd_rules = NULL;
885                 relation->rd_rulescxt = NULL;
886         }
887
888         if (relation->rd_rel->reltriggers > 0)
889                 RelationBuildTriggers(relation);
890         else
891                 relation->trigdesc = NULL;
892
893         /*
894          * if it's an index, initialize index-related information
895          */
896         if (OidIsValid(relation->rd_rel->relam))
897                 RelationInitIndexAccessInfo(relation);
898
899         /*
900          * initialize the relation lock manager information
901          */
902         RelationInitLockInfo(relation);         /* see lmgr.c */
903
904         if (relation->rd_rel->relisshared)
905                 relation->rd_node.tblNode = InvalidOid;
906         else
907                 relation->rd_node.tblNode = MyDatabaseId;
908         relation->rd_node.relNode = relation->rd_rel->relfilenode;
909
910         /* make sure relation is marked as having no open file yet */
911         relation->rd_fd = -1;
912
913         /*
914          * Insert newly created relation into relcache hash tables.
915          */
916         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
917         RelationCacheInsert(relation);
918         MemoryContextSwitchTo(oldcxt);
919
920         /*
921          * If it's a temp rel, RelationGetNumberOfBlocks will assume that
922          * rd_nblocks is correct.  Must forcibly update the block count when
923          * creating the relcache entry.  But if we are doing a rebuild, don't
924          * do this yet; leave it to RelationClearRelation to do at the end.
925          * (Otherwise, an elog in RelationUpdateNumberOfBlocks would leave us
926          * with inconsistent relcache state.)
927          */
928         if (relation->rd_istemp && oldrelation == NULL)
929                 RelationUpdateNumberOfBlocks(relation);
930
931         return relation;
932 }
933
934 /*
935  * Initialize index-access-method support data for an index relation
936  */
937 void
938 RelationInitIndexAccessInfo(Relation relation)
939 {
940         HeapTuple       tuple;
941         Size            iformsize;
942         Form_pg_index iform;
943         Form_pg_am      aform;
944         MemoryContext indexcxt;
945         IndexStrategy strategy;
946         Oid                *operator;
947         RegProcedure *support;
948         FmgrInfo   *supportinfo;
949         int                     natts;
950         uint16          amstrategies;
951         uint16          amsupport;
952
953         /*
954          * Make a copy of the pg_index entry for the index.  Note that this
955          * is a variable-length tuple.
956          */
957         tuple = SearchSysCache(INDEXRELID,
958                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
959                                                    0, 0, 0);
960         if (!HeapTupleIsValid(tuple))
961                 elog(ERROR, "RelationInitIndexAccessInfo: no pg_index entry for index %u",
962                          RelationGetRelid(relation));
963         iformsize = tuple->t_len - tuple->t_data->t_hoff;
964         iform = (Form_pg_index) MemoryContextAlloc(CacheMemoryContext, iformsize);
965         memcpy(iform, GETSTRUCT(tuple), iformsize);
966         ReleaseSysCache(tuple);
967         relation->rd_index = iform;
968
969         /*
970          * Make a copy of the pg_am entry for the index's access method
971          */
972         tuple = SearchSysCache(AMOID,
973                                                    ObjectIdGetDatum(relation->rd_rel->relam),
974                                                    0, 0, 0);
975         if (!HeapTupleIsValid(tuple))
976                 elog(ERROR, "RelationInitIndexAccessInfo: cache lookup failed for AM %u",
977                          relation->rd_rel->relam);
978         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
979         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
980         ReleaseSysCache(tuple);
981         relation->rd_am = aform;
982
983         natts = relation->rd_rel->relnatts;
984         amstrategies = aform->amstrategies;
985         amsupport = aform->amsupport;
986
987         /*
988          * Make the private context to hold index access info.  The reason we
989          * need a context, and not just a couple of pallocs, is so that we
990          * won't leak any subsidiary info attached to fmgr lookup records.
991          *
992          * Context parameters are set on the assumption that it'll probably not
993          * contain much data.
994          */
995         indexcxt = AllocSetContextCreate(CacheMemoryContext,
996                                                                          RelationGetRelationName(relation),
997                                                                          0, /* minsize */
998                                                                          512,           /* initsize */
999                                                                          1024);         /* maxsize */
1000         relation->rd_indexcxt = indexcxt;
1001
1002         /*
1003          * Allocate arrays to hold data
1004          */
1005         if (amstrategies > 0)
1006         {
1007                 int                     noperators = natts * amstrategies;
1008                 Size            stratSize;
1009
1010                 stratSize = AttributeNumberGetIndexStrategySize(natts, amstrategies);
1011                 strategy = (IndexStrategy) MemoryContextAlloc(indexcxt, stratSize);
1012                 MemSet(strategy, 0, stratSize);
1013                 operator = (Oid *)
1014                         MemoryContextAlloc(indexcxt, noperators * sizeof(Oid));
1015                 MemSet(operator, 0, noperators * sizeof(Oid));
1016         }
1017         else
1018         {
1019                 strategy = NULL;
1020                 operator = NULL;
1021         }
1022
1023         if (amsupport > 0)
1024         {
1025                 int                     nsupport = natts * amsupport;
1026
1027                 support = (RegProcedure *)
1028                         MemoryContextAlloc(indexcxt, nsupport * sizeof(RegProcedure));
1029                 MemSet(support, 0, nsupport * sizeof(RegProcedure));
1030                 supportinfo = (FmgrInfo *)
1031                         MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
1032                 MemSet(supportinfo, 0, nsupport * sizeof(FmgrInfo));
1033         }
1034         else
1035         {
1036                 support = NULL;
1037                 supportinfo = NULL;
1038         }
1039
1040         relation->rd_istrat = strategy;
1041         relation->rd_operator = operator;
1042         relation->rd_support = support;
1043         relation->rd_supportinfo = supportinfo;
1044
1045         /*
1046          * Fill the strategy map and the support RegProcedure arrays.
1047          * (supportinfo is left as zeroes, and is filled on-the-fly when used)
1048          */
1049         IndexSupportInitialize(iform,
1050                                                    strategy, operator, support,
1051                                                    amstrategies, amsupport, natts);
1052 }
1053
1054 /*
1055  * IndexSupportInitialize
1056  *              Initializes an index strategy and associated support procedures,
1057  *              given the index's pg_index tuple.
1058  *
1059  * Data is returned into *indexStrategy, *indexOperator, and *indexSupport,
1060  * all of which are objects allocated by the caller.
1061  *
1062  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1063  * maxAttributeNumber, since these indicate the size of the arrays
1064  * it has allocated --- but in practice these numbers must always match
1065  * those obtainable from the system catalog entries for the index and
1066  * access method.
1067  */
1068 static void
1069 IndexSupportInitialize(Form_pg_index iform,
1070                                            IndexStrategy indexStrategy,
1071                                            Oid *indexOperator,
1072                                            RegProcedure *indexSupport,
1073                                            StrategyNumber maxStrategyNumber,
1074                                            StrategyNumber maxSupportNumber,
1075                                            AttrNumber maxAttributeNumber)
1076 {
1077         int                     attIndex;
1078
1079         maxStrategyNumber = AMStrategies(maxStrategyNumber);
1080
1081         /*
1082          * XXX note that the following assumes the INDEX tuple is well formed
1083          * and that the *key and *class are 0 terminated.
1084          */
1085         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1086         {
1087                 OpClassCacheEnt *opcentry;
1088
1089                 if (iform->indkey[attIndex] == InvalidAttrNumber ||
1090                         !OidIsValid(iform->indclass[attIndex]))
1091                         elog(ERROR, "IndexSupportInitialize: bogus pg_index tuple");
1092
1093                 /* look up the info for this opclass, using a cache */
1094                 opcentry = LookupOpclassInfo(iform->indclass[attIndex],
1095                                                                          maxStrategyNumber,
1096                                                                          maxSupportNumber);
1097
1098                 /* load the strategy information for the index operators */
1099                 if (maxStrategyNumber > 0)
1100                 {
1101                         StrategyMap map;
1102                         Oid                *opers;
1103                         StrategyNumber strategy;
1104
1105                         map = IndexStrategyGetStrategyMap(indexStrategy,
1106                                                                                           maxStrategyNumber,
1107                                                                                           attIndex + 1);
1108                         opers = &indexOperator[attIndex * maxStrategyNumber];
1109
1110                         for (strategy = 0; strategy < maxStrategyNumber; strategy++)
1111                         {
1112                                 ScanKey         mapentry;
1113
1114                                 mapentry = StrategyMapGetScanKeyEntry(map, strategy + 1);
1115                                 if (RegProcedureIsValid(opcentry->operatorProcs[strategy]))
1116                                 {
1117                                         MemSet(mapentry, 0, sizeof(*mapentry));
1118                                         mapentry->sk_flags = 0;
1119                                         mapentry->sk_procedure = opcentry->operatorProcs[strategy];
1120                                         /*
1121                                          * Mark mapentry->sk_func invalid, until and unless
1122                                          * someone sets it up.
1123                                          */
1124                                         mapentry->sk_func.fn_oid = InvalidOid;
1125                                 }
1126                                 else
1127                                         ScanKeyEntrySetIllegal(mapentry);
1128                                 opers[strategy] = opcentry->operatorOids[strategy];
1129                         }
1130                 }
1131
1132                 /* if support routines exist for this access method, load them */
1133                 if (maxSupportNumber > 0)
1134                 {
1135                         RegProcedure *procs;
1136                         StrategyNumber support;
1137
1138                         procs = &indexSupport[attIndex * maxSupportNumber];
1139
1140                         for (support = 0; support < maxSupportNumber; ++support)
1141                                 procs[support] = opcentry->supportProcs[support];
1142                 }
1143         }
1144 }
1145
1146 /*
1147  * LookupOpclassInfo
1148  *
1149  * This routine maintains a per-opclass cache of the information needed
1150  * by IndexSupportInitialize().  This is more efficient than relying on
1151  * the catalog cache, because we can load all the info about a particular
1152  * opclass in a single indexscan of pg_amproc or pg_amop.
1153  *
1154  * The information from pg_am about expected range of strategy and support
1155  * numbers is passed in, rather than being looked up, mainly because the
1156  * caller will have it already.
1157  *
1158  * XXX There isn't any provision for flushing the cache.  However, there
1159  * isn't any provision for flushing relcache entries when opclass info
1160  * changes, either :-(
1161  */
1162 static OpClassCacheEnt *
1163 LookupOpclassInfo(Oid operatorClassOid,
1164                                   StrategyNumber numStrats,
1165                                   StrategyNumber numSupport)
1166 {
1167         OpClassCacheEnt *opcentry;
1168         bool            found;
1169         Relation        pg_amop_desc;
1170         Relation        pg_amproc_desc;
1171         SysScanDesc pg_amop_scan;
1172         SysScanDesc pg_amproc_scan;
1173         ScanKeyData key;
1174         HeapTuple       htup;
1175         bool            indexOK;
1176
1177         if (OpClassCache == NULL)
1178         {
1179                 /* First time through: initialize the opclass cache */
1180                 HASHCTL         ctl;
1181
1182                 if (!CacheMemoryContext)
1183                         CreateCacheMemoryContext();
1184
1185                 MemSet(&ctl, 0, sizeof(ctl));
1186                 ctl.keysize = sizeof(Oid);
1187                 ctl.entrysize = sizeof(OpClassCacheEnt);
1188                 ctl.hash = tag_hash;
1189                 OpClassCache = hash_create("Operator class cache", 64,
1190                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1191         }
1192
1193         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1194                                                                                            (void *) &operatorClassOid,
1195                                                                                            HASH_ENTER, &found);
1196         if (opcentry == NULL)
1197                 elog(ERROR, "out of memory for operator class cache");
1198
1199         if (found && opcentry->valid)
1200         {
1201                 /* Already made an entry for it */
1202                 Assert(numStrats == opcentry->numStrats);
1203                 Assert(numSupport == opcentry->numSupport);
1204                 return opcentry;
1205         }
1206
1207         /* Need to fill in new entry */
1208         opcentry->valid = false;        /* until known OK */
1209         opcentry->numStrats = numStrats;
1210         opcentry->numSupport = numSupport;
1211
1212         if (numStrats > 0)
1213         {
1214                 opcentry->operatorOids = (Oid *)
1215                         MemoryContextAlloc(CacheMemoryContext,
1216                                                            numStrats * sizeof(Oid));
1217                 MemSet(opcentry->operatorOids, 0, numStrats * sizeof(Oid));
1218                 opcentry->operatorProcs = (RegProcedure *)
1219                         MemoryContextAlloc(CacheMemoryContext,
1220                                                            numStrats * sizeof(RegProcedure));
1221                 MemSet(opcentry->operatorProcs, 0, numStrats * sizeof(RegProcedure));
1222         }
1223         else
1224         {
1225                 opcentry->operatorOids = NULL;
1226                 opcentry->operatorProcs = NULL;
1227         }
1228
1229         if (numSupport > 0)
1230         {
1231                 opcentry->supportProcs = (RegProcedure *)
1232                         MemoryContextAlloc(CacheMemoryContext,
1233                                                            numSupport * sizeof(RegProcedure));
1234                 MemSet(opcentry->supportProcs, 0, numSupport * sizeof(RegProcedure));
1235         }
1236         else
1237                 opcentry->supportProcs = NULL;
1238
1239         /*
1240          * To avoid infinite recursion during startup, force a heap scan if
1241          * we're looking up info for the opclasses used by the indexes we
1242          * would like to reference here.
1243          */
1244         indexOK = criticalRelcachesBuilt ||
1245                 (operatorClassOid != OID_BTREE_OPS_OID &&
1246                  operatorClassOid != INT2_BTREE_OPS_OID);
1247
1248         /*
1249          * Scan pg_amop to obtain operators for the opclass
1250          */
1251         if (numStrats > 0)
1252         {
1253                 ScanKeyEntryInitialize(&key, 0,
1254                                                            Anum_pg_amop_amopclaid,
1255                                                            F_OIDEQ,
1256                                                            ObjectIdGetDatum(operatorClassOid));
1257                 pg_amop_desc = heap_openr(AccessMethodOperatorRelationName,
1258                                                                   AccessShareLock);
1259                 pg_amop_scan = systable_beginscan(pg_amop_desc,
1260                                                                                   AccessMethodStrategyIndex,
1261                                                                                   indexOK,
1262                                                                                   SnapshotNow,
1263                                                                                   1, &key);
1264
1265                 while (HeapTupleIsValid(htup = systable_getnext(pg_amop_scan)))
1266                 {
1267                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1268
1269                         if (amopform->amopstrategy <= 0 ||
1270                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1271                                 elog(ERROR, "Bogus amopstrategy number %d for opclass %u",
1272                                          amopform->amopstrategy, operatorClassOid);
1273                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1274                                 amopform->amopopr;
1275                         opcentry->operatorProcs[amopform->amopstrategy - 1] =
1276                                 get_opcode(amopform->amopopr);
1277                 }
1278
1279                 systable_endscan(pg_amop_scan);
1280                 heap_close(pg_amop_desc, AccessShareLock);
1281         }
1282
1283         /*
1284          * Scan pg_amproc to obtain support procs for the opclass
1285          */
1286         if (numSupport > 0)
1287         {
1288                 ScanKeyEntryInitialize(&key, 0,
1289                                                            Anum_pg_amproc_amopclaid,
1290                                                            F_OIDEQ,
1291                                                            ObjectIdGetDatum(operatorClassOid));
1292                 pg_amproc_desc = heap_openr(AccessMethodProcedureRelationName,
1293                                                                         AccessShareLock);
1294                 pg_amproc_scan = systable_beginscan(pg_amproc_desc,
1295                                                                                         AccessMethodProcedureIndex,
1296                                                                                         indexOK,
1297                                                                                         SnapshotNow,
1298                                                                                         1, &key);
1299
1300                 while (HeapTupleIsValid(htup = systable_getnext(pg_amproc_scan)))
1301                 {
1302                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1303
1304                         if (amprocform->amprocnum <= 0 ||
1305                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1306                                 elog(ERROR, "Bogus amproc number %d for opclass %u",
1307                                          amprocform->amprocnum, operatorClassOid);
1308
1309                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1310                                 amprocform->amproc;
1311                 }
1312
1313                 systable_endscan(pg_amproc_scan);
1314                 heap_close(pg_amproc_desc, AccessShareLock);
1315         }
1316
1317         opcentry->valid = true;
1318         return opcentry;
1319 }
1320
1321
1322 /*
1323  *              formrdesc
1324  *
1325  *              This is a special cut-down version of RelationBuildDesc()
1326  *              used by RelationCacheInitialize() in initializing the relcache.
1327  *              The relation descriptor is built just from the supplied parameters,
1328  *              without actually looking at any system table entries.  We cheat
1329  *              quite a lot since we only need to work for a few basic system
1330  *              catalogs.
1331  *
1332  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1333  * and pg_type (see RelationCacheInitialize).
1334  *
1335  * Note that these catalogs can't have constraints, default values,
1336  * rules, or triggers, since we don't cope with any of that.
1337  *
1338  * NOTE: we assume we are already switched into CacheMemoryContext.
1339  */
1340 static void
1341 formrdesc(const char *relationName,
1342                   int natts,
1343                   FormData_pg_attribute *att)
1344 {
1345         Relation        relation;
1346         int                     i;
1347
1348         /*
1349          * allocate new relation desc
1350          */
1351         relation = (Relation) palloc(sizeof(RelationData));
1352
1353         /*
1354          * clear all fields of reldesc
1355          */
1356         MemSet((char *) relation, 0, sizeof(RelationData));
1357         relation->rd_targblock = InvalidBlockNumber;
1358
1359         /* make sure relation is marked as having no open file yet */
1360         relation->rd_fd = -1;
1361
1362         /*
1363          * initialize reference count
1364          */
1365         RelationSetReferenceCount(relation, 1);
1366
1367         /*
1368          * all entries built with this routine are nailed-in-cache; none are
1369          * for new or temp relations.
1370          */
1371         relation->rd_isnailed = true;
1372         relation->rd_isnew = false;
1373         relation->rd_istemp = false;
1374
1375         /*
1376          * initialize relation tuple form
1377          *
1378          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1379          * get us launched.  RelationCacheInitializePhase2() will read the
1380          * real data from pg_class and replace what we've done here.
1381          */
1382         relation->rd_rel = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
1383         MemSet(relation->rd_rel, 0, CLASS_TUPLE_SIZE);
1384
1385         namestrcpy(&relation->rd_rel->relname, relationName);
1386         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1387
1388         /*
1389          * It's important to distinguish between shared and non-shared
1390          * relations, even at bootstrap time, to make sure we know where they
1391          * are stored.  At present, all relations that formrdesc is used for
1392          * are not shared.
1393          */
1394         relation->rd_rel->relisshared = false;
1395
1396         relation->rd_rel->relpages = 1;
1397         relation->rd_rel->reltuples = 1;
1398         relation->rd_rel->relkind = RELKIND_RELATION;
1399         relation->rd_rel->relhasoids = true;
1400         relation->rd_rel->relnatts = (int16) natts;
1401
1402         /*
1403          * initialize attribute tuple form
1404          *
1405          * Unlike the case with the relation tuple, this data had better be
1406          * right because it will never be replaced.  The input values must be
1407          * correctly defined by macros in src/include/catalog/ headers.
1408          */
1409         relation->rd_att = CreateTemplateTupleDesc(natts,
1410                                                                                            relation->rd_rel->relhasoids);
1411
1412         /*
1413          * initialize tuple desc info
1414          */
1415         for (i = 0; i < natts; i++)
1416         {
1417                 relation->rd_att->attrs[i] = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
1418                 memcpy((char *) relation->rd_att->attrs[i],
1419                            (char *) &att[i],
1420                            ATTRIBUTE_TUPLE_SIZE);
1421                 /* make sure attcacheoff is valid */
1422                 relation->rd_att->attrs[i]->attcacheoff = -1;
1423         }
1424
1425         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1426         relation->rd_att->attrs[0]->attcacheoff = 0;
1427
1428         /*
1429          * initialize relation id from info in att array (my, this is ugly)
1430          */
1431         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1432
1433         /*
1434          * initialize the relation's lock manager and RelFileNode information
1435          */
1436         RelationInitLockInfo(relation);         /* see lmgr.c */
1437
1438         if (relation->rd_rel->relisshared)
1439                 relation->rd_node.tblNode = InvalidOid;
1440         else
1441                 relation->rd_node.tblNode = MyDatabaseId;
1442         relation->rd_node.relNode =
1443                 relation->rd_rel->relfilenode = RelationGetRelid(relation);
1444
1445         /*
1446          * initialize the rel-has-index flag, using hardwired knowledge
1447          */
1448         relation->rd_rel->relhasindex = false;
1449
1450         /* In bootstrap mode, we have no indexes */
1451         if (!IsBootstrapProcessingMode())
1452         {
1453                 /* Otherwise, all the rels formrdesc is used for have indexes */
1454                 relation->rd_rel->relhasindex = true;
1455         }
1456
1457         /*
1458          * add new reldesc to relcache
1459          */
1460         RelationCacheInsert(relation);
1461 }
1462
1463
1464 /* ----------------------------------------------------------------
1465  *                               Relation Descriptor Lookup Interface
1466  * ----------------------------------------------------------------
1467  */
1468
1469 /*
1470  *              RelationIdCacheGetRelation
1471  *
1472  *              Lookup an existing reldesc by OID.
1473  *
1474  *              Only try to get the reldesc by looking in the cache,
1475  *              do not go to the disk.
1476  *
1477  *              NB: relation ref count is incremented if successful.
1478  *              Caller should eventually decrement count.  (Usually,
1479  *              that happens by calling RelationClose().)
1480  */
1481 Relation
1482 RelationIdCacheGetRelation(Oid relationId)
1483 {
1484         Relation        rd;
1485
1486         RelationIdCacheLookup(relationId, rd);
1487
1488         if (RelationIsValid(rd))
1489                 RelationIncrementReferenceCount(rd);
1490
1491         return rd;
1492 }
1493
1494 /*
1495  *              RelationSysNameCacheGetRelation
1496  *
1497  *              As above, but lookup by name; only works for system catalogs.
1498  */
1499 static Relation
1500 RelationSysNameCacheGetRelation(const char *relationName)
1501 {
1502         Relation        rd;
1503         NameData        name;
1504
1505         /*
1506          * make sure that the name key used for hash lookup is properly
1507          * null-padded
1508          */
1509         namestrcpy(&name, relationName);
1510         RelationSysNameCacheLookup(NameStr(name), rd);
1511
1512         if (RelationIsValid(rd))
1513                 RelationIncrementReferenceCount(rd);
1514
1515         return rd;
1516 }
1517
1518 Relation
1519 RelationNodeCacheGetRelation(RelFileNode rnode)
1520 {
1521         Relation        rd;
1522
1523         RelationNodeCacheLookup(rnode, rd);
1524
1525         if (RelationIsValid(rd))
1526                 RelationIncrementReferenceCount(rd);
1527
1528         return rd;
1529 }
1530
1531 /*
1532  *              RelationIdGetRelation
1533  *
1534  *              Lookup a reldesc by OID; make one if not already in cache.
1535  *
1536  *              NB: relation ref count is incremented, or set to 1 if new entry.
1537  *              Caller should eventually decrement count.  (Usually,
1538  *              that happens by calling RelationClose().)
1539  */
1540 Relation
1541 RelationIdGetRelation(Oid relationId)
1542 {
1543         Relation        rd;
1544         RelationBuildDescInfo buildinfo;
1545
1546         /*
1547          * first try and get a reldesc from the cache
1548          */
1549         rd = RelationIdCacheGetRelation(relationId);
1550         if (RelationIsValid(rd))
1551                 return rd;
1552
1553         /*
1554          * no reldesc in the cache, so have RelationBuildDesc() build one and
1555          * add it.
1556          */
1557         buildinfo.infotype = INFO_RELID;
1558         buildinfo.i.info_id = relationId;
1559
1560         rd = RelationBuildDesc(buildinfo, NULL);
1561         return rd;
1562 }
1563
1564 /*
1565  *              RelationSysNameGetRelation
1566  *
1567  *              As above, but lookup by name; only works for system catalogs.
1568  */
1569 Relation
1570 RelationSysNameGetRelation(const char *relationName)
1571 {
1572         Relation        rd;
1573         RelationBuildDescInfo buildinfo;
1574
1575         /*
1576          * first try and get a reldesc from the cache
1577          */
1578         rd = RelationSysNameCacheGetRelation(relationName);
1579         if (RelationIsValid(rd))
1580                 return rd;
1581
1582         /*
1583          * no reldesc in the cache, so have RelationBuildDesc() build one and
1584          * add it.
1585          */
1586         buildinfo.infotype = INFO_RELNAME;
1587         buildinfo.i.info_name = (char *) relationName;
1588
1589         rd = RelationBuildDesc(buildinfo, NULL);
1590         return rd;
1591 }
1592
1593 /* ----------------------------------------------------------------
1594  *                              cache invalidation support routines
1595  * ----------------------------------------------------------------
1596  */
1597
1598 /*
1599  * RelationClose - close an open relation
1600  *
1601  *      Actually, we just decrement the refcount.
1602  *
1603  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1604  *      will be freed as soon as their refcount goes to zero.  In combination
1605  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1606  *      to catch references to already-released relcache entries.  It slows
1607  *      things down quite a bit, however.
1608  */
1609 void
1610 RelationClose(Relation relation)
1611 {
1612         /* Note: no locking manipulations needed */
1613         RelationDecrementReferenceCount(relation);
1614
1615 #ifdef RELCACHE_FORCE_RELEASE
1616         if (RelationHasReferenceCountZero(relation) &&
1617                 !relation->rd_isnew)
1618                 RelationClearRelation(relation, false);
1619 #endif
1620 }
1621
1622 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1623 /*
1624  * RelationReloadClassinfo
1625  *
1626  *      This function is especially for nailed relations.
1627  *      relhasindex/relfilenode could be changed even for
1628  *      nailed relations.
1629  */
1630 static void
1631 RelationReloadClassinfo(Relation relation)
1632 {
1633         RelationBuildDescInfo buildinfo;
1634         HeapTuple       pg_class_tuple;
1635         Form_pg_class relp;
1636
1637         if (!relation->rd_rel)
1638                 return;
1639         buildinfo.infotype = INFO_RELID;
1640         buildinfo.i.info_id = relation->rd_id;
1641         pg_class_tuple = ScanPgRelation(buildinfo);
1642         if (!HeapTupleIsValid(pg_class_tuple))
1643         {
1644                 elog(ERROR, "RelationReloadClassinfo system relation id=%d doesn't exist", relation->rd_id);
1645                 return;
1646         }
1647         RelationCacheDelete(relation);
1648         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1649         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
1650         relation->rd_node.relNode = relp->relfilenode;
1651         RelationCacheInsert(relation);
1652         heap_freetuple(pg_class_tuple);
1653
1654         return;
1655 }
1656 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1657
1658 /*
1659  * RelationClearRelation
1660  *
1661  *       Physically blow away a relation cache entry, or reset it and rebuild
1662  *       it from scratch (that is, from catalog entries).  The latter path is
1663  *       usually used when we are notified of a change to an open relation
1664  *       (one with refcount > 0).  However, this routine just does whichever
1665  *       it's told to do; callers must determine which they want.
1666  */
1667 static void
1668 RelationClearRelation(Relation relation, bool rebuild)
1669 {
1670         MemoryContext oldcxt;
1671
1672         /*
1673          * Make sure smgr and lower levels close the relation's files, if they
1674          * weren't closed already.  If the relation is not getting deleted,
1675          * the next smgr access should reopen the files automatically.  This
1676          * ensures that the low-level file access state is updated after, say,
1677          * a vacuum truncation.
1678          */
1679         if (relation->rd_fd >= 0)
1680         {
1681                 smgrclose(DEFAULT_SMGR, relation);
1682                 relation->rd_fd = -1;
1683         }
1684
1685         /*
1686          * Never, never ever blow away a nailed-in system relation, because
1687          * we'd be unable to recover.
1688          */
1689         if (relation->rd_isnailed)
1690         {
1691 #ifdef  ENABLE_REINDEX_NAILED_RELATIONS
1692                 RelationReloadClassinfo(relation);
1693 #endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
1694                 return;
1695         }
1696
1697         /*
1698          * Remove relation from hash tables
1699          *
1700          * Note: we might be reinserting it momentarily, but we must not have it
1701          * visible in the hash tables until it's valid again, so don't try to
1702          * optimize this away...
1703          */
1704         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1705         RelationCacheDelete(relation);
1706         MemoryContextSwitchTo(oldcxt);
1707
1708         /* Clear out catcache's entries for this relation */
1709         CatalogCacheFlushRelation(RelationGetRelid(relation));
1710
1711         /*
1712          * Free all the subsidiary data structures of the relcache entry. We
1713          * cannot free rd_att if we are trying to rebuild the entry, however,
1714          * because pointers to it may be cached in various places. The trigger
1715          * manager might also have pointers into the trigdesc, and the rule
1716          * manager might have pointers into the rewrite rules. So to begin
1717          * with, we can only get rid of these fields:
1718          */
1719         if (relation->rd_index)
1720                 pfree(relation->rd_index);
1721         if (relation->rd_am)
1722                 pfree(relation->rd_am);
1723         if (relation->rd_rel)
1724                 pfree(relation->rd_rel);
1725         freeList(relation->rd_indexlist);
1726         if (relation->rd_indexcxt)
1727                 MemoryContextDelete(relation->rd_indexcxt);
1728
1729         /*
1730          * If we're really done with the relcache entry, blow it away. But if
1731          * someone is still using it, reconstruct the whole deal without
1732          * moving the physical RelationData record (so that the someone's
1733          * pointer is still valid).
1734          */
1735         if (!rebuild)
1736         {
1737                 /* ok to zap remaining substructure */
1738                 FreeTupleDesc(relation->rd_att);
1739                 if (relation->rd_rulescxt)
1740                         MemoryContextDelete(relation->rd_rulescxt);
1741                 FreeTriggerDesc(relation->trigdesc);
1742                 pfree(relation);
1743         }
1744         else
1745         {
1746                 /*
1747                  * When rebuilding an open relcache entry, must preserve ref count
1748                  * and rd_isnew flag.  Also attempt to preserve the tupledesc,
1749                  * rewrite rules, and trigger substructures in place.
1750                  */
1751                 int                     old_refcnt = relation->rd_refcnt;
1752                 bool            old_isnew = relation->rd_isnew;
1753                 TupleDesc       old_att = relation->rd_att;
1754                 RuleLock   *old_rules = relation->rd_rules;
1755                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1756                 TriggerDesc *old_trigdesc = relation->trigdesc;
1757                 RelationBuildDescInfo buildinfo;
1758
1759                 buildinfo.infotype = INFO_RELID;
1760                 buildinfo.i.info_id = RelationGetRelid(relation);
1761
1762                 if (RelationBuildDesc(buildinfo, relation) != relation)
1763                 {
1764                         /* Should only get here if relation was deleted */
1765                         FreeTupleDesc(old_att);
1766                         if (old_rulescxt)
1767                                 MemoryContextDelete(old_rulescxt);
1768                         FreeTriggerDesc(old_trigdesc);
1769                         pfree(relation);
1770                         elog(ERROR, "RelationClearRelation: relation %u deleted while still in use",
1771                                  buildinfo.i.info_id);
1772                 }
1773                 RelationSetReferenceCount(relation, old_refcnt);
1774                 relation->rd_isnew = old_isnew;
1775                 if (equalTupleDescs(old_att, relation->rd_att))
1776                 {
1777                         FreeTupleDesc(relation->rd_att);
1778                         relation->rd_att = old_att;
1779                 }
1780                 else
1781                         FreeTupleDesc(old_att);
1782                 if (equalRuleLocks(old_rules, relation->rd_rules))
1783                 {
1784                         if (relation->rd_rulescxt)
1785                                 MemoryContextDelete(relation->rd_rulescxt);
1786                         relation->rd_rules = old_rules;
1787                         relation->rd_rulescxt = old_rulescxt;
1788                 }
1789                 else
1790                 {
1791                         if (old_rulescxt)
1792                                 MemoryContextDelete(old_rulescxt);
1793                 }
1794                 if (equalTriggerDescs(old_trigdesc, relation->trigdesc))
1795                 {
1796                         FreeTriggerDesc(relation->trigdesc);
1797                         relation->trigdesc = old_trigdesc;
1798                 }
1799                 else
1800                         FreeTriggerDesc(old_trigdesc);
1801
1802                 /*
1803                  * Update rd_nblocks.  This is kind of expensive, but I think we must
1804                  * do it in case relation has been truncated... we definitely must
1805                  * do it if the rel is new or temp, since RelationGetNumberOfBlocks
1806                  * will subsequently assume that the block count is correct.
1807                  */
1808                 RelationUpdateNumberOfBlocks(relation);
1809         }
1810 }
1811
1812 /*
1813  * RelationFlushRelation
1814  *
1815  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1816  */
1817 static void
1818 RelationFlushRelation(Relation relation)
1819 {
1820         bool            rebuild;
1821
1822         if (relation->rd_isnew)
1823         {
1824                 /*
1825                  * New relcache entries are always rebuilt, not flushed; else we'd
1826                  * forget the "new" status of the relation, which is a useful
1827                  * optimization to have.
1828                  */
1829                 rebuild = true;
1830         }
1831         else
1832         {
1833                 /*
1834                  * Pre-existing rels can be dropped from the relcache if not open.
1835                  */
1836                 rebuild = !RelationHasReferenceCountZero(relation);
1837         }
1838
1839         RelationClearRelation(relation, rebuild);
1840 }
1841
1842 /*
1843  * RelationForgetRelation - unconditionally remove a relcache entry
1844  *
1845  *                 External interface for destroying a relcache entry when we
1846  *                 drop the relation.
1847  */
1848 void
1849 RelationForgetRelation(Oid rid)
1850 {
1851         Relation        relation;
1852
1853         RelationIdCacheLookup(rid, relation);
1854
1855         if (!PointerIsValid(relation))
1856                 return;                                 /* not in cache, nothing to do */
1857
1858         if (!RelationHasReferenceCountZero(relation))
1859                 elog(ERROR, "RelationForgetRelation: relation %u is still open", rid);
1860
1861         /* Unconditionally destroy the relcache entry */
1862         RelationClearRelation(relation, false);
1863 }
1864
1865 /*
1866  *              RelationIdInvalidateRelationCacheByRelationId
1867  *
1868  *              This routine is invoked for SI cache flush messages.
1869  *
1870  *              We used to skip local relations, on the grounds that they could
1871  *              not be targets of cross-backend SI update messages; but it seems
1872  *              safer to process them, so that our *own* SI update messages will
1873  *              have the same effects during CommandCounterIncrement for both
1874  *              local and nonlocal relations.
1875  */
1876 void
1877 RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
1878 {
1879         Relation        relation;
1880
1881         RelationIdCacheLookup(relationId, relation);
1882
1883         if (PointerIsValid(relation))
1884         {
1885                 relcacheInvalsReceived++;
1886                 RelationFlushRelation(relation);
1887         }
1888 }
1889
1890 /*
1891  * RelationCacheInvalidate
1892  *       Blow away cached relation descriptors that have zero reference counts,
1893  *       and rebuild those with positive reference counts.
1894  *
1895  *       This is currently used only to recover from SI message buffer overflow,
1896  *       so we do not touch new-in-transaction relations; they cannot be targets
1897  *       of cross-backend SI updates (and our own updates now go through a
1898  *       separate linked list that isn't limited by the SI message buffer size).
1899  *
1900  *       We do this in two phases: the first pass deletes deletable items, and
1901  *       the second one rebuilds the rebuildable items.  This is essential for
1902  *       safety, because hash_seq_search only copes with concurrent deletion of
1903  *       the element it is currently visiting.  If a second SI overflow were to
1904  *       occur while we are walking the table, resulting in recursive entry to
1905  *       this routine, we could crash because the inner invocation blows away
1906  *       the entry next to be visited by the outer scan.  But this way is OK,
1907  *       because (a) during the first pass we won't process any more SI messages,
1908  *       so hash_seq_search will complete safely; (b) during the second pass we
1909  *       only hold onto pointers to nondeletable entries.
1910  */
1911 void
1912 RelationCacheInvalidate(void)
1913 {
1914         HASH_SEQ_STATUS status;
1915         RelIdCacheEnt *idhentry;
1916         Relation        relation;
1917         List       *rebuildList = NIL;
1918         List       *l;
1919
1920         /* Phase 1 */
1921         hash_seq_init(&status, RelationIdCache);
1922
1923         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1924         {
1925                 relation = idhentry->reldesc;
1926
1927                 /* Ignore new relations, since they are never SI targets */
1928                 if (relation->rd_isnew)
1929                         continue;
1930
1931                 relcacheInvalsReceived++;
1932
1933                 if (RelationHasReferenceCountZero(relation))
1934                 {
1935                         /* Delete this entry immediately */
1936                         RelationClearRelation(relation, false);
1937                 }
1938                 else
1939                 {
1940                         /* Add entry to list of stuff to rebuild in second pass */
1941                         rebuildList = lcons(relation, rebuildList);
1942                 }
1943         }
1944
1945         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1946         foreach(l, rebuildList)
1947         {
1948                 relation = (Relation) lfirst(l);
1949                 RelationClearRelation(relation, true);
1950         }
1951         freeList(rebuildList);
1952 }
1953
1954 /*
1955  * AtEOXact_RelationCache
1956  *
1957  *      Clean up the relcache at transaction commit or abort.
1958  */
1959 void
1960 AtEOXact_RelationCache(bool commit)
1961 {
1962         HASH_SEQ_STATUS status;
1963         RelIdCacheEnt *idhentry;
1964
1965         hash_seq_init(&status, RelationIdCache);
1966
1967         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1968         {
1969                 Relation        relation = idhentry->reldesc;
1970                 int                     expected_refcnt;
1971
1972                 /*
1973                  * Is it a relation created in the current transaction?
1974                  *
1975                  * During commit, reset the flag to false, since we are now out of the
1976                  * creating transaction.  During abort, simply delete the relcache
1977                  * entry --- it isn't interesting any longer.  (NOTE: if we have
1978                  * forgotten the isnew state of a new relation due to a forced cache
1979                  * flush, the entry will get deleted anyway by shared-cache-inval
1980                  * processing of the aborted pg_class insertion.)
1981                  */
1982                 if (relation->rd_isnew)
1983                 {
1984                         if (commit)
1985                                 relation->rd_isnew = false;
1986                         else
1987                         {
1988                                 RelationClearRelation(relation, false);
1989                                 continue;
1990                         }
1991                 }
1992
1993                 /*
1994                  * During transaction abort, we must also reset relcache entry ref
1995                  * counts to their normal not-in-a-transaction state.  A ref count may
1996                  * be too high because some routine was exited by elog() between
1997                  * incrementing and decrementing the count.
1998                  *
1999                  * During commit, we should not have to do this, but it's still useful
2000                  * to check that the counts are correct to catch missed relcache
2001                  * closes.
2002                  *
2003                  * In bootstrap mode, do NOT reset the refcnt nor complain that it's
2004                  * nonzero --- the bootstrap code expects relations to stay open
2005                  * across start/commit transaction calls.  (That seems bogus, but it's
2006                  * not worth fixing.)
2007                  */
2008                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
2009
2010                 if (commit)
2011                 {
2012                         if (relation->rd_refcnt != expected_refcnt &&
2013                                 !IsBootstrapProcessingMode())
2014                         {
2015                                 elog(WARNING, "Relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
2016                                          RelationGetRelationName(relation),
2017                                          relation->rd_refcnt, expected_refcnt);
2018                                 RelationSetReferenceCount(relation, expected_refcnt);
2019                         }
2020                 }
2021                 else
2022                 {
2023                         /* abort case, just reset it quietly */
2024                         RelationSetReferenceCount(relation, expected_refcnt);
2025                 }
2026         }
2027 }
2028
2029 /*
2030  *              RelationBuildLocalRelation
2031  *                      Build a relcache entry for an about-to-be-created relation,
2032  *                      and enter it into the relcache.
2033  */
2034 Relation
2035 RelationBuildLocalRelation(const char *relname,
2036                                                    Oid relnamespace,
2037                                                    TupleDesc tupDesc,
2038                                                    Oid relid, Oid dbid,
2039                                                    RelFileNode rnode,
2040                                                    bool nailit)
2041 {
2042         Relation        rel;
2043         MemoryContext oldcxt;
2044         int                     natts = tupDesc->natts;
2045         int                     i;
2046
2047         AssertArg(natts > 0);
2048
2049         /*
2050          * switch to the cache context to create the relcache entry.
2051          */
2052         if (!CacheMemoryContext)
2053                 CreateCacheMemoryContext();
2054
2055         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2056
2057         /*
2058          * allocate a new relation descriptor and fill in basic state fields.
2059          */
2060         rel = (Relation) palloc(sizeof(RelationData));
2061         MemSet((char *) rel, 0, sizeof(RelationData));
2062
2063         rel->rd_targblock = InvalidBlockNumber;
2064
2065         /* make sure relation is marked as having no open file yet */
2066         rel->rd_fd = -1;
2067
2068         RelationSetReferenceCount(rel, 1);
2069
2070         /* it's being created in this transaction */
2071         rel->rd_isnew = true;
2072
2073         /* is it a temporary relation? */
2074         rel->rd_istemp = isTempNamespace(relnamespace);
2075
2076         /*
2077          * nail the reldesc if this is a bootstrap create reln and we may need
2078          * it in the cache later on in the bootstrap process so we don't ever
2079          * want it kicked out.  e.g. pg_attribute!!!
2080          */
2081         if (nailit)
2082                 rel->rd_isnailed = true;
2083
2084         /*
2085          * create a new tuple descriptor from the one passed in.  We do this
2086          * partly to copy it into the cache context, and partly because the
2087          * new relation can't have any defaults or constraints yet; they
2088          * have to be added in later steps, because they require additions
2089          * to multiple system catalogs.  We can copy attnotnull constraints
2090          * here, however.
2091          */
2092         rel->rd_att = CreateTupleDescCopy(tupDesc);
2093         for (i = 0; i < natts; i++)
2094                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2095
2096         /*
2097          * initialize relation tuple form (caller may add/override data later)
2098          */
2099         rel->rd_rel = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
2100         MemSet((char *) rel->rd_rel, 0, CLASS_TUPLE_SIZE);
2101
2102         namestrcpy(&rel->rd_rel->relname, relname);
2103         rel->rd_rel->relnamespace = relnamespace;
2104
2105         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2106         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2107         rel->rd_rel->relnatts = natts;
2108         rel->rd_rel->reltype = InvalidOid;
2109
2110         /*
2111          * Insert relation physical and logical identifiers (OIDs) into the
2112          * right places.
2113          */
2114         rel->rd_rel->relisshared = (dbid == InvalidOid);
2115
2116         RelationGetRelid(rel) = relid;
2117
2118         for (i = 0; i < natts; i++)
2119                 rel->rd_att->attrs[i]->attrelid = relid;
2120
2121         rel->rd_node = rnode;
2122         rel->rd_rel->relfilenode = rnode.relNode;
2123
2124         RelationInitLockInfo(rel);      /* see lmgr.c */
2125
2126         /*
2127          * Okay to insert into the relcache hash tables.
2128          */
2129         RelationCacheInsert(rel);
2130
2131         /*
2132          * done building relcache entry.
2133          */
2134         MemoryContextSwitchTo(oldcxt);
2135
2136         return rel;
2137 }
2138
2139 /*
2140  *              RelationCacheInitialize
2141  *
2142  *              This initializes the relation descriptor cache.  At the time
2143  *              that this is invoked, we can't do database access yet (mainly
2144  *              because the transaction subsystem is not up), so we can't get
2145  *              "real" info.  However it's okay to read the pg_internal.init
2146  *              cache file, if one is available.  Otherwise we make phony
2147  *              entries for the minimum set of nailed-in-cache relations.
2148  */
2149
2150 #define INITRELCACHESIZE                400
2151
2152 void
2153 RelationCacheInitialize(void)
2154 {
2155         MemoryContext oldcxt;
2156         HASHCTL         ctl;
2157
2158         /*
2159          * switch to cache memory context
2160          */
2161         if (!CacheMemoryContext)
2162                 CreateCacheMemoryContext();
2163
2164         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2165
2166         /*
2167          * create hashtables that index the relcache
2168          */
2169         MemSet(&ctl, 0, sizeof(ctl));
2170         ctl.keysize = sizeof(NameData);
2171         ctl.entrysize = sizeof(RelNameCacheEnt);
2172         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2173                                                                            &ctl, HASH_ELEM);
2174
2175         ctl.keysize = sizeof(Oid);
2176         ctl.entrysize = sizeof(RelIdCacheEnt);
2177         ctl.hash = tag_hash;
2178         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2179                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2180
2181         ctl.keysize = sizeof(RelFileNode);
2182         ctl.entrysize = sizeof(RelNodeCacheEnt);
2183         ctl.hash = tag_hash;
2184         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2185                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2186
2187         /*
2188          * Try to load the relcache cache file.  If successful, we're done
2189          * for now.  Otherwise, initialize the cache with pre-made descriptors
2190          * for the critical "nailed-in" system catalogs.
2191          */
2192         if (IsBootstrapProcessingMode() ||
2193                 ! load_relcache_init_file())
2194         {
2195                 formrdesc(RelationRelationName,
2196                                   Natts_pg_class, Desc_pg_class);
2197                 formrdesc(AttributeRelationName,
2198                                   Natts_pg_attribute, Desc_pg_attribute);
2199                 formrdesc(ProcedureRelationName,
2200                                   Natts_pg_proc, Desc_pg_proc);
2201                 formrdesc(TypeRelationName,
2202                                   Natts_pg_type, Desc_pg_type);
2203
2204 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2205         }
2206
2207         MemoryContextSwitchTo(oldcxt);
2208 }
2209
2210 /*
2211  *              RelationCacheInitializePhase2
2212  *
2213  *              This is called as soon as the catcache and transaction system
2214  *              are functional.  At this point we can actually read data from
2215  *              the system catalogs.  Update the relcache entries made during
2216  *              RelationCacheInitialize, and make sure we have entries for the
2217  *              critical system indexes.
2218  */
2219 void
2220 RelationCacheInitializePhase2(void)
2221 {
2222         HASH_SEQ_STATUS status;
2223         RelIdCacheEnt *idhentry;
2224
2225         if (IsBootstrapProcessingMode())
2226                 return;
2227
2228         /*
2229          * If we didn't get the critical system indexes loaded into relcache,
2230          * do so now.  These are critical because the catcache depends on them
2231          * for catcache fetches that are done during relcache load.  Thus, we
2232          * have an infinite-recursion problem.  We can break the recursion
2233          * by doing heapscans instead of indexscans at certain key spots.
2234          * To avoid hobbling performance, we only want to do that until we
2235          * have the critical indexes loaded into relcache.  Thus, the flag
2236          * criticalRelcachesBuilt is used to decide whether to do heapscan
2237          * or indexscan at the key spots, and we set it true after we've loaded
2238          * the critical indexes.
2239          *
2240          * The critical indexes are marked as "nailed in cache", partly to make
2241          * it easy for load_relcache_init_file to count them, but mainly
2242          * because we cannot flush and rebuild them once we've set
2243          * criticalRelcachesBuilt to true.  (NOTE: perhaps it would be possible
2244          * to reload them by temporarily setting criticalRelcachesBuilt to
2245          * false again.  For now, though, we just nail 'em in.)
2246          */
2247         if (! criticalRelcachesBuilt)
2248         {
2249                 RelationBuildDescInfo buildinfo;
2250                 Relation        ird;
2251
2252 #define LOAD_CRIT_INDEX(indname) \
2253                 do { \
2254                         buildinfo.infotype = INFO_RELNAME; \
2255                         buildinfo.i.info_name = (indname); \
2256                         ird = RelationBuildDesc(buildinfo, NULL); \
2257                         ird->rd_isnailed = true; \
2258                         RelationSetReferenceCount(ird, 1); \
2259                 } while (0)
2260
2261                 LOAD_CRIT_INDEX(ClassNameNspIndex);
2262                 LOAD_CRIT_INDEX(ClassOidIndex);
2263                 LOAD_CRIT_INDEX(AttributeRelidNumIndex);
2264                 LOAD_CRIT_INDEX(IndexRelidIndex);
2265                 LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
2266                 LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
2267                 LOAD_CRIT_INDEX(OperatorOidIndex);
2268
2269 #define NUM_CRITICAL_INDEXES    7       /* fix if you change list above */
2270
2271                 criticalRelcachesBuilt = true;
2272         }
2273
2274         /*
2275          * Now, scan all the relcache entries and update anything that might
2276          * be wrong in the results from formrdesc or the relcache cache file.
2277          * If we faked up relcache entries using formrdesc, then read
2278          * the real pg_class rows and replace the fake entries with them.
2279          * Also, if any of the relcache entries have rules or triggers,
2280          * load that info the hard way since it isn't recorded in the cache file.
2281          */
2282         hash_seq_init(&status, RelationIdCache);
2283
2284         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2285         {
2286                 Relation        relation = idhentry->reldesc;
2287
2288                 /*
2289                  * If it's a faked-up entry, read the real pg_class tuple.
2290                  */
2291                 if (needNewCacheFile && relation->rd_isnailed)
2292                 {
2293                         HeapTuple       htup;
2294                         Form_pg_class relp;
2295
2296                         htup = SearchSysCache(RELOID,
2297                                                                   ObjectIdGetDatum(RelationGetRelid(relation)),
2298                                                                   0, 0, 0);
2299                         if (!HeapTupleIsValid(htup))
2300                                 elog(FATAL, "RelationCacheInitializePhase2: no pg_class entry for %s",
2301                                          RelationGetRelationName(relation));
2302                         relp = (Form_pg_class) GETSTRUCT(htup);
2303                         /*
2304                          * Copy tuple to relation->rd_rel. (See notes in
2305                          * AllocateRelationDesc())
2306                          */
2307                         Assert(relation->rd_rel != NULL);
2308                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2309                         relation->rd_att->tdhasoid = relp->relhasoids;
2310
2311                         ReleaseSysCache(htup);
2312                 }
2313
2314                 /*
2315                  * Fix data that isn't saved in relcache cache file.
2316                  */
2317                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2318                         RelationBuildRuleLock(relation);
2319                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2320                         RelationBuildTriggers(relation);
2321         }
2322 }
2323
2324 /*
2325  *              RelationCacheInitializePhase3
2326  *
2327  *              Final step of relcache initialization: write out a new relcache
2328  *              cache file if one is needed.
2329  */
2330 void
2331 RelationCacheInitializePhase3(void)
2332 {
2333         if (IsBootstrapProcessingMode())
2334                 return;
2335
2336         if (needNewCacheFile)
2337         {
2338                 /*
2339                  * Force all the catcaches to finish initializing and thereby
2340                  * open the catalogs and indexes they use.  This will preload
2341                  * the relcache with entries for all the most important system
2342                  * catalogs and indexes, so that the init file will be most
2343                  * useful for future backends.
2344                  */
2345                 InitCatalogCachePhase2();
2346
2347                 /* now write the file */
2348                 write_relcache_init_file();
2349         }
2350 }
2351
2352
2353 /* used by XLogInitCache */
2354 void            CreateDummyCaches(void);
2355 void            DestroyDummyCaches(void);
2356
2357 void
2358 CreateDummyCaches(void)
2359 {
2360         MemoryContext oldcxt;
2361         HASHCTL         ctl;
2362
2363         if (!CacheMemoryContext)
2364                 CreateCacheMemoryContext();
2365
2366         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2367
2368         MemSet(&ctl, 0, sizeof(ctl));
2369         ctl.keysize = sizeof(NameData);
2370         ctl.entrysize = sizeof(RelNameCacheEnt);
2371         RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
2372                                                                            &ctl, HASH_ELEM);
2373
2374         ctl.keysize = sizeof(Oid);
2375         ctl.entrysize = sizeof(RelIdCacheEnt);
2376         ctl.hash = tag_hash;
2377         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2378                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2379
2380         ctl.keysize = sizeof(RelFileNode);
2381         ctl.entrysize = sizeof(RelNodeCacheEnt);
2382         ctl.hash = tag_hash;
2383         RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
2384                                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
2385
2386         MemoryContextSwitchTo(oldcxt);
2387 }
2388
2389 void
2390 DestroyDummyCaches(void)
2391 {
2392         MemoryContext oldcxt;
2393
2394         if (!CacheMemoryContext)
2395                 return;
2396
2397         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2398
2399         if (RelationIdCache)
2400                 hash_destroy(RelationIdCache);
2401         if (RelationSysNameCache)
2402                 hash_destroy(RelationSysNameCache);
2403         if (RelationNodeCache)
2404                 hash_destroy(RelationNodeCache);
2405
2406         RelationIdCache = RelationSysNameCache = RelationNodeCache = NULL;
2407
2408         MemoryContextSwitchTo(oldcxt);
2409 }
2410
2411 static void
2412 AttrDefaultFetch(Relation relation)
2413 {
2414         AttrDefault *attrdef = relation->rd_att->constr->defval;
2415         int                     ndef = relation->rd_att->constr->num_defval;
2416         Relation        adrel;
2417         SysScanDesc adscan;
2418         ScanKeyData skey;
2419         HeapTuple       htup;
2420         Datum           val;
2421         bool            isnull;
2422         int                     found;
2423         int                     i;
2424
2425         ScanKeyEntryInitialize(&skey,
2426                                                    (bits16) 0x0,
2427                                                    (AttrNumber) Anum_pg_attrdef_adrelid,
2428                                                    (RegProcedure) F_OIDEQ,
2429                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2430
2431         adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
2432         adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
2433                                                                 SnapshotNow,
2434                                                                 1, &skey);
2435         found = 0;
2436
2437         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2438         {
2439                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2440
2441                 found++;
2442                 for (i = 0; i < ndef; i++)
2443                 {
2444                         if (adform->adnum != attrdef[i].adnum)
2445                                 continue;
2446                         if (attrdef[i].adbin != NULL)
2447                                 elog(WARNING, "AttrDefaultFetch: second record found for attr %s in rel %s",
2448                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2449                                          RelationGetRelationName(relation));
2450
2451                         val = fastgetattr(htup,
2452                                                           Anum_pg_attrdef_adbin,
2453                                                           adrel->rd_att, &isnull);
2454                         if (isnull)
2455                                 elog(WARNING, "AttrDefaultFetch: adbin IS NULL for attr %s in rel %s",
2456                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2457                                          RelationGetRelationName(relation));
2458                         else
2459                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2460                                                          DatumGetCString(DirectFunctionCall1(textout,
2461                                                                                                                                  val)));
2462                         break;
2463                 }
2464
2465                 if (i >= ndef)
2466                         elog(WARNING, "AttrDefaultFetch: unexpected record found for attr %d in rel %s",
2467                                  adform->adnum,
2468                                  RelationGetRelationName(relation));
2469         }
2470
2471         systable_endscan(adscan);
2472         heap_close(adrel, AccessShareLock);
2473
2474         if (found != ndef)
2475                 elog(WARNING, "AttrDefaultFetch: %d record(s) not found for rel %s",
2476                          ndef - found, RelationGetRelationName(relation));
2477 }
2478
2479 static void
2480 CheckConstraintFetch(Relation relation)
2481 {
2482         ConstrCheck *check = relation->rd_att->constr->check;
2483         int                     ncheck = relation->rd_att->constr->num_check;
2484         Relation        conrel;
2485         SysScanDesc conscan;
2486         ScanKeyData skey[1];
2487         HeapTuple       htup;
2488         Datum           val;
2489         bool            isnull;
2490         int                     found = 0;
2491
2492         ScanKeyEntryInitialize(&skey[0], 0x0,
2493                                                    Anum_pg_constraint_conrelid, F_OIDEQ,
2494                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2495
2496         conrel = heap_openr(ConstraintRelationName, AccessShareLock);
2497         conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
2498                                                                  SnapshotNow, 1, skey);
2499
2500         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2501         {
2502                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2503
2504                 /* We want check constraints only */
2505                 if (conform->contype != CONSTRAINT_CHECK)
2506                         continue;
2507
2508                 if (found == ncheck)
2509                         elog(ERROR, "CheckConstraintFetch: unexpected record found for rel %s",
2510                                  RelationGetRelationName(relation));
2511
2512                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2513                                                                                                   NameStr(conform->conname));
2514
2515                 /* Grab and test conbin is actually set */
2516                 val = fastgetattr(htup,
2517                                                   Anum_pg_constraint_conbin,
2518                                                   conrel->rd_att, &isnull);
2519                 if (isnull)
2520                         elog(ERROR, "CheckConstraintFetch: conbin IS NULL for rel %s",
2521                                  RelationGetRelationName(relation));
2522
2523                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2524                                                          DatumGetCString(DirectFunctionCall1(textout,
2525                                                                                                                                  val)));
2526                 found++;
2527         }
2528
2529         systable_endscan(conscan);
2530         heap_close(conrel, AccessShareLock);
2531
2532         if (found != ncheck)
2533                 elog(ERROR, "CheckConstraintFetch: %d record(s) not found for rel %s",
2534                          ncheck - found, RelationGetRelationName(relation));
2535 }
2536
2537 /*
2538  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2539  *
2540  * The index list is created only if someone requests it.  We scan pg_index
2541  * to find relevant indexes, and add the list to the relcache entry so that
2542  * we won't have to compute it again.  Note that shared cache inval of a
2543  * relcache entry will delete the old list and set rd_indexfound to false,
2544  * so that we must recompute the index list on next request.  This handles
2545  * creation or deletion of an index.
2546  *
2547  * The returned list is guaranteed to be sorted in order by OID.  This is
2548  * needed by the executor, since for index types that we obtain exclusive
2549  * locks on when updating the index, all backends must lock the indexes in
2550  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2551  * consistent ordering would do, but ordering by OID is easy.
2552  *
2553  * Since shared cache inval causes the relcache's copy of the list to go away,
2554  * we return a copy of the list palloc'd in the caller's context.  The caller
2555  * may freeList() the returned list after scanning it.  This is necessary
2556  * since the caller will typically be doing syscache lookups on the relevant
2557  * indexes, and syscache lookup could cause SI messages to be processed!
2558  */
2559 List *
2560 RelationGetIndexList(Relation relation)
2561 {
2562         Relation        indrel;
2563         SysScanDesc     indscan;
2564         ScanKeyData skey;
2565         HeapTuple       htup;
2566         List       *result;
2567         MemoryContext oldcxt;
2568
2569         /* Quick exit if we already computed the list. */
2570         if (relation->rd_indexfound)
2571                 return listCopy(relation->rd_indexlist);
2572
2573         /*
2574          * We build the list we intend to return (in the caller's context)
2575          * while doing the scan.  After successfully completing the scan, we
2576          * copy that list into the relcache entry.      This avoids cache-context
2577          * memory leakage if we get some sort of error partway through.
2578          */
2579         result = NIL;
2580
2581         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2582         ScanKeyEntryInitialize(&skey,
2583                                                    (bits16) 0x0,
2584                                                    (AttrNumber) Anum_pg_index_indrelid,
2585                                                    (RegProcedure) F_OIDEQ,
2586                                                    ObjectIdGetDatum(RelationGetRelid(relation)));
2587
2588         indrel = heap_openr(IndexRelationName, AccessShareLock);
2589         indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
2590                                                                  SnapshotNow,
2591                                                                  1, &skey);
2592
2593         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2594         {
2595                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2596
2597                 result = insert_ordered_oid(result, index->indexrelid);
2598         }
2599
2600         systable_endscan(indscan);
2601         heap_close(indrel, AccessShareLock);
2602
2603         /* Now save a copy of the completed list in the relcache entry. */
2604         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2605         relation->rd_indexlist = listCopy(result);
2606         relation->rd_indexfound = true;
2607         MemoryContextSwitchTo(oldcxt);
2608
2609         return result;
2610 }
2611
2612 /*
2613  * insert_ordered_oid
2614  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2615  *
2616  * Building the ordered list this way is O(N^2), but with a pretty small
2617  * constant, so for the number of entries we expect it will probably be
2618  * faster than trying to apply qsort().  Most tables don't have very many
2619  * indexes...
2620  */
2621 static List *
2622 insert_ordered_oid(List *list, Oid datum)
2623 {
2624         List       *l;
2625
2626         /* Does the datum belong at the front? */
2627         if (list == NIL || datum < (Oid) lfirsti(list))
2628                 return lconsi(datum, list);
2629         /* No, so find the entry it belongs after */
2630         l = list;
2631         for (;;)
2632         {
2633                 List       *n = lnext(l);
2634
2635                 if (n == NIL || datum < (Oid) lfirsti(n))
2636                         break;                          /* it belongs before n */
2637                 l = n;
2638         }
2639         /* Insert datum into list after item l */
2640         lnext(l) = lconsi(datum, lnext(l));
2641         return list;
2642 }
2643
2644
2645 /*
2646  *      load_relcache_init_file, write_relcache_init_file
2647  *
2648  *              In late 1992, we started regularly having databases with more than
2649  *              a thousand classes in them.  With this number of classes, it became
2650  *              critical to do indexed lookups on the system catalogs.
2651  *
2652  *              Bootstrapping these lookups is very hard.  We want to be able to
2653  *              use an index on pg_attribute, for example, but in order to do so,
2654  *              we must have read pg_attribute for the attributes in the index,
2655  *              which implies that we need to use the index.
2656  *
2657  *              In order to get around the problem, we do the following:
2658  *
2659  *                 +  When the database system is initialized (at initdb time), we
2660  *                        don't use indexes.  We do sequential scans.
2661  *
2662  *                 +  When the backend is started up in normal mode, we load an image
2663  *                        of the appropriate relation descriptors, in internal format,
2664  *                        from an initialization file in the data/base/... directory.
2665  *
2666  *                 +  If the initialization file isn't there, then we create the
2667  *                        relation descriptors using sequential scans and write 'em to
2668  *                        the initialization file for use by subsequent backends.
2669  *
2670  *              We could dispense with the initialization file and just build the
2671  *              critical reldescs the hard way on every backend startup, but that
2672  *              slows down backend startup noticeably.
2673  *
2674  *              We can in fact go further, and save more relcache entries than
2675  *              just the ones that are absolutely critical; this allows us to speed
2676  *              up backend startup by not having to build such entries the hard way.
2677  *              Presently, all the catalog and index entries that are referred to
2678  *              by catcaches are stored in the initialization file.
2679  *
2680  *              The same mechanism that detects when catcache and relcache entries
2681  *              need to be invalidated (due to catalog updates) also arranges to
2682  *              unlink the initialization file when its contents may be out of date.
2683  *              The file will then be rebuilt during the next backend startup.
2684  */
2685
2686 /*
2687  * load_relcache_init_file -- attempt to load cache from the init file
2688  *
2689  * If successful, return TRUE and set criticalRelcachesBuilt to true.
2690  * If not successful, return FALSE and set needNewCacheFile to true.
2691  *
2692  * NOTE: we assume we are already switched into CacheMemoryContext.
2693  */
2694 static bool
2695 load_relcache_init_file(void)
2696 {
2697         FILE       *fp;
2698         char            initfilename[MAXPGPATH];
2699         Relation   *rels;
2700         int                     relno,
2701                                 num_rels,
2702                                 max_rels,
2703                                 nailed_rels,
2704                                 nailed_indexes;
2705         int                     i;
2706
2707         snprintf(initfilename, sizeof(initfilename), "%s/%s",
2708                          DatabasePath, RELCACHE_INIT_FILENAME);
2709
2710         fp = AllocateFile(initfilename, PG_BINARY_R);
2711         if (fp == NULL)
2712         {
2713                 needNewCacheFile = true;
2714                 return false;
2715         }
2716
2717         /*
2718          * Read the index relcache entries from the file.  Note we will not
2719          * enter any of them into the cache if the read fails partway through;
2720          * this helps to guard against broken init files.
2721          */
2722         max_rels = 100;
2723         rels = (Relation *) palloc(max_rels * sizeof(Relation));
2724         num_rels = 0;
2725         nailed_rels = nailed_indexes = 0;
2726         initFileRelationIds = NIL;
2727
2728         for (relno = 0; ; relno++)
2729         {
2730                 Size            len;
2731                 size_t          nread;
2732                 Relation        rel;
2733                 Form_pg_class relform;
2734
2735                 /* first read the relation descriptor length */
2736                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2737                 {
2738                         if (nread == 0)
2739                                 break;                  /* end of file */
2740                         goto read_failed;
2741                 }
2742
2743                 /* safety check for incompatible relcache layout */
2744                 if (len != sizeof(RelationData))
2745                         goto read_failed;
2746
2747                 /* allocate another relcache header */
2748                 if (num_rels >= max_rels)
2749                 {
2750                         max_rels *= 2;
2751                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
2752                 }
2753
2754                 rel = rels[num_rels++] = (Relation) palloc(len);
2755
2756                 /* then, read the Relation structure */
2757                 if ((nread = fread(rel, 1, len, fp)) != len)
2758                         goto read_failed;
2759
2760                 /* next read the relation tuple form */
2761                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2762                         goto read_failed;
2763
2764                 relform = (Form_pg_class) palloc(len);
2765                 if ((nread = fread(relform, 1, len, fp)) != len)
2766                         goto read_failed;
2767
2768                 rel->rd_rel = relform;
2769
2770                 /* initialize attribute tuple forms */
2771                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
2772                                                                                           relform->relhasoids);
2773
2774                 /* next read all the attribute tuple form data entries */
2775                 for (i = 0; i < relform->relnatts; i++)
2776                 {
2777                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2778                                 goto read_failed;
2779
2780                         rel->rd_att->attrs[i] = (Form_pg_attribute) palloc(len);
2781
2782                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
2783                                 goto read_failed;
2784                 }
2785
2786                 /* If it's an index, there's more to do */
2787                 if (rel->rd_rel->relkind == RELKIND_INDEX)
2788                 {
2789                         Form_pg_am      am;
2790                         MemoryContext indexcxt;
2791                         IndexStrategy strat;
2792                         Oid                *operator;
2793                         RegProcedure *support;
2794                         int                     nstrategies,
2795                                                 nsupport;
2796
2797                         /* Count nailed indexes to ensure we have 'em all */
2798                         if (rel->rd_isnailed)
2799                                 nailed_indexes++;
2800
2801                         /* next, read the pg_index tuple form */
2802                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2803                                 goto read_failed;
2804
2805                         rel->rd_index = (Form_pg_index) palloc(len);
2806                         if ((nread = fread(rel->rd_index, 1, len, fp)) != len)
2807                                 goto read_failed;
2808
2809                         /* next, read the access method tuple form */
2810                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2811                                 goto read_failed;
2812
2813                         am = (Form_pg_am) palloc(len);
2814                         if ((nread = fread(am, 1, len, fp)) != len)
2815                                 goto read_failed;
2816                         rel->rd_am = am;
2817
2818                         /*
2819                          * prepare index info context --- parameters should match
2820                          * RelationInitIndexAccessInfo
2821                          */
2822                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
2823                                                                                          RelationGetRelationName(rel),
2824                                                                                          0,             /* minsize */
2825                                                                                          512,   /* initsize */
2826                                                                                          1024); /* maxsize */
2827                         rel->rd_indexcxt = indexcxt;
2828
2829                         /* next, read the index strategy map */
2830                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2831                                 goto read_failed;
2832
2833                         strat = (IndexStrategy) MemoryContextAlloc(indexcxt, len);
2834                         if ((nread = fread(strat, 1, len, fp)) != len)
2835                                 goto read_failed;
2836
2837                         /* have to invalidate any FmgrInfo data in the strategy maps */
2838                         nstrategies = am->amstrategies * relform->relnatts;
2839                         for (i = 0; i < nstrategies; i++)
2840                                 strat->strategyMapData[i].entry[0].sk_func.fn_oid = InvalidOid;
2841
2842                         rel->rd_istrat = strat;
2843
2844                         /* next, read the vector of operator OIDs */
2845                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2846                                 goto read_failed;
2847
2848                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
2849                         if ((nread = fread(operator, 1, len, fp)) != len)
2850                                 goto read_failed;
2851
2852                         rel->rd_operator = operator;
2853
2854                         /* finally, read the vector of support procedures */
2855                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2856                                 goto read_failed;
2857                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
2858                         if ((nread = fread(support, 1, len, fp)) != len)
2859                                 goto read_failed;
2860
2861                         rel->rd_support = support;
2862
2863                         /* add a zeroed support-fmgr-info vector */
2864                         nsupport = relform->relnatts * am->amsupport;
2865                         rel->rd_supportinfo = (FmgrInfo *)
2866                                 MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
2867                         MemSet(rel->rd_supportinfo, 0, nsupport * sizeof(FmgrInfo));
2868                 }
2869                 else
2870                 {
2871                         /* Count nailed rels to ensure we have 'em all */
2872                         if (rel->rd_isnailed)
2873                                 nailed_rels++;
2874
2875                         Assert(rel->rd_index == NULL);
2876                         Assert(rel->rd_am == NULL);
2877                         Assert(rel->rd_indexcxt == NULL);
2878                         Assert(rel->rd_istrat == NULL);
2879                         Assert(rel->rd_operator == NULL);
2880                         Assert(rel->rd_support == NULL);
2881                         Assert(rel->rd_supportinfo == NULL);
2882                 }
2883
2884                 /*
2885                  * Rules and triggers are not saved (mainly because the internal
2886                  * format is complex and subject to change).  They must be rebuilt
2887                  * if needed by RelationCacheInitializePhase2.  This is not expected
2888                  * to be a big performance hit since few system catalogs have such.
2889                  */
2890                 rel->rd_rules = NULL;
2891                 rel->rd_rulescxt = NULL;
2892                 rel->trigdesc = NULL;
2893
2894                 /*
2895                  * Reset transient-state fields in the relcache entry
2896                  */
2897                 rel->rd_fd = -1;
2898                 rel->rd_targblock = InvalidBlockNumber;
2899                 if (rel->rd_isnailed)
2900                         RelationSetReferenceCount(rel, 1);
2901                 else
2902                         RelationSetReferenceCount(rel, 0);
2903                 rel->rd_indexfound = false;
2904                 rel->rd_indexlist = NIL;
2905                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
2906
2907                 /*
2908                  * Make sure database ID is correct.  This is needed in case the
2909                  * pg_internal.init file was copied from some other database by
2910                  * CREATE DATABASE.
2911                  */
2912                 if (rel->rd_rel->relisshared)
2913                         rel->rd_node.tblNode = InvalidOid;
2914                 else
2915                         rel->rd_node.tblNode = MyDatabaseId;
2916
2917                 RelationInitLockInfo(rel);
2918         }
2919
2920         /*
2921          * We reached the end of the init file without apparent problem.
2922          * Did we get the right number of nailed items?  (This is a useful
2923          * crosscheck in case the set of critical rels or indexes changes.)
2924          */
2925         if (nailed_rels != NUM_CRITICAL_RELS ||
2926                 nailed_indexes != NUM_CRITICAL_INDEXES)
2927                 goto read_failed;
2928
2929         /*
2930          * OK, all appears well.
2931          *
2932          * Now insert all the new relcache entries into the cache.
2933          */
2934         for (relno = 0; relno < num_rels; relno++)
2935         {
2936                 RelationCacheInsert(rels[relno]);
2937                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
2938                 initFileRelationIds = lconsi((int) RelationGetRelid(rels[relno]),
2939                                                                          initFileRelationIds);
2940         }
2941
2942         pfree(rels);
2943         FreeFile(fp);
2944
2945         criticalRelcachesBuilt = true;
2946         return true;
2947
2948         /*
2949          * init file is broken, so do it the hard way.  We don't bother
2950          * trying to free the clutter we just allocated; it's not in the
2951          * relcache so it won't hurt.
2952          */
2953 read_failed:
2954         pfree(rels);
2955         FreeFile(fp);
2956
2957         needNewCacheFile = true;
2958         return false;
2959 }
2960
2961 /*
2962  * Write out a new initialization file with the current contents
2963  * of the relcache.
2964  */
2965 static void
2966 write_relcache_init_file(void)
2967 {
2968         FILE       *fp;
2969         char            tempfilename[MAXPGPATH];
2970         char            finalfilename[MAXPGPATH];
2971         HASH_SEQ_STATUS status;
2972         RelIdCacheEnt *idhentry;
2973         MemoryContext oldcxt;
2974         int                     i;
2975
2976         /*
2977          * We must write a temporary file and rename it into place. Otherwise,
2978          * another backend starting at about the same time might crash trying
2979          * to read the partially-complete file.
2980          */
2981         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
2982                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
2983         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
2984                          DatabasePath, RELCACHE_INIT_FILENAME);
2985
2986         unlink(tempfilename);           /* in case it exists w/wrong permissions */
2987
2988         fp = AllocateFile(tempfilename, PG_BINARY_W);
2989         if (fp == NULL)
2990         {
2991                 /*
2992                  * We used to consider this a fatal error, but we might as well
2993                  * continue with backend startup ...
2994                  */
2995                 elog(WARNING, "Cannot create init file %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename);
2996                 return;
2997         }
2998
2999         /*
3000          * Write all the reldescs (in no particular order).
3001          */
3002         hash_seq_init(&status, RelationIdCache);
3003
3004         initFileRelationIds = NIL;
3005
3006         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3007         {
3008                 Relation        rel = idhentry->reldesc;
3009                 Form_pg_class relform = rel->rd_rel;
3010                 Size            len;
3011
3012                 /*
3013                  * first write the relcache entry proper
3014                  */
3015                 len = sizeof(RelationData);
3016
3017                 /* first, write the relation descriptor length */
3018                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3019                         elog(FATAL, "cannot write init file -- descriptor length");
3020
3021                 /* next, write out the Relation structure */
3022                 if (fwrite(rel, 1, len, fp) != len)
3023                         elog(FATAL, "cannot write init file -- reldesc");
3024
3025                 /* next write the relation tuple form */
3026                 len = sizeof(FormData_pg_class);
3027                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3028                         elog(FATAL, "cannot write init file -- relation tuple form length");
3029
3030                 if (fwrite(relform, 1, len, fp) != len)
3031                         elog(FATAL, "cannot write init file -- relation tuple form");
3032
3033                 /* next, do all the attribute tuple form data entries */
3034                 for (i = 0; i < relform->relnatts; i++)
3035                 {
3036                         len = ATTRIBUTE_TUPLE_SIZE;
3037                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3038                                 elog(FATAL, "cannot write init file -- length of attdesc %d", i);
3039                         if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
3040                                 elog(FATAL, "cannot write init file -- attdesc %d", i);
3041                 }
3042
3043                 /* If it's an index, there's more to do */
3044                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3045                 {
3046                         Form_pg_am      am = rel->rd_am;
3047                         HeapTuple       tuple;
3048
3049                         /*
3050                          * We need to write the index tuple form, but this is a bit
3051                          * tricky since it's a variable-length struct.  Rather than
3052                          * hoping to intuit the length, fetch the pg_index tuple
3053                          * afresh using the syscache, and write that.
3054                          */
3055                         tuple = SearchSysCache(INDEXRELID,
3056                                                                    ObjectIdGetDatum(RelationGetRelid(rel)),
3057                                                                    0, 0, 0);
3058                         if (!HeapTupleIsValid(tuple))
3059                                 elog(ERROR, "write_relcache_init_file: no pg_index entry for index %u",
3060                                          RelationGetRelid(rel));
3061                         len = tuple->t_len - tuple->t_data->t_hoff;
3062                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3063                                 elog(FATAL, "cannot write init file -- index tuple form length");
3064                         if (fwrite(GETSTRUCT(tuple), 1, len, fp) != len)
3065                                 elog(FATAL, "cannot write init file -- index tuple form");
3066                         ReleaseSysCache(tuple);
3067
3068                         /* next, write the access method tuple form */
3069                         len = sizeof(FormData_pg_am);
3070                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3071                                 elog(FATAL, "cannot write init file -- am tuple form length");
3072
3073                         if (fwrite(am, 1, len, fp) != len)
3074                                 elog(FATAL, "cannot write init file -- am tuple form");
3075
3076                         /* next, write the index strategy map */
3077                         len = AttributeNumberGetIndexStrategySize(relform->relnatts,
3078                                                                                                           am->amstrategies);
3079                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3080                                 elog(FATAL, "cannot write init file -- strategy map length");
3081
3082                         if (fwrite(rel->rd_istrat, 1, len, fp) != len)
3083                                 elog(FATAL, "cannot write init file -- strategy map");
3084
3085                         /* next, write the vector of operator OIDs */
3086                         len = relform->relnatts * (am->amstrategies * sizeof(Oid));
3087                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3088                                 elog(FATAL, "cannot write init file -- operator vector length");
3089
3090                         if (fwrite(rel->rd_operator, 1, len, fp) != len)
3091                                 elog(FATAL, "cannot write init file -- operator vector");
3092
3093                         /* finally, write the vector of support procedures */
3094                         len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
3095                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3096                                 elog(FATAL, "cannot write init file -- support vector length");
3097
3098                         if (fwrite(rel->rd_support, 1, len, fp) != len)
3099                                 elog(FATAL, "cannot write init file -- support vector");
3100                 }
3101
3102                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3103                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3104                 initFileRelationIds = lconsi((int) RelationGetRelid(rel),
3105                                                                          initFileRelationIds);
3106                 MemoryContextSwitchTo(oldcxt);
3107         }
3108
3109         FreeFile(fp);
3110
3111         /*
3112          * Now we have to check whether the data we've so painstakingly
3113          * accumulated is already obsolete due to someone else's just-committed
3114          * catalog changes.  If so, we just delete the temp file and leave it
3115          * to the next backend to try again.  (Our own relcache entries will be
3116          * updated by SI message processing, but we can't be sure whether what
3117          * we wrote out was up-to-date.)
3118          *
3119          * This mustn't run concurrently with RelationCacheInitFileInvalidate,
3120          * so grab a serialization lock for the duration.
3121          */
3122         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3123
3124         /* Make sure we have seen all incoming SI messages */
3125         AcceptInvalidationMessages();
3126
3127         /*
3128          * If we have received any SI relcache invals since backend start,
3129          * assume we may have written out-of-date data.
3130          */
3131         if (relcacheInvalsReceived == 0L)
3132         {
3133                 /*
3134                  * OK, rename the temp file to its final name, deleting any
3135                  * previously-existing init file.
3136                  *
3137                  * Note: a failure here is possible under Cygwin, if some other
3138                  * backend is holding open an unlinked-but-not-yet-gone init file.
3139                  * So treat this as a noncritical failure.
3140                  */
3141                 if (rename(tempfilename, finalfilename) < 0)
3142                 {
3143                         elog(WARNING, "Cannot rename init file %s to %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename, finalfilename);
3144                         /*
3145                          * If we fail, try to clean up the useless temp file; don't bother
3146                          * to complain if this fails too.
3147                          */
3148                         unlink(tempfilename);
3149                 }
3150         }
3151         else
3152         {
3153                 /* Delete the already-obsolete temp file */
3154                 unlink(tempfilename);
3155         }
3156
3157         LWLockRelease(RelCacheInitLock);
3158 }
3159
3160 /*
3161  * Detect whether a given relation (identified by OID) is one of the ones
3162  * we store in the init file.
3163  *
3164  * Note that we effectively assume that all backends running in a database
3165  * would choose to store the same set of relations in the init file;
3166  * otherwise there are cases where we'd fail to detect the need for an init
3167  * file invalidation.  This does not seem likely to be a problem in practice.
3168  */
3169 bool
3170 RelationIdIsInInitFile(Oid relationId)
3171 {
3172         return intMember((int) relationId, initFileRelationIds);
3173 }
3174
3175 /*
3176  * Invalidate (remove) the init file during commit of a transaction that
3177  * changed one or more of the relation cache entries that are kept in the
3178  * init file.
3179  *
3180  * We actually need to remove the init file twice: once just before sending
3181  * the SI messages that include relcache inval for such relations, and once
3182  * just after sending them.  The unlink before ensures that a backend that's
3183  * currently starting cannot read the now-obsolete init file and then miss
3184  * the SI messages that will force it to update its relcache entries.  (This
3185  * works because the backend startup sequence gets into the PROC array before
3186  * trying to load the init file.)  The unlink after is to synchronize with a
3187  * backend that may currently be trying to write an init file based on data
3188  * that we've just rendered invalid.  Such a backend will see the SI messages,
3189  * but we can't leave the init file sitting around to fool later backends.
3190  *
3191  * Ignore any failure to unlink the file, since it might not be there if
3192  * no backend has been started since the last removal.
3193  */
3194 void
3195 RelationCacheInitFileInvalidate(bool beforeSend)
3196 {
3197         char            initfilename[MAXPGPATH];
3198
3199         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3200                          DatabasePath, RELCACHE_INIT_FILENAME);
3201
3202         if (beforeSend)
3203         {
3204                 /* no interlock needed here */
3205                 unlink(initfilename);
3206         }
3207         else
3208         {
3209                 /*
3210                  * We need to interlock this against write_relcache_init_file,
3211                  * to guard against possibility that someone renames a new-but-
3212                  * already-obsolete init file into place just after we unlink.
3213                  * With the interlock, it's certain that write_relcache_init_file
3214                  * will notice our SI inval message before renaming into place,
3215                  * or else that we will execute second and successfully unlink
3216                  * the file.
3217                  */
3218                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3219                 unlink(initfilename);
3220                 LWLockRelease(RelCacheInitLock);
3221         }
3222 }