]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/cache/relcache.c
Fix an oversight I made in a cleanup patch over a year ago:
[postgresql] / src / backend / utils / cache / relcache.c
index 28431b5c5cc383cd51cd62844b425b3007f60018..9a32fd5845e77556076ee63aa39545622fb351bb 100644 (file)
@@ -3,22 +3,20 @@
  * relcache.c
  *       POSTGRES relation descriptor cache code
  *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.178 2002/11/10 07:25:14 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.270 2008/04/01 00:48:33 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
- *             RelationCacheInitialize                 - initialize relcache
+ *             RelationCacheInitialize                 - initialize relcache (to empty)
  *             RelationCacheInitializePhase2   - finish initializing relcache
  *             RelationIdGetRelation                   - get a reldesc by relation id
- *             RelationSysNameGetRelation              - get a reldesc by system rel name
- *             RelationIdCacheGetRelation              - get a cached reldesc by relid
  *             RelationClose                                   - close an open relation
  *
  * NOTES
  */
 #include "postgres.h"
 
-#include <errno.h>
 #include <sys/file.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include "access/genam.h"
 #include "access/heapam.h"
-#include "access/istrat.h"
+#include "access/reloptions.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
-#include "catalog/catname.h"
+#include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
-#include "catalog/pg_attribute.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_constraint.h"
-#include "catalog/pg_index.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
+#include "optimizer/planmain.h"
+#include "optimizer/prep.h"
+#include "optimizer/var.h"
+#include "rewrite/rewriteDefine.h"
+#include "storage/fd.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
-#include "utils/catcache.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
-#include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
+#include "utils/resowner.h"
 #include "utils/syscache.h"
+#include "utils/tqual.h"
+#include "utils/typcache.h"
 
 
 /*
@@ -67,6 +72,8 @@
  */
 #define RELCACHE_INIT_FILENAME "pg_internal.init"
 
+#define RELCACHE_INIT_FILEMAGIC                0x573264        /* version ID value */
+
 /*
  *             hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
  */
@@ -74,25 +81,21 @@ static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
+static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
 
 /*
  *             Hash tables that index the relation cache
  *
- *             Relations are looked up two ways, by OID and by name,
- *             thus there are two hash tables for referencing them.
- *
- *             The OID index covers all relcache entries.      The name index
- *             covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
+ *             We used to index the cache by both name and OID, but now there
+ *             is only an index by OID.
  */
-static HTAB *RelationIdCache;
-static HTAB *RelationSysNameCache;
+typedef struct relidcacheent
+{
+       Oid                     reloid;
+       Relation        reldesc;
+} RelIdCacheEnt;
 
-/*
- * Bufmgr uses RelFileNode for lookup. Actually, I would like to do
- * not pass Relation to bufmgr & beyond at all and keep some cache
- * in smgr, but no time to do it right way now.                -- vadim 10/22/2000
- */
-static HTAB *RelationNodeCache;
+static HTAB *RelationIdCache;
 
 /*
  * This flag is false until we have prepared the critical relcache entries
@@ -100,12 +103,6 @@ static HTAB *RelationNodeCache;
  */
 bool           criticalRelcachesBuilt = false;
 
-/*
- * This flag is set if we discover that we need to write a new relcache
- * cache file at the end of startup.
- */
-static bool needNewCacheFile = false;
-
 /*
  * This counter counts relcache inval events received since backend startup
  * (but only for rels that are actually in cache).     Presently, we use it only
@@ -121,103 +118,30 @@ static long relcacheInvalsReceived = 0L;
 static List *initFileRelationIds = NIL;
 
 /*
- *             RelationBuildDescInfo exists so code can be shared
- *             between RelationIdGetRelation() and RelationSysNameGetRelation()
+ * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
  */
-typedef struct RelationBuildDescInfo
-{
-       int                     infotype;               /* lookup by id or by name */
-#define INFO_RELID 1
-#define INFO_RELNAME 2
-       union
-       {
-               Oid                     info_id;        /* relation object id */
-               char       *info_name;  /* system relation name */
-       }                       i;
-} RelationBuildDescInfo;
+static bool need_eoxact_work = false;
 
-typedef struct relidcacheent
-{
-       Oid                     reloid;
-       Relation        reldesc;
-} RelIdCacheEnt;
-
-typedef struct relnamecacheent
-{
-       NameData        relname;
-       Relation        reldesc;
-} RelNameCacheEnt;
-
-typedef struct relnodecacheent
-{
-       RelFileNode relnode;
-       Relation        reldesc;
-} RelNodeCacheEnt;
 
 /*
  *             macros to manipulate the lookup hashtables
  */
 #define RelationCacheInsert(RELATION)  \
 do { \
-       RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; bool found; \
+       RelIdCacheEnt *idhentry; bool found; \
        idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
                                                                                   (void *) &(RELATION->rd_id), \
                                                                                   HASH_ENTER, \
                                                                                   &found); \
-       if (idhentry == NULL) \
-               elog(ERROR, "out of memory for relation descriptor cache"); \
        /* used to give notice if found -- now just keep quiet */ \
        idhentry->reldesc = RELATION; \
-       nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
-                                                                                  (void *) &(RELATION->rd_node), \
-                                                                                  HASH_ENTER, \
-                                                                                  &found); \
-       if (nodentry == NULL) \
-               elog(ERROR, "out of memory for relation descriptor cache"); \
-       /* used to give notice if found -- now just keep quiet */ \
-       nodentry->reldesc = RELATION; \
-       if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
-       { \
-               char *relname = RelationGetRelationName(RELATION); \
-               RelNameCacheEnt *namehentry; \
-               namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                                  relname, \
-                                                                                                  HASH_ENTER, \
-                                                                                                  &found); \
-               if (namehentry == NULL) \
-                       elog(ERROR, "out of memory for relation descriptor cache"); \
-               /* used to give notice if found -- now just keep quiet */ \
-               namehentry->reldesc = RELATION; \
-       } \
 } while(0)
 
 #define RelationIdCacheLookup(ID, RELATION) \
 do { \
        RelIdCacheEnt *hentry; \
        hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                (void *)&(ID), HASH_FIND,NULL); \
-       if (hentry) \
-               RELATION = hentry->reldesc; \
-       else \
-               RELATION = NULL; \
-} while(0)
-
-#define RelationSysNameCacheLookup(NAME, RELATION) \
-do { \
-       RelNameCacheEnt *hentry; \
-       hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                  (void *) (NAME), HASH_FIND,NULL); \
-       if (hentry) \
-               RELATION = hentry->reldesc; \
-       else \
-               RELATION = NULL; \
-} while(0)
-
-#define RelationNodeCacheLookup(NODE, RELATION) \
-do { \
-       RelNodeCacheEnt *hentry; \
-       hentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
-                                                                                  (void *)&(NODE), HASH_FIND,NULL); \
+                                                                                (void *) &(ID), HASH_FIND,NULL); \
        if (hentry) \
                RELATION = hentry->reldesc; \
        else \
@@ -226,32 +150,20 @@ do { \
 
 #define RelationCacheDelete(RELATION) \
 do { \
-       RelIdCacheEnt *idhentry; RelNodeCacheEnt *nodentry; \
+       RelIdCacheEnt *idhentry; \
        idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                  (void *)&(RELATION->rd_id), \
+                                                                                  (void *) &(RELATION->rd_id), \
                                                                                   HASH_REMOVE, NULL); \
        if (idhentry == NULL) \
-               elog(WARNING, "trying to delete a rd_id reldesc that does not exist."); \
-       nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \
-                                                                                  (void *)&(RELATION->rd_node), \
-                                                                                  HASH_REMOVE, NULL); \
-       if (nodentry == NULL) \
-               elog(WARNING, "trying to delete a rd_node reldesc that does not exist."); \
-       if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
-       { \
-               char *relname = RelationGetRelationName(RELATION); \
-               RelNameCacheEnt *namehentry; \
-               namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                                  relname, \
-                                                                                                  HASH_REMOVE, NULL); \
-               if (namehentry == NULL) \
-                       elog(WARNING, "trying to delete a relname reldesc that does not exist."); \
-       } \
+               elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
 } while(0)
 
 
 /*
  * Special cache for opclass-related information
+ *
+ * Note: only default operators and support procs get cached, ie, those with
+ * lefttype = righttype = opcintype.
  */
 typedef struct opclasscacheent
 {
@@ -259,8 +171,9 @@ typedef struct opclasscacheent
        bool            valid;                  /* set TRUE after successful fill-in */
        StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
        StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
+       Oid                     opcfamily;              /* OID of opclass's family */
+       Oid                     opcintype;              /* OID of opclass's declared input type */
        Oid                *operatorOids;       /* strategy operators' OIDs */
-       RegProcedure *operatorProcs;    /* strategy operators' procs */
        RegProcedure *supportProcs; /* support procs */
 } OpClassCacheEnt;
 
@@ -271,30 +184,31 @@ static HTAB *OpClassCache = NULL;
 
 static void RelationClearRelation(Relation relation, bool rebuild);
 
-#ifdef ENABLE_REINDEX_NAILED_RELATIONS
-static void RelationReloadClassinfo(Relation relation);
-#endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
+static void RelationReloadIndexInfo(Relation relation);
 static void RelationFlushRelation(Relation relation);
-static Relation RelationSysNameCacheGetRelation(const char *relationName);
 static bool load_relcache_init_file(void);
 static void write_relcache_init_file(void);
+static void write_item(const void *data, Size len, FILE *fp);
 
-static void formrdesc(const char *relationName, int natts,
-                 FormData_pg_attribute *att);
+static void formrdesc(const char *relationName, Oid relationReltype,
+                 bool hasoids, int natts, FormData_pg_attribute *att);
 
-static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo);
+static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
-static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
-                                          Relation relation);
-static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
-                                 Relation oldrelation);
+static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
+static void RelationBuildTupleDesc(Relation relation);
+static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
+static void RelationInitPhysicalAddr(Relation relation);
+static TupleDesc GetPgClassDescriptor(void);
+static TupleDesc GetPgIndexDescriptor(void);
 static void AttrDefaultFetch(Relation relation);
 static void CheckConstraintFetch(Relation relation);
 static List *insert_ordered_oid(List *list, Oid datum);
-static void IndexSupportInitialize(Form_pg_index iform,
-                                          IndexStrategy indexStrategy,
+static void IndexSupportInitialize(oidvector *indclass,
                                           Oid *indexOperator,
                                           RegProcedure *indexSupport,
+                                          Oid *opFamily,
+                                          Oid *opcInType,
                                           StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber);
@@ -307,64 +221,38 @@ static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
  *             ScanPgRelation
  *
  *             this is used by RelationBuildDesc to find a pg_class
- *             tuple matching either a relation name or a relation id
- *             as specified in buildinfo.
+ *             tuple matching targetRelId.
  *
  *             NB: the returned tuple has been copied into palloc'd storage
  *             and must eventually be freed with heap_freetuple.
  */
 static HeapTuple
-ScanPgRelation(RelationBuildDescInfo buildinfo)
+ScanPgRelation(Oid targetRelId, bool indexOK)
 {
        HeapTuple       pg_class_tuple;
        Relation        pg_class_desc;
-       const char *indexRelname;
        SysScanDesc pg_class_scan;
-       ScanKeyData key[2];
-       int                     nkeys;
+       ScanKeyData key[1];
 
        /*
         * form a scan key
         */
-       switch (buildinfo.infotype)
-       {
-               case INFO_RELID:
-                       ScanKeyEntryInitialize(&key[0], 0,
-                                                                  ObjectIdAttributeNumber,
-                                                                  F_OIDEQ,
-                                                                  ObjectIdGetDatum(buildinfo.i.info_id));
-                       nkeys = 1;
-                       indexRelname = ClassOidIndex;
-                       break;
-
-               case INFO_RELNAME:
-                       ScanKeyEntryInitialize(&key[0], 0,
-                                                                  Anum_pg_class_relname,
-                                                                  F_NAMEEQ,
-                                                                  NameGetDatum(buildinfo.i.info_name));
-                       ScanKeyEntryInitialize(&key[1], 0,
-                                                                  Anum_pg_class_relnamespace,
-                                                                  F_OIDEQ,
-                                                                ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
-                       nkeys = 2;
-                       indexRelname = ClassNameNspIndex;
-                       break;
-
-               default:
-                       elog(ERROR, "ScanPgRelation: bad buildinfo");
-                       return NULL;            /* keep compiler quiet */
-       }
+       ScanKeyInit(&key[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(targetRelId));
 
        /*
         * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
-        * built the critical relcache entries (this includes initdb and
-        * startup without a pg_internal.init file).
+        * built the critical relcache entries (this includes initdb and startup
+        * without a pg_internal.init file).  The caller can also force a heap
+        * scan by setting indexOK == false.
         */
-       pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
-       pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
-                                                                          criticalRelcachesBuilt,
+       pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
+       pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
+                                                                          indexOK && criticalRelcachesBuilt,
                                                                           SnapshotNow,
-                                                                          nkeys, key);
+                                                                          1, key);
 
        pg_class_tuple = systable_getnext(pg_class_scan);
 
@@ -409,25 +297,27 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        /*
         * clear all fields of reldesc
         */
-       MemSet((char *) relation, 0, sizeof(RelationData));
+       MemSet(relation, 0, sizeof(RelationData));
        relation->rd_targblock = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
-       relation->rd_fd = -1;
+       relation->rd_smgr = NULL;
 
        /*
         * Copy the relation tuple form
         *
-        * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
-        * relacl is NOT stored in the relcache --- there'd be little point in
-        * it, since we don't copy the tuple's nullvalues bitmap and hence
-        * wouldn't know if the value is valid ... bottom line is that relacl
-        * *cannot* be retrieved from the relcache.  Get it from the syscache
-        * if you need it.
+        * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
+        * variable-length fields (relacl, reloptions) are NOT stored in the
+        * relcache --- there'd be little point in it, since we don't copy the
+        * tuple's nulls bitmap and hence wouldn't know if the values are valid.
+        * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
+        * it from the syscache if you need it.  The same goes for the original
+        * form of reloptions (however, we do store the parsed form of reloptions
+        * in rd_options).
         */
        relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
 
-       memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
+       memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
 
        /* initialize relation tuple form */
        relation->rd_rel = relationForm;
@@ -435,12 +325,83 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        /* and allocate attribute tuple form storage */
        relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
                                                                                           relationForm->relhasoids);
+       /* which we mark as a reference-counted tupdesc */
+       relation->rd_att->tdrefcount = 1;
 
        MemoryContextSwitchTo(oldcxt);
 
        return relation;
 }
 
+/*
+ * RelationParseRelOptions
+ *             Convert pg_class.reloptions into pre-parsed rd_options
+ *
+ * tuple is the real pg_class tuple (not rd_rel!) for relation
+ *
+ * Note: rd_rel and (if an index) rd_am must be valid already
+ */
+static void
+RelationParseRelOptions(Relation relation, HeapTuple tuple)
+{
+       Datum           datum;
+       bool            isnull;
+       bytea      *options;
+
+       relation->rd_options = NULL;
+
+       /* Fall out if relkind should not have options */
+       switch (relation->rd_rel->relkind)
+       {
+               case RELKIND_RELATION:
+               case RELKIND_TOASTVALUE:
+               case RELKIND_UNCATALOGED:
+               case RELKIND_INDEX:
+                       break;
+               default:
+                       return;
+       }
+
+       /*
+        * Fetch reloptions from tuple; have to use a hardwired descriptor because
+        * we might not have any other for pg_class yet (consider executing this
+        * code for pg_class itself)
+        */
+       datum = fastgetattr(tuple,
+                                               Anum_pg_class_reloptions,
+                                               GetPgClassDescriptor(),
+                                               &isnull);
+       if (isnull)
+               return;
+
+       /* Parse into appropriate format; don't error out here */
+       switch (relation->rd_rel->relkind)
+       {
+               case RELKIND_RELATION:
+               case RELKIND_TOASTVALUE:
+               case RELKIND_UNCATALOGED:
+                       options = heap_reloptions(relation->rd_rel->relkind, datum,
+                                                                         false);
+                       break;
+               case RELKIND_INDEX:
+                       options = index_reloptions(relation->rd_am->amoptions, datum,
+                                                                          false);
+                       break;
+               default:
+                       Assert(false);          /* can't get here */
+                       options = NULL;         /* keep compiler quiet */
+                       break;
+       }
+
+       /* Copy parsed data into CacheMemoryContext */
+       if (options)
+       {
+               relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
+                                                                                                 VARSIZE(options));
+               memcpy(relation->rd_options, options, VARSIZE(options));
+       }
+}
+
 /*
  *             RelationBuildTupleDesc
  *
@@ -448,8 +409,7 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
  *             the pg_attribute, pg_attrdef & pg_constraint system catalogs.
  */
 static void
-RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
-                                          Relation relation)
+RelationBuildTupleDesc(Relation relation)
 {
        HeapTuple       pg_attribute_tuple;
        Relation        pg_attribute_desc;
@@ -460,7 +420,10 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
        AttrDefault *attrdef = NULL;
        int                     ndef = 0;
 
-       relation->rd_att->tdhasoid = RelationGetForm(relation)->relhasoids;
+       /* copy some fields from pg_class row to rd_att */
+       relation->rd_att->tdtypeid = relation->rd_rel->reltype;
+       relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
+       relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
 
        constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
                                                                                                sizeof(TupleConstr));
@@ -468,26 +431,26 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
 
        /*
         * Form a scan key that selects only user attributes (attnum > 0).
-        * (Eliminating system attribute rows at the index level is lots
-        * faster than fetching them.)
+        * (Eliminating system attribute rows at the index level is lots faster
+        * than fetching them.)
         */
-       ScanKeyEntryInitialize(&skey[0], 0,
-                                                  Anum_pg_attribute_attrelid,
-                                                  F_OIDEQ,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)));
-       ScanKeyEntryInitialize(&skey[1], 0,
-                                                  Anum_pg_attribute_attnum,
-                                                  F_INT2GT,
-                                                  Int16GetDatum(0));
+       ScanKeyInit(&skey[0],
+                               Anum_pg_attribute_attrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(relation)));
+       ScanKeyInit(&skey[1],
+                               Anum_pg_attribute_attnum,
+                               BTGreaterStrategyNumber, F_INT2GT,
+                               Int16GetDatum(0));
 
        /*
-        * Open pg_attribute and begin a scan.  Force heap scan if we haven't
-        * yet built the critical relcache entries (this includes initdb and
-        * startup without a pg_internal.init file).
+        * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
+        * built the critical relcache entries (this includes initdb and startup
+        * without a pg_internal.init file).
         */
-       pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
+       pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
        pg_attribute_scan = systable_beginscan(pg_attribute_desc,
-                                                                                  AttributeRelidNumIndex,
+                                                                                  AttributeRelidNumIndexId,
                                                                                   criticalRelcachesBuilt,
                                                                                   SnapshotNow,
                                                                                   2, skey);
@@ -505,15 +468,11 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
 
                if (attp->attnum <= 0 ||
                        attp->attnum > relation->rd_rel->relnatts)
-                       elog(ERROR, "Bogus attribute number %d for %s",
+                       elog(ERROR, "invalid attribute number %d for %s",
                                 attp->attnum, RelationGetRelationName(relation));
 
-               relation->rd_att->attrs[attp->attnum - 1] =
-                       (Form_pg_attribute) MemoryContextAlloc(CacheMemoryContext,
-                                                                                                  ATTRIBUTE_TUPLE_SIZE);
-
-               memcpy((char *) (relation->rd_att->attrs[attp->attnum - 1]),
-                          (char *) attp,
+               memcpy(relation->rd_att->attrs[attp->attnum - 1],
+                          attp,
                           ATTRIBUTE_TUPLE_SIZE);
 
                /* Update constraint/default info */
@@ -523,14 +482,10 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
                if (attp->atthasdef)
                {
                        if (attrdef == NULL)
-                       {
                                attrdef = (AttrDefault *)
-                                       MemoryContextAlloc(CacheMemoryContext,
-                                                                          relation->rd_rel->relnatts *
-                                                                          sizeof(AttrDefault));
-                               MemSet(attrdef, 0,
-                                          relation->rd_rel->relnatts * sizeof(AttrDefault));
-                       }
+                                       MemoryContextAllocZero(CacheMemoryContext,
+                                                                                  relation->rd_rel->relnatts *
+                                                                                  sizeof(AttrDefault));
                        attrdef[ndef].adnum = attp->attnum;
                        attrdef[ndef].adbin = NULL;
                        ndef++;
@@ -566,11 +521,11 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
 
        /*
         * However, we can easily set the attcacheoff value for the first
-        * attribute: it must be zero.  This eliminates the need for special
-        * cases for attnum=1 that used to exist in fastgetattr() and
-        * index_getattr().
+        * attribute: it must be zero.  This eliminates the need for special cases
+        * for attnum=1 that used to exist in fastgetattr() and index_getattr().
         */
-       relation->rd_att->attrs[0]->attcacheoff = 0;
+       if (relation->rd_rel->relnatts > 0)
+               relation->rd_att->attrs[0]->attcacheoff = 0;
 
        /*
         * Set up constraint/default info
@@ -596,9 +551,8 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
                {
                        constr->num_check = relation->rd_rel->relchecks;
                        constr->check = (ConstrCheck *)
-                               MemoryContextAlloc(CacheMemoryContext,
-                                                               constr->num_check * sizeof(ConstrCheck));
-                       MemSet(constr->check, 0, constr->num_check * sizeof(ConstrCheck));
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                       constr->num_check * sizeof(ConstrCheck));
                        CheckConstraintFetch(relation);
                }
                else
@@ -642,19 +596,19 @@ RelationBuildRuleLock(Relation relation)
        int                     maxlocks;
 
        /*
-        * Make the private context.  Parameters are set on the assumption
-        * that it'll probably not contain much data.
+        * Make the private context.  Parameters are set on the assumption that
+        * it'll probably not contain much data.
         */
        rulescxt = AllocSetContextCreate(CacheMemoryContext,
                                                                         RelationGetRelationName(relation),
-                                                                        0, /* minsize */
-                                                                        1024,          /* initsize */
-                                                                        1024);         /* maxsize */
+                                                                        ALLOCSET_SMALL_MINSIZE,
+                                                                        ALLOCSET_SMALL_INITSIZE,
+                                                                        ALLOCSET_SMALL_MAXSIZE);
        relation->rd_rulescxt = rulescxt;
 
        /*
-        * allocate an array to hold the rewrite rules (the array is extended
-        * if necessary)
+        * allocate an array to hold the rewrite rules (the array is extended if
+        * necessary)
         */
        maxlocks = 4;
        rules = (RewriteRule **)
@@ -664,23 +618,23 @@ RelationBuildRuleLock(Relation relation)
        /*
         * form a scan key
         */
-       ScanKeyEntryInitialize(&key, 0,
-                                                  Anum_pg_rewrite_ev_class,
-                                                  F_OIDEQ,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)));
+       ScanKeyInit(&key,
+                               Anum_pg_rewrite_ev_class,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(relation)));
 
        /*
         * open pg_rewrite and begin a scan
         *
-        * Note: since we scan the rules using RewriteRelRulenameIndex, we will
+        * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
         * be reading the rules in name order, except possibly during
-        * emergency-recovery operations (ie, IsIgnoringSystemIndexes). This
-        * in turn ensures that rules will be fired in name order.
+        * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
+        * ensures that rules will be fired in name order.
         */
-       rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
+       rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
        rewrite_tupdesc = RelationGetDescr(rewrite_desc);
        rewrite_scan = systable_beginscan(rewrite_desc,
-                                                                         RewriteRelRulenameIndex,
+                                                                         RewriteRelRulenameIndexId,
                                                                          true, SnapshotNow,
                                                                          1, &key);
 
@@ -688,10 +642,8 @@ RelationBuildRuleLock(Relation relation)
        {
                Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
                bool            isnull;
-               Datum           ruleaction;
-               Datum           rule_evqual;
-               char       *ruleaction_str;
-               char       *rule_evqual_str;
+               Datum           rule_datum;
+               char       *rule_str;
                RewriteRule *rule;
 
                rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
@@ -701,33 +653,52 @@ RelationBuildRuleLock(Relation relation)
 
                rule->event = rewrite_form->ev_type - '0';
                rule->attrno = rewrite_form->ev_attr;
+               rule->enabled = rewrite_form->ev_enabled;
                rule->isInstead = rewrite_form->is_instead;
 
-               /* Must use heap_getattr to fetch ev_qual and ev_action */
-
-               ruleaction = heap_getattr(rewrite_tuple,
+               /*
+                * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
+                * rule strings are often large enough to be toasted.  To avoid
+                * leaking memory in the caller's context, do the detoasting here so
+                * we can free the detoasted version.
+                */
+               rule_datum = heap_getattr(rewrite_tuple,
                                                                  Anum_pg_rewrite_ev_action,
                                                                  rewrite_tupdesc,
                                                                  &isnull);
                Assert(!isnull);
-               ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                        ruleaction));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
-               rule->actions = (List *) stringToNode(ruleaction_str);
+               rule->actions = (List *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
-               pfree(ruleaction_str);
+               pfree(rule_str);
 
-               rule_evqual = heap_getattr(rewrite_tuple,
-                                                                  Anum_pg_rewrite_ev_qual,
-                                                                  rewrite_tupdesc,
-                                                                  &isnull);
+               rule_datum = heap_getattr(rewrite_tuple,
+                                                                 Anum_pg_rewrite_ev_qual,
+                                                                 rewrite_tupdesc,
+                                                                 &isnull);
                Assert(!isnull);
-               rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                  rule_evqual));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
-               rule->qual = (Node *) stringToNode(rule_evqual_str);
+               rule->qual = (Node *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
-               pfree(rule_evqual_str);
+               pfree(rule_str);
+
+               /*
+                * We want the rule's table references to be checked as though by the
+                * table owner, not the user referencing the rule.      Therefore, scan
+                * through the rule's actions and set the checkAsUser field on all
+                * rtable entries.      We have to look at the qual as well, in case it
+                * contains sublinks.
+                *
+                * The reason for doing this when the rule is loaded, rather than when
+                * it is stored, is that otherwise ALTER TABLE OWNER would have to
+                * grovel through stored rules to update checkAsUser fields. Scanning
+                * the rule tree during load is relatively cheap (compared to
+                * constructing it in the first place), so we do it here.
+                */
+               setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
+               setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
 
                if (numlocks >= maxlocks)
                {
@@ -768,8 +739,8 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
 
        /*
         * As of 7.3 we assume the rule ordering is repeatable, because
-        * RelationBuildRuleLock should read 'em in a consistent order.  So
-        * just compare corresponding slots.
+        * RelationBuildRuleLock should read 'em in a consistent order.  So just
+        * compare corresponding slots.
         */
        if (rlock1 != NULL)
        {
@@ -809,11 +780,14 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
  *             recycling the given old relation object.  The latter case
  *             supports rebuilding a relcache entry without invalidating
  *             pointers to it.
+ *
+ *             Returns NULL if no pg_class row could be found for the given relid
+ *             (suggesting we are trying to access a just-deleted relation).
+ *             Any other error is reported via elog.
  * --------------------------------
  */
 static Relation
-RelationBuildDesc(RelationBuildDescInfo buildinfo,
-                                 Relation oldrelation)
+RelationBuildDesc(Oid targetRelId, Relation oldrelation)
 {
        Relation        relation;
        Oid                     relid;
@@ -824,7 +798,7 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        /*
         * find the tuple in pg_class corresponding to the given relation id
         */
-       pg_class_tuple = ScanPgRelation(buildinfo);
+       pg_class_tuple = ScanPgRelation(targetRelId, true);
 
        /*
         * if no such tuple exists, return NULL
@@ -839,40 +813,31 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
 
        /*
-        * allocate storage for the relation descriptor, and copy
-        * pg_class_tuple to relation->rd_rel.
+        * allocate storage for the relation descriptor, and copy pg_class_tuple
+        * to relation->rd_rel.
         */
        relation = AllocateRelationDesc(oldrelation, relp);
 
-       /*
-        * now we can free the memory allocated for pg_class_tuple
-        */
-       heap_freetuple(pg_class_tuple);
-
        /*
         * initialize the relation's relation id (relation->rd_id)
         */
        RelationGetRelid(relation) = relid;
 
        /*
-        * initialize relation->rd_refcnt
-        */
-       RelationSetReferenceCount(relation, 1);
-
-       /*
-        * normal relations are not nailed into the cache; nor can a
-        * pre-existing relation be new.  It could be temp though.      (Actually,
-        * it could be new too, but it's okay to forget that fact if forced to
-        * flush the entry.)
+        * normal relations are not nailed into the cache; nor can a pre-existing
+        * relation be new.  It could be temp though.  (Actually, it could be new
+        * too, but it's okay to forget that fact if forced to flush the entry.)
         */
+       relation->rd_refcnt = 0;
        relation->rd_isnailed = false;
-       relation->rd_isnew = false;
-       relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
+       relation->rd_createSubid = InvalidSubTransactionId;
+       relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+       relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
 
        /*
         * initialize the tuple descriptor (relation->rd_att).
         */
-       RelationBuildTupleDesc(buildinfo, relation);
+       RelationBuildTupleDesc(relation);
 
        /*
         * Fetch rules and triggers that affect this relation
@@ -896,19 +861,26 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        if (OidIsValid(relation->rd_rel->relam))
                RelationInitIndexAccessInfo(relation);
 
+       /* extract reloptions if any */
+       RelationParseRelOptions(relation, pg_class_tuple);
+
        /*
         * initialize the relation lock manager information
         */
        RelationInitLockInfo(relation);         /* see lmgr.c */
 
-       if (relation->rd_rel->relisshared)
-               relation->rd_node.tblNode = InvalidOid;
-       else
-               relation->rd_node.tblNode = MyDatabaseId;
-       relation->rd_node.relNode = relation->rd_rel->relfilenode;
+       /*
+        * initialize physical addressing information for the relation
+        */
+       RelationInitPhysicalAddr(relation);
 
        /* make sure relation is marked as having no open file yet */
-       relation->rd_fd = -1;
+       relation->rd_smgr = NULL;
+
+       /*
+        * now we can free the memory allocated for pg_class_tuple
+        */
+       heap_freetuple(pg_class_tuple);
 
        /*
         * Insert newly created relation into relcache hash tables.
@@ -917,20 +889,29 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        RelationCacheInsert(relation);
        MemoryContextSwitchTo(oldcxt);
 
-       /*
-        * If it's a temp rel, RelationGetNumberOfBlocks will assume that
-        * rd_nblocks is correct.  Must forcibly update the block count when
-        * creating the relcache entry.  But if we are doing a rebuild, don't
-        * do this yet; leave it to RelationClearRelation to do at the end.
-        * (Otherwise, an elog in RelationUpdateNumberOfBlocks would leave us
-        * with inconsistent relcache state.)
-        */
-       if (relation->rd_istemp && oldrelation == NULL)
-               RelationUpdateNumberOfBlocks(relation);
+       /* It's fully valid */
+       relation->rd_isvalid = true;
 
        return relation;
 }
 
+/*
+ * Initialize the physical addressing info (RelFileNode) for a relcache entry
+ */
+static void
+RelationInitPhysicalAddr(Relation relation)
+{
+       if (relation->rd_rel->reltablespace)
+               relation->rd_node.spcNode = relation->rd_rel->reltablespace;
+       else
+               relation->rd_node.spcNode = MyDatabaseTableSpace;
+       if (relation->rd_rel->relisshared)
+               relation->rd_node.dbNode = InvalidOid;
+       else
+               relation->rd_node.dbNode = MyDatabaseId;
+       relation->rd_node.relNode = relation->rd_rel->relfilenode;
+}
+
 /*
  * Initialize index-access-method support data for an index relation
  */
@@ -938,33 +919,34 @@ void
 RelationInitIndexAccessInfo(Relation relation)
 {
        HeapTuple       tuple;
-       Size            iformsize;
-       Form_pg_index iform;
        Form_pg_am      aform;
+       Datum           indclassDatum;
+       Datum           indoptionDatum;
+       bool            isnull;
+       oidvector  *indclass;
+       int2vector *indoption;
        MemoryContext indexcxt;
-       IndexStrategy strategy;
-       Oid                *operator;
-       RegProcedure *support;
-       FmgrInfo   *supportinfo;
+       MemoryContext oldcontext;
        int                     natts;
        uint16          amstrategies;
        uint16          amsupport;
 
        /*
-        * Make a copy of the pg_index entry for the index.  Note that this is
-        * a variable-length tuple.
+        * Make a copy of the pg_index entry for the index.  Since pg_index
+        * contains variable-length and possibly-null fields, we have to do this
+        * honestly rather than just treating it as a Form_pg_index struct.
         */
        tuple = SearchSysCache(INDEXRELID,
                                                   ObjectIdGetDatum(RelationGetRelid(relation)),
                                                   0, 0, 0);
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "RelationInitIndexAccessInfo: no pg_index entry for index %u",
+               elog(ERROR, "cache lookup failed for index %u",
                         RelationGetRelid(relation));
-       iformsize = tuple->t_len - tuple->t_data->t_hoff;
-       iform = (Form_pg_index) MemoryContextAlloc(CacheMemoryContext, iformsize);
-       memcpy(iform, GETSTRUCT(tuple), iformsize);
+       oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indextuple = heap_copytuple(tuple);
+       relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
+       MemoryContextSwitchTo(oldcontext);
        ReleaseSysCache(tuple);
-       relation->rd_index = iform;
 
        /*
         * Make a copy of the pg_am entry for the index's access method
@@ -973,7 +955,7 @@ RelationInitIndexAccessInfo(Relation relation)
                                                   ObjectIdGetDatum(relation->rd_rel->relam),
                                                   0, 0, 0);
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "RelationInitIndexAccessInfo: cache lookup failed for AM %u",
+               elog(ERROR, "cache lookup failed for access method %u",
                         relation->rd_rel->relam);
        aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
        memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
@@ -981,83 +963,111 @@ RelationInitIndexAccessInfo(Relation relation)
        relation->rd_am = aform;
 
        natts = relation->rd_rel->relnatts;
+       if (natts != relation->rd_index->indnatts)
+               elog(ERROR, "relnatts disagrees with indnatts for index %u",
+                        RelationGetRelid(relation));
        amstrategies = aform->amstrategies;
        amsupport = aform->amsupport;
 
        /*
-        * Make the private context to hold index access info.  The reason we
-        * need a context, and not just a couple of pallocs, is so that we
-        * won't leak any subsidiary info attached to fmgr lookup records.
+        * Make the private context to hold index access info.  The reason we need
+        * a context, and not just a couple of pallocs, is so that we won't leak
+        * any subsidiary info attached to fmgr lookup records.
         *
         * Context parameters are set on the assumption that it'll probably not
         * contain much data.
         */
        indexcxt = AllocSetContextCreate(CacheMemoryContext,
                                                                         RelationGetRelationName(relation),
-                                                                        0, /* minsize */
-                                                                        512,           /* initsize */
-                                                                        1024);         /* maxsize */
+                                                                        ALLOCSET_SMALL_MINSIZE,
+                                                                        ALLOCSET_SMALL_INITSIZE,
+                                                                        ALLOCSET_SMALL_MAXSIZE);
        relation->rd_indexcxt = indexcxt;
 
        /*
         * Allocate arrays to hold data
         */
+       relation->rd_aminfo = (RelationAmInfo *)
+               MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
+
+       relation->rd_opfamily = (Oid *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+       relation->rd_opcintype = (Oid *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+
        if (amstrategies > 0)
-       {
-               int                     noperators = natts * amstrategies;
-               Size            stratSize;
-
-               stratSize = AttributeNumberGetIndexStrategySize(natts, amstrategies);
-               strategy = (IndexStrategy) MemoryContextAlloc(indexcxt, stratSize);
-               MemSet(strategy, 0, stratSize);
-               operator = (Oid *)
-                       MemoryContextAlloc(indexcxt, noperators * sizeof(Oid));
-               MemSet(operator, 0, noperators * sizeof(Oid));
-       }
+               relation->rd_operator = (Oid *)
+                       MemoryContextAllocZero(indexcxt,
+                                                                  natts * amstrategies * sizeof(Oid));
        else
-       {
-               strategy = NULL;
-               operator = NULL;
-       }
+               relation->rd_operator = NULL;
 
        if (amsupport > 0)
        {
                int                     nsupport = natts * amsupport;
 
-               support = (RegProcedure *)
-                       MemoryContextAlloc(indexcxt, nsupport * sizeof(RegProcedure));
-               MemSet(support, 0, nsupport * sizeof(RegProcedure));
-               supportinfo = (FmgrInfo *)
-                       MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
-               MemSet(supportinfo, 0, nsupport * sizeof(FmgrInfo));
+               relation->rd_support = (RegProcedure *)
+                       MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
+               relation->rd_supportinfo = (FmgrInfo *)
+                       MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
        }
        else
        {
-               support = NULL;
-               supportinfo = NULL;
+               relation->rd_support = NULL;
+               relation->rd_supportinfo = NULL;
        }
 
-       relation->rd_istrat = strategy;
-       relation->rd_operator = operator;
-       relation->rd_support = support;
-       relation->rd_supportinfo = supportinfo;
+       relation->rd_indoption = (int16 *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
+
+       /*
+        * indclass cannot be referenced directly through the C struct, because it
+        * comes after the variable-width indkey field.  Must extract the datum
+        * the hard way...
+        */
+       indclassDatum = fastgetattr(relation->rd_indextuple,
+                                                               Anum_pg_index_indclass,
+                                                               GetPgIndexDescriptor(),
+                                                               &isnull);
+       Assert(!isnull);
+       indclass = (oidvector *) DatumGetPointer(indclassDatum);
 
        /*
-        * Fill the strategy map and the support RegProcedure arrays.
-        * (supportinfo is left as zeroes, and is filled on-the-fly when used)
+        * Fill the operator and support procedure OID arrays, as well as the info
+        * about opfamilies and opclass input types.  (aminfo and supportinfo are
+        * left as zeroes, and are filled on-the-fly when used)
         */
-       IndexSupportInitialize(iform,
-                                                  strategy, operator, support,
+       IndexSupportInitialize(indclass,
+                                                  relation->rd_operator, relation->rd_support,
+                                                  relation->rd_opfamily, relation->rd_opcintype,
                                                   amstrategies, amsupport, natts);
+
+       /*
+        * Similarly extract indoption and copy it to the cache entry
+        */
+       indoptionDatum = fastgetattr(relation->rd_indextuple,
+                                                                Anum_pg_index_indoption,
+                                                                GetPgIndexDescriptor(),
+                                                                &isnull);
+       Assert(!isnull);
+       indoption = (int2vector *) DatumGetPointer(indoptionDatum);
+       memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
+
+       /*
+        * expressions and predicate cache will be filled later
+        */
+       relation->rd_indexprs = NIL;
+       relation->rd_indpred = NIL;
+       relation->rd_amcache = NULL;
 }
 
 /*
  * IndexSupportInitialize
- *             Initializes an index strategy and associated support procedures,
- *             given the index's pg_index tuple.
+ *             Initializes an index's cached opclass information,
+ *             given the index's pg_index.indclass entry.
  *
- * Data is returned into *indexStrategy, *indexOperator, and *indexSupport,
- * all of which are objects allocated by the caller.
+ * Data is returned into *indexOperator, *indexSupport, *opFamily, and
+ * *opcInType, which are arrays allocated by the caller.
  *
  * The caller also passes maxStrategyNumber, maxSupportNumber, and
  * maxAttributeNumber, since these indicate the size of the arrays
@@ -1066,81 +1076,40 @@ RelationInitIndexAccessInfo(Relation relation)
  * access method.
  */
 static void
-IndexSupportInitialize(Form_pg_index iform,
-                                          IndexStrategy indexStrategy,
+IndexSupportInitialize(oidvector *indclass,
                                           Oid *indexOperator,
                                           RegProcedure *indexSupport,
+                                          Oid *opFamily,
+                                          Oid *opcInType,
                                           StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber)
 {
        int                     attIndex;
 
-       maxStrategyNumber = AMStrategies(maxStrategyNumber);
-
-       /*
-        * XXX note that the following assumes the INDEX tuple is well formed
-        * and that the *key and *class are 0 terminated.
-        */
        for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
        {
                OpClassCacheEnt *opcentry;
 
-               if (iform->indkey[attIndex] == InvalidAttrNumber ||
-                       !OidIsValid(iform->indclass[attIndex]))
-                       elog(ERROR, "IndexSupportInitialize: bogus pg_index tuple");
+               if (!OidIsValid(indclass->values[attIndex]))
+                       elog(ERROR, "bogus pg_index tuple");
 
                /* look up the info for this opclass, using a cache */
-               opcentry = LookupOpclassInfo(iform->indclass[attIndex],
+               opcentry = LookupOpclassInfo(indclass->values[attIndex],
                                                                         maxStrategyNumber,
                                                                         maxSupportNumber);
 
-               /* load the strategy information for the index operators */
+               /* copy cached data into relcache entry */
+               opFamily[attIndex] = opcentry->opcfamily;
+               opcInType[attIndex] = opcentry->opcintype;
                if (maxStrategyNumber > 0)
-               {
-                       StrategyMap map;
-                       Oid                *opers;
-                       StrategyNumber strategy;
-
-                       map = IndexStrategyGetStrategyMap(indexStrategy,
-                                                                                         maxStrategyNumber,
-                                                                                         attIndex + 1);
-                       opers = &indexOperator[attIndex * maxStrategyNumber];
-
-                       for (strategy = 0; strategy < maxStrategyNumber; strategy++)
-                       {
-                               ScanKey         mapentry;
-
-                               mapentry = StrategyMapGetScanKeyEntry(map, strategy + 1);
-                               if (RegProcedureIsValid(opcentry->operatorProcs[strategy]))
-                               {
-                                       MemSet(mapentry, 0, sizeof(*mapentry));
-                                       mapentry->sk_flags = 0;
-                                       mapentry->sk_procedure = opcentry->operatorProcs[strategy];
-
-                                       /*
-                                        * Mark mapentry->sk_func invalid, until and unless
-                                        * someone sets it up.
-                                        */
-                                       mapentry->sk_func.fn_oid = InvalidOid;
-                               }
-                               else
-                                       ScanKeyEntrySetIllegal(mapentry);
-                               opers[strategy] = opcentry->operatorOids[strategy];
-                       }
-               }
-
-               /* if support routines exist for this access method, load them */
+                       memcpy(&indexOperator[attIndex * maxStrategyNumber],
+                                  opcentry->operatorOids,
+                                  maxStrategyNumber * sizeof(Oid));
                if (maxSupportNumber > 0)
-               {
-                       RegProcedure *procs;
-                       StrategyNumber support;
-
-                       procs = &indexSupport[attIndex * maxSupportNumber];
-
-                       for (support = 0; support < maxSupportNumber; ++support)
-                               procs[support] = opcentry->supportProcs[support];
-               }
+                       memcpy(&indexSupport[attIndex * maxSupportNumber],
+                                  opcentry->supportProcs,
+                                  maxSupportNumber * sizeof(RegProcedure));
        }
 }
 
@@ -1156,9 +1125,13 @@ IndexSupportInitialize(Form_pg_index iform,
  * numbers is passed in, rather than being looked up, mainly because the
  * caller will have it already.
  *
- * XXX There isn't any provision for flushing the cache.  However, there
- * isn't any provision for flushing relcache entries when opclass info
- * changes, either :-(
+ * Note there is no provision for flushing the cache.  This is OK at the
+ * moment because there is no way to ALTER any interesting properties of an
+ * existing opclass --- all you can do is drop it, which will result in
+ * a useless but harmless dead entry in the cache.  To support altering
+ * opclass membership (not the same as opfamily membership!), we'd need to
+ * be able to flush this cache as well as the contents of relcache entries
+ * for indexes.
  */
 static OpClassCacheEnt *
 LookupOpclassInfo(Oid operatorClassOid,
@@ -1167,11 +1140,9 @@ LookupOpclassInfo(Oid operatorClassOid,
 {
        OpClassCacheEnt *opcentry;
        bool            found;
-       Relation        pg_amop_desc;
-       Relation        pg_amproc_desc;
-       SysScanDesc pg_amop_scan;
-       SysScanDesc pg_amproc_scan;
-       ScanKeyData key;
+       Relation        rel;
+       SysScanDesc scan;
+       ScanKeyData skey[3];
        HeapTuple       htup;
        bool            indexOK;
 
@@ -1186,7 +1157,7 @@ LookupOpclassInfo(Oid operatorClassOid,
                MemSet(&ctl, 0, sizeof(ctl));
                ctl.keysize = sizeof(Oid);
                ctl.entrysize = sizeof(OpClassCacheEnt);
-               ctl.hash = tag_hash;
+               ctl.hash = oid_hash;
                OpClassCache = hash_create("Operator class cache", 64,
                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
        }
@@ -1194,125 +1165,162 @@ LookupOpclassInfo(Oid operatorClassOid,
        opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
                                                                                           (void *) &operatorClassOid,
                                                                                           HASH_ENTER, &found);
-       if (opcentry == NULL)
-               elog(ERROR, "out of memory for operator class cache");
 
-       if (found && opcentry->valid)
+       if (!found)
        {
-               /* Already made an entry for it */
-               Assert(numStrats == opcentry->numStrats);
-               Assert(numSupport == opcentry->numSupport);
-               return opcentry;
-       }
-
-       /* Need to fill in new entry */
-       opcentry->valid = false;        /* until known OK */
-       opcentry->numStrats = numStrats;
-       opcentry->numSupport = numSupport;
+               /* Need to allocate memory for new entry */
+               opcentry->valid = false;        /* until known OK */
+               opcentry->numStrats = numStrats;
+               opcentry->numSupport = numSupport;
+
+               if (numStrats > 0)
+                       opcentry->operatorOids = (Oid *)
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                          numStrats * sizeof(Oid));
+               else
+                       opcentry->operatorOids = NULL;
 
-       if (numStrats > 0)
-       {
-               opcentry->operatorOids = (Oid *)
-                       MemoryContextAlloc(CacheMemoryContext,
-                                                          numStrats * sizeof(Oid));
-               MemSet(opcentry->operatorOids, 0, numStrats * sizeof(Oid));
-               opcentry->operatorProcs = (RegProcedure *)
-                       MemoryContextAlloc(CacheMemoryContext,
-                                                          numStrats * sizeof(RegProcedure));
-               MemSet(opcentry->operatorProcs, 0, numStrats * sizeof(RegProcedure));
+               if (numSupport > 0)
+                       opcentry->supportProcs = (RegProcedure *)
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                          numSupport * sizeof(RegProcedure));
+               else
+                       opcentry->supportProcs = NULL;
        }
        else
        {
-               opcentry->operatorOids = NULL;
-               opcentry->operatorProcs = NULL;
+               Assert(numStrats == opcentry->numStrats);
+               Assert(numSupport == opcentry->numSupport);
        }
 
-       if (numSupport > 0)
-       {
-               opcentry->supportProcs = (RegProcedure *)
-                       MemoryContextAlloc(CacheMemoryContext,
-                                                          numSupport * sizeof(RegProcedure));
-               MemSet(opcentry->supportProcs, 0, numSupport * sizeof(RegProcedure));
-       }
-       else
-               opcentry->supportProcs = NULL;
+       /*
+        * When testing for cache-flush hazards, we intentionally disable the
+        * operator class cache and force reloading of the info on each call.
+        * This is helpful because we want to test the case where a cache flush
+        * occurs while we are loading the info, and it's very hard to provoke
+        * that if this happens only once per opclass per backend.
+        */
+#if defined(CLOBBER_CACHE_ALWAYS)
+       opcentry->valid = false;
+#endif
+
+       if (opcentry->valid)
+               return opcentry;
 
        /*
-        * To avoid infinite recursion during startup, force a heap scan if
-        * we're looking up info for the opclasses used by the indexes we
-        * would like to reference here.
+        * Need to fill in new entry.
+        *
+        * To avoid infinite recursion during startup, force heap scans if we're
+        * looking up info for the opclasses used by the indexes we would like to
+        * reference here.
         */
        indexOK = criticalRelcachesBuilt ||
                (operatorClassOid != OID_BTREE_OPS_OID &&
                 operatorClassOid != INT2_BTREE_OPS_OID);
 
        /*
-        * Scan pg_amop to obtain operators for the opclass
+        * We have to fetch the pg_opclass row to determine its opfamily and
+        * opcintype, which are needed to look up the operators and functions.
+        * It'd be convenient to use the syscache here, but that probably doesn't
+        * work while bootstrapping.
+        */
+       ScanKeyInit(&skey[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(operatorClassOid));
+       rel = heap_open(OperatorClassRelationId, AccessShareLock);
+       scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
+                                                         SnapshotNow, 1, skey);
+
+       if (HeapTupleIsValid(htup = systable_getnext(scan)))
+       {
+               Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
+
+               opcentry->opcfamily = opclassform->opcfamily;
+               opcentry->opcintype = opclassform->opcintype;
+       }
+       else
+               elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
+
+       systable_endscan(scan);
+       heap_close(rel, AccessShareLock);
+
+
+       /*
+        * Scan pg_amop to obtain operators for the opclass.  We only fetch the
+        * default ones (those with lefttype = righttype = opcintype).
         */
        if (numStrats > 0)
        {
-               ScanKeyEntryInitialize(&key, 0,
-                                                          Anum_pg_amop_amopclaid,
-                                                          F_OIDEQ,
-                                                          ObjectIdGetDatum(operatorClassOid));
-               pg_amop_desc = heap_openr(AccessMethodOperatorRelationName,
-                                                                 AccessShareLock);
-               pg_amop_scan = systable_beginscan(pg_amop_desc,
-                                                                                 AccessMethodStrategyIndex,
-                                                                                 indexOK,
-                                                                                 SnapshotNow,
-                                                                                 1, &key);
-
-               while (HeapTupleIsValid(htup = systable_getnext(pg_amop_scan)))
+               ScanKeyInit(&skey[0],
+                                       Anum_pg_amop_amopfamily,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcfamily));
+               ScanKeyInit(&skey[1],
+                                       Anum_pg_amop_amoplefttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               ScanKeyInit(&skey[2],
+                                       Anum_pg_amop_amoprighttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
+               scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
+                                                                 SnapshotNow, 3, skey);
+
+               while (HeapTupleIsValid(htup = systable_getnext(scan)))
                {
                        Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
 
                        if (amopform->amopstrategy <= 0 ||
                                (StrategyNumber) amopform->amopstrategy > numStrats)
-                               elog(ERROR, "Bogus amopstrategy number %d for opclass %u",
+                               elog(ERROR, "invalid amopstrategy number %d for opclass %u",
                                         amopform->amopstrategy, operatorClassOid);
                        opcentry->operatorOids[amopform->amopstrategy - 1] =
                                amopform->amopopr;
-                       opcentry->operatorProcs[amopform->amopstrategy - 1] =
-                               get_opcode(amopform->amopopr);
                }
 
-               systable_endscan(pg_amop_scan);
-               heap_close(pg_amop_desc, AccessShareLock);
+               systable_endscan(scan);
+               heap_close(rel, AccessShareLock);
        }
 
        /*
-        * Scan pg_amproc to obtain support procs for the opclass
+        * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
+        * the default ones (those with lefttype = righttype = opcintype).
         */
        if (numSupport > 0)
        {
-               ScanKeyEntryInitialize(&key, 0,
-                                                          Anum_pg_amproc_amopclaid,
-                                                          F_OIDEQ,
-                                                          ObjectIdGetDatum(operatorClassOid));
-               pg_amproc_desc = heap_openr(AccessMethodProcedureRelationName,
-                                                                       AccessShareLock);
-               pg_amproc_scan = systable_beginscan(pg_amproc_desc,
-                                                                                       AccessMethodProcedureIndex,
-                                                                                       indexOK,
-                                                                                       SnapshotNow,
-                                                                                       1, &key);
-
-               while (HeapTupleIsValid(htup = systable_getnext(pg_amproc_scan)))
+               ScanKeyInit(&skey[0],
+                                       Anum_pg_amproc_amprocfamily,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcfamily));
+               ScanKeyInit(&skey[1],
+                                       Anum_pg_amproc_amproclefttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               ScanKeyInit(&skey[2],
+                                       Anum_pg_amproc_amprocrighttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
+               scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
+                                                                 SnapshotNow, 3, skey);
+
+               while (HeapTupleIsValid(htup = systable_getnext(scan)))
                {
                        Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
 
                        if (amprocform->amprocnum <= 0 ||
                                (StrategyNumber) amprocform->amprocnum > numSupport)
-                               elog(ERROR, "Bogus amproc number %d for opclass %u",
+                               elog(ERROR, "invalid amproc number %d for opclass %u",
                                         amprocform->amprocnum, operatorClassOid);
 
                        opcentry->supportProcs[amprocform->amprocnum - 1] =
                                amprocform->amproc;
                }
 
-               systable_endscan(pg_amproc_scan);
-               heap_close(pg_amproc_desc, AccessShareLock);
+               systable_endscan(scan);
+               heap_close(rel, AccessShareLock);
        }
 
        opcentry->valid = true;
@@ -1324,96 +1332,100 @@ LookupOpclassInfo(Oid operatorClassOid,
  *             formrdesc
  *
  *             This is a special cut-down version of RelationBuildDesc()
- *             used by RelationCacheInitialize() in initializing the relcache.
+ *             used by RelationCacheInitializePhase2() in initializing the relcache.
  *             The relation descriptor is built just from the supplied parameters,
  *             without actually looking at any system table entries.  We cheat
  *             quite a lot since we only need to work for a few basic system
  *             catalogs.
  *
  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
- * and pg_type (see RelationCacheInitialize).
+ * and pg_type (see RelationCacheInitializePhase2).
  *
- * Note that these catalogs can't have constraints, default values,
- * rules, or triggers, since we don't cope with any of that.
+ * Note that these catalogs can't have constraints (except attnotnull),
+ * default values, rules, or triggers, since we don't cope with any of that.
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
 static void
-formrdesc(const char *relationName,
-                 int natts,
-                 FormData_pg_attribute *att)
+formrdesc(const char *relationName, Oid relationReltype,
+                 bool hasoids, int natts, FormData_pg_attribute *att)
 {
        Relation        relation;
        int                     i;
+       bool            has_not_null;
 
        /*
-        * allocate new relation desc
-        * clear all fields of reldesc
+        * allocate new relation desc, clear all fields of reldesc
         */
        relation = (Relation) palloc0(sizeof(RelationData));
        relation->rd_targblock = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
-       relation->rd_fd = -1;
+       relation->rd_smgr = NULL;
 
        /*
-        * initialize reference count
+        * initialize reference count: 1 because it is nailed in cache
         */
-       RelationSetReferenceCount(relation, 1);
+       relation->rd_refcnt = 1;
 
        /*
-        * all entries built with this routine are nailed-in-cache; none are
-        * for new or temp relations.
+        * all entries built with this routine are nailed-in-cache; none are for
+        * new or temp relations.
         */
        relation->rd_isnailed = true;
-       relation->rd_isnew = false;
+       relation->rd_createSubid = InvalidSubTransactionId;
+       relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
        relation->rd_istemp = false;
 
        /*
         * initialize relation tuple form
         *
         * The data we insert here is pretty incomplete/bogus, but it'll serve to
-        * get us launched.  RelationCacheInitializePhase2() will read the
-        * real data from pg_class and replace what we've done here.
+        * get us launched.  RelationCacheInitializePhase2() will read the real
+        * data from pg_class and replace what we've done here.
         */
        relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
 
        namestrcpy(&relation->rd_rel->relname, relationName);
        relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
+       relation->rd_rel->reltype = relationReltype;
 
        /*
-        * It's important to distinguish between shared and non-shared
-        * relations, even at bootstrap time, to make sure we know where they
-        * are stored.  At present, all relations that formrdesc is used for
-        * are not shared.
+        * It's important to distinguish between shared and non-shared relations,
+        * even at bootstrap time, to make sure we know where they are stored.  At
+        * present, all relations that formrdesc is used for are not shared.
         */
        relation->rd_rel->relisshared = false;
 
        relation->rd_rel->relpages = 1;
        relation->rd_rel->reltuples = 1;
        relation->rd_rel->relkind = RELKIND_RELATION;
-       relation->rd_rel->relhasoids = true;
+       relation->rd_rel->relhasoids = hasoids;
        relation->rd_rel->relnatts = (int16) natts;
 
        /*
         * initialize attribute tuple form
         *
         * Unlike the case with the relation tuple, this data had better be right
-        * because it will never be replaced.  The input values must be
-        * correctly defined by macros in src/include/catalog/ headers.
+        * because it will never be replaced.  The input values must be correctly
+        * defined by macros in src/include/catalog/ headers.
         */
-       relation->rd_att = CreateTemplateTupleDesc(natts,
-                                                                                  relation->rd_rel->relhasoids);
+       relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
+       relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
+
+       relation->rd_att->tdtypeid = relationReltype;
+       relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
 
        /*
         * initialize tuple desc info
         */
+       has_not_null = false;
        for (i = 0; i < natts; i++)
        {
-               relation->rd_att->attrs[i] = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
-               memcpy((char *) relation->rd_att->attrs[i],
-                          (char *) &att[i],
+               memcpy(relation->rd_att->attrs[i],
+                          &att[i],
                           ATTRIBUTE_TUPLE_SIZE);
+               has_not_null |= att[i].attnotnull;
                /* make sure attcacheoff is valid */
                relation->rd_att->attrs[i]->attcacheoff = -1;
        }
@@ -1421,30 +1433,40 @@ formrdesc(const char *relationName,
        /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
        relation->rd_att->attrs[0]->attcacheoff = 0;
 
+       /* mark not-null status */
+       if (has_not_null)
+       {
+               TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+               constr->has_not_null = true;
+               relation->rd_att->constr = constr;
+       }
+
        /*
         * initialize relation id from info in att array (my, this is ugly)
         */
        RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
+       relation->rd_rel->relfilenode = RelationGetRelid(relation);
 
        /*
-        * initialize the relation's lock manager and RelFileNode information
+        * initialize the relation lock manager information
         */
        RelationInitLockInfo(relation);         /* see lmgr.c */
 
-       if (relation->rd_rel->relisshared)
-               relation->rd_node.tblNode = InvalidOid;
-       else
-               relation->rd_node.tblNode = MyDatabaseId;
-       relation->rd_node.relNode =
-               relation->rd_rel->relfilenode = RelationGetRelid(relation);
+       /*
+        * initialize physical addressing information for the relation
+        */
+       RelationInitPhysicalAddr(relation);
 
        /*
         * initialize the rel-has-index flag, using hardwired knowledge
         */
-       relation->rd_rel->relhasindex = false;
-
-       /* In bootstrap mode, we have no indexes */
-       if (!IsBootstrapProcessingMode())
+       if (IsBootstrapProcessingMode())
+       {
+               /* In bootstrap mode, we have no indexes */
+               relation->rd_rel->relhasindex = false;
+       }
+       else
        {
                /* Otherwise, all the rels formrdesc is used for have indexes */
                relation->rd_rel->relhasindex = true;
@@ -1454,6 +1476,9 @@ formrdesc(const char *relationName,
         * add new reldesc to relcache
         */
        RelationCacheInsert(relation);
+
+       /* It's fully valid */
+       relation->rd_isvalid = true;
 }
 
 
@@ -1463,134 +1488,85 @@ formrdesc(const char *relationName,
  */
 
 /*
- *             RelationIdCacheGetRelation
+ *             RelationIdGetRelation
+ *
+ *             Lookup a reldesc by OID; make one if not already in cache.
  *
- *             Lookup an existing reldesc by OID.
+ *             Returns NULL if no pg_class row could be found for the given relid
+ *             (suggesting we are trying to access a just-deleted relation).
+ *             Any other error is reported via elog.
  *
- *             Only try to get the reldesc by looking in the cache,
- *             do not go to the disk.
+ *             NB: caller should already have at least AccessShareLock on the
+ *             relation ID, else there are nasty race conditions.
  *
- *             NB: relation ref count is incremented if successful.
+ *             NB: relation ref count is incremented, or set to 1 if new entry.
  *             Caller should eventually decrement count.  (Usually,
  *             that happens by calling RelationClose().)
  */
 Relation
-RelationIdCacheGetRelation(Oid relationId)
+RelationIdGetRelation(Oid relationId)
 {
        Relation        rd;
 
+       /*
+        * first try to find reldesc in the cache
+        */
        RelationIdCacheLookup(relationId, rd);
 
        if (RelationIsValid(rd))
+       {
                RelationIncrementReferenceCount(rd);
+               /* revalidate nailed index if necessary */
+               if (!rd->rd_isvalid)
+                       RelationReloadIndexInfo(rd);
+               return rd;
+       }
 
+       /*
+        * no reldesc in the cache, so have RelationBuildDesc() build one and add
+        * it.
+        */
+       rd = RelationBuildDesc(relationId, NULL);
+       if (RelationIsValid(rd))
+               RelationIncrementReferenceCount(rd);
        return rd;
 }
 
+/* ----------------------------------------------------------------
+ *                             cache invalidation support routines
+ * ----------------------------------------------------------------
+ */
+
 /*
- *             RelationSysNameCacheGetRelation
+ * RelationIncrementReferenceCount
+ *             Increments relation reference count.
  *
- *             As above, but lookup by name; only works for system catalogs.
+ * Note: bootstrap mode has its own weird ideas about relation refcount
+ * behavior; we ought to fix it someday, but for now, just disable
+ * reference count ownership tracking in bootstrap mode.
  */
-static Relation
-RelationSysNameCacheGetRelation(const char *relationName)
+void
+RelationIncrementReferenceCount(Relation rel)
 {
-       Relation        rd;
-       NameData        name;
-
-       /*
-        * make sure that the name key used for hash lookup is properly
-        * null-padded
-        */
-       namestrcpy(&name, relationName);
-       RelationSysNameCacheLookup(NameStr(name), rd);
-
-       if (RelationIsValid(rd))
-               RelationIncrementReferenceCount(rd);
-
-       return rd;
+       ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
+       rel->rd_refcnt += 1;
+       if (!IsBootstrapProcessingMode())
+               ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
 }
 
-Relation
-RelationNodeCacheGetRelation(RelFileNode rnode)
+/*
+ * RelationDecrementReferenceCount
+ *             Decrements relation reference count.
+ */
+void
+RelationDecrementReferenceCount(Relation rel)
 {
-       Relation        rd;
-
-       RelationNodeCacheLookup(rnode, rd);
-
-       if (RelationIsValid(rd))
-               RelationIncrementReferenceCount(rd);
-
-       return rd;
+       Assert(rel->rd_refcnt > 0);
+       rel->rd_refcnt -= 1;
+       if (!IsBootstrapProcessingMode())
+               ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
 }
 
-/*
- *             RelationIdGetRelation
- *
- *             Lookup a reldesc by OID; make one if not already in cache.
- *
- *             NB: relation ref count is incremented, or set to 1 if new entry.
- *             Caller should eventually decrement count.  (Usually,
- *             that happens by calling RelationClose().)
- */
-Relation
-RelationIdGetRelation(Oid relationId)
-{
-       Relation        rd;
-       RelationBuildDescInfo buildinfo;
-
-       /*
-        * first try and get a reldesc from the cache
-        */
-       rd = RelationIdCacheGetRelation(relationId);
-       if (RelationIsValid(rd))
-               return rd;
-
-       /*
-        * no reldesc in the cache, so have RelationBuildDesc() build one and
-        * add it.
-        */
-       buildinfo.infotype = INFO_RELID;
-       buildinfo.i.info_id = relationId;
-
-       rd = RelationBuildDesc(buildinfo, NULL);
-       return rd;
-}
-
-/*
- *             RelationSysNameGetRelation
- *
- *             As above, but lookup by name; only works for system catalogs.
- */
-Relation
-RelationSysNameGetRelation(const char *relationName)
-{
-       Relation        rd;
-       RelationBuildDescInfo buildinfo;
-
-       /*
-        * first try and get a reldesc from the cache
-        */
-       rd = RelationSysNameCacheGetRelation(relationName);
-       if (RelationIsValid(rd))
-               return rd;
-
-       /*
-        * no reldesc in the cache, so have RelationBuildDesc() build one and
-        * add it.
-        */
-       buildinfo.infotype = INFO_RELNAME;
-       buildinfo.i.info_name = (char *) relationName;
-
-       rd = RelationBuildDesc(buildinfo, NULL);
-       return rd;
-}
-
-/* ----------------------------------------------------------------
- *                             cache invalidation support routines
- * ----------------------------------------------------------------
- */
-
 /*
  * RelationClose - close an open relation
  *
@@ -1610,46 +1586,103 @@ RelationClose(Relation relation)
 
 #ifdef RELCACHE_FORCE_RELEASE
        if (RelationHasReferenceCountZero(relation) &&
-               !relation->rd_isnew)
+               relation->rd_createSubid == InvalidSubTransactionId &&
+               relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
                RelationClearRelation(relation, false);
 #endif
 }
 
-#ifdef ENABLE_REINDEX_NAILED_RELATIONS
 /*
- * RelationReloadClassinfo
- *
- *     This function is especially for nailed relations.
- *     relhasindex/relfilenode could be changed even for
- *     nailed relations.
+ * RelationReloadIndexInfo - reload minimal information for an open index
+ *
+ *     This function is used only for indexes.  A relcache inval on an index
+ *     can mean that its pg_class or pg_index row changed.  There are only
+ *     very limited changes that are allowed to an existing index's schema,
+ *     so we can update the relcache entry without a complete rebuild; which
+ *     is fortunate because we can't rebuild an index entry that is "nailed"
+ *     and/or in active use.  We support full replacement of the pg_class row,
+ *     as well as updates of a few simple fields of the pg_index row.
+ *
+ *     We can't necessarily reread the catalog rows right away; we might be
+ *     in a failed transaction when we receive the SI notification.  If so,
+ *     RelationClearRelation just marks the entry as invalid by setting
+ *     rd_isvalid to false.  This routine is called to fix the entry when it
+ *     is next needed.
  */
 static void
-RelationReloadClassinfo(Relation relation)
+RelationReloadIndexInfo(Relation relation)
 {
-       RelationBuildDescInfo buildinfo;
+       bool            indexOK;
        HeapTuple       pg_class_tuple;
        Form_pg_class relp;
 
-       if (!relation->rd_rel)
-               return;
-       buildinfo.infotype = INFO_RELID;
-       buildinfo.i.info_id = relation->rd_id;
-       pg_class_tuple = ScanPgRelation(buildinfo);
+       /* Should be called only for invalidated indexes */
+       Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
+                  !relation->rd_isvalid);
+       /* Should be closed at smgr level */
+       Assert(relation->rd_smgr == NULL);
+
+       /*
+        * Read the pg_class row
+        *
+        * Don't try to use an indexscan of pg_class_oid_index to reload the info
+        * for pg_class_oid_index ...
+        */
+       indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
+       pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
        if (!HeapTupleIsValid(pg_class_tuple))
-       {
-               elog(ERROR, "RelationReloadClassinfo system relation id=%d doesn't exist", relation->rd_id);
-               return;
-       }
-       RelationCacheDelete(relation);
+               elog(ERROR, "could not find pg_class tuple for index %u",
+                        RelationGetRelid(relation));
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
-       memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
-       relation->rd_node.relNode = relp->relfilenode;
-       RelationCacheInsert(relation);
+       memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
+       /* Reload reloptions in case they changed */
+       if (relation->rd_options)
+               pfree(relation->rd_options);
+       RelationParseRelOptions(relation, pg_class_tuple);
+       /* done with pg_class tuple */
        heap_freetuple(pg_class_tuple);
+       /* We must recalculate physical address in case it changed */
+       RelationInitPhysicalAddr(relation);
+       /* Make sure targblock is reset in case rel was truncated */
+       relation->rd_targblock = InvalidBlockNumber;
+       /* Must free any AM cached data, too */
+       if (relation->rd_amcache)
+               pfree(relation->rd_amcache);
+       relation->rd_amcache = NULL;
+
+       /*
+        * For a non-system index, there are fields of the pg_index row that are
+        * allowed to change, so re-read that row and update the relcache entry.
+        * Most of the info derived from pg_index (such as support function lookup
+        * info) cannot change, and indeed the whole point of this routine is to
+        * update the relcache entry without clobbering that data; so wholesale
+        * replacement is not appropriate.
+        */
+       if (!IsSystemRelation(relation))
+       {
+               HeapTuple       tuple;
+               Form_pg_index index;
+
+               tuple = SearchSysCache(INDEXRELID,
+                                                          ObjectIdGetDatum(RelationGetRelid(relation)),
+                                                          0, 0, 0);
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for index %u",
+                                RelationGetRelid(relation));
+               index = (Form_pg_index) GETSTRUCT(tuple);
+
+               relation->rd_index->indisvalid = index->indisvalid;
+               relation->rd_index->indcheckxmin = index->indcheckxmin;
+               relation->rd_index->indisready = index->indisready;
+               HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
+                                                          HeapTupleHeaderGetXmin(tuple->t_data));
+
+               ReleaseSysCache(tuple);
+       }
 
-       return;
+       /* Okay, now it's valid again */
+       relation->rd_isvalid = true;
 }
-#endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
 
 /*
  * RelationClearRelation
@@ -1663,34 +1696,56 @@ RelationReloadClassinfo(Relation relation)
 static void
 RelationClearRelation(Relation relation, bool rebuild)
 {
+       Oid                     old_reltype = relation->rd_rel->reltype;
        MemoryContext oldcxt;
 
        /*
         * Make sure smgr and lower levels close the relation's files, if they
-        * weren't closed already.  If the relation is not getting deleted,
-        * the next smgr access should reopen the files automatically.  This
-        * ensures that the low-level file access state is updated after, say,
-        * a vacuum truncation.
+        * weren't closed already.  If the relation is not getting deleted, the
+        * next smgr access should reopen the files automatically.      This ensures
+        * that the low-level file access state is updated after, say, a vacuum
+        * truncation.
         */
-       if (relation->rd_fd >= 0)
-       {
-               smgrclose(DEFAULT_SMGR, relation);
-               relation->rd_fd = -1;
-       }
+       RelationCloseSmgr(relation);
 
        /*
-        * Never, never ever blow away a nailed-in system relation, because
-        * we'd be unable to recover.  However, we must update rd_nblocks
-        * and reset rd_targblock, in case we got called because of a relation
-        * cache flush that was triggered by VACUUM.
+        * Never, never ever blow away a nailed-in system relation, because we'd
+        * be unable to recover.  However, we must reset rd_targblock, in case we
+        * got called because of a relation cache flush that was triggered by
+        * VACUUM.
+        *
+        * If it's a nailed index, then we need to re-read the pg_class row to see
+        * if its relfilenode changed.  We can't necessarily do that here, because
+        * we might be in a failed transaction.  We assume it's okay to do it if
+        * there are open references to the relcache entry (cf notes for
+        * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
+        * invalid, and it'll be fixed when next opened.
         */
        if (relation->rd_isnailed)
        {
                relation->rd_targblock = InvalidBlockNumber;
-               RelationUpdateNumberOfBlocks(relation);
-#ifdef ENABLE_REINDEX_NAILED_RELATIONS
-               RelationReloadClassinfo(relation);
-#endif   /* ENABLE_REINDEX_NAILED_RELATIONS */
+               if (relation->rd_rel->relkind == RELKIND_INDEX)
+               {
+                       relation->rd_isvalid = false;           /* needs to be revalidated */
+                       if (relation->rd_refcnt > 1)
+                               RelationReloadIndexInfo(relation);
+               }
+               return;
+       }
+
+       /*
+        * Even non-system indexes should not be blown away if they are open and
+        * have valid index support information.  This avoids problems with active
+        * use of the index support information.  As with nailed indexes, we
+        * re-read the pg_class row to handle possible physical relocation of the
+        * index, and we check for pg_index updates too.
+        */
+       if (relation->rd_rel->relkind == RELKIND_INDEX &&
+               relation->rd_refcnt > 0 &&
+               relation->rd_indexcxt != NULL)
+       {
+               relation->rd_isvalid = false;   /* needs to be revalidated */
+               RelationReloadIndexInfo(relation);
                return;
        }
 
@@ -1716,26 +1771,33 @@ RelationClearRelation(Relation relation, bool rebuild)
         * with, we can only get rid of these fields:
         */
        FreeTriggerDesc(relation->trigdesc);
-       if (relation->rd_index)
-               pfree(relation->rd_index);
+       if (relation->rd_indextuple)
+               pfree(relation->rd_indextuple);
        if (relation->rd_am)
                pfree(relation->rd_am);
        if (relation->rd_rel)
                pfree(relation->rd_rel);
-       freeList(relation->rd_indexlist);
+       if (relation->rd_options)
+               pfree(relation->rd_options);
+       list_free(relation->rd_indexlist);
+       bms_free(relation->rd_indexattr);
        if (relation->rd_indexcxt)
                MemoryContextDelete(relation->rd_indexcxt);
 
        /*
         * If we're really done with the relcache entry, blow it away. But if
-        * someone is still using it, reconstruct the whole deal without
-        * moving the physical RelationData record (so that the someone's
-        * pointer is still valid).
+        * someone is still using it, reconstruct the whole deal without moving
+        * the physical RelationData record (so that the someone's pointer is
+        * still valid).
         */
        if (!rebuild)
        {
                /* ok to zap remaining substructure */
-               FreeTupleDesc(relation->rd_att);
+               flush_rowtype_cache(old_reltype);
+               /* can't use DecrTupleDescRefCount here */
+               Assert(relation->rd_att->tdrefcount > 0);
+               if (--relation->rd_att->tdrefcount == 0)
+                       FreeTupleDesc(relation->rd_att);
                if (relation->rd_rulescxt)
                        MemoryContextDelete(relation->rd_rulescxt);
                pfree(relation);
@@ -1743,39 +1805,59 @@ RelationClearRelation(Relation relation, bool rebuild)
        else
        {
                /*
-                * When rebuilding an open relcache entry, must preserve ref count
-                * and rd_isnew flag.  Also attempt to preserve the tupledesc and
-                * rewrite-rule substructures in place.
+                * When rebuilding an open relcache entry, must preserve ref count and
+                * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
+                * preserve the tupledesc and rewrite-rule substructures in place.
+                * (Note: the refcount mechanism for tupledescs may eventually ensure
+                * that we don't really need to preserve the tupledesc in-place, but
+                * for now there are still a lot of places that assume an open rel's
+                * tupledesc won't move.)
+                *
+                * Note that this process does not touch CurrentResourceOwner; which
+                * is good because whatever ref counts the entry may have do not
+                * necessarily belong to that resource owner.
                 */
+               Oid                     save_relid = RelationGetRelid(relation);
                int                     old_refcnt = relation->rd_refcnt;
-               bool            old_isnew = relation->rd_isnew;
+               SubTransactionId old_createSubid = relation->rd_createSubid;
+               SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
+               struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
                TupleDesc       old_att = relation->rd_att;
                RuleLock   *old_rules = relation->rd_rules;
                MemoryContext old_rulescxt = relation->rd_rulescxt;
-               RelationBuildDescInfo buildinfo;
-
-               buildinfo.infotype = INFO_RELID;
-               buildinfo.i.info_id = RelationGetRelid(relation);
 
-               if (RelationBuildDesc(buildinfo, relation) != relation)
+               if (RelationBuildDesc(save_relid, relation) != relation)
                {
                        /* Should only get here if relation was deleted */
-                       FreeTupleDesc(old_att);
+                       flush_rowtype_cache(old_reltype);
+                       Assert(old_att->tdrefcount > 0);
+                       if (--old_att->tdrefcount == 0)
+                               FreeTupleDesc(old_att);
                        if (old_rulescxt)
                                MemoryContextDelete(old_rulescxt);
                        pfree(relation);
-                       elog(ERROR, "RelationClearRelation: relation %u deleted while still in use",
-                                buildinfo.i.info_id);
+                       elog(ERROR, "relation %u deleted while still in use", save_relid);
                }
-               RelationSetReferenceCount(relation, old_refcnt);
-               relation->rd_isnew = old_isnew;
+               relation->rd_refcnt = old_refcnt;
+               relation->rd_createSubid = old_createSubid;
+               relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
+               relation->pgstat_info = old_pgstat_info;
+
                if (equalTupleDescs(old_att, relation->rd_att))
                {
-                       FreeTupleDesc(relation->rd_att);
+                       /* needn't flush typcache here */
+                       Assert(relation->rd_att->tdrefcount == 1);
+                       if (--relation->rd_att->tdrefcount == 0)
+                               FreeTupleDesc(relation->rd_att);
                        relation->rd_att = old_att;
                }
                else
-                       FreeTupleDesc(old_att);
+               {
+                       flush_rowtype_cache(old_reltype);
+                       Assert(old_att->tdrefcount > 0);
+                       if (--old_att->tdrefcount == 0)
+                               FreeTupleDesc(old_att);
+               }
                if (equalRuleLocks(old_rules, relation->rd_rules))
                {
                        if (relation->rd_rulescxt)
@@ -1788,15 +1870,6 @@ RelationClearRelation(Relation relation, bool rebuild)
                        if (old_rulescxt)
                                MemoryContextDelete(old_rulescxt);
                }
-
-               /*
-                * Update rd_nblocks.  This is kind of expensive, but I think we
-                * must do it in case relation has been truncated... we definitely
-                * must do it if the rel is new or temp, since
-                * RelationGetNumberOfBlocks will subsequently assume that the
-                * block count is correct.
-                */
-               RelationUpdateNumberOfBlocks(relation);
        }
 }
 
@@ -1810,12 +1883,13 @@ RelationFlushRelation(Relation relation)
 {
        bool            rebuild;
 
-       if (relation->rd_isnew)
+       if (relation->rd_createSubid != InvalidSubTransactionId ||
+               relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
        {
                /*
                 * New relcache entries are always rebuilt, not flushed; else we'd
                 * forget the "new" status of the relation, which is a useful
-                * optimization to have.
+                * optimization to have.  Ditto for the new-relfilenode status.
                 */
                rebuild = true;
        }
@@ -1847,25 +1921,29 @@ RelationForgetRelation(Oid rid)
                return;                                 /* not in cache, nothing to do */
 
        if (!RelationHasReferenceCountZero(relation))
-               elog(ERROR, "RelationForgetRelation: relation %u is still open", rid);
+               elog(ERROR, "relation %u is still open", rid);
 
        /* Unconditionally destroy the relcache entry */
        RelationClearRelation(relation, false);
 }
 
 /*
- *             RelationIdInvalidateRelationCacheByRelationId
+ *             RelationCacheInvalidateEntry
  *
  *             This routine is invoked for SI cache flush messages.
  *
- *             We used to skip local relations, on the grounds that they could
- *             not be targets of cross-backend SI update messages; but it seems
- *             safer to process them, so that our *own* SI update messages will
- *             have the same effects during CommandCounterIncrement for both
- *             local and nonlocal relations.
+ * Any relcache entry matching the relid must be flushed.  (Note: caller has
+ * already determined that the relid belongs to our database or is a shared
+ * relation.)
+ *
+ * We used to skip local relations, on the grounds that they could
+ * not be targets of cross-backend SI update messages; but it seems
+ * safer to process them, so that our *own* SI update messages will
+ * have the same effects during CommandCounterIncrement for both
+ * local and nonlocal relations.
  */
 void
-RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
+RelationCacheInvalidateEntry(Oid relationId)
 {
        Relation        relation;
 
@@ -1881,12 +1959,15 @@ RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
 /*
  * RelationCacheInvalidate
  *      Blow away cached relation descriptors that have zero reference counts,
- *      and rebuild those with positive reference counts.
+ *      and rebuild those with positive reference counts.      Also reset the smgr
+ *      relation cache.
  *
  *      This is currently used only to recover from SI message buffer overflow,
  *      so we do not touch new-in-transaction relations; they cannot be targets
  *      of cross-backend SI updates (and our own updates now go through a
  *      separate linked list that isn't limited by the SI message buffer size).
+ *      Likewise, we need not discard new-relfilenode-in-transaction hints,
+ *      since any invalidation of those would be a local event.
  *
  *      We do this in two phases: the first pass deletes deletable items, and
  *      the second one rebuilds the rebuildable items.  This is essential for
@@ -1898,6 +1979,12 @@ RelationIdInvalidateRelationCacheByRelationId(Oid relationId)
  *      because (a) during the first pass we won't process any more SI messages,
  *      so hash_seq_search will complete safely; (b) during the second pass we
  *      only hold onto pointers to nondeletable entries.
+ *
+ *      The two-phase approach also makes it easy to ensure that we process
+ *      nailed-in-cache indexes before other nondeletable items, and that we
+ *      process pg_class_oid_index first of all.  In scenarios where a nailed
+ *      index has been given a new relfilenode, we have to detect that update
+ *      before the nailed index is used in reloading any other relcache entry.
  */
 void
 RelationCacheInvalidate(void)
@@ -1905,8 +1992,9 @@ RelationCacheInvalidate(void)
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
        Relation        relation;
+       List       *rebuildFirstList = NIL;
        List       *rebuildList = NIL;
-       List       *l;
+       ListCell   *l;
 
        /* Phase 1 */
        hash_seq_init(&status, RelationIdCache);
@@ -1915,8 +2003,11 @@ RelationCacheInvalidate(void)
        {
                relation = idhentry->reldesc;
 
+               /* Must close all smgr references to avoid leaving dangling ptrs */
+               RelationCloseSmgr(relation);
+
                /* Ignore new relations, since they are never SI targets */
-               if (relation->rd_isnew)
+               if (relation->rd_createSubid != InvalidSubTransactionId)
                        continue;
 
                relcacheInvalsReceived++;
@@ -1924,57 +2015,130 @@ RelationCacheInvalidate(void)
                if (RelationHasReferenceCountZero(relation))
                {
                        /* Delete this entry immediately */
+                       Assert(!relation->rd_isnailed);
                        RelationClearRelation(relation, false);
                }
                else
                {
-                       /* Add entry to list of stuff to rebuild in second pass */
-                       rebuildList = lcons(relation, rebuildList);
+                       /*
+                        * Add this entry to list of stuff to rebuild in second pass.
+                        * pg_class_oid_index goes on the front of rebuildFirstList, other
+                        * nailed indexes on the back, and everything else into
+                        * rebuildList (in no particular order).
+                        */
+                       if (relation->rd_isnailed &&
+                               relation->rd_rel->relkind == RELKIND_INDEX)
+                       {
+                               if (RelationGetRelid(relation) == ClassOidIndexId)
+                                       rebuildFirstList = lcons(relation, rebuildFirstList);
+                               else
+                                       rebuildFirstList = lappend(rebuildFirstList, relation);
+                       }
+                       else
+                               rebuildList = lcons(relation, rebuildList);
                }
        }
 
+       /*
+        * Now zap any remaining smgr cache entries.  This must happen before we
+        * start to rebuild entries, since that may involve catalog fetches which
+        * will re-open catalog files.
+        */
+       smgrcloseall();
+
        /* Phase 2: rebuild the items found to need rebuild in phase 1 */
+       foreach(l, rebuildFirstList)
+       {
+               relation = (Relation) lfirst(l);
+               RelationClearRelation(relation, true);
+       }
+       list_free(rebuildFirstList);
        foreach(l, rebuildList)
        {
                relation = (Relation) lfirst(l);
                RelationClearRelation(relation, true);
        }
-       freeList(rebuildList);
+       list_free(rebuildList);
 }
 
 /*
  * AtEOXact_RelationCache
  *
- *     Clean up the relcache at transaction commit or abort.
+ *     Clean up the relcache at main-transaction commit or abort.
+ *
+ * Note: this must be called *before* processing invalidation messages.
+ * In the case of abort, we don't want to try to rebuild any invalidated
+ * cache entries (since we can't safely do database accesses).  Therefore
+ * we must reset refcnts before handling pending invalidations.
+ *
+ * As of PostgreSQL 8.1, relcache refcnts should get released by the
+ * ResourceOwner mechanism.  This routine just does a debugging
+ * cross-check that no pins remain.  However, we also need to do special
+ * cleanup when the current transaction created any relations or made use
+ * of forced index lists.
  */
 void
-AtEOXact_RelationCache(bool commit)
+AtEOXact_RelationCache(bool isCommit)
 {
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
 
+       /*
+        * To speed up transaction exit, we want to avoid scanning the relcache
+        * unless there is actually something for this routine to do.  Other than
+        * the debug-only Assert checks, most transactions don't create any work
+        * for us to do here, so we keep a static flag that gets set if there is
+        * anything to do.      (Currently, this means either a relation is created in
+        * the current xact, or one is given a new relfilenode, or an index list
+        * is forced.)  For simplicity, the flag remains set till end of top-level
+        * transaction, even though we could clear it at subtransaction end in
+        * some cases.
+        */
+       if (!need_eoxact_work
+#ifdef USE_ASSERT_CHECKING
+               && !assert_enabled
+#endif
+               )
+               return;
+
        hash_seq_init(&status, RelationIdCache);
 
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        relation = idhentry->reldesc;
-               int                     expected_refcnt;
+
+               /*
+                * The relcache entry's ref count should be back to its normal
+                * not-in-a-transaction state: 0 unless it's nailed in cache.
+                *
+                * In bootstrap mode, this is NOT true, so don't check it --- the
+                * bootstrap code expects relations to stay open across start/commit
+                * transaction calls.  (That seems bogus, but it's not worth fixing.)
+                */
+#ifdef USE_ASSERT_CHECKING
+               if (!IsBootstrapProcessingMode())
+               {
+                       int                     expected_refcnt;
+
+                       expected_refcnt = relation->rd_isnailed ? 1 : 0;
+                       Assert(relation->rd_refcnt == expected_refcnt);
+               }
+#endif
 
                /*
                 * Is it a relation created in the current transaction?
                 *
-                * During commit, reset the flag to false, since we are now out of
-                * the creating transaction.  During abort, simply delete the
-                * relcache entry --- it isn't interesting any longer.  (NOTE: if
-                * we have forgotten the isnew state of a new relation due to a
-                * forced cache flush, the entry will get deleted anyway by
-                * shared-cache-inval processing of the aborted pg_class
-                * insertion.)
+                * During commit, reset the flag to zero, since we are now out of the
+                * creating transaction.  During abort, simply delete the relcache
+                * entry --- it isn't interesting any longer.  (NOTE: if we have
+                * forgotten the new-ness of a new relation due to a forced cache
+                * flush, the entry will get deleted anyway by shared-cache-inval
+                * processing of the aborted pg_class insertion.)
                 */
-               if (relation->rd_isnew)
+               if (relation->rd_createSubid != InvalidSubTransactionId)
                {
-                       if (commit)
-                               relation->rd_isnew = false;
+                       if (isCommit)
+                               relation->rd_createSubid = InvalidSubTransactionId;
                        else
                        {
                                RelationClearRelation(relation, false);
@@ -1983,41 +2147,113 @@ AtEOXact_RelationCache(bool commit)
                }
 
                /*
-                * During transaction abort, we must also reset relcache entry ref
-                * counts to their normal not-in-a-transaction state.  A ref count
-                * may be too high because some routine was exited by elog()
-                * between incrementing and decrementing the count.
-                *
-                * During commit, we should not have to do this, but it's still
-                * useful to check that the counts are correct to catch missed
-                * relcache closes.
-                *
-                * In bootstrap mode, do NOT reset the refcnt nor complain that it's
-                * nonzero --- the bootstrap code expects relations to stay open
-                * across start/commit transaction calls.  (That seems bogus, but
-                * it's not worth fixing.)
+                * Likewise, reset the hint about the relfilenode being new.
                 */
-               expected_refcnt = relation->rd_isnailed ? 1 : 0;
+               relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+
+               /*
+                * Flush any temporary index list.
+                */
+               if (relation->rd_indexvalid == 2)
+               {
+                       list_free(relation->rd_indexlist);
+                       relation->rd_indexlist = NIL;
+                       relation->rd_oidindex = InvalidOid;
+                       relation->rd_indexvalid = 0;
+               }
+       }
+
+       /* Once done with the transaction, we can reset need_eoxact_work */
+       need_eoxact_work = false;
+}
+
+/*
+ * AtEOSubXact_RelationCache
+ *
+ *     Clean up the relcache at sub-transaction commit or abort.
+ *
+ * Note: this must be called *before* processing invalidation messages.
+ */
+void
+AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
+                                                 SubTransactionId parentSubid)
+{
+       HASH_SEQ_STATUS status;
+       RelIdCacheEnt *idhentry;
+
+       /*
+        * Skip the relcache scan if nothing to do --- see notes for
+        * AtEOXact_RelationCache.
+        */
+       if (!need_eoxact_work)
+               return;
+
+       hash_seq_init(&status, RelationIdCache);
+
+       while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+       {
+               Relation        relation = idhentry->reldesc;
 
-               if (commit)
+               /*
+                * Is it a relation created in the current subtransaction?
+                *
+                * During subcommit, mark it as belonging to the parent, instead.
+                * During subabort, simply delete the relcache entry.
+                */
+               if (relation->rd_createSubid == mySubid)
                {
-                       if (relation->rd_refcnt != expected_refcnt &&
-                               !IsBootstrapProcessingMode())
+                       if (isCommit)
+                               relation->rd_createSubid = parentSubid;
+                       else
                        {
-                               elog(WARNING, "Relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
-                                        RelationGetRelationName(relation),
-                                        relation->rd_refcnt, expected_refcnt);
-                               RelationSetReferenceCount(relation, expected_refcnt);
+                               Assert(RelationHasReferenceCountZero(relation));
+                               RelationClearRelation(relation, false);
+                               continue;
                        }
                }
-               else
+
+               /*
+                * Likewise, update or drop any new-relfilenode-in-subtransaction
+                * hint.
+                */
+               if (relation->rd_newRelfilenodeSubid == mySubid)
+               {
+                       if (isCommit)
+                               relation->rd_newRelfilenodeSubid = parentSubid;
+                       else
+                               relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+               }
+
+               /*
+                * Flush any temporary index list.
+                */
+               if (relation->rd_indexvalid == 2)
                {
-                       /* abort case, just reset it quietly */
-                       RelationSetReferenceCount(relation, expected_refcnt);
+                       list_free(relation->rd_indexlist);
+                       relation->rd_indexlist = NIL;
+                       relation->rd_oidindex = InvalidOid;
+                       relation->rd_indexvalid = 0;
                }
        }
 }
 
+/*
+ * RelationCacheMarkNewRelfilenode
+ *
+ *     Mark the rel as having been given a new relfilenode in the current
+ *     (sub) transaction.      This is a hint that can be used to optimize
+ *     later operations on the rel in the same transaction.
+ */
+void
+RelationCacheMarkNewRelfilenode(Relation rel)
+{
+       /* Mark it... */
+       rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+       /* ... and now we have eoxact cleanup work to do */
+       need_eoxact_work = true;
+}
+
+
 /*
  *             RelationBuildLocalRelation
  *                     Build a relcache entry for an about-to-be-created relation,
@@ -2027,16 +2263,46 @@ Relation
 RelationBuildLocalRelation(const char *relname,
                                                   Oid relnamespace,
                                                   TupleDesc tupDesc,
-                                                  Oid relid, Oid dbid,
-                                                  RelFileNode rnode,
-                                                  bool nailit)
+                                                  Oid relid,
+                                                  Oid reltablespace,
+                                                  bool shared_relation)
 {
        Relation        rel;
        MemoryContext oldcxt;
        int                     natts = tupDesc->natts;
        int                     i;
+       bool            has_not_null;
+       bool            nailit;
 
-       AssertArg(natts > 0);
+       AssertArg(natts >= 0);
+
+       /*
+        * check for creation of a rel that must be nailed in cache.
+        *
+        * XXX this list had better match RelationCacheInitializePhase2's list.
+        */
+       switch (relid)
+       {
+               case RelationRelationId:
+               case AttributeRelationId:
+               case ProcedureRelationId:
+               case TypeRelationId:
+                       nailit = true;
+                       break;
+               default:
+                       nailit = false;
+                       break;
+       }
+
+       /*
+        * check that hardwired list of shared rels matches what's in the
+        * bootstrap .bki file.  If you get a failure here during initdb, you
+        * probably need to fix IsSharedRelation() to match whatever you've done
+        * to the set of shared relations.
+        */
+       if (shared_relation != IsSharedRelation(relid))
+               elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
+                        relname, relid);
 
        /*
         * switch to the cache context to create the relcache entry.
@@ -2054,35 +2320,46 @@ RelationBuildLocalRelation(const char *relname,
        rel->rd_targblock = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
-       rel->rd_fd = -1;
+       rel->rd_smgr = NULL;
+
+       /* mark it nailed if appropriate */
+       rel->rd_isnailed = nailit;
 
-       RelationSetReferenceCount(rel, 1);
+       rel->rd_refcnt = nailit ? 1 : 0;
 
        /* it's being created in this transaction */
-       rel->rd_isnew = true;
+       rel->rd_createSubid = GetCurrentSubTransactionId();
+       rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
 
-       /* is it a temporary relation? */
-       rel->rd_istemp = isTempNamespace(relnamespace);
+       /* must flag that we have rels created in this transaction */
+       need_eoxact_work = true;
 
-       /*
-        * nail the reldesc if this is a bootstrap create reln and we may need
-        * it in the cache later on in the bootstrap process so we don't ever
-        * want it kicked out.  e.g. pg_attribute!!!
-        */
-       if (nailit)
-               rel->rd_isnailed = true;
+       /* is it a temporary relation? */
+       rel->rd_istemp = isTempOrToastNamespace(relnamespace);
 
        /*
         * create a new tuple descriptor from the one passed in.  We do this
-        * partly to copy it into the cache context, and partly because the
-        * new relation can't have any defaults or constraints yet; they have
-        * to be added in later steps, because they require additions to
-        * multiple system catalogs.  We can copy attnotnull constraints here,
-        * however.
+        * partly to copy it into the cache context, and partly because the new
+        * relation can't have any defaults or constraints yet; they have to be
+        * added in later steps, because they require additions to multiple system
+        * catalogs.  We can copy attnotnull constraints here, however.
         */
        rel->rd_att = CreateTupleDescCopy(tupDesc);
+       rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
+       has_not_null = false;
        for (i = 0; i < natts; i++)
+       {
                rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
+               has_not_null |= tupDesc->attrs[i]->attnotnull;
+       }
+
+       if (has_not_null)
+       {
+               TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+               constr->has_not_null = true;
+               rel->rd_att->constr = constr;
+       }
 
        /*
         * initialize relation tuple form (caller may add/override data later)
@@ -2096,23 +2373,28 @@ RelationBuildLocalRelation(const char *relname,
        rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
        rel->rd_rel->relnatts = natts;
        rel->rd_rel->reltype = InvalidOid;
+       /* needed when bootstrapping: */
+       rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
 
        /*
-        * Insert relation physical and logical identifiers (OIDs) into the
-        * right places.
+        * Insert relation physical and logical identifiers (OIDs) into the right
+        * places.      Note that the physical ID (relfilenode) is initially the same
+        * as the logical ID (OID).
         */
-       rel->rd_rel->relisshared = (dbid == InvalidOid);
+       rel->rd_rel->relisshared = shared_relation;
 
        RelationGetRelid(rel) = relid;
 
        for (i = 0; i < natts; i++)
                rel->rd_att->attrs[i]->attrelid = relid;
 
-       rel->rd_node = rnode;
-       rel->rd_rel->relfilenode = rnode.relNode;
+       rel->rd_rel->relfilenode = relid;
+       rel->rd_rel->reltablespace = reltablespace;
 
        RelationInitLockInfo(rel);      /* see lmgr.c */
 
+       RelationInitPhysicalAddr(rel);
+
        /*
         * Okay to insert into the relcache hash tables.
         */
@@ -2123,6 +2405,14 @@ RelationBuildLocalRelation(const char *relname,
         */
        MemoryContextSwitchTo(oldcxt);
 
+       /* It's fully valid */
+       rel->rd_isvalid = true;
+
+       /*
+        * Caller expects us to pin the returned entry.
+        */
+       RelationIncrementReferenceCount(rel);
+
        return rel;
 }
 
@@ -2131,10 +2421,11 @@ RelationBuildLocalRelation(const char *relname,
  *
  *             This initializes the relation descriptor cache.  At the time
  *             that this is invoked, we can't do database access yet (mainly
- *             because the transaction subsystem is not up), so we can't get
- *             "real" info.  However it's okay to read the pg_internal.init
- *             cache file, if one is available.  Otherwise we make phony
- *             entries for the minimum set of nailed-in-cache relations.
+ *             because the transaction subsystem is not up); all we are doing
+ *             is making an empty cache hashtable.  This must be done before
+ *             starting the initialization transaction, because otherwise
+ *             AtEOXact_RelationCache would crash if that transaction aborts
+ *             before we can get the relcache set up.
  */
 
 #define INITRELCACHESIZE               400
@@ -2154,46 +2445,15 @@ RelationCacheInitialize(void)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * create hashtables that index the relcache
+        * create hashtable that indexes the relcache
         */
        MemSet(&ctl, 0, sizeof(ctl));
-       ctl.keysize = sizeof(NameData);
-       ctl.entrysize = sizeof(RelNameCacheEnt);
-       RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
-                                                                          &ctl, HASH_ELEM);
-
        ctl.keysize = sizeof(Oid);
        ctl.entrysize = sizeof(RelIdCacheEnt);
-       ctl.hash = tag_hash;
+       ctl.hash = oid_hash;
        RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
                                                                  &ctl, HASH_ELEM | HASH_FUNCTION);
 
-       ctl.keysize = sizeof(RelFileNode);
-       ctl.entrysize = sizeof(RelNodeCacheEnt);
-       ctl.hash = tag_hash;
-       RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
-                                                                       &ctl, HASH_ELEM | HASH_FUNCTION);
-
-       /*
-        * Try to load the relcache cache file.  If successful, we're done for
-        * now.  Otherwise, initialize the cache with pre-made descriptors for
-        * the critical "nailed-in" system catalogs.
-        */
-       if (IsBootstrapProcessingMode() ||
-               !load_relcache_init_file())
-       {
-               formrdesc(RelationRelationName,
-                                 Natts_pg_class, Desc_pg_class);
-               formrdesc(AttributeRelationName,
-                                 Natts_pg_attribute, Desc_pg_attribute);
-               formrdesc(ProcedureRelationName,
-                                 Natts_pg_proc, Desc_pg_proc);
-               formrdesc(TypeRelationName,
-                                 Natts_pg_type, Desc_pg_type);
-
-#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
-       }
-
        MemoryContextSwitchTo(oldcxt);
 }
 
@@ -2202,73 +2462,116 @@ RelationCacheInitialize(void)
  *
  *             This is called as soon as the catcache and transaction system
  *             are functional.  At this point we can actually read data from
- *             the system catalogs.  Update the relcache entries made during
- *             RelationCacheInitialize, and make sure we have entries for the
- *             critical system indexes.
+ *             the system catalogs.  We first try to read pre-computed relcache
+ *             entries from the pg_internal.init file.  If that's missing or
+ *             broken, make phony entries for the minimum set of nailed-in-cache
+ *             relations.      Then (unless bootstrapping) make sure we have entries
+ *             for the critical system indexes.  Once we've done all this, we
+ *             have enough infrastructure to open any system catalog or use any
+ *             catcache.  The last step is to rewrite pg_internal.init if needed.
  */
 void
 RelationCacheInitializePhase2(void)
 {
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
+       MemoryContext oldcxt;
+       bool            needNewCacheFile = false;
 
+       /*
+        * switch to cache memory context
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       /*
+        * Try to load the relcache cache file.  If unsuccessful, bootstrap the
+        * cache with pre-made descriptors for the critical "nailed-in" system
+        * catalogs.
+        */
+       if (IsBootstrapProcessingMode() ||
+               !load_relcache_init_file())
+       {
+               needNewCacheFile = true;
+
+               formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
+                                 true, Natts_pg_class, Desc_pg_class);
+               formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
+                                 false, Natts_pg_attribute, Desc_pg_attribute);
+               formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
+                                 true, Natts_pg_proc, Desc_pg_proc);
+               formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
+                                 true, Natts_pg_type, Desc_pg_type);
+
+#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
+       }
+
+       MemoryContextSwitchTo(oldcxt);
+
+       /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
        if (IsBootstrapProcessingMode())
                return;
 
        /*
-        * If we didn't get the critical system indexes loaded into relcache,
-        * do so now.  These are critical because the catcache depends on them
-        * for catcache fetches that are done during relcache load.  Thus, we
-        * have an infinite-recursion problem.  We can break the recursion by
-        * doing heapscans instead of indexscans at certain key spots. To
-        * avoid hobbling performance, we only want to do that until we have
-        * the critical indexes loaded into relcache.  Thus, the flag
-        * criticalRelcachesBuilt is used to decide whether to do heapscan or
-        * indexscan at the key spots, and we set it true after we've loaded
-        * the critical indexes.
+        * If we didn't get the critical system indexes loaded into relcache, do
+        * so now.      These are critical because the catcache and/or opclass cache
+        * depend on them for fetches done during relcache load.  Thus, we have an
+        * infinite-recursion problem.  We can break the recursion by doing
+        * heapscans instead of indexscans at certain key spots. To avoid hobbling
+        * performance, we only want to do that until we have the critical indexes
+        * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
+        * decide whether to do heapscan or indexscan at the key spots, and we set
+        * it true after we've loaded the critical indexes.
+        *
+        * The critical indexes are marked as "nailed in cache", partly to make it
+        * easy for load_relcache_init_file to count them, but mainly because we
+        * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
+        * true.  (NOTE: perhaps it would be possible to reload them by
+        * temporarily setting criticalRelcachesBuilt to false again.  For now,
+        * though, we just nail 'em in.)
         *
-        * The critical indexes are marked as "nailed in cache", partly to make
-        * it easy for load_relcache_init_file to count them, but mainly
-        * because we cannot flush and rebuild them once we've set
-        * criticalRelcachesBuilt to true.      (NOTE: perhaps it would be
-        * possible to reload them by temporarily setting
-        * criticalRelcachesBuilt to false again.  For now, though, we just
-        * nail 'em in.)
+        * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
+        * in the same way as the others, because the critical catalogs don't
+        * (currently) have any rules or triggers, and so these indexes can be
+        * rebuilt without inducing recursion.  However they are used during
+        * relcache load when a rel does have rules or triggers, so we choose to
+        * nail them for performance reasons.
         */
        if (!criticalRelcachesBuilt)
        {
-               RelationBuildDescInfo buildinfo;
                Relation        ird;
 
-#define LOAD_CRIT_INDEX(indname) \
+#define LOAD_CRIT_INDEX(indexoid) \
                do { \
-                       buildinfo.infotype = INFO_RELNAME; \
-                       buildinfo.i.info_name = (indname); \
-                       ird = RelationBuildDesc(buildinfo, NULL); \
+                       ird = RelationBuildDesc(indexoid, NULL); \
+                       if (ird == NULL) \
+                               elog(PANIC, "could not open critical system index %u", \
+                                        indexoid); \
                        ird->rd_isnailed = true; \
-                       RelationSetReferenceCount(ird, 1); \
+                       ird->rd_refcnt = 1; \
                } while (0)
 
-               LOAD_CRIT_INDEX(ClassNameNspIndex);
-               LOAD_CRIT_INDEX(ClassOidIndex);
-               LOAD_CRIT_INDEX(AttributeRelidNumIndex);
-               LOAD_CRIT_INDEX(IndexRelidIndex);
-               LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
-               LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
-               LOAD_CRIT_INDEX(OperatorOidIndex);
+               LOAD_CRIT_INDEX(ClassOidIndexId);
+               LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
+               LOAD_CRIT_INDEX(IndexRelidIndexId);
+               LOAD_CRIT_INDEX(OpclassOidIndexId);
+               LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
+               LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
+               LOAD_CRIT_INDEX(OperatorOidIndexId);
+               LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
+               LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
 
-#define NUM_CRITICAL_INDEXES   7               /* fix if you change list above */
+#define NUM_CRITICAL_INDEXES   9               /* fix if you change list above */
 
                criticalRelcachesBuilt = true;
        }
 
        /*
-        * Now, scan all the relcache entries and update anything that might
-        * be wrong in the results from formrdesc or the relcache cache file.
-        * If we faked up relcache entries using formrdesc, then read the real
-        * pg_class rows and replace the fake entries with them. Also, if any
-        * of the relcache entries have rules or triggers, load that info the
-        * hard way since it isn't recorded in the cache file.
+        * Now, scan all the relcache entries and update anything that might be
+        * wrong in the results from formrdesc or the relcache cache file. If we
+        * faked up relcache entries using formrdesc, then read the real pg_class
+        * rows and replace the fake entries with them. Also, if any of the
+        * relcache entries have rules or triggers, load that info the hard way
+        * since it isn't recorded in the cache file.
         */
        hash_seq_init(&status, RelationIdCache);
 
@@ -2285,11 +2588,11 @@ RelationCacheInitializePhase2(void)
                        Form_pg_class relp;
 
                        htup = SearchSysCache(RELOID,
-                                                       ObjectIdGetDatum(RelationGetRelid(relation)),
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)),
                                                                  0, 0, 0);
                        if (!HeapTupleIsValid(htup))
-                               elog(FATAL, "RelationCacheInitializePhase2: no pg_class entry for %s",
-                                        RelationGetRelationName(relation));
+                               elog(FATAL, "cache lookup failed for relation %u",
+                                        RelationGetRelid(relation));
                        relp = (Form_pg_class) GETSTRUCT(htup);
 
                        /*
@@ -2298,6 +2601,17 @@ RelationCacheInitializePhase2(void)
                         */
                        Assert(relation->rd_rel != NULL);
                        memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
+
+                       /* Update rd_options while we have the tuple */
+                       if (relation->rd_options)
+                               pfree(relation->rd_options);
+                       RelationParseRelOptions(relation, htup);
+
+                       /*
+                        * Also update the derived fields in rd_att.
+                        */
+                       relation->rd_att->tdtypeid = relp->reltype;
+                       relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
                        relation->rd_att->tdhasoid = relp->relhasoids;
 
                        ReleaseSysCache(htup);
@@ -2311,28 +2625,17 @@ RelationCacheInitializePhase2(void)
                if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
                        RelationBuildTriggers(relation);
        }
-}
-
-/*
- *             RelationCacheInitializePhase3
- *
- *             Final step of relcache initialization: write out a new relcache
- *             cache file if one is needed.
- */
-void
-RelationCacheInitializePhase3(void)
-{
-       if (IsBootstrapProcessingMode())
-               return;
 
+       /*
+        * Lastly, write out a new relcache cache file if one is needed.
+        */
        if (needNewCacheFile)
        {
                /*
-                * Force all the catcaches to finish initializing and thereby open
-                * the catalogs and indexes they use.  This will preload the
-                * relcache with entries for all the most important system
-                * catalogs and indexes, so that the init file will be most useful
-                * for future backends.
+                * Force all the catcaches to finish initializing and thereby open the
+                * catalogs and indexes they use.  This will preload the relcache with
+                * entries for all the most important system catalogs and indexes, so
+                * that the init file will be most useful for future backends.
                 */
                InitCatalogCachePhase2();
 
@@ -2341,63 +2644,74 @@ RelationCacheInitializePhase3(void)
        }
 }
 
-
-/* used by XLogInitCache */
-void           CreateDummyCaches(void);
-void           DestroyDummyCaches(void);
-
-void
-CreateDummyCaches(void)
+/*
+ * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
+ * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
+ *
+ * We need this kluge because we have to be able to access non-fixed-width
+ * fields of pg_class and pg_index before we have the standard catalog caches
+ * available.  We use predefined data that's set up in just the same way as
+ * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
+ * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
+ * does it have a TupleConstr field.  But it's good enough for the purpose of
+ * extracting fields.
+ */
+static TupleDesc
+BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
 {
+       TupleDesc       result;
        MemoryContext oldcxt;
-       HASHCTL         ctl;
-
-       if (!CacheMemoryContext)
-               CreateCacheMemoryContext();
+       int                     i;
 
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
-       MemSet(&ctl, 0, sizeof(ctl));
-       ctl.keysize = sizeof(NameData);
-       ctl.entrysize = sizeof(RelNameCacheEnt);
-       RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
-                                                                          &ctl, HASH_ELEM);
+       result = CreateTemplateTupleDesc(natts, hasoids);
+       result->tdtypeid = RECORDOID;           /* not right, but we don't care */
+       result->tdtypmod = -1;
 
-       ctl.keysize = sizeof(Oid);
-       ctl.entrysize = sizeof(RelIdCacheEnt);
-       ctl.hash = tag_hash;
-       RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
-                                                                 &ctl, HASH_ELEM | HASH_FUNCTION);
+       for (i = 0; i < natts; i++)
+       {
+               memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
+               /* make sure attcacheoff is valid */
+               result->attrs[i]->attcacheoff = -1;
+       }
 
-       ctl.keysize = sizeof(RelFileNode);
-       ctl.entrysize = sizeof(RelNodeCacheEnt);
-       ctl.hash = tag_hash;
-       RelationNodeCache = hash_create("Relcache by rnode", INITRELCACHESIZE,
-                                                                       &ctl, HASH_ELEM | HASH_FUNCTION);
+       /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
+       result->attrs[0]->attcacheoff = 0;
+
+       /* Note: we don't bother to set up a TupleConstr entry */
 
        MemoryContextSwitchTo(oldcxt);
+
+       return result;
 }
 
-void
-DestroyDummyCaches(void)
+static TupleDesc
+GetPgClassDescriptor(void)
 {
-       MemoryContext oldcxt;
+       static TupleDesc pgclassdesc = NULL;
 
-       if (!CacheMemoryContext)
-               return;
+       /* Already done? */
+       if (pgclassdesc == NULL)
+               pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
+                                                                                          Desc_pg_class,
+                                                                                          true);
 
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       return pgclassdesc;
+}
 
-       if (RelationIdCache)
-               hash_destroy(RelationIdCache);
-       if (RelationSysNameCache)
-               hash_destroy(RelationSysNameCache);
-       if (RelationNodeCache)
-               hash_destroy(RelationNodeCache);
+static TupleDesc
+GetPgIndexDescriptor(void)
+{
+       static TupleDesc pgindexdesc = NULL;
 
-       RelationIdCache = RelationSysNameCache = RelationNodeCache = NULL;
+       /* Already done? */
+       if (pgindexdesc == NULL)
+               pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
+                                                                                          Desc_pg_index,
+                                                                                          false);
 
-       MemoryContextSwitchTo(oldcxt);
+       return pgindexdesc;
 }
 
 static void
@@ -2414,57 +2728,54 @@ AttrDefaultFetch(Relation relation)
        int                     found;
        int                     i;
 
-       ScanKeyEntryInitialize(&skey,
-                                                  (bits16) 0x0,
-                                                  (AttrNumber) Anum_pg_attrdef_adrelid,
-                                                  (RegProcedure) F_OIDEQ,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)));
+       ScanKeyInit(&skey,
+                               Anum_pg_attrdef_adrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
-       adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
-                                                               SnapshotNow,
-                                                               1, &skey);
+       adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
+       adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
+                                                               SnapshotNow, 1, &skey);
        found = 0;
 
        while (HeapTupleIsValid(htup = systable_getnext(adscan)))
        {
                Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
 
-               found++;
                for (i = 0; i < ndef; i++)
                {
                        if (adform->adnum != attrdef[i].adnum)
                                continue;
                        if (attrdef[i].adbin != NULL)
-                               elog(WARNING, "AttrDefaultFetch: second record found for attr %s in rel %s",
-                                        NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
+                               elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
+                               NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
                                         RelationGetRelationName(relation));
+                       else
+                               found++;
 
                        val = fastgetattr(htup,
                                                          Anum_pg_attrdef_adbin,
                                                          adrel->rd_att, &isnull);
                        if (isnull)
-                               elog(WARNING, "AttrDefaultFetch: adbin IS NULL for attr %s in rel %s",
-                                        NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
+                               elog(WARNING, "null adbin for attr %s of rel %s",
+                               NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
                                         RelationGetRelationName(relation));
                        else
                                attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
-                                                        DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                val)));
+                                                                                                       TextDatumGetCString(val));
                        break;
                }
 
                if (i >= ndef)
-                       elog(WARNING, "AttrDefaultFetch: unexpected record found for attr %d in rel %s",
-                                adform->adnum,
-                                RelationGetRelationName(relation));
+                       elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
+                                adform->adnum, RelationGetRelationName(relation));
        }
 
        systable_endscan(adscan);
        heap_close(adrel, AccessShareLock);
 
        if (found != ndef)
-               elog(WARNING, "AttrDefaultFetch: %d record(s) not found for rel %s",
+               elog(WARNING, "%d attrdef record(s) missing for rel %s",
                         ndef - found, RelationGetRelationName(relation));
 }
 
@@ -2481,12 +2792,13 @@ CheckConstraintFetch(Relation relation)
        bool            isnull;
        int                     found = 0;
 
-       ScanKeyEntryInitialize(&skey[0], 0x0,
-                                                  Anum_pg_constraint_conrelid, F_OIDEQ,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)));
+       ScanKeyInit(&skey[0],
+                               Anum_pg_constraint_conrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       conrel = heap_openr(ConstraintRelationName, AccessShareLock);
-       conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
+       conrel = heap_open(ConstraintRelationId, AccessShareLock);
+       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
                                                                 SnapshotNow, 1, skey);
 
        while (HeapTupleIsValid(htup = systable_getnext(conscan)))
@@ -2497,24 +2809,23 @@ CheckConstraintFetch(Relation relation)
                if (conform->contype != CONSTRAINT_CHECK)
                        continue;
 
-               if (found == ncheck)
-                       elog(ERROR, "CheckConstraintFetch: unexpected record found for rel %s",
+               if (found >= ncheck)
+                       elog(ERROR, "unexpected constraint record found for rel %s",
                                 RelationGetRelationName(relation));
 
                check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
-                                                                                         NameStr(conform->conname));
+                                                                                                 NameStr(conform->conname));
 
                /* Grab and test conbin is actually set */
                val = fastgetattr(htup,
                                                  Anum_pg_constraint_conbin,
                                                  conrel->rd_att, &isnull);
                if (isnull)
-                       elog(ERROR, "CheckConstraintFetch: conbin IS NULL for rel %s",
+                       elog(ERROR, "null conbin for rel %s",
                                 RelationGetRelationName(relation));
 
                check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
-                                                        DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                val)));
+                                                                                                TextDatumGetCString(val));
                found++;
        }
 
@@ -2522,7 +2833,7 @@ CheckConstraintFetch(Relation relation)
        heap_close(conrel, AccessShareLock);
 
        if (found != ncheck)
-               elog(ERROR, "CheckConstraintFetch: %d record(s) not found for rel %s",
+               elog(ERROR, "%d constraint record(s) missing for rel %s",
                         ncheck - found, RelationGetRelationName(relation));
 }
 
@@ -2532,7 +2843,7 @@ CheckConstraintFetch(Relation relation)
  * The index list is created only if someone requests it.  We scan pg_index
  * to find relevant indexes, and add the list to the relcache entry so that
  * we won't have to compute it again.  Note that shared cache inval of a
- * relcache entry will delete the old list and set rd_indexfound to false,
+ * relcache entry will delete the old list and set rd_indexvalid to 0,
  * so that we must recompute the index list on next request.  This handles
  * creation or deletion of an index.
  *
@@ -2544,9 +2855,14 @@ CheckConstraintFetch(Relation relation)
  *
  * Since shared cache inval causes the relcache's copy of the list to go away,
  * we return a copy of the list palloc'd in the caller's context.  The caller
- * may freeList() the returned list after scanning it. This is necessary
+ * may list_free() the returned list after scanning it. This is necessary
  * since the caller will typically be doing syscache lookups on the relevant
  * indexes, and syscache lookup could cause SI messages to be processed!
+ *
+ * We also update rd_oidindex, which this module treats as effectively part
+ * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
+ * it is the pg_class OID of a unique index on OID when the relation has one,
+ * and InvalidOid if there is no such index.
  */
 List *
 RelationGetIndexList(Relation relation)
@@ -2556,37 +2872,46 @@ RelationGetIndexList(Relation relation)
        ScanKeyData skey;
        HeapTuple       htup;
        List       *result;
+       Oid                     oidIndex;
        MemoryContext oldcxt;
 
        /* Quick exit if we already computed the list. */
-       if (relation->rd_indexfound)
-               return listCopy(relation->rd_indexlist);
+       if (relation->rd_indexvalid != 0)
+               return list_copy(relation->rd_indexlist);
 
        /*
-        * We build the list we intend to return (in the caller's context)
-        * while doing the scan.  After successfully completing the scan, we
-        * copy that list into the relcache entry.      This avoids cache-context
-        * memory leakage if we get some sort of error partway through.
+        * We build the list we intend to return (in the caller's context) while
+        * doing the scan.      After successfully completing the scan, we copy that
+        * list into the relcache entry.  This avoids cache-context memory leakage
+        * if we get some sort of error partway through.
         */
        result = NIL;
+       oidIndex = InvalidOid;
 
        /* Prepare to scan pg_index for entries having indrelid = this rel. */
-       ScanKeyEntryInitialize(&skey,
-                                                  (bits16) 0x0,
-                                                  (AttrNumber) Anum_pg_index_indrelid,
-                                                  (RegProcedure) F_OIDEQ,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)));
+       ScanKeyInit(&skey,
+                               Anum_pg_index_indrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       indrel = heap_openr(IndexRelationName, AccessShareLock);
-       indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
-                                                                SnapshotNow,
-                                                                1, &skey);
+       indrel = heap_open(IndexRelationId, AccessShareLock);
+       indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
+                                                                SnapshotNow, 1, &skey);
 
        while (HeapTupleIsValid(htup = systable_getnext(indscan)))
        {
                Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
 
+               /* Add index's OID to result list in the proper order */
                result = insert_ordered_oid(result, index->indexrelid);
+
+               /* Check to see if it is a unique, non-partial btree index on OID */
+               if (index->indnatts == 1 &&
+                       index->indisunique &&
+                       index->indkey.values[0] == ObjectIdAttributeNumber &&
+                       index->indclass.values[0] == OID_BTREE_OPS_OID &&
+                       heap_attisnull(htup, Anum_pg_index_indpred))
+                       oidIndex = index->indexrelid;
        }
 
        systable_endscan(indscan);
@@ -2594,8 +2919,9 @@ RelationGetIndexList(Relation relation)
 
        /* Now save a copy of the completed list in the relcache entry. */
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       relation->rd_indexlist = listCopy(result);
-       relation->rd_indexfound = true;
+       relation->rd_indexlist = list_copy(result);
+       relation->rd_oidindex = oidIndex;
+       relation->rd_indexvalid = 1;
        MemoryContextSwitchTo(oldcxt);
 
        return result;
@@ -2613,26 +2939,314 @@ RelationGetIndexList(Relation relation)
 static List *
 insert_ordered_oid(List *list, Oid datum)
 {
-       List       *l;
+       ListCell   *prev;
 
        /* Does the datum belong at the front? */
-       if (list == NIL || datum < (Oid) lfirsti(list))
-               return lconsi(datum, list);
+       if (list == NIL || datum < linitial_oid(list))
+               return lcons_oid(datum, list);
        /* No, so find the entry it belongs after */
-       l = list;
+       prev = list_head(list);
        for (;;)
        {
-               List       *n = lnext(l);
+               ListCell   *curr = lnext(prev);
 
-               if (n == NIL || datum < (Oid) lfirsti(n))
-                       break;                          /* it belongs before n */
-               l = n;
+               if (curr == NULL || datum < lfirst_oid(curr))
+                       break;                          /* it belongs after 'prev', before 'curr' */
+
+               prev = curr;
        }
-       /* Insert datum into list after item l */
-       lnext(l) = lconsi(datum, lnext(l));
+       /* Insert datum into list after 'prev' */
+       lappend_cell_oid(list, prev, datum);
        return list;
 }
 
+/*
+ * RelationSetIndexList -- externally force the index list contents
+ *
+ * This is used to temporarily override what we think the set of valid
+ * indexes is (including the presence or absence of an OID index).
+ * The forcing will be valid only until transaction commit or abort.
+ *
+ * This should only be applied to nailed relations, because in a non-nailed
+ * relation the hacked index list could be lost at any time due to SI
+ * messages.  In practice it is only used on pg_class (see REINDEX).
+ *
+ * It is up to the caller to make sure the given list is correctly ordered.
+ */
+void
+RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
+{
+       MemoryContext oldcxt;
+
+       Assert(relation->rd_isnailed);
+       /* Copy the list into the cache context (could fail for lack of mem) */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       indexIds = list_copy(indexIds);
+       MemoryContextSwitchTo(oldcxt);
+       /* Okay to replace old list */
+       list_free(relation->rd_indexlist);
+       relation->rd_indexlist = indexIds;
+       relation->rd_oidindex = oidIndex;
+       relation->rd_indexvalid = 2;    /* mark list as forced */
+       /* must flag that we have a forced index list */
+       need_eoxact_work = true;
+       /* we deliberately do not change rd_indexattr */
+}
+
+/*
+ * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
+ *
+ * Returns InvalidOid if there is no such index.
+ */
+Oid
+RelationGetOidIndex(Relation relation)
+{
+       List       *ilist;
+
+       /*
+        * If relation doesn't have OIDs at all, caller is probably confused. (We
+        * could just silently return InvalidOid, but it seems better to throw an
+        * assertion.)
+        */
+       Assert(relation->rd_rel->relhasoids);
+
+       if (relation->rd_indexvalid == 0)
+       {
+               /* RelationGetIndexList does the heavy lifting. */
+               ilist = RelationGetIndexList(relation);
+               list_free(ilist);
+               Assert(relation->rd_indexvalid != 0);
+       }
+
+       return relation->rd_oidindex;
+}
+
+/*
+ * RelationGetIndexExpressions -- get the index expressions for an index
+ *
+ * We cache the result of transforming pg_index.indexprs into a node tree.
+ * If the rel is not an index or has no expressional columns, we return NIL.
+ * Otherwise, the returned tree is copied into the caller's memory context.
+ * (We don't want to return a pointer to the relcache copy, since it could
+ * disappear due to relcache invalidation.)
+ */
+List *
+RelationGetIndexExpressions(Relation relation)
+{
+       List       *result;
+       Datum           exprsDatum;
+       bool            isnull;
+       char       *exprsString;
+       MemoryContext oldcxt;
+
+       /* Quick exit if we already computed the result. */
+       if (relation->rd_indexprs)
+               return (List *) copyObject(relation->rd_indexprs);
+
+       /* Quick exit if there is nothing to do. */
+       if (relation->rd_indextuple == NULL ||
+               heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
+               return NIL;
+
+       /*
+        * We build the tree we intend to return in the caller's context. After
+        * successfully completing the work, we copy it into the relcache entry.
+        * This avoids problems if we get some sort of error partway through.
+        */
+       exprsDatum = heap_getattr(relation->rd_indextuple,
+                                                         Anum_pg_index_indexprs,
+                                                         GetPgIndexDescriptor(),
+                                                         &isnull);
+       Assert(!isnull);
+       exprsString = TextDatumGetCString(exprsDatum);
+       result = (List *) stringToNode(exprsString);
+       pfree(exprsString);
+
+       /*
+        * Run the expressions through eval_const_expressions. This is not just an
+        * optimization, but is necessary, because the planner will be comparing
+        * them to similarly-processed qual clauses, and may fail to detect valid
+        * matches without this.  We don't bother with canonicalize_qual, however.
+        */
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
+
+       /*
+        * Also mark any coercion format fields as "don't care", so that the
+        * planner can match to both explicit and implicit coercions.
+        */
+       set_coercionform_dontcare((Node *) result);
+
+       /* May as well fix opfuncids too */
+       fix_opfuncids((Node *) result);
+
+       /* Now save a copy of the completed tree in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indexprs = (List *) copyObject(result);
+       MemoryContextSwitchTo(oldcxt);
+
+       return result;
+}
+
+/*
+ * RelationGetIndexPredicate -- get the index predicate for an index
+ *
+ * We cache the result of transforming pg_index.indpred into an implicit-AND
+ * node tree (suitable for ExecQual).
+ * If the rel is not an index or has no predicate, we return NIL.
+ * Otherwise, the returned tree is copied into the caller's memory context.
+ * (We don't want to return a pointer to the relcache copy, since it could
+ * disappear due to relcache invalidation.)
+ */
+List *
+RelationGetIndexPredicate(Relation relation)
+{
+       List       *result;
+       Datum           predDatum;
+       bool            isnull;
+       char       *predString;
+       MemoryContext oldcxt;
+
+       /* Quick exit if we already computed the result. */
+       if (relation->rd_indpred)
+               return (List *) copyObject(relation->rd_indpred);
+
+       /* Quick exit if there is nothing to do. */
+       if (relation->rd_indextuple == NULL ||
+               heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
+               return NIL;
+
+       /*
+        * We build the tree we intend to return in the caller's context. After
+        * successfully completing the work, we copy it into the relcache entry.
+        * This avoids problems if we get some sort of error partway through.
+        */
+       predDatum = heap_getattr(relation->rd_indextuple,
+                                                        Anum_pg_index_indpred,
+                                                        GetPgIndexDescriptor(),
+                                                        &isnull);
+       Assert(!isnull);
+       predString = TextDatumGetCString(predDatum);
+       result = (List *) stringToNode(predString);
+       pfree(predString);
+
+       /*
+        * Run the expression through const-simplification and canonicalization.
+        * This is not just an optimization, but is necessary, because the planner
+        * will be comparing it to similarly-processed qual clauses, and may fail
+        * to detect valid matches without this.  This must match the processing
+        * done to qual clauses in preprocess_expression()!  (We can skip the
+        * stuff involving subqueries, however, since we don't allow any in index
+        * predicates.)
+        */
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
+
+       result = (List *) canonicalize_qual((Expr *) result);
+
+       /*
+        * Also mark any coercion format fields as "don't care", so that the
+        * planner can match to both explicit and implicit coercions.
+        */
+       set_coercionform_dontcare((Node *) result);
+
+       /* Also convert to implicit-AND format */
+       result = make_ands_implicit((Expr *) result);
+
+       /* May as well fix opfuncids too */
+       fix_opfuncids((Node *) result);
+
+       /* Now save a copy of the completed tree in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indpred = (List *) copyObject(result);
+       MemoryContextSwitchTo(oldcxt);
+
+       return result;
+}
+
+/*
+ * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
+ *
+ * The result has a bit set for each attribute used anywhere in the index
+ * definitions of all the indexes on this relation.  (This includes not only
+ * simple index keys, but attributes used in expressions and partial-index
+ * predicates.)
+ *
+ * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
+ * we can include system attributes (e.g., OID) in the bitmap representation.
+ *
+ * The returned result is palloc'd in the caller's memory context and should
+ * be bms_free'd when not needed anymore.
+ */
+Bitmapset *
+RelationGetIndexAttrBitmap(Relation relation)
+{
+       Bitmapset  *indexattrs;
+       List       *indexoidlist;
+       ListCell   *l;
+       MemoryContext oldcxt;
+
+       /* Quick exit if we already computed the result. */
+       if (relation->rd_indexattr != NULL)
+               return bms_copy(relation->rd_indexattr);
+
+       /* Fast path if definitely no indexes */
+       if (!RelationGetForm(relation)->relhasindex)
+               return NULL;
+
+       /*
+        * Get cached list of index OIDs
+        */
+       indexoidlist = RelationGetIndexList(relation);
+
+       /* Fall out if no indexes (but relhasindex was set) */
+       if (indexoidlist == NIL)
+               return NULL;
+
+       /*
+        * For each index, add referenced attributes to indexattrs.
+        */
+       indexattrs = NULL;
+       foreach(l, indexoidlist)
+       {
+               Oid                     indexOid = lfirst_oid(l);
+               Relation        indexDesc;
+               IndexInfo  *indexInfo;
+               int                     i;
+
+               indexDesc = index_open(indexOid, AccessShareLock);
+
+               /* Extract index key information from the index's pg_index row */
+               indexInfo = BuildIndexInfo(indexDesc);
+
+               /* Collect simple attribute references */
+               for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+               {
+                       int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
+
+                       if (attrnum != 0)
+                               indexattrs = bms_add_member(indexattrs,
+                                                          attrnum - FirstLowInvalidHeapAttributeNumber);
+               }
+
+               /* Collect all attributes used in expressions, too */
+               pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);
+
+               /* Collect all attributes in the index predicate, too */
+               pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);
+
+               index_close(indexDesc, AccessShareLock);
+       }
+
+       list_free(indexoidlist);
+
+       /* Now save a copy of the bitmap in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indexattr = bms_copy(indexattrs);
+       MemoryContextSwitchTo(oldcxt);
+
+       /* We return our original working copy for caller to play with */
+       return indexattrs;
+}
+
 
 /*
  *     load_relcache_init_file, write_relcache_init_file
@@ -2679,7 +3293,7 @@ insert_ordered_oid(List *list, Oid datum)
  * load_relcache_init_file -- attempt to load cache from the init file
  *
  * If successful, return TRUE and set criticalRelcachesBuilt to true.
- * If not successful, return FALSE and set needNewCacheFile to true.
+ * If not successful, return FALSE.
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
@@ -2693,7 +3307,8 @@ load_relcache_init_file(void)
                                num_rels,
                                max_rels,
                                nailed_rels,
-                               nailed_indexes;
+                               nailed_indexes,
+                               magic;
        int                     i;
 
        snprintf(initfilename, sizeof(initfilename), "%s/%s",
@@ -2701,15 +3316,12 @@ load_relcache_init_file(void)
 
        fp = AllocateFile(initfilename, PG_BINARY_R);
        if (fp == NULL)
-       {
-               needNewCacheFile = true;
                return false;
-       }
 
        /*
-        * Read the index relcache entries from the file.  Note we will not
-        * enter any of them into the cache if the read fails partway through;
-        * this helps to guard against broken init files.
+        * Read the index relcache entries from the file.  Note we will not enter
+        * any of them into the cache if the read fails partway through; this
+        * helps to guard against broken init files.
         */
        max_rels = 100;
        rels = (Relation *) palloc(max_rels * sizeof(Relation));
@@ -2717,12 +3329,19 @@ load_relcache_init_file(void)
        nailed_rels = nailed_indexes = 0;
        initFileRelationIds = NIL;
 
+       /* check for correct magic number (compatible version) */
+       if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
+               goto read_failed;
+       if (magic != RELCACHE_INIT_FILEMAGIC)
+               goto read_failed;
+
        for (relno = 0;; relno++)
        {
                Size            len;
                size_t          nread;
                Relation        rel;
                Form_pg_class relform;
+               bool            has_not_null;
 
                /* first read the relation descriptor length */
                if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
@@ -2762,17 +3381,48 @@ load_relcache_init_file(void)
                /* initialize attribute tuple forms */
                rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
                                                                                          relform->relhasoids);
+               rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
+
+               rel->rd_att->tdtypeid = relform->reltype;
+               rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
 
                /* next read all the attribute tuple form data entries */
+               has_not_null = false;
                for (i = 0; i < relform->relnatts; i++)
                {
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
+                       if (len != ATTRIBUTE_TUPLE_SIZE)
+                               goto read_failed;
+                       if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
+                               goto read_failed;
 
-                       rel->rd_att->attrs[i] = (Form_pg_attribute) palloc(len);
+                       has_not_null |= rel->rd_att->attrs[i]->attnotnull;
+               }
 
-                       if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
+               /* next read the access method specific field */
+               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       goto read_failed;
+               if (len > 0)
+               {
+                       rel->rd_options = palloc(len);
+                       if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
                                goto read_failed;
+                       if (len != VARSIZE(rel->rd_options))
+                               goto read_failed;               /* sanity check */
+               }
+               else
+               {
+                       rel->rd_options = NULL;
+               }
+
+               /* mark not-null status */
+               if (has_not_null)
+               {
+                       TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+                       constr->has_not_null = true;
+                       rel->rd_att->constr = constr;
                }
 
                /* If it's an index, there's more to do */
@@ -2780,24 +3430,29 @@ load_relcache_init_file(void)
                {
                        Form_pg_am      am;
                        MemoryContext indexcxt;
-                       IndexStrategy strat;
+                       Oid                *opfamily;
+                       Oid                *opcintype;
                        Oid                *operator;
                        RegProcedure *support;
-                       int                     nstrategies,
-                                               nsupport;
+                       int                     nsupport;
+                       int16      *indoption;
 
                        /* Count nailed indexes to ensure we have 'em all */
                        if (rel->rd_isnailed)
                                nailed_indexes++;
 
-                       /* next, read the pg_index tuple form */
+                       /* next, read the pg_index tuple */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
 
-                       rel->rd_index = (Form_pg_index) palloc(len);
-                       if ((nread = fread(rel->rd_index, 1, len, fp)) != len)
+                       rel->rd_indextuple = (HeapTuple) palloc(len);
+                       if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
                                goto read_failed;
 
+                       /* Fix up internal pointers in the tuple -- see heap_copytuple */
+                       rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
+                       rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
+
                        /* next, read the access method tuple form */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
@@ -2813,25 +3468,30 @@ load_relcache_init_file(void)
                         */
                        indexcxt = AllocSetContextCreate(CacheMemoryContext,
                                                                                         RelationGetRelationName(rel),
-                                                                                        0, /* minsize */
-                                                                                        512,           /* initsize */
-                                                                                        1024);         /* maxsize */
+                                                                                        ALLOCSET_SMALL_MINSIZE,
+                                                                                        ALLOCSET_SMALL_INITSIZE,
+                                                                                        ALLOCSET_SMALL_MAXSIZE);
                        rel->rd_indexcxt = indexcxt;
 
-                       /* next, read the index strategy map */
+                       /* next, read the vector of opfamily OIDs */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
 
-                       strat = (IndexStrategy) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(strat, 1, len, fp)) != len)
+                       opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(opfamily, 1, len, fp)) != len)
+                               goto read_failed;
+
+                       rel->rd_opfamily = opfamily;
+
+                       /* next, read the vector of opcintype OIDs */
+                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
 
-                       /* have to invalidate any FmgrInfo data in the strategy maps */
-                       nstrategies = am->amstrategies * relform->relnatts;
-                       for (i = 0; i < nstrategies; i++)
-                               strat->strategyMapData[i].entry[0].sk_func.fn_oid = InvalidOid;
+                       opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(opcintype, 1, len, fp)) != len)
+                               goto read_failed;
 
-                       rel->rd_istrat = strat;
+                       rel->rd_opcintype = opcintype;
 
                        /* next, read the vector of operator OIDs */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
@@ -2843,7 +3503,7 @@ load_relcache_init_file(void)
 
                        rel->rd_operator = operator;
 
-                       /* finally, read the vector of support procedures */
+                       /* next, read the vector of support procedures */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
                        support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
@@ -2852,11 +3512,22 @@ load_relcache_init_file(void)
 
                        rel->rd_support = support;
 
-                       /* add a zeroed support-fmgr-info vector */
+                       /* finally, read the vector of indoption values */
+                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                               goto read_failed;
+
+                       indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(indoption, 1, len, fp)) != len)
+                               goto read_failed;
+
+                       rel->rd_indoption = indoption;
+
+                       /* set up zeroed fmgr-info vectors */
+                       rel->rd_aminfo = (RelationAmInfo *)
+                               MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
                        nsupport = relform->relnatts * am->amsupport;
                        rel->rd_supportinfo = (FmgrInfo *)
-                               MemoryContextAlloc(indexcxt, nsupport * sizeof(FmgrInfo));
-                       MemSet(rel->rd_supportinfo, 0, nsupport * sizeof(FmgrInfo));
+                               MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
                }
                else
                {
@@ -2865,55 +3536,62 @@ load_relcache_init_file(void)
                                nailed_rels++;
 
                        Assert(rel->rd_index == NULL);
+                       Assert(rel->rd_indextuple == NULL);
                        Assert(rel->rd_am == NULL);
                        Assert(rel->rd_indexcxt == NULL);
-                       Assert(rel->rd_istrat == NULL);
+                       Assert(rel->rd_aminfo == NULL);
+                       Assert(rel->rd_opfamily == NULL);
+                       Assert(rel->rd_opcintype == NULL);
                        Assert(rel->rd_operator == NULL);
                        Assert(rel->rd_support == NULL);
                        Assert(rel->rd_supportinfo == NULL);
+                       Assert(rel->rd_indoption == NULL);
                }
 
                /*
                 * Rules and triggers are not saved (mainly because the internal
-                * format is complex and subject to change).  They must be rebuilt
-                * if needed by RelationCacheInitializePhase2.  This is not
-                * expected to be a big performance hit since few system catalogs
-                * have such.
+                * format is complex and subject to change).  They must be rebuilt if
+                * needed by RelationCacheInitializePhase2.  This is not expected to
+                * be a big performance hit since few system catalogs have such. Ditto
+                * for index expressions and predicates.
                 */
                rel->rd_rules = NULL;
                rel->rd_rulescxt = NULL;
                rel->trigdesc = NULL;
+               rel->rd_indexprs = NIL;
+               rel->rd_indpred = NIL;
 
                /*
                 * Reset transient-state fields in the relcache entry
                 */
-               rel->rd_fd = -1;
+               rel->rd_smgr = NULL;
                rel->rd_targblock = InvalidBlockNumber;
                if (rel->rd_isnailed)
-                       RelationSetReferenceCount(rel, 1);
+                       rel->rd_refcnt = 1;
                else
-                       RelationSetReferenceCount(rel, 0);
-               rel->rd_indexfound = false;
+                       rel->rd_refcnt = 0;
+               rel->rd_indexvalid = 0;
                rel->rd_indexlist = NIL;
+               rel->rd_indexattr = NULL;
+               rel->rd_oidindex = InvalidOid;
+               rel->rd_createSubid = InvalidSubTransactionId;
+               rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+               rel->rd_amcache = NULL;
                MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
 
                /*
-                * Make sure database ID is correct.  This is needed in case the
-                * pg_internal.init file was copied from some other database by
-                * CREATE DATABASE.
+                * Recompute lock and physical addressing info.  This is needed in
+                * case the pg_internal.init file was copied from some other database
+                * by CREATE DATABASE.
                 */
-               if (rel->rd_rel->relisshared)
-                       rel->rd_node.tblNode = InvalidOid;
-               else
-                       rel->rd_node.tblNode = MyDatabaseId;
-
                RelationInitLockInfo(rel);
+               RelationInitPhysicalAddr(rel);
        }
 
        /*
-        * We reached the end of the init file without apparent problem. Did
-        * we get the right number of nailed items?  (This is a useful
-        * crosscheck in case the set of critical rels or indexes changes.)
+        * We reached the end of the init file without apparent problem. Did we
+        * get the right number of nailed items?  (This is a useful crosscheck in
+        * case the set of critical rels or indexes changes.)
         */
        if (nailed_rels != NUM_CRITICAL_RELS ||
                nailed_indexes != NUM_CRITICAL_INDEXES)
@@ -2928,8 +3606,8 @@ load_relcache_init_file(void)
        {
                RelationCacheInsert(rels[relno]);
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
-               initFileRelationIds = lconsi((int) RelationGetRelid(rels[relno]),
-                                                                        initFileRelationIds);
+               initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
+                                                                               initFileRelationIds);
        }
 
        pfree(rels);
@@ -2939,15 +3617,14 @@ load_relcache_init_file(void)
        return true;
 
        /*
-        * init file is broken, so do it the hard way.  We don't bother trying
-        * to free the clutter we just allocated; it's not in the relcache so
-        * it won't hurt.
+        * init file is broken, so do it the hard way.  We don't bother trying to
+        * free the clutter we just allocated; it's not in the relcache so it
+        * won't hurt.
         */
 read_failed:
        pfree(rels);
        FreeFile(fp);
 
-       needNewCacheFile = true;
        return false;
 }
 
@@ -2961,6 +3638,7 @@ write_relcache_init_file(void)
        FILE       *fp;
        char            tempfilename[MAXPGPATH];
        char            finalfilename[MAXPGPATH];
+       int                     magic;
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
        MemoryContext oldcxt;
@@ -2968,8 +3646,8 @@ write_relcache_init_file(void)
 
        /*
         * We must write a temporary file and rename it into place. Otherwise,
-        * another backend starting at about the same time might crash trying
-        * to read the partially-complete file.
+        * another backend starting at about the same time might crash trying to
+        * read the partially-complete file.
         */
        snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
                         DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
@@ -2985,10 +3663,22 @@ write_relcache_init_file(void)
                 * We used to consider this a fatal error, but we might as well
                 * continue with backend startup ...
                 */
-               elog(WARNING, "Cannot create init file %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename);
+               ereport(WARNING,
+                               (errcode_for_file_access(),
+                                errmsg("could not create relation-cache initialization file \"%s\": %m",
+                                               tempfilename),
+                         errdetail("Continuing anyway, but there's something wrong.")));
                return;
        }
 
+       /*
+        * Write a magic number to serve as a file version identifier.  We can
+        * change the magic number whenever the relcache layout changes.
+        */
+       magic = RELCACHE_INIT_FILEMAGIC;
+       if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
+               elog(FATAL, "could not write init file");
+
        /*
         * Write all the reldescs (in no particular order).
         */
@@ -3000,114 +3690,81 @@ write_relcache_init_file(void)
        {
                Relation        rel = idhentry->reldesc;
                Form_pg_class relform = rel->rd_rel;
-               Size            len;
 
-               /*
-                * first write the relcache entry proper
-                */
-               len = sizeof(RelationData);
-
-               /* first, write the relation descriptor length */
-               if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                       elog(FATAL, "cannot write init file -- descriptor length");
-
-               /* next, write out the Relation structure */
-               if (fwrite(rel, 1, len, fp) != len)
-                       elog(FATAL, "cannot write init file -- reldesc");
+               /* first write the relcache entry proper */
+               write_item(rel, sizeof(RelationData), fp);
 
                /* next write the relation tuple form */
-               len = sizeof(FormData_pg_class);
-               if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                       elog(FATAL, "cannot write init file -- relation tuple form length");
-
-               if (fwrite(relform, 1, len, fp) != len)
-                       elog(FATAL, "cannot write init file -- relation tuple form");
+               write_item(relform, CLASS_TUPLE_SIZE, fp);
 
                /* next, do all the attribute tuple form data entries */
                for (i = 0; i < relform->relnatts; i++)
                {
-                       len = ATTRIBUTE_TUPLE_SIZE;
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- length of attdesc %d", i);
-                       if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- attdesc %d", i);
+                       write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
                }
 
+               /* next, do the access method specific field */
+               write_item(rel->rd_options,
+                                  (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
+                                  fp);
+
                /* If it's an index, there's more to do */
                if (rel->rd_rel->relkind == RELKIND_INDEX)
                {
                        Form_pg_am      am = rel->rd_am;
-                       HeapTuple       tuple;
 
-                       /*
-                        * We need to write the index tuple form, but this is a bit
-                        * tricky since it's a variable-length struct.  Rather than
-                        * hoping to intuit the length, fetch the pg_index tuple
-                        * afresh using the syscache, and write that.
-                        */
-                       tuple = SearchSysCache(INDEXRELID,
-                                                                ObjectIdGetDatum(RelationGetRelid(rel)),
-                                                                  0, 0, 0);
-                       if (!HeapTupleIsValid(tuple))
-                               elog(ERROR, "write_relcache_init_file: no pg_index entry for index %u",
-                                        RelationGetRelid(rel));
-                       len = tuple->t_len - tuple->t_data->t_hoff;
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- index tuple form length");
-                       if (fwrite(GETSTRUCT(tuple), 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- index tuple form");
-                       ReleaseSysCache(tuple);
+                       /* write the pg_index tuple */
+                       /* we assume this was created by heap_copytuple! */
+                       write_item(rel->rd_indextuple,
+                                          HEAPTUPLESIZE + rel->rd_indextuple->t_len,
+                                          fp);
 
                        /* next, write the access method tuple form */
-                       len = sizeof(FormData_pg_am);
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- am tuple form length");
+                       write_item(am, sizeof(FormData_pg_am), fp);
 
-                       if (fwrite(am, 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- am tuple form");
+                       /* next, write the vector of opfamily OIDs */
+                       write_item(rel->rd_opfamily,
+                                          relform->relnatts * sizeof(Oid),
+                                          fp);
 
-                       /* next, write the index strategy map */
-                       len = AttributeNumberGetIndexStrategySize(relform->relnatts,
-                                                                                                         am->amstrategies);
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- strategy map length");
-
-                       if (fwrite(rel->rd_istrat, 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- strategy map");
+                       /* next, write the vector of opcintype OIDs */
+                       write_item(rel->rd_opcintype,
+                                          relform->relnatts * sizeof(Oid),
+                                          fp);
 
                        /* next, write the vector of operator OIDs */
-                       len = relform->relnatts * (am->amstrategies * sizeof(Oid));
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- operator vector length");
-
-                       if (fwrite(rel->rd_operator, 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- operator vector");
-
-                       /* finally, write the vector of support procedures */
-                       len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "cannot write init file -- support vector length");
-
-                       if (fwrite(rel->rd_support, 1, len, fp) != len)
-                               elog(FATAL, "cannot write init file -- support vector");
+                       write_item(rel->rd_operator,
+                                          relform->relnatts * (am->amstrategies * sizeof(Oid)),
+                                          fp);
+
+                       /* next, write the vector of support procedures */
+                       write_item(rel->rd_support,
+                                 relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
+                                          fp);
+
+                       /* finally, write the vector of indoption values */
+                       write_item(rel->rd_indoption,
+                                          relform->relnatts * sizeof(int16),
+                                          fp);
                }
 
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
                oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               initFileRelationIds = lconsi((int) RelationGetRelid(rel),
-                                                                        initFileRelationIds);
+               initFileRelationIds = lcons_oid(RelationGetRelid(rel),
+                                                                               initFileRelationIds);
                MemoryContextSwitchTo(oldcxt);
        }
 
-       FreeFile(fp);
+       if (FreeFile(fp))
+               elog(FATAL, "could not write init file");
 
        /*
         * Now we have to check whether the data we've so painstakingly
-        * accumulated is already obsolete due to someone else's
-        * just-committed catalog changes.      If so, we just delete the temp
-        * file and leave it to the next backend to try again.  (Our own
-        * relcache entries will be updated by SI message processing, but we
-        * can't be sure whether what we wrote out was up-to-date.)
+        * accumulated is already obsolete due to someone else's just-committed
+        * catalog changes.  If so, we just delete the temp file and leave it to
+        * the next backend to try again.  (Our own relcache entries will be
+        * updated by SI message processing, but we can't be sure whether what we
+        * wrote out was up-to-date.)
         *
         * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
         * grab a serialization lock for the duration.
@@ -3118,8 +3775,8 @@ write_relcache_init_file(void)
        AcceptInvalidationMessages();
 
        /*
-        * If we have received any SI relcache invals since backend start,
-        * assume we may have written out-of-date data.
+        * If we have received any SI relcache invals since backend start, assume
+        * we may have written out-of-date data.
         */
        if (relcacheInvalsReceived == 0L)
        {
@@ -3128,19 +3785,12 @@ write_relcache_init_file(void)
                 * previously-existing init file.
                 *
                 * Note: a failure here is possible under Cygwin, if some other
-                * backend is holding open an unlinked-but-not-yet-gone init file.
-                * So treat this as a noncritical failure.
+                * backend is holding open an unlinked-but-not-yet-gone init file. So
+                * treat this as a noncritical failure; just remove the useless temp
+                * file on failure.
                 */
                if (rename(tempfilename, finalfilename) < 0)
-               {
-                       elog(WARNING, "Cannot rename init file %s to %s: %m\n\tContinuing anyway, but there's something wrong.", tempfilename, finalfilename);
-
-                       /*
-                        * If we fail, try to clean up the useless temp file; don't
-                        * bother to complain if this fails too.
-                        */
                        unlink(tempfilename);
-               }
        }
        else
        {
@@ -3151,6 +3801,16 @@ write_relcache_init_file(void)
        LWLockRelease(RelCacheInitLock);
 }
 
+/* write a chunk of data preceded by its length */
+static void
+write_item(const void *data, Size len, FILE *fp)
+{
+       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
+               elog(FATAL, "could not write init file");
+       if (fwrite(data, 1, len, fp) != len)
+               elog(FATAL, "could not write init file");
+}
+
 /*
  * Detect whether a given relation (identified by OID) is one of the ones
  * we store in the init file.
@@ -3163,7 +3823,7 @@ write_relcache_init_file(void)
 bool
 RelationIdIsInInitFile(Oid relationId)
 {
-       return intMember((int) relationId, initFileRelationIds);
+       return list_member_oid(initFileRelationIds, relationId);
 }
 
 /*
@@ -3176,7 +3836,7 @@ RelationIdIsInInitFile(Oid relationId)
  * just after sending them.  The unlink before ensures that a backend that's
  * currently starting cannot read the now-obsolete init file and then miss
  * the SI messages that will force it to update its relcache entries.  (This
- * works because the backend startup sequence gets into the PROC array before
+ * works because the backend startup sequence gets into the PGPROC array before
  * trying to load the init file.)  The unlink after is to synchronize with a
  * backend that may currently be trying to write an init file based on data
  * that we've just rendered invalid.  Such a backend will see the SI messages,
@@ -3203,14 +3863,35 @@ RelationCacheInitFileInvalidate(bool beforeSend)
                /*
                 * We need to interlock this against write_relcache_init_file, to
                 * guard against possibility that someone renames a new-but-
-                * already-obsolete init file into place just after we unlink.
-                * With the interlock, it's certain that write_relcache_init_file
-                * will notice our SI inval message before renaming into place, or
-                * else that we will execute second and successfully unlink the
-                * file.
+                * already-obsolete init file into place just after we unlink. With
+                * the interlock, it's certain that write_relcache_init_file will
+                * notice our SI inval message before renaming into place, or else
+                * that we will execute second and successfully unlink the file.
                 */
                LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
                unlink(initfilename);
                LWLockRelease(RelCacheInitLock);
        }
 }
+
+/*
+ * Remove the init file for a given database during postmaster startup.
+ *
+ * We used to keep the init file across restarts, but that is unsafe in PITR
+ * scenarios, and even in simple crash-recovery cases there are windows for
+ * the init file to become out-of-sync with the database.  So now we just
+ * remove it during startup and expect the first backend launch to rebuild it.
+ * Of course, this has to happen in each database of the cluster.  For
+ * simplicity this is driven by flatfiles.c, which has to scan pg_database
+ * anyway.
+ */
+void
+RelationCacheInitFileRemove(const char *dbPath)
+{
+       char            initfilename[MAXPGPATH];
+
+       snprintf(initfilename, sizeof(initfilename), "%s/%s",
+                        dbPath, RELCACHE_INIT_FILENAME);
+       unlink(initfilename);
+       /* ignore any error, since it might not be there at all */
+}