]> granicus.if.org Git - postgresql/blob - src/backend/utils/cache/relcache.c
c930acc8bf4ba27d3a750a278c6ed31be3f956f1
[postgresql] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c--
4  *        POSTGRES relation descriptor cache code
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.25 1997/09/18 20:22:24 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /*
15  * INTERFACE ROUTINES
16  *              RelationInitialize                              - initialize relcache
17  *              RelationIdCacheGetRelation              - get a reldesc from the cache (id)
18  *              RelationNameCacheGetRelation    - get a reldesc from the cache (name)
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationNameGetRelation                 - get a reldesc by relation name
21  *              RelationClose                                   - close an open relation
22  *              RelationFlushRelation                   - flush relation information
23  *
24  * NOTES
25  *              This file is in the process of being cleaned up
26  *              before I add system attribute indexing.  -cim 1/13/91
27  *
28  *              The following code contains many undocumented hacks.  Please be
29  *              careful....
30  *
31  */
32 #include <sys/types.h>
33 #include <stdio.h>                              /* for sprintf() */
34 #include <errno.h>
35 #include <sys/file.h>
36 #include <fcntl.h>
37 #include <string.h>
38
39 #include "postgres.h"
40 #include "miscadmin.h"
41
42 #include <storage/smgr.h>
43
44 #include "access/genam.h"
45 #include "access/heapam.h"
46 #include "access/htup.h"
47 #include "access/istrat.h"
48 #include "access/itup.h"
49 #include "access/skey.h"
50 #include "utils/builtins.h"
51 #include "access/tupdesc.h"
52 #include "access/tupmacs.h"
53 #include "access/xact.h"
54
55 #include "storage/buf.h"
56 #include "storage/fd.h"                 /* for SEEK_ */
57 #include "storage/lmgr.h"
58 #include "storage/bufmgr.h"
59
60 #include "lib/hasht.h"
61
62 #include "utils/memutils.h"
63 #include "utils/mcxt.h"
64 #include "utils/rel.h"
65 #include "utils/relcache.h"
66 #include "utils/hsearch.h"
67 #include "utils/relcache.h"
68
69 #include "catalog/catname.h"
70 #include "catalog/catalog.h"
71 #include "utils/syscache.h"
72
73 #include "catalog/pg_attribute.h"
74 #include "catalog/pg_aggregate.h"
75 #include "catalog/pg_index.h"
76 #include "catalog/pg_proc.h"
77 #include "catalog/pg_class.h"
78 #include "catalog/pg_rewrite.h"
79 #include "catalog/pg_type.h"
80
81 #include "catalog/pg_variable.h"
82 #include "catalog/pg_log.h"
83 #include "catalog/pg_time.h"
84 #include "catalog/pg_attrdef.h"
85 #include "catalog/pg_relcheck.h"
86 #include "catalog/indexing.h"
87 #include "catalog/index.h"
88 #include "fmgr.h"
89
90 static void
91 RelationFlushRelation(Relation *relationPtr,
92                                           bool onlyFlushReferenceCountZero);
93 static Relation RelationNameCacheGetRelation(char *relationName);
94 static void init_irels(void);
95 static void write_irels(void);
96
97 /* ----------------
98  *              defines
99  * ----------------
100  */
101 #define private static
102 #define INIT_FILENAME   "pg_internal.init"
103
104 /* ----------------
105  *              externs
106  * ----------------
107  */
108 extern bool AMI_OVERRIDE;               /* XXX style */
109 extern GlobalMemory CacheCxt;   /* from utils/cache/catcache.c */
110
111 /* ----------------
112  *              hardcoded tuple descriptors.  see lib/backend/catalog/pg_attribute.h
113  * ----------------
114  */
115 FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
116 FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
117 FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
118 FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
119 FormData_pg_attribute Desc_pg_variable[Natts_pg_variable] = {Schema_pg_variable};
120 FormData_pg_attribute Desc_pg_log[Natts_pg_log] = {Schema_pg_log};
121 FormData_pg_attribute Desc_pg_time[Natts_pg_time] = {Schema_pg_time};
122
123 /* ----------------
124  *              global variables
125  *
126  *              Relations are cached two ways, by name and by id,
127  *              thus there are two hash tables for referencing them.
128  * ----------------
129  */
130 HTAB       *RelationNameCache;
131 HTAB       *RelationIdCache;
132
133 /* ----------------
134  *              RelationBuildDescInfo exists so code can be shared
135  *              between RelationIdGetRelation() and RelationNameGetRelation()
136  * ----------------
137  */
138 typedef struct RelationBuildDescInfo
139 {
140         int                     infotype;               /* lookup by id or by name */
141 #define INFO_RELID 1
142 #define INFO_RELNAME 2
143         union
144         {
145                 Oid                     info_id;        /* relation object id */
146                 char       *info_name;  /* relation name */
147         }                       i;
148 } RelationBuildDescInfo;
149
150 typedef struct relidcacheent
151 {
152         Oid                     reloid;
153         Relation        reldesc;
154 } RelIdCacheEnt;
155
156 typedef struct relnamecacheent
157 {
158         NameData        relname;
159         Relation        reldesc;
160 } RelNameCacheEnt;
161
162 /* -----------------
163  *              macros to manipulate name cache and id cache
164  * -----------------
165  */
166 #define RelationCacheInsert(RELATION)   \
167         {       RelIdCacheEnt *idhentry; RelNameCacheEnt *namehentry; \
168                 char *relname; Oid reloid; bool found; \
169                 relname = (RELATION->rd_rel->relname).data; \
170                 namehentry = (RelNameCacheEnt*)hash_search(RelationNameCache, \
171                                                                                                    relname, \
172                                                                                                    HASH_ENTER, \
173                                                                                                    &found); \
174                 if (namehentry == NULL) { \
175                         elog(FATAL, "can't insert into relation descriptor cache"); \
176                   } \
177                 if (found && !IsBootstrapProcessingMode()) { \
178                         /* used to give notice -- now just keep quiet */ ; \
179                   } \
180                 namehentry->reldesc = RELATION; \
181                 reloid = RELATION->rd_id; \
182                 idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
183                                                                                            (char *)&reloid, \
184                                                                                            HASH_ENTER, \
185                                                                                            &found); \
186                 if (idhentry == NULL) { \
187                         elog(FATAL, "can't insert into relation descriptor cache"); \
188                   } \
189                 if (found && !IsBootstrapProcessingMode()) { \
190                         /* used to give notice -- now just keep quiet */ ; \
191                   } \
192                 idhentry->reldesc = RELATION; \
193         }
194 #define RelationNameCacheLookup(NAME, RELATION) \
195         {       RelNameCacheEnt *hentry; bool found; \
196                 hentry = (RelNameCacheEnt*)hash_search(RelationNameCache, \
197                                                                                            (char *)NAME,HASH_FIND,&found); \
198                 if (hentry == NULL) { \
199                         elog(FATAL, "error in CACHE"); \
200                   } \
201                 if (found) { \
202                         RELATION = hentry->reldesc; \
203                   } \
204                 else { \
205                         RELATION = NULL; \
206                   } \
207         }
208 #define RelationIdCacheLookup(ID, RELATION)             \
209         {       RelIdCacheEnt *hentry; bool found; \
210                 hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
211                                                                                          (char *)&(ID),HASH_FIND, &found); \
212                 if (hentry == NULL) { \
213                         elog(FATAL, "error in CACHE"); \
214                   } \
215                 if (found) { \
216                         RELATION = hentry->reldesc; \
217                   } \
218                 else { \
219                         RELATION = NULL; \
220                   } \
221         }
222 #define RelationCacheDelete(RELATION)   \
223         {       RelNameCacheEnt *namehentry; RelIdCacheEnt *idhentry; \
224                 char *relname; Oid reloid; bool found; \
225                 relname = (RELATION->rd_rel->relname).data; \
226                 namehentry = (RelNameCacheEnt*)hash_search(RelationNameCache, \
227                                                                                                    relname, \
228                                                                                                    HASH_REMOVE, \
229                                                                                                    &found); \
230                 if (namehentry == NULL) { \
231                         elog(FATAL, "can't delete from relation descriptor cache"); \
232                   } \
233                 if (!found) { \
234                         elog(NOTICE, "trying to delete a reldesc that does not exist."); \
235                   } \
236                 reloid = RELATION->rd_id; \
237                 idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
238                                                                                            (char *)&reloid, \
239                                                                                            HASH_REMOVE, &found); \
240                 if (idhentry == NULL) { \
241                         elog(FATAL, "can't delete from relation descriptor cache"); \
242                   } \
243                 if (!found) { \
244                         elog(NOTICE, "trying to delete a reldesc that does not exist."); \
245                   } \
246         }
247
248 /* non-export function prototypes */
249 static void
250 formrdesc(char *relationName, u_int natts,
251                   FormData_pg_attribute att[]);
252
253 #if 0                                                   /* See comments at line 1304 */
254 static void RelationFlushIndexes(Relation *r, Oid accessMethodId);
255
256 #endif
257
258 static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo);
259 static HeapTuple scan_pg_rel_seq(RelationBuildDescInfo buildinfo);
260 static HeapTuple scan_pg_rel_ind(RelationBuildDescInfo buildinfo);
261 static Relation AllocateRelationDesc(u_int natts, Form_pg_class relp);
262 static void
263 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
264                                            Relation relation, u_int natts);
265 static void
266 build_tupdesc_seq(RelationBuildDescInfo buildinfo,
267                                   Relation relation, u_int natts);
268 static void
269 build_tupdesc_ind(RelationBuildDescInfo buildinfo,
270                                   Relation relation, u_int natts);
271 static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo);
272 static void IndexedAccessMethodInitialize(Relation relation);
273 static void AttrDefaultFetch(Relation relation);
274 static void RelCheckFetch(Relation relation);
275
276 extern void RelationBuildTriggers(Relation relation);
277 extern void FreeTriggerDesc(Relation relation);
278
279 /*
280  * newlyCreatedRelns -
281  *        relations created during this transaction. We need to keep track of
282  *        these.
283  */
284 static List *newlyCreatedRelns = NULL;
285
286 /* ----------------------------------------------------------------
287  *              RelationIdGetRelation() and RelationNameGetRelation()
288  *                                              support functions
289  * ----------------------------------------------------------------
290  */
291
292
293 #if NOT_USED                                    /* XXX This doesn't seem to be used
294                                                                  * anywhere */
295 /* --------------------------------
296  *              BuildDescInfoError returns a string appropriate to
297  *              the buildinfo passed to it
298  * --------------------------------
299  */
300 static char *
301 BuildDescInfoError(RelationBuildDescInfo buildinfo)
302 {
303         static char errBuf[64];
304
305         MemSet(errBuf, 0, (int) sizeof(errBuf));
306         switch (buildinfo.infotype)
307         {
308                 case INFO_RELID:
309                         sprintf(errBuf, "(relation id %d)", buildinfo.i.info_id);
310                         break;
311                 case INFO_RELNAME:
312                         sprintf(errBuf, "(relation name %s)", buildinfo.i.info_name);
313                         break;
314         }
315
316         return errBuf;
317 }
318
319 #endif
320
321 /* --------------------------------
322  *              ScanPgRelation
323  *
324  *              this is used by RelationBuildDesc to find a pg_class
325  *              tuple matching either a relation name or a relation id
326  *              as specified in buildinfo.
327  * --------------------------------
328  */
329 static HeapTuple
330 ScanPgRelation(RelationBuildDescInfo buildinfo)
331 {
332
333         /*
334          * If this is bootstrap time (initdb), then we can't use the system
335          * catalog indices, because they may not exist yet.  Otherwise, we
336          * can, and do.
337          */
338
339         if (IsBootstrapProcessingMode())
340                 return (scan_pg_rel_seq(buildinfo));
341         else
342                 return (scan_pg_rel_ind(buildinfo));
343 }
344
345 static HeapTuple
346 scan_pg_rel_seq(RelationBuildDescInfo buildinfo)
347 {
348         HeapTuple       pg_class_tuple;
349         HeapTuple       return_tuple;
350         Relation        pg_class_desc;
351         HeapScanDesc pg_class_scan;
352         ScanKeyData key;
353         Buffer          buf;
354
355         /* ----------------
356          *      form a scan key
357          * ----------------
358          */
359         switch (buildinfo.infotype)
360         {
361                 case INFO_RELID:
362                         ScanKeyEntryInitialize(&key, 0,
363                                                                    ObjectIdAttributeNumber,
364                                                                    ObjectIdEqualRegProcedure,
365                                                                    ObjectIdGetDatum(buildinfo.i.info_id));
366                         break;
367
368                 case INFO_RELNAME:
369                         ScanKeyEntryInitialize(&key, 0,
370                                                                    Anum_pg_class_relname,
371                                                                    Character16EqualRegProcedure,
372                                                                    NameGetDatum(buildinfo.i.info_name));
373                         break;
374
375                 default:
376                         elog(WARN, "ScanPgRelation: bad buildinfo");
377                         return NULL;
378         }
379
380         /* ----------------
381          *      open pg_class and fetch a tuple
382          * ----------------
383          */
384         pg_class_desc = heap_openr(RelationRelationName);
385         if (!IsInitProcessingMode())
386                 RelationSetLockForRead(pg_class_desc);
387         pg_class_scan =
388                 heap_beginscan(pg_class_desc, 0, NowTimeQual, 1, &key);
389         pg_class_tuple = heap_getnext(pg_class_scan, 0, &buf);
390
391         /* ----------------
392          *      get set to return tuple
393          * ----------------
394          */
395         if (!HeapTupleIsValid(pg_class_tuple))
396         {
397                 return_tuple = pg_class_tuple;
398         }
399         else
400         {
401                 /* ------------------
402                  *      a satanic bug used to live here: pg_class_tuple used to be
403                  *      returned here without having the corresponding buffer pinned.
404                  *      so when the buffer gets replaced, all hell breaks loose.
405                  *      this bug is discovered and killed by wei on 9/27/91.
406                  * -------------------
407                  */
408                 return_tuple = (HeapTuple) palloc((Size) pg_class_tuple->t_len);
409                 memmove((char *) return_tuple,
410                                 (char *) pg_class_tuple,
411                                 (int) pg_class_tuple->t_len);
412                 ReleaseBuffer(buf);
413         }
414
415         /* all done */
416         heap_endscan(pg_class_scan);
417         if (!IsInitProcessingMode())
418                 RelationUnsetLockForRead(pg_class_desc);
419         heap_close(pg_class_desc);
420
421         return return_tuple;
422 }
423
424 static HeapTuple
425 scan_pg_rel_ind(RelationBuildDescInfo buildinfo)
426 {
427         Relation        pg_class_desc;
428         HeapTuple       return_tuple;
429
430         pg_class_desc = heap_openr(RelationRelationName);
431         if (!IsInitProcessingMode())
432                 RelationSetLockForRead(pg_class_desc);
433
434         switch (buildinfo.infotype)
435         {
436                 case INFO_RELID:
437                         return_tuple = ClassOidIndexScan(pg_class_desc, buildinfo.i.info_id);
438                         break;
439
440                 case INFO_RELNAME:
441                         return_tuple = ClassNameIndexScan(pg_class_desc,
442                                                                                           buildinfo.i.info_name);
443                         break;
444
445                 default:
446                         elog(WARN, "ScanPgRelation: bad buildinfo");
447
448                         /*
449                          * XXX I hope this is right.  It seems better than returning
450                          * an uninitialized value
451                          */
452                         return_tuple = NULL;
453         }
454
455         /* all done */
456         if (!IsInitProcessingMode())
457                 RelationUnsetLockForRead(pg_class_desc);
458         heap_close(pg_class_desc);
459
460         return return_tuple;
461 }
462
463 /* ----------------
464  *              AllocateRelationDesc
465  *
466  *              This is used to allocate memory for a new relation descriptor
467  *              and initialize the rd_rel field.
468  * ----------------
469  */
470 static Relation
471 AllocateRelationDesc(u_int natts, Form_pg_class relp)
472 {
473         Relation        relation;
474         Size            len;
475         Form_pg_class relationTupleForm;
476
477         /* ----------------
478          *      allocate space for the relation tuple form
479          * ----------------
480          */
481         relationTupleForm = (Form_pg_class)
482                 palloc((Size) (sizeof(FormData_pg_class)));
483
484         memmove((char *) relationTupleForm, (char *) relp, CLASS_TUPLE_SIZE);
485
486         /* ----------------
487          *      allocate space for new relation descriptor
488          */
489         len = sizeof(RelationData) + 10;        /* + 10 is voodoo XXX mao */
490
491         relation = (Relation) palloc(len);
492
493         /* ----------------
494          *      clear new reldesc
495          * ----------------
496          */
497         MemSet((char *) relation, 0, len);
498
499         /* initialize attribute tuple form */
500         relation->rd_att = CreateTemplateTupleDesc(natts);
501
502         /* and initialize relation tuple form */
503         relation->rd_rel = relationTupleForm;
504
505         return relation;
506 }
507
508 /* --------------------------------
509  *              RelationBuildTupleDesc
510  *
511  *              Form the relation's tuple descriptor from information in
512  *              the pg_attribute, pg_attrdef & pg_relcheck system cataloges.
513  * --------------------------------
514  */
515 static void
516 RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
517                                            Relation relation,
518                                            u_int natts)
519 {
520
521         /*
522          * If this is bootstrap time (initdb), then we can't use the system
523          * catalog indices, because they may not exist yet.  Otherwise, we
524          * can, and do.
525          */
526
527         if (IsBootstrapProcessingMode())
528                 build_tupdesc_seq(buildinfo, relation, natts);
529         else
530                 build_tupdesc_ind(buildinfo, relation, natts);
531 }
532
533 static void
534 build_tupdesc_seq(RelationBuildDescInfo buildinfo,
535                                   Relation relation,
536                                   u_int natts)
537 {
538         HeapTuple       pg_attribute_tuple;
539         Relation        pg_attribute_desc;
540         HeapScanDesc pg_attribute_scan;
541         AttributeTupleForm attp;
542         ScanKeyData key;
543         int                     need;
544
545         /* ----------------
546          *      form a scan key
547          * ----------------
548          */
549         ScanKeyEntryInitialize(&key, 0,
550                                                    Anum_pg_attribute_attrelid,
551                                                    ObjectIdEqualRegProcedure,
552                                                    ObjectIdGetDatum(relation->rd_id));
553
554         /* ----------------
555          *      open pg_attribute and begin a scan
556          * ----------------
557          */
558         pg_attribute_desc = heap_openr(AttributeRelationName);
559         pg_attribute_scan =
560                 heap_beginscan(pg_attribute_desc, 0, NowTimeQual, 1, &key);
561
562         /* ----------------
563          *      add attribute data to relation->rd_att
564          * ----------------
565          */
566         need = natts;
567
568         pg_attribute_tuple = heap_getnext(pg_attribute_scan, 0, (Buffer *) NULL);
569         while (HeapTupleIsValid(pg_attribute_tuple) && need > 0)
570         {
571                 attp = (AttributeTupleForm) GETSTRUCT(pg_attribute_tuple);
572
573                 if (attp->attnum > 0)
574                 {
575                         relation->rd_att->attrs[attp->attnum - 1] =
576                                 (AttributeTupleForm) palloc(ATTRIBUTE_TUPLE_SIZE);
577
578                         memmove((char *) (relation->rd_att->attrs[attp->attnum - 1]),
579                                         (char *) attp,
580                                         ATTRIBUTE_TUPLE_SIZE);
581                         need--;
582                 }
583                 pg_attribute_tuple = heap_getnext(pg_attribute_scan,
584                                                                                   0, (Buffer *) NULL);
585         }
586
587         if (need > 0)
588                 elog(WARN, "catalog is missing %d attribute%s for relid %d",
589                          need, (need == 1 ? "" : "s"), relation->rd_id);
590
591         /* ----------------
592          *      end the scan and close the attribute relation
593          * ----------------
594          */
595         heap_endscan(pg_attribute_scan);
596         heap_close(pg_attribute_desc);
597 }
598
599 static void
600 build_tupdesc_ind(RelationBuildDescInfo buildinfo,
601                                   Relation relation,
602                                   u_int natts)
603 {
604         Relation        attrel;
605         HeapTuple       atttup;
606         AttributeTupleForm attp;
607         TupleConstr *constr = (TupleConstr *) palloc(sizeof(TupleConstr));
608         AttrDefault *attrdef = NULL;
609         int                     ndef = 0;
610         int                     i;
611
612         constr->has_not_null = false;
613
614         attrel = heap_openr(AttributeRelationName);
615
616         for (i = 1; i <= relation->rd_rel->relnatts; i++)
617         {
618
619                 atttup = (HeapTuple) AttributeNumIndexScan(attrel, relation->rd_id, i);
620
621                 if (!HeapTupleIsValid(atttup))
622                         elog(WARN, "cannot find attribute %d of relation %.*s", i,
623                                  NAMEDATALEN, &(relation->rd_rel->relname.data[0]));
624                 attp = (AttributeTupleForm) GETSTRUCT(atttup);
625
626                 relation->rd_att->attrs[i - 1] =
627                         (AttributeTupleForm) palloc(ATTRIBUTE_TUPLE_SIZE);
628
629                 memmove((char *) (relation->rd_att->attrs[i - 1]),
630                                 (char *) attp,
631                                 ATTRIBUTE_TUPLE_SIZE);
632
633                 /* Update if this attribute have a constraint */
634                 if (attp->attnotnull)
635                         constr->has_not_null = true;
636
637                 if (attp->atthasdef)
638                 {
639                         if (attrdef == NULL)
640                                 attrdef = (AttrDefault *) palloc(relation->rd_rel->relnatts *
641                                                                                                  sizeof(AttrDefault));
642                         attrdef[ndef].adnum = i;
643                         attrdef[ndef].adbin = NULL;
644                         attrdef[ndef].adsrc = NULL;
645                         ndef++;
646                 }
647         }
648
649         heap_close(attrel);
650
651         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
652         {
653                 relation->rd_att->constr = constr;
654
655                 if (ndef > 0)                   /* DEFAULTs */
656                 {
657                         if (ndef < relation->rd_rel->relnatts)
658                                 constr->defval = (AttrDefault *)
659                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
660                         else
661                                 constr->defval = attrdef;
662                         constr->num_defval = ndef;
663                         AttrDefaultFetch(relation);
664                 }
665                 else
666                         constr->num_defval = 0;
667
668                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
669                 {
670                         constr->num_check = relation->rd_rel->relchecks;
671                         constr->check = (ConstrCheck *) palloc(constr->num_check *
672                                                                                                    sizeof(ConstrCheck));
673                         MemSet(constr->check, 0, constr->num_check * sizeof(ConstrCheck));
674                         RelCheckFetch(relation);
675                 }
676                 else
677                         constr->num_check = 0;
678         }
679         else
680         {
681                 pfree(constr);
682                 relation->rd_att->constr = NULL;
683         }
684
685 }
686
687 /* --------------------------------
688  *              RelationBuildRuleLock
689  *
690  *              Form the relation's rewrite rules from information in
691  *              the pg_rewrite system catalog.
692  * --------------------------------
693  */
694 static void
695 RelationBuildRuleLock(Relation relation)
696 {
697         HeapTuple       pg_rewrite_tuple;
698         Relation        pg_rewrite_desc;
699         TupleDesc       pg_rewrite_tupdesc;
700         HeapScanDesc pg_rewrite_scan;
701         ScanKeyData key;
702         RuleLock   *rulelock;
703         int                     numlocks;
704         RewriteRule **rules;
705         int                     maxlocks;
706
707         /* ----------------
708          *      form an array to hold the rewrite rules (the array is extended if
709          *      necessary)
710          * ----------------
711          */
712         maxlocks = 4;
713         rules = (RewriteRule **) palloc(sizeof(RewriteRule *) * maxlocks);
714         numlocks = 0;
715
716         /* ----------------
717          *      form a scan key
718          * ----------------
719          */
720         ScanKeyEntryInitialize(&key, 0,
721                                                    Anum_pg_rewrite_ev_class,
722                                                    ObjectIdEqualRegProcedure,
723                                                    ObjectIdGetDatum(relation->rd_id));
724
725         /* ----------------
726          *      open pg_attribute and begin a scan
727          * ----------------
728          */
729         pg_rewrite_desc = heap_openr(RewriteRelationName);
730         pg_rewrite_scan =
731                 heap_beginscan(pg_rewrite_desc, 0, NowTimeQual, 1, &key);
732         pg_rewrite_tupdesc =
733                 RelationGetTupleDescriptor(pg_rewrite_desc);
734
735         /* ----------------
736          *      add attribute data to relation->rd_att
737          * ----------------
738          */
739         while ((pg_rewrite_tuple = heap_getnext(pg_rewrite_scan, 0,
740                                                                                         (Buffer *) NULL)) != NULL)
741         {
742                 bool            isnull;
743                 Datum           ruleaction;
744                 Datum           rule_evqual_string;
745                 RewriteRule *rule;
746
747                 rule = (RewriteRule *) palloc(sizeof(RewriteRule));
748
749                 rule->ruleId = pg_rewrite_tuple->t_oid;
750
751                 rule->event =
752                         (int) heap_getattr(pg_rewrite_tuple, InvalidBuffer,
753                                                          Anum_pg_rewrite_ev_type, pg_rewrite_tupdesc,
754                                                            &isnull) - 48;
755                 rule->attrno =
756                         (int) heap_getattr(pg_rewrite_tuple, InvalidBuffer,
757                                                          Anum_pg_rewrite_ev_attr, pg_rewrite_tupdesc,
758                                                            &isnull);
759                 rule->isInstead =
760                         !!heap_getattr(pg_rewrite_tuple, InvalidBuffer,
761                                                    Anum_pg_rewrite_is_instead, pg_rewrite_tupdesc,
762                                                    &isnull);
763
764                 ruleaction =
765                         heap_getattr(pg_rewrite_tuple, InvalidBuffer,
766                                                  Anum_pg_rewrite_action, pg_rewrite_tupdesc,
767                                                  &isnull);
768                 rule_evqual_string =
769                         heap_getattr(pg_rewrite_tuple, InvalidBuffer,
770                                                  Anum_pg_rewrite_ev_qual, pg_rewrite_tupdesc,
771                                                  &isnull);
772
773                 ruleaction = PointerGetDatum (textout((struct varlena *) DatumGetPointer (ruleaction)));
774                 rule_evqual_string = PointerGetDatum (textout((struct varlena *) DatumGetPointer (rule_evqual_string)));
775
776                 rule->actions = (List *) stringToNode(DatumGetPointer (ruleaction));
777                 rule->qual = (Node *) stringToNode(DatumGetPointer (rule_evqual_string));
778
779                 rules[numlocks++] = rule;
780                 if (numlocks == maxlocks)
781                 {
782                         maxlocks *= 2;
783                         rules =
784                                 (RewriteRule **) repalloc(rules, sizeof(RewriteRule *) * maxlocks);
785                 }
786         }
787
788         /* ----------------
789          *      end the scan and close the attribute relation
790          * ----------------
791          */
792         heap_endscan(pg_rewrite_scan);
793         heap_close(pg_rewrite_desc);
794
795         /* ----------------
796          *      form a RuleLock and insert into relation
797          * ----------------
798          */
799         rulelock = (RuleLock *) palloc(sizeof(RuleLock));
800         rulelock->numLocks = numlocks;
801         rulelock->rules = rules;
802
803         relation->rd_rules = rulelock;
804         return;
805 }
806
807
808 /* --------------------------------
809  *              RelationBuildDesc
810  *
811  *              To build a relation descriptor, we have to allocate space,
812  *              open the underlying unix file and initialize the following
813  *              fields:
814  *
815  *      File                               rd_fd;                open file descriptor
816  *      int                                        rd_nblocks;   number of blocks in rel
817  *                                                                               it will be set in ambeginscan()
818  *      uint16                             rd_refcnt;    reference count
819  *      Form_pg_am                         rd_am;                AM tuple
820  *      Form_pg_class              rd_rel;               RELATION tuple
821  *      Oid                                        rd_id;                relations's object id
822  *      Pointer                            lockInfo;     ptr. to misc. info.
823  *      TupleDesc                          rd_att;               tuple desciptor
824  *
825  *              Note: rd_ismem (rel is in-memory only) is currently unused
826  *              by any part of the system.      someday this will indicate that
827  *              the relation lives only in the main-memory buffer pool
828  *              -cim 2/4/91
829  * --------------------------------
830  */
831 static Relation
832 RelationBuildDesc(RelationBuildDescInfo buildinfo)
833 {
834         File            fd;
835         Relation        relation;
836         u_int           natts;
837         Oid                     relid;
838         Oid                     relam;
839         Form_pg_class relp;
840
841         MemoryContext oldcxt;
842
843         HeapTuple       pg_class_tuple;
844
845         oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
846
847         /* ----------------
848          *      find the tuple in pg_class corresponding to the given relation id
849          * ----------------
850          */
851         pg_class_tuple = ScanPgRelation(buildinfo);
852
853         /* ----------------
854          *      if no such tuple exists, return NULL
855          * ----------------
856          */
857         if (!HeapTupleIsValid(pg_class_tuple))
858         {
859
860                 MemoryContextSwitchTo(oldcxt);
861
862                 return NULL;
863         }
864
865         /* ----------------
866          *      get information from the pg_class_tuple
867          * ----------------
868          */
869         relid = pg_class_tuple->t_oid;
870         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
871         natts = relp->relnatts;
872
873         /* ----------------
874          *      allocate storage for the relation descriptor,
875          *      initialize relation->rd_rel and get the access method id.
876          * ----------------
877          */
878         relation = AllocateRelationDesc(natts, relp);
879         relam = relation->rd_rel->relam;
880
881         /* ----------------
882          *      initialize the relation's relation id (relation->rd_id)
883          * ----------------
884          */
885         relation->rd_id = relid;
886
887         /* ----------------
888          *      initialize relation->rd_refcnt
889          * ----------------
890          */
891         RelationSetReferenceCount(relation, 1);
892
893         /* ----------------
894          *       normal relations are not nailed into the cache
895          * ----------------
896          */
897         relation->rd_isnailed = false;
898
899         /* ----------------
900          *      initialize the access method information (relation->rd_am)
901          * ----------------
902          */
903         if (OidIsValid(relam))
904         {
905                 relation->rd_am = (Form_pg_am)
906                         AccessMethodObjectIdGetAccessMethodTupleForm(relam);
907         }
908
909         /* ----------------
910          *      initialize the tuple descriptor (relation->rd_att).
911          *      remember, rd_att is an array of attribute pointers that lives
912          *      off the end of the relation descriptor structure so space was
913          *      already allocated for it by AllocateRelationDesc.
914          * ----------------
915          */
916         RelationBuildTupleDesc(buildinfo, relation, natts);
917
918         /* ----------------
919          *      initialize rules that affect this relation
920          * ----------------
921          */
922         if (relp->relhasrules)
923         {
924                 RelationBuildRuleLock(relation);
925         }
926         else
927         {
928                 relation->rd_rules = NULL;
929         }
930
931         /* Triggers */
932         if (relp->reltriggers > 0)
933                 RelationBuildTriggers(relation);
934         else
935                 relation->trigdesc = NULL;
936
937         /* ----------------
938          *      initialize index strategy and support information for this relation
939          * ----------------
940          */
941         if (OidIsValid(relam))
942         {
943                 IndexedAccessMethodInitialize(relation);
944         }
945
946         /* ----------------
947          *      initialize the relation lock manager information
948          * ----------------
949          */
950         RelationInitLockInfo(relation);         /* see lmgr.c */
951
952         /* ----------------
953          *      open the relation and assign the file descriptor returned
954          *      by the storage manager code to rd_fd.
955          * ----------------
956          */
957         fd = smgropen(relp->relsmgr, relation);
958
959         Assert(fd >= -1);
960         if (fd == -1)
961                 elog(NOTICE, "RelationIdBuildRelation: smgropen(%s): %m",
962                          &relp->relname);
963
964         relation->rd_fd = fd;
965
966         /* ----------------
967          *      insert newly created relation into proper relcaches,
968          *      restore memory context and return the new reldesc.
969          * ----------------
970          */
971
972         RelationCacheInsert(relation);
973
974         /* -------------------
975          *      free the memory allocated for pg_class_tuple
976          *      and for lock data pointed to by pg_class_tuple
977          * -------------------
978          */
979         pfree(pg_class_tuple);
980
981         MemoryContextSwitchTo(oldcxt);
982
983         return relation;
984 }
985
986 static void
987 IndexedAccessMethodInitialize(Relation relation)
988 {
989         IndexStrategy strategy;
990         RegProcedure *support;
991         int                     natts;
992         Size            stratSize;
993         Size            supportSize;
994         uint16          relamstrategies;
995         uint16          relamsupport;
996
997         natts = relation->rd_rel->relnatts;
998         relamstrategies = relation->rd_am->amstrategies;
999         stratSize = AttributeNumberGetIndexStrategySize(natts, relamstrategies);
1000         strategy = (IndexStrategy) palloc(stratSize);
1001         relamsupport = relation->rd_am->amsupport;
1002
1003         if (relamsupport > 0)
1004         {
1005                 supportSize = natts * (relamsupport * sizeof(RegProcedure));
1006                 support = (RegProcedure *) palloc(supportSize);
1007         }
1008         else
1009         {
1010                 support = (RegProcedure *) NULL;
1011         }
1012
1013         IndexSupportInitialize(strategy, support,
1014                                                    relation->rd_att->attrs[0]->attrelid,
1015                                                    relation->rd_rel->relam,
1016                                                    relamstrategies, relamsupport, natts);
1017
1018         RelationSetIndexSupport(relation, strategy, support);
1019 }
1020
1021 /* --------------------------------
1022  *              formrdesc
1023  *
1024  *              This is a special version of RelationBuildDesc()
1025  *              used by RelationInitialize() in initializing the
1026  *              relcache.  The system relation descriptors built
1027  *              here are all nailed in the descriptor caches, for
1028  *              bootstrapping.
1029  * --------------------------------
1030  */
1031 static void
1032 formrdesc(char *relationName,
1033                   u_int natts,
1034                   FormData_pg_attribute att[])
1035 {
1036         Relation        relation;
1037         Size            len;
1038         int                     i;
1039
1040         /* ----------------
1041          *      allocate new relation desc
1042          * ----------------
1043          */
1044         len = sizeof(RelationData);
1045         relation = (Relation) palloc(len);
1046         MemSet((char *) relation, 0, len);
1047
1048         /* ----------------
1049          *      don't open the unix file yet..
1050          * ----------------
1051          */
1052         relation->rd_fd = -1;
1053
1054         /* ----------------
1055          *      initialize reference count
1056          * ----------------
1057          */
1058         RelationSetReferenceCount(relation, 1);
1059
1060         /* ----------------
1061          *      initialize relation tuple form
1062          * ----------------
1063          */
1064         relation->rd_rel = (Form_pg_class)
1065                 palloc((Size) (sizeof(*relation->rd_rel)));
1066         MemSet(relation->rd_rel, 0, sizeof(FormData_pg_class));
1067         namestrcpy(&relation->rd_rel->relname, relationName);
1068
1069         /* ----------------
1070            initialize attribute tuple form
1071         */
1072         relation->rd_att = CreateTemplateTupleDesc(natts);
1073
1074         /*
1075          * For debugging purposes, it's important to distinguish between
1076          * shared and non-shared relations, even at bootstrap time.  There's
1077          * code in the buffer manager that traces allocations that has to know
1078          * about this.
1079          */
1080
1081         if (IsSystemRelationName(relationName))
1082         {
1083                 relation->rd_rel->relowner = 6; /* XXX use sym const */
1084                 relation->rd_rel->relisshared =
1085                         IsSharedSystemRelationName(relationName);
1086         }
1087         else
1088         {
1089                 relation->rd_rel->relowner = InvalidOid;                /* XXX incorrect */
1090                 relation->rd_rel->relisshared = false;
1091         }
1092
1093         relation->rd_rel->relpages = 1;         /* XXX */
1094         relation->rd_rel->reltuples = 1;        /* XXX */
1095         relation->rd_rel->relkind = RELKIND_RELATION;
1096         relation->rd_rel->relarch = 'n';
1097         relation->rd_rel->relnatts = (uint16) natts;
1098         relation->rd_isnailed = true;
1099
1100         /* ----------------
1101          *      initialize tuple desc info
1102          * ----------------
1103          */
1104         for (i = 0; i < natts; i++)
1105         {
1106                 relation->rd_att->attrs[i] =
1107                         (AttributeTupleForm) palloc(ATTRIBUTE_TUPLE_SIZE);
1108
1109                 MemSet((char *) relation->rd_att->attrs[i], 0,
1110                            ATTRIBUTE_TUPLE_SIZE);
1111                 memmove((char *) relation->rd_att->attrs[i],
1112                                 (char *) &att[i],
1113                                 ATTRIBUTE_TUPLE_SIZE);
1114         }
1115
1116         /* ----------------
1117          *      initialize relation id
1118          * ----------------
1119          */
1120         relation->rd_id = relation->rd_att->attrs[0]->attrelid;
1121
1122         /* ----------------
1123          *      add new reldesc to relcache
1124          * ----------------
1125          */
1126         RelationCacheInsert(relation);
1127
1128         /*
1129          * Determining this requires a scan on pg_class, but to do the scan
1130          * the rdesc for pg_class must already exist.  Therefore we must do
1131          * the check (and possible set) after cache insertion.
1132          */
1133         relation->rd_rel->relhasindex =
1134                 CatalogHasIndex(relationName, relation->rd_id);
1135 }
1136
1137
1138 /* ----------------------------------------------------------------
1139  *                               Relation Descriptor Lookup Interface
1140  * ----------------------------------------------------------------
1141  */
1142
1143 /* --------------------------------
1144  *              RelationIdCacheGetRelation
1145  *
1146  *              only try to get the reldesc by looking up the cache
1147  *              do not go to the disk.  this is used by BlockPrepareFile()
1148  *              and RelationIdGetRelation below.
1149  * --------------------------------
1150  */
1151 Relation
1152 RelationIdCacheGetRelation(Oid relationId)
1153 {
1154         Relation        rd;
1155
1156         RelationIdCacheLookup(relationId, rd);
1157
1158         if (RelationIsValid(rd))
1159         {
1160                 if (rd->rd_fd == -1)
1161                 {
1162                         rd->rd_fd = smgropen(rd->rd_rel->relsmgr, rd);
1163                         Assert(rd->rd_fd != -1);
1164                 }
1165
1166                 RelationIncrementReferenceCount(rd);
1167                 RelationSetLockForDescriptorOpen(rd);
1168
1169         }
1170
1171         return (rd);
1172 }
1173
1174 /* --------------------------------
1175  *              RelationNameCacheGetRelation
1176  * --------------------------------
1177  */
1178 static Relation
1179 RelationNameCacheGetRelation(char *relationName)
1180 {
1181         Relation        rd;
1182         NameData        name;
1183
1184         /*
1185          * make sure that the name key used for hash lookup is properly
1186          * null-padded
1187          */
1188         namestrcpy(&name, relationName);
1189         RelationNameCacheLookup(name.data, rd);
1190
1191         if (RelationIsValid(rd))
1192         {
1193                 if (rd->rd_fd == -1)
1194                 {
1195                         rd->rd_fd = smgropen(rd->rd_rel->relsmgr, rd);
1196                         Assert(rd->rd_fd != -1);
1197                 }
1198
1199                 RelationIncrementReferenceCount(rd);
1200                 RelationSetLockForDescriptorOpen(rd);
1201
1202         }
1203
1204         return (rd);
1205 }
1206
1207 /* --------------------------------
1208  *              RelationIdGetRelation
1209  *
1210  *              return a relation descriptor based on its id.
1211  *              return a cached value if possible
1212  * --------------------------------
1213  */
1214 Relation
1215 RelationIdGetRelation(Oid relationId)
1216 {
1217         Relation        rd;
1218         RelationBuildDescInfo buildinfo;
1219
1220         /* ----------------
1221          *      increment access statistics
1222          * ----------------
1223          */
1224         IncrHeapAccessStat(local_RelationIdGetRelation);
1225         IncrHeapAccessStat(global_RelationIdGetRelation);
1226
1227         /* ----------------
1228          *      first try and get a reldesc from the cache
1229          * ----------------
1230          */
1231         rd = RelationIdCacheGetRelation(relationId);
1232         if (RelationIsValid(rd))
1233                 return rd;
1234
1235         /* ----------------
1236          *      no reldesc in the cache, so have RelationBuildDesc()
1237          *      build one and add it.
1238          * ----------------
1239          */
1240         buildinfo.infotype = INFO_RELID;
1241         buildinfo.i.info_id = relationId;
1242
1243         rd = RelationBuildDesc(buildinfo);
1244         return
1245                 rd;
1246 }
1247
1248 /* --------------------------------
1249  *              RelationNameGetRelation
1250  *
1251  *              return a relation descriptor based on its name.
1252  *              return a cached value if possible
1253  * --------------------------------
1254  */
1255 Relation
1256 RelationNameGetRelation(char *relationName)
1257 {
1258         Relation        rd;
1259         RelationBuildDescInfo buildinfo;
1260
1261         /* ----------------
1262          *      increment access statistics
1263          * ----------------
1264          */
1265         IncrHeapAccessStat(local_RelationNameGetRelation);
1266         IncrHeapAccessStat(global_RelationNameGetRelation);
1267
1268         /* ----------------
1269          *      first try and get a reldesc from the cache
1270          * ----------------
1271          */
1272         rd = RelationNameCacheGetRelation(relationName);
1273         if (RelationIsValid(rd))
1274                 return rd;
1275
1276         /* ----------------
1277          *      no reldesc in the cache, so have RelationBuildDesc()
1278          *      build one and add it.
1279          * ----------------
1280          */
1281         buildinfo.infotype = INFO_RELNAME;
1282         buildinfo.i.info_name = relationName;
1283
1284         rd = RelationBuildDesc(buildinfo);
1285         return rd;
1286 }
1287
1288 /* ----------------
1289  *              old "getreldesc" interface.
1290  * ----------------
1291  */
1292 #ifdef NOT_USED
1293 Relation
1294 getreldesc(char *relationName)
1295 {
1296         /* ----------------
1297          *      increment access statistics
1298          * ----------------
1299          */
1300         IncrHeapAccessStat(local_getreldesc);
1301         IncrHeapAccessStat(global_getreldesc);
1302
1303         return RelationNameGetRelation(relationName);
1304 }
1305
1306 #endif
1307
1308 /* ----------------------------------------------------------------
1309  *                              cache invalidation support routines
1310  * ----------------------------------------------------------------
1311  */
1312
1313 /* --------------------------------
1314  *              RelationClose - close an open relation
1315  * --------------------------------
1316  */
1317 void
1318 RelationClose(Relation relation)
1319 {
1320         /* Note: no locking manipulations needed */
1321         RelationDecrementReferenceCount(relation);
1322 }
1323
1324 /* --------------------------------
1325  * RelationFlushRelation
1326  *
1327  *       Actually blows away a relation... RelationFree doesn't do
1328  *       anything anymore.
1329  * --------------------------------
1330  */
1331 static void
1332 RelationFlushRelation(Relation *relationPtr,
1333                                           bool onlyFlushReferenceCountZero)
1334 {
1335         MemoryContext oldcxt;
1336         Relation        relation = *relationPtr;
1337
1338         if (relation->rd_isnailed)
1339         {
1340                 /* this is a nailed special relation for bootstraping */
1341                 return;
1342         }
1343
1344         if (!onlyFlushReferenceCountZero ||
1345                 RelationHasReferenceCountZero(relation))
1346         {
1347
1348                 oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
1349
1350                 RelationCacheDelete(relation);
1351
1352                 FreeTupleDesc(relation->rd_att);
1353
1354                 FreeTriggerDesc(relation);
1355
1356 #if 0
1357                 if (relation->rd_rules)
1358                 {
1359                         int                     j;
1360
1361                         for (j = 0; j < relation->rd_rules->numLocks; j++)
1362                         {
1363                                 pfree(relation->rd_rules->rules[j]);
1364                         }
1365                         pfree(relation->rd_rules->rules);
1366                         pfree(relation->rd_rules);
1367                 }
1368 #endif
1369
1370                 pfree(RelationGetLockInfo(relation));
1371                 pfree(RelationGetRelationTupleForm(relation));
1372                 pfree(relation);
1373
1374                 MemoryContextSwitchTo(oldcxt);
1375         }
1376 }
1377
1378 /* --------------------------------
1379  *              RelationForgetRelation -
1380  *                 RelationFlushRelation + if the relation is local then get rid of
1381  *                 the relation descriptor from the newly created relation list.
1382  * --------------------------------
1383  */
1384 void
1385 RelationForgetRelation(Oid rid)
1386 {
1387         Relation        relation;
1388
1389         RelationIdCacheLookup(rid, relation);
1390         Assert(PointerIsValid(relation));
1391
1392         if (relation->rd_islocal)
1393         {
1394                 MemoryContext oldcxt;
1395                 List       *curr;
1396                 List       *prev = NIL;
1397
1398                 oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
1399
1400                 foreach(curr, newlyCreatedRelns)
1401                 {
1402                         Relation        reln = lfirst(curr);
1403
1404                         Assert(reln != NULL && reln->rd_islocal);
1405                         if (reln->rd_id == rid)
1406                                 break;
1407                         prev = curr;
1408                 }
1409                 if (curr == NIL)
1410                         elog(FATAL, "Local relation %s not found in list",
1411                                  (RelationGetRelationName(relation))->data);
1412                 if (prev == NIL)
1413                         newlyCreatedRelns = lnext(newlyCreatedRelns);
1414                 else
1415                         lnext(prev) = lnext(curr);
1416                 pfree(curr);
1417                 MemoryContextSwitchTo(oldcxt);
1418         }
1419
1420         RelationFlushRelation(&relation, false);
1421 }
1422
1423 /* --------------------------------
1424  *              RelationIdInvalidateRelationCacheByRelationId
1425  * --------------------------------
1426  */
1427 void
1428 RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
1429 {
1430         Relation        relation;
1431
1432         RelationIdCacheLookup(relationId, relation);
1433
1434         /*
1435          * "local" relations are invalidated by RelationPurgeLocalRelation.
1436          * (This is to make LocalBufferSync's life easier: want the descriptor
1437          * to hang around for a while. In fact, won't we want this for
1438          * BufferSync also? But I'll leave it for now since I don't want to
1439          * break anything.) - ay 3/95
1440          */
1441         if (PointerIsValid(relation) && !relation->rd_islocal)
1442         {
1443
1444                 /*
1445                  * The boolean onlyFlushReferenceCountZero in RelationFlushReln()
1446                  * should be set to true when we are incrementing the command
1447                  * counter and to false when we are starting a new xaction.  This
1448                  * can be determined by checking the current xaction status.
1449                  */
1450                 RelationFlushRelation(&relation, CurrentXactInProgress());
1451         }
1452 }
1453
1454 #if NOT_USED                                    /* See comments at line 1304 */
1455 /* --------------------------------
1456  *              RelationIdInvalidateRelationCacheByAccessMethodId
1457  *
1458  *              RelationFlushIndexes is needed for use with HashTableWalk..
1459  * --------------------------------
1460  */
1461 static void
1462 RelationFlushIndexes(Relation *r,
1463                                          Oid accessMethodId)
1464 {
1465         Relation        relation = *r;
1466
1467         if (!RelationIsValid(relation))
1468         {
1469                 elog(NOTICE, "inval call to RFI");
1470                 return;
1471         }
1472
1473         if (relation->rd_rel->relkind == RELKIND_INDEX &&       /* XXX style */
1474                 (!OidIsValid(accessMethodId) ||
1475                  relation->rd_rel->relam == accessMethodId))
1476         {
1477                 RelationFlushRelation(&relation, false);
1478         }
1479 }
1480
1481 #endif
1482
1483
1484 void
1485 RelationIdInvalidateRelationCacheByAccessMethodId(Oid accessMethodId)
1486 {
1487 #if 0
1488
1489         /*
1490          * 25 aug 1992:  mao commented out the ht walk below.  it should be
1491          * doing the right thing, in theory, but flushing reldescs for index
1492          * relations apparently doesn't work.  we want to cut 4.0.1, and i
1493          * don't want to introduce new bugs.  this code never executed before,
1494          * so i'm turning it off for now.  after the release is cut, i'll fix
1495          * this up.
1496          */
1497
1498         HashTableWalk(RelationNameCache, (HashtFunc) RelationFlushIndexes,
1499                                   accessMethodId);
1500 #else
1501         return;
1502 #endif
1503 }
1504
1505 /*
1506  * RelationCacheInvalidate
1507  *
1508  *       Will blow away either all the cached relation descriptors or
1509  *       those that have a zero reference count.
1510  *
1511  */
1512 void
1513 RelationCacheInvalidate(bool onlyFlushReferenceCountZero)
1514 {
1515         HashTableWalk(RelationNameCache, (HashtFunc) RelationFlushRelation,
1516                                   onlyFlushReferenceCountZero);
1517
1518         /*
1519          * nailed-in reldescs will still be in the cache... 7 hardwired heaps
1520          * + 3 hardwired indices == 10 total.
1521          */
1522         if (!onlyFlushReferenceCountZero)
1523         {
1524                 Assert(RelationNameCache->hctl->nkeys == 10);
1525                 Assert(RelationIdCache->hctl->nkeys == 10);
1526         }
1527 }
1528
1529
1530 /* --------------------------------
1531  *              RelationRegisterRelation -
1532  *                 register the Relation descriptor of a newly created relation
1533  *                 with the relation descriptor Cache.
1534  * --------------------------------
1535  */
1536 void
1537 RelationRegisterRelation(Relation relation)
1538 {
1539         MemoryContext oldcxt;
1540
1541         oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
1542
1543         if (oldcxt != (MemoryContext) CacheCxt)
1544                 elog(NOIND, "RelationRegisterRelation: WARNING: Context != CacheCxt");
1545
1546         RelationCacheInsert(relation);
1547
1548         RelationInitLockInfo(relation);
1549
1550         /*
1551          * we've just created the relation. It is invisible to anyone else
1552          * before the transaction is committed. Setting rd_islocal allows us
1553          * to use the local buffer manager for select/insert/etc before the
1554          * end of transaction. (We also need to keep track of relations
1555          * created during a transaction and does the necessary clean up at the
1556          * end of the transaction.)                             - ay 3/95
1557          */
1558         relation->rd_islocal = TRUE;
1559         newlyCreatedRelns = lcons(relation, newlyCreatedRelns);
1560
1561         MemoryContextSwitchTo(oldcxt);
1562 }
1563
1564 /*
1565  * RelationPurgeLocalRelation -
1566  *        find all the Relation descriptors marked rd_islocal and reset them.
1567  *        This should be called at the end of a transaction (commit/abort) when
1568  *        the "local" relations will become visible to others and the multi-user
1569  *        buffer pool should be used.
1570  */
1571 void
1572 RelationPurgeLocalRelation(bool xactCommitted)
1573 {
1574         MemoryContext oldcxt;
1575
1576         if (newlyCreatedRelns == NULL)
1577                 return;
1578
1579         oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
1580
1581         while (newlyCreatedRelns)
1582         {
1583                 List       *l = newlyCreatedRelns;
1584                 Relation        reln = lfirst(l);
1585
1586                 Assert(reln != NULL && reln->rd_islocal);
1587
1588                 if (!xactCommitted)
1589                 {
1590
1591                         /*
1592                          * remove the file if we abort. This is so that files for
1593                          * tables created inside a transaction block get removed.
1594                          */
1595                         if (reln->rd_istemp)
1596                         {
1597                                 if (!(reln->rd_tmpunlinked))
1598                                 {
1599                                         smgrunlink(reln->rd_rel->relsmgr, reln);
1600                                         reln->rd_tmpunlinked = TRUE;
1601                                 }
1602                         }
1603                         else
1604                         {
1605                                 smgrunlink(reln->rd_rel->relsmgr, reln);
1606                         }
1607                 }
1608                 else if (!IsBootstrapProcessingMode() && !(reln->rd_istemp))
1609
1610                         /*
1611                          * RelationFlushRelation () below will flush relation
1612                          * information from the cache. We must call smgrclose to flush
1613                          * relation information from SMGR & FMGR, too. We assume that
1614                          * for temp relations smgrunlink is already called by
1615                          * heap_destroyr and we skip smgrclose for them.                  -
1616                          * vadim 05/22/97
1617                          */
1618                         smgrclose(reln->rd_rel->relsmgr, reln);
1619
1620                 reln->rd_islocal = FALSE;
1621
1622                 if (!IsBootstrapProcessingMode())
1623                         RelationFlushRelation(&reln, FALSE);
1624
1625                 newlyCreatedRelns = lnext(newlyCreatedRelns);
1626                 pfree(l);
1627         }
1628
1629         MemoryContextSwitchTo(oldcxt);
1630 }
1631
1632 /* --------------------------------
1633  *              RelationInitialize
1634  *
1635  *              This initializes the relation descriptor cache.
1636  * --------------------------------
1637  */
1638
1639 #define INITRELCACHESIZE                400
1640
1641 void
1642 RelationInitialize(void)
1643 {
1644         MemoryContext oldcxt;
1645         HASHCTL         ctl;
1646
1647         /* ----------------
1648          *      switch to cache memory context
1649          * ----------------
1650          */
1651         if (!CacheCxt)
1652                 CacheCxt = CreateGlobalMemory("Cache");
1653
1654         oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
1655
1656         /* ----------------
1657          *      create global caches
1658          * ----------------
1659          */
1660         MemSet(&ctl, 0, (int) sizeof(ctl));
1661         ctl.keysize = sizeof(NameData);
1662         ctl.datasize = sizeof(Relation);
1663         RelationNameCache = hash_create(INITRELCACHESIZE, &ctl, HASH_ELEM);
1664
1665         ctl.keysize = sizeof(Oid);
1666         ctl.hash = tag_hash;
1667         RelationIdCache = hash_create(INITRELCACHESIZE, &ctl,
1668                                                                   HASH_ELEM | HASH_FUNCTION);
1669
1670         /* ----------------
1671          *      initialize the cache with pre-made relation descriptors
1672          *      for some of the more important system relations.  These
1673          *      relations should always be in the cache.
1674          * ----------------
1675          */
1676         formrdesc(RelationRelationName, Natts_pg_class, Desc_pg_class);
1677         formrdesc(AttributeRelationName, Natts_pg_attribute, Desc_pg_attribute);
1678         formrdesc(ProcedureRelationName, Natts_pg_proc, Desc_pg_proc);
1679         formrdesc(TypeRelationName, Natts_pg_type, Desc_pg_type);
1680         formrdesc(VariableRelationName, Natts_pg_variable, Desc_pg_variable);
1681         formrdesc(LogRelationName, Natts_pg_log, Desc_pg_log);
1682         formrdesc(TimeRelationName, Natts_pg_time, Desc_pg_time);
1683
1684         /*
1685          * If this isn't initdb time, then we want to initialize some index
1686          * relation descriptors, as well.  The descriptors are for
1687          * pg_attnumind (to make building relation descriptors fast) and
1688          * possibly others, as they're added.
1689          */
1690
1691         if (!IsBootstrapProcessingMode())
1692                 init_irels();
1693
1694         MemoryContextSwitchTo(oldcxt);
1695 }
1696
1697 static void
1698 AttrDefaultFetch(Relation relation)
1699 {
1700         AttrDefault *attrdef = relation->rd_att->constr->defval;
1701         int                     ndef = relation->rd_att->constr->num_defval;
1702         Relation        adrel;
1703         Relation        irel;
1704         ScanKeyData skey;
1705         HeapTuple       tuple;
1706         Form_pg_attrdef adform;
1707         IndexScanDesc sd;
1708         RetrieveIndexResult indexRes;
1709         Buffer          buffer;
1710         ItemPointer iptr;
1711         struct varlena *val;
1712         bool            isnull;
1713         int                     found;
1714         int                     i;
1715
1716         ScanKeyEntryInitialize(&skey,
1717                                                    (bits16) 0x0,
1718                                                    (AttrNumber) 1,
1719                                                    (RegProcedure) ObjectIdEqualRegProcedure,
1720                                                    ObjectIdGetDatum(relation->rd_id));
1721
1722         adrel = heap_openr(AttrDefaultRelationName);
1723         irel = index_openr(AttrDefaultIndex);
1724         sd = index_beginscan(irel, false, 1, &skey);
1725         tuple = (HeapTuple) NULL;
1726
1727         for (found = 0;;)
1728         {
1729                 indexRes = index_getnext(sd, ForwardScanDirection);
1730                 if (!indexRes)
1731                         break;
1732
1733                 iptr = &indexRes->heap_iptr;
1734                 tuple = heap_fetch(adrel, NowTimeQual, iptr, &buffer);
1735                 pfree(indexRes);
1736                 if (!HeapTupleIsValid(tuple))
1737                         continue;
1738                 found++;
1739                 adform = (Form_pg_attrdef) GETSTRUCT(tuple);
1740                 for (i = 0; i < ndef; i++)
1741                 {
1742                         if (adform->adnum != attrdef[i].adnum)
1743                                 continue;
1744                         if (attrdef[i].adsrc != NULL)
1745                                 elog(WARN, "AttrDefaultFetch: second record found for attr %.*s in rel %.*s",
1746                                          NAMEDATALEN, relation->rd_att->attrs[adform->adnum - 1]->attname.data,
1747                                          NAMEDATALEN, relation->rd_rel->relname.data);
1748
1749                         val = (struct varlena *) fastgetattr(tuple,
1750                                                                                                  Anum_pg_attrdef_adbin,
1751                                                                                                  adrel->rd_att, &isnull);
1752                         if (isnull)
1753                                 elog(WARN, "AttrDefaultFetch: adbin IS NULL for attr %.*s in rel %.*s",
1754                                          NAMEDATALEN, relation->rd_att->attrs[adform->adnum - 1]->attname.data,
1755                                          NAMEDATALEN, relation->rd_rel->relname.data);
1756                         attrdef[i].adbin = textout(val);
1757                         val = (struct varlena *) fastgetattr(tuple,
1758                                                                                                  Anum_pg_attrdef_adsrc,
1759                                                                                                  adrel->rd_att, &isnull);
1760                         if (isnull)
1761                                 elog(WARN, "AttrDefaultFetch: adsrc IS NULL for attr %.*s in rel %.*s",
1762                                          NAMEDATALEN, relation->rd_att->attrs[adform->adnum - 1]->attname.data,
1763                                          NAMEDATALEN, relation->rd_rel->relname.data);
1764                         attrdef[i].adsrc = textout(val);
1765                         break;
1766                 }
1767
1768                 if (i >= ndef)
1769                         elog(WARN, "AttrDefaultFetch: unexpected record found for attr %d in rel %.*s",
1770                                  adform->adnum,
1771                                  NAMEDATALEN, relation->rd_rel->relname.data);
1772                 ReleaseBuffer(buffer);
1773         }
1774
1775         if (found < ndef)
1776                 elog(WARN, "AttrDefaultFetch: %d record not found for rel %.*s",
1777                          ndef - found,
1778                          NAMEDATALEN, relation->rd_rel->relname.data);
1779
1780         index_endscan(sd);
1781         pfree(sd);
1782         index_close(irel);
1783         heap_close(adrel);
1784
1785 }
1786
1787 static void
1788 RelCheckFetch(Relation relation)
1789 {
1790         ConstrCheck *check = relation->rd_att->constr->check;
1791         int                     ncheck = relation->rd_att->constr->num_check;
1792         Relation        rcrel;
1793         Relation        irel;
1794         ScanKeyData skey;
1795         HeapTuple       tuple;
1796         IndexScanDesc sd;
1797         RetrieveIndexResult indexRes;
1798         Buffer          buffer;
1799         ItemPointer iptr;
1800         Name            rcname;
1801         struct varlena *val;
1802         bool            isnull;
1803         int                     found;
1804
1805         ScanKeyEntryInitialize(&skey,
1806                                                    (bits16) 0x0,
1807                                                    (AttrNumber) 1,
1808                                                    (RegProcedure) ObjectIdEqualRegProcedure,
1809                                                    ObjectIdGetDatum(relation->rd_id));
1810
1811         rcrel = heap_openr(RelCheckRelationName);
1812         irel = index_openr(RelCheckIndex);
1813         sd = index_beginscan(irel, false, 1, &skey);
1814         tuple = (HeapTuple) NULL;
1815
1816         for (found = 0;;)
1817         {
1818                 indexRes = index_getnext(sd, ForwardScanDirection);
1819                 if (!indexRes)
1820                         break;
1821
1822                 iptr = &indexRes->heap_iptr;
1823                 tuple = heap_fetch(rcrel, NowTimeQual, iptr, &buffer);
1824                 pfree(indexRes);
1825                 if (!HeapTupleIsValid(tuple))
1826                         continue;
1827                 if (found == ncheck)
1828                         elog(WARN, "RelCheckFetch: unexpected record found for rel %.*s",
1829                                  NAMEDATALEN, relation->rd_rel->relname.data);
1830
1831                 rcname = (Name) fastgetattr(tuple,
1832                                                                         Anum_pg_relcheck_rcname,
1833                                                                         rcrel->rd_att, &isnull);
1834                 if (isnull)
1835                         elog(WARN, "RelCheckFetch: rcname IS NULL for rel %.*s",
1836                                  NAMEDATALEN, relation->rd_rel->relname.data);
1837                 check[found].ccname = nameout(rcname);
1838                 val = (struct varlena *) fastgetattr(tuple,
1839                                                                                          Anum_pg_relcheck_rcbin,
1840                                                                                          rcrel->rd_att, &isnull);
1841                 if (isnull)
1842                         elog(WARN, "RelCheckFetch: rcbin IS NULL for rel %.*s",
1843                                  NAMEDATALEN, relation->rd_rel->relname.data);
1844                 check[found].ccbin = textout(val);
1845                 val = (struct varlena *) fastgetattr(tuple,
1846                                                                                          Anum_pg_relcheck_rcsrc,
1847                                                                                          rcrel->rd_att, &isnull);
1848                 if (isnull)
1849                         elog(WARN, "RelCheckFetch: rcsrc IS NULL for rel %.*s",
1850                                  NAMEDATALEN, relation->rd_rel->relname.data);
1851                 check[found].ccsrc = textout(val);
1852                 found++;
1853
1854                 ReleaseBuffer(buffer);
1855         }
1856
1857         if (found < ncheck)
1858                 elog(WARN, "RelCheckFetch: %d record not found for rel %.*s",
1859                          ncheck - found,
1860                          NAMEDATALEN, relation->rd_rel->relname.data);
1861
1862         index_endscan(sd);
1863         pfree(sd);
1864         index_close(irel);
1865         heap_close(rcrel);
1866
1867 }
1868
1869 /*
1870  *      init_irels(), write_irels() -- handle special-case initialization of
1871  *                                                                 index relation descriptors.
1872  *
1873  *              In late 1992, we started regularly having databases with more than
1874  *              a thousand classes in them.  With this number of classes, it became
1875  *              critical to do indexed lookups on the system catalogs.
1876  *
1877  *              Bootstrapping these lookups is very hard.  We want to be able to
1878  *              use an index on pg_attribute, for example, but in order to do so,
1879  *              we must have read pg_attribute for the attributes in the index,
1880  *              which implies that we need to use the index.
1881  *
1882  *              In order to get around the problem, we do the following:
1883  *
1884  *                 +  When the database system is initialized (at initdb time), we
1885  *                        don't use indices on pg_attribute.  We do sequential scans.
1886  *
1887  *                 +  When the backend is started up in normal mode, we load an image
1888  *                        of the appropriate relation descriptors, in internal format,
1889  *                        from an initialization file in the data/base/... directory.
1890  *
1891  *                 +  If the initialization file isn't there, then we create the
1892  *                        relation descriptor using sequential scans and write it to
1893  *                        the initialization file for use by subsequent backends.
1894  *
1895  *              This is complicated and interferes with system changes, but
1896  *              performance is so bad that we're willing to pay the tax.
1897  */
1898
1899 /* pg_attnumind, pg_classnameind, pg_classoidind */
1900 #define Num_indices_bootstrap   3
1901
1902 static void
1903 init_irels(void)
1904 {
1905         Size            len;
1906         int                     nread;
1907         File            fd;
1908         Relation        irel[Num_indices_bootstrap];
1909         Relation        ird;
1910         Form_pg_am      am;
1911         Form_pg_class relform;
1912         IndexStrategy strat;
1913         RegProcedure *support;
1914         int                     i;
1915         int                     relno;
1916
1917         if ((fd = FileNameOpenFile(INIT_FILENAME, O_RDONLY, 0600)) < 0)
1918         {
1919                 write_irels();
1920                 return;
1921         }
1922
1923         FileSeek(fd, 0L, SEEK_SET);
1924
1925         for (relno = 0; relno < Num_indices_bootstrap; relno++)
1926         {
1927                 /* first read the relation descriptor length */
1928                 if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
1929                 {
1930                         write_irels();
1931                         return;
1932                 }
1933
1934                 ird = irel[relno] = (Relation) palloc(len);
1935                 MemSet(ird, 0, len);
1936
1937                 /* then, read the Relation structure */
1938                 if ((nread = FileRead(fd, (char *) ird, len)) != len)
1939                 {
1940                         write_irels();
1941                         return;
1942                 }
1943
1944                 /* the file descriptor is not yet opened */
1945                 ird->rd_fd = -1;
1946
1947                 /* lock info is not initialized */
1948                 ird->lockInfo = (char *) NULL;
1949
1950                 /* next, read the access method tuple form */
1951                 if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
1952                 {
1953                         write_irels();
1954                         return;
1955                 }
1956
1957                 am = (Form_pg_am) palloc(len);
1958                 if ((nread = FileRead(fd, (char *) am, len)) != len)
1959                 {
1960                         write_irels();
1961                         return;
1962                 }
1963
1964                 ird->rd_am = am;
1965
1966                 /* next read the relation tuple form */
1967                 if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
1968                 {
1969                         write_irels();
1970                         return;
1971                 }
1972
1973                 relform = (Form_pg_class) palloc(len);
1974                 if ((nread = FileRead(fd, (char *) relform, len)) != len)
1975                 {
1976                         write_irels();
1977                         return;
1978                 }
1979
1980                 ird->rd_rel = relform;
1981
1982                 /* initialize attribute tuple forms */
1983                 ird->rd_att = CreateTemplateTupleDesc(relform->relnatts);
1984
1985                 /* next read all the attribute tuple form data entries */
1986                 len = ATTRIBUTE_TUPLE_SIZE;
1987                 for (i = 0; i < relform->relnatts; i++)
1988                 {
1989                         if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
1990                         {
1991                                 write_irels();
1992                                 return;
1993                         }
1994
1995                         ird->rd_att->attrs[i] = (AttributeTupleForm) palloc(len);
1996
1997                         if ((nread = FileRead(fd, (char *) ird->rd_att->attrs[i], len)) != len)
1998                         {
1999                                 write_irels();
2000                                 return;
2001                         }
2002                 }
2003
2004                 /* next, read the index strategy map */
2005                 if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
2006                 {
2007                         write_irels();
2008                         return;
2009                 }
2010
2011                 strat = (IndexStrategy) palloc(len);
2012                 if ((nread = FileRead(fd, (char *) strat, len)) != len)
2013                 {
2014                         write_irels();
2015                         return;
2016                 }
2017
2018                 /* oh, for god's sake... */
2019 #define SMD(i)  strat[0].strategyMapData[i].entry[0]
2020
2021                 /* have to reinit the function pointers in the strategy maps */
2022                 for (i = 0; i < am->amstrategies; i++)
2023                         fmgr_info(SMD(i).sk_procedure,
2024                                           &(SMD(i).sk_func), &(SMD(i).sk_nargs));
2025
2026
2027                 /*
2028                  * use a real field called rd_istrat instead of the bogosity of
2029                  * hanging invisible fields off the end of a structure - jolly
2030                  */
2031                 ird->rd_istrat = strat;
2032
2033                 /* finally, read the vector of support procedures */
2034                 if ((nread = FileRead(fd, (char *) &len, sizeof(int))) != sizeof(int))
2035                 {
2036                         write_irels();
2037                         return;
2038                 }
2039
2040                 support = (RegProcedure *) palloc(len);
2041                 if ((nread = FileRead(fd, (char *) support, len)) != len)
2042                 {
2043                         write_irels();
2044                         return;
2045                 }
2046
2047                 /*
2048                  * p += sizeof(IndexStrategy); ((RegProcedure **) p) = support;
2049                  */
2050
2051                 ird->rd_support = support;
2052
2053                 RelationCacheInsert(ird);
2054         }
2055 }
2056
2057 static void
2058 write_irels(void)
2059 {
2060         int                     len;
2061         int                     nwritten;
2062         File            fd;
2063         Relation        irel[Num_indices_bootstrap];
2064         Relation        ird;
2065         Form_pg_am      am;
2066         Form_pg_class relform;
2067         IndexStrategy strat;
2068         RegProcedure *support;
2069         ProcessingMode oldmode;
2070         int                     i;
2071         int                     relno;
2072         RelationBuildDescInfo bi;
2073
2074         fd = FileNameOpenFile(INIT_FILENAME, O_WRONLY | O_CREAT | O_TRUNC, 0600);
2075         if (fd < 0)
2076                 elog(FATAL, "cannot create init file %s", INIT_FILENAME);
2077
2078         FileSeek(fd, 0L, SEEK_SET);
2079
2080         /*
2081          * Build a relation descriptor for pg_attnumind without resort to the
2082          * descriptor cache.  In order to do this, we set ProcessingMode to
2083          * Bootstrap.  The effect of this is to disable indexed relation
2084          * searches -- a necessary step, since we're trying to instantiate the
2085          * index relation descriptors here.
2086          */
2087
2088         oldmode = GetProcessingMode();
2089         SetProcessingMode(BootstrapProcessing);
2090
2091         bi.infotype = INFO_RELNAME;
2092         bi.i.info_name = AttributeNumIndex;
2093         irel[0] = RelationBuildDesc(bi);
2094         irel[0]->rd_isnailed = true;
2095
2096         bi.i.info_name = ClassNameIndex;
2097         irel[1] = RelationBuildDesc(bi);
2098         irel[1]->rd_isnailed = true;
2099
2100         bi.i.info_name = ClassOidIndex;
2101         irel[2] = RelationBuildDesc(bi);
2102         irel[2]->rd_isnailed = true;
2103
2104         SetProcessingMode(oldmode);
2105
2106         /* nail the descriptor in the cache */
2107         for (relno = 0; relno < Num_indices_bootstrap; relno++)
2108         {
2109                 ird = irel[relno];
2110
2111                 /* save the volatile fields in the relation descriptor */
2112                 am = ird->rd_am;
2113                 ird->rd_am = (Form_pg_am) NULL;
2114                 relform = ird->rd_rel;
2115                 ird->rd_rel = (Form_pg_class) NULL;
2116                 strat = ird->rd_istrat;
2117                 support = ird->rd_support;
2118
2119                 /*
2120                  * first write the relation descriptor , excluding strategy and
2121                  * support
2122                  */
2123                 len = sizeof(RelationData);
2124
2125                 /* first, write the relation descriptor length */
2126                 if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2127                         != sizeof(int))
2128                         elog(FATAL, "cannot write init file -- descriptor length");
2129
2130                 /* next, write out the Relation structure */
2131                 if ((nwritten = FileWrite(fd, (char *) ird, len)) != len)
2132                         elog(FATAL, "cannot write init file -- reldesc");
2133
2134                 /* next, write the access method tuple form */
2135                 len = sizeof(FormData_pg_am);
2136                 if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2137                         != sizeof(int))
2138                         elog(FATAL, "cannot write init file -- am tuple form length");
2139
2140                 if ((nwritten = FileWrite(fd, (char *) am, len)) != len)
2141                         elog(FATAL, "cannot write init file -- am tuple form");
2142
2143                 /* next write the relation tuple form */
2144                 len = sizeof(FormData_pg_class);
2145                 if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2146                         != sizeof(int))
2147                         elog(FATAL, "cannot write init file -- relation tuple form length");
2148
2149                 if ((nwritten = FileWrite(fd, (char *) relform, len)) != len)
2150                         elog(FATAL, "cannot write init file -- relation tuple form");
2151
2152                 /* next, do all the attribute tuple form data entries */
2153                 len = ATTRIBUTE_TUPLE_SIZE;
2154                 for (i = 0; i < relform->relnatts; i++)
2155                 {
2156                         if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2157                                 != sizeof(int))
2158                                 elog(FATAL, "cannot write init file -- length of attdesc %d", i);
2159                         if ((nwritten = FileWrite(fd, (char *) ird->rd_att->attrs[i], len))
2160                                 != len)
2161                                 elog(FATAL, "cannot write init file -- attdesc %d", i);
2162                 }
2163
2164                 /* next, write the index strategy map */
2165                 len = AttributeNumberGetIndexStrategySize(relform->relnatts,
2166                                                                                                   am->amstrategies);
2167                 if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2168                         != sizeof(int))
2169                         elog(FATAL, "cannot write init file -- strategy map length");
2170
2171                 if ((nwritten = FileWrite(fd, (char *) strat, len)) != len)
2172                         elog(FATAL, "cannot write init file -- strategy map");
2173
2174                 /* finally, write the vector of support procedures */
2175                 len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
2176                 if ((nwritten = FileWrite(fd, (char *) &len, sizeof(int)))
2177                         != sizeof(int))
2178                         elog(FATAL, "cannot write init file -- support vector length");
2179
2180                 if ((nwritten = FileWrite(fd, (char *) support, len)) != len)
2181                         elog(FATAL, "cannot write init file -- support vector");
2182
2183                 /* restore volatile fields */
2184                 ird->rd_am = am;
2185                 ird->rd_rel = relform;
2186         }
2187
2188         FileClose(fd);
2189 }