]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/cache/relcache.c
Fix an oversight I made in a cleanup patch over a year ago:
[postgresql] / src / backend / utils / cache / relcache.c
index d1143381d2d7ccddb9b4299ed47d6c973206d84f..9a32fd5845e77556076ee63aa39545622fb351bb 100644 (file)
@@ -3,22 +3,20 @@
  * relcache.c
  *       POSTGRES relation descriptor cache code
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.216 2005/03/07 04:42:16 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.270 2008/04/01 00:48:33 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
- *             RelationCacheInitialize                 - initialize relcache
+ *             RelationCacheInitialize                 - initialize relcache (to empty)
  *             RelationCacheInitializePhase2   - finish initializing relcache
  *             RelationIdGetRelation                   - get a reldesc by relation id
- *             RelationSysNameGetRelation              - get a reldesc by system rel name
- *             RelationIdCacheGetRelation              - get a cached reldesc by relid
  *             RelationClose                                   - close an open relation
  *
  * NOTES
  */
 #include "postgres.h"
 
-#include <errno.h>
 #include <sys/file.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/reloptions.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
-#include "catalog/catname.h"
+#include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
-#include "catalog/pg_attribute.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_constraint.h"
-#include "catalog/pg_index.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_proc.h"
 #include "optimizer/clauses.h"
 #include "optimizer/planmain.h"
 #include "optimizer/prep.h"
+#include "optimizer/var.h"
+#include "rewrite/rewriteDefine.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
-#include "utils/catcache.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
-#include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/resowner.h"
 #include "utils/syscache.h"
+#include "utils/tqual.h"
 #include "utils/typcache.h"
 
 
@@ -72,7 +72,7 @@
  */
 #define RELCACHE_INIT_FILENAME "pg_internal.init"
 
-#define RELCACHE_INIT_FILEMAGIC                0x573262        /* version ID value */
+#define RELCACHE_INIT_FILEMAGIC                0x573264        /* version ID value */
 
 /*
  *             hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
@@ -81,18 +81,21 @@ static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
+static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
 
 /*
  *             Hash tables that index the relation cache
  *
- *             Relations are looked up two ways, by OID and by name,
- *             thus there are two hash tables for referencing them.
- *
- *             The OID index covers all relcache entries.      The name index
- *             covers *only* system relations (only those in PG_CATALOG_NAMESPACE).
+ *             We used to index the cache by both name and OID, but now there
+ *             is only an index by OID.
  */
+typedef struct relidcacheent
+{
+       Oid                     reloid;
+       Relation        reldesc;
+} RelIdCacheEnt;
+
 static HTAB *RelationIdCache;
-static HTAB *RelationSysNameCache;
 
 /*
  * This flag is false until we have prepared the critical relcache entries
@@ -100,12 +103,6 @@ static HTAB *RelationSysNameCache;
  */
 bool           criticalRelcachesBuilt = false;
 
-/*
- * This flag is set if we discover that we need to write a new relcache
- * cache file at the end of startup.
- */
-static bool needNewCacheFile = false;
-
 /*
  * This counter counts relcache inval events received since backend startup
  * (but only for rels that are actually in cache).     Presently, we use it only
@@ -121,37 +118,10 @@ static long relcacheInvalsReceived = 0L;
 static List *initFileRelationIds = NIL;
 
 /*
- * This flag lets us optimize away work in AtEOSubXact_RelationCache().
+ * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
  */
-static bool need_eosubxact_work = false;
+static bool need_eoxact_work = false;
 
-/*
- *             RelationBuildDescInfo exists so code can be shared
- *             between RelationIdGetRelation() and RelationSysNameGetRelation()
- */
-typedef struct RelationBuildDescInfo
-{
-       int                     infotype;               /* lookup by id or by name */
-#define INFO_RELID 1
-#define INFO_RELNAME 2
-       union
-       {
-               Oid                     info_id;        /* relation object id */
-               char       *info_name;  /* system relation name */
-       }                       i;
-} RelationBuildDescInfo;
-
-typedef struct relidcacheent
-{
-       Oid                     reloid;
-       Relation        reldesc;
-} RelIdCacheEnt;
-
-typedef struct relnamecacheent
-{
-       NameData        relname;
-       Relation        reldesc;
-} RelNameCacheEnt;
 
 /*
  *             macros to manipulate the lookup hashtables
@@ -163,45 +133,15 @@ do { \
                                                                                   (void *) &(RELATION->rd_id), \
                                                                                   HASH_ENTER, \
                                                                                   &found); \
-       if (idhentry == NULL) \
-               ereport(ERROR, \
-                               (errcode(ERRCODE_OUT_OF_MEMORY), \
-                                errmsg("out of memory"))); \
        /* used to give notice if found -- now just keep quiet */ \
        idhentry->reldesc = RELATION; \
-       if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
-       { \
-               char *relname = RelationGetRelationName(RELATION); \
-               RelNameCacheEnt *namehentry; \
-               namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                                  relname, \
-                                                                                                  HASH_ENTER, \
-                                                                                                  &found); \
-               if (namehentry == NULL) \
-                       ereport(ERROR, \
-                                       (errcode(ERRCODE_OUT_OF_MEMORY), \
-                                        errmsg("out of memory"))); \
-               /* used to give notice if found -- now just keep quiet */ \
-               namehentry->reldesc = RELATION; \
-       } \
 } while(0)
 
 #define RelationIdCacheLookup(ID, RELATION) \
 do { \
        RelIdCacheEnt *hentry; \
        hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                (void *)&(ID), HASH_FIND,NULL); \
-       if (hentry) \
-               RELATION = hentry->reldesc; \
-       else \
-               RELATION = NULL; \
-} while(0)
-
-#define RelationSysNameCacheLookup(NAME, RELATION) \
-do { \
-       RelNameCacheEnt *hentry; \
-       hentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                  (void *) (NAME), HASH_FIND,NULL); \
+                                                                                (void *) &(ID), HASH_FIND,NULL); \
        if (hentry) \
                RELATION = hentry->reldesc; \
        else \
@@ -212,27 +152,18 @@ do { \
 do { \
        RelIdCacheEnt *idhentry; \
        idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                  (void *)&(RELATION->rd_id), \
+                                                                                  (void *) &(RELATION->rd_id), \
                                                                                   HASH_REMOVE, NULL); \
        if (idhentry == NULL) \
                elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
-       if (IsSystemNamespace(RelationGetNamespace(RELATION))) \
-       { \
-               char *relname = RelationGetRelationName(RELATION); \
-               RelNameCacheEnt *namehentry; \
-               namehentry = (RelNameCacheEnt*)hash_search(RelationSysNameCache, \
-                                                                                                  relname, \
-                                                                                                  HASH_REMOVE, NULL); \
-               if (namehentry == NULL) \
-                       elog(WARNING, "trying to delete a relname reldesc that does not exist"); \
-       } \
 } while(0)
 
 
 /*
  * Special cache for opclass-related information
  *
- * Note: only default-subtype operators and support procs get cached
+ * Note: only default operators and support procs get cached, ie, those with
+ * lefttype = righttype = opcintype.
  */
 typedef struct opclasscacheent
 {
@@ -240,6 +171,8 @@ typedef struct opclasscacheent
        bool            valid;                  /* set TRUE after successful fill-in */
        StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
        StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
+       Oid                     opcfamily;              /* OID of opclass's family */
+       Oid                     opcintype;              /* OID of opclass's declared input type */
        Oid                *operatorOids;       /* strategy operators' OIDs */
        RegProcedure *supportProcs; /* support procs */
 } OpClassCacheEnt;
@@ -251,28 +184,31 @@ static HTAB *OpClassCache = NULL;
 
 static void RelationClearRelation(Relation relation, bool rebuild);
 
-static void RelationReloadClassinfo(Relation relation);
+static void RelationReloadIndexInfo(Relation relation);
 static void RelationFlushRelation(Relation relation);
-static Relation RelationSysNameCacheGetRelation(const char *relationName);
 static bool load_relcache_init_file(void);
 static void write_relcache_init_file(void);
+static void write_item(const void *data, Size len, FILE *fp);
 
 static void formrdesc(const char *relationName, Oid relationReltype,
-                                         bool hasoids, int natts, FormData_pg_attribute *att);
+                 bool hasoids, int natts, FormData_pg_attribute *att);
 
-static HeapTuple ScanPgRelation(RelationBuildDescInfo buildinfo, bool indexOK);
+static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
-static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
-                                          Relation relation);
-static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo,
-                                 Relation oldrelation);
+static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
+static void RelationBuildTupleDesc(Relation relation);
+static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
 static void RelationInitPhysicalAddr(Relation relation);
+static TupleDesc GetPgClassDescriptor(void);
+static TupleDesc GetPgIndexDescriptor(void);
 static void AttrDefaultFetch(Relation relation);
 static void CheckConstraintFetch(Relation relation);
 static List *insert_ordered_oid(List *list, Oid datum);
-static void IndexSupportInitialize(Form_pg_index iform,
+static void IndexSupportInitialize(oidvector *indclass,
                                           Oid *indexOperator,
                                           RegProcedure *indexSupport,
+                                          Oid *opFamily,
+                                          Oid *opcInType,
                                           StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber);
@@ -285,66 +221,38 @@ static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
  *             ScanPgRelation
  *
  *             this is used by RelationBuildDesc to find a pg_class
- *             tuple matching either a relation name or a relation id
- *             as specified in buildinfo.
+ *             tuple matching targetRelId.
  *
  *             NB: the returned tuple has been copied into palloc'd storage
  *             and must eventually be freed with heap_freetuple.
  */
 static HeapTuple
-ScanPgRelation(RelationBuildDescInfo buildinfo, bool indexOK)
+ScanPgRelation(Oid targetRelId, bool indexOK)
 {
        HeapTuple       pg_class_tuple;
        Relation        pg_class_desc;
-       const char *indexRelname;
        SysScanDesc pg_class_scan;
-       ScanKeyData key[2];
-       int                     nkeys;
+       ScanKeyData key[1];
 
        /*
         * form a scan key
         */
-       switch (buildinfo.infotype)
-       {
-               case INFO_RELID:
-                       ScanKeyInit(&key[0],
-                                               ObjectIdAttributeNumber,
-                                               BTEqualStrategyNumber, F_OIDEQ,
-                                               ObjectIdGetDatum(buildinfo.i.info_id));
-                       nkeys = 1;
-                       indexRelname = ClassOidIndex;
-                       break;
-
-               case INFO_RELNAME:
-                       ScanKeyInit(&key[0],
-                                               Anum_pg_class_relname,
-                                               BTEqualStrategyNumber, F_NAMEEQ,
-                                               NameGetDatum(buildinfo.i.info_name));
-                       ScanKeyInit(&key[1],
-                                               Anum_pg_class_relnamespace,
-                                               BTEqualStrategyNumber, F_OIDEQ,
-                                               ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
-                       nkeys = 2;
-                       indexRelname = ClassNameNspIndex;
-                       break;
-
-               default:
-                       elog(ERROR, "unrecognized buildinfo type: %d",
-                                buildinfo.infotype);
-                       return NULL;            /* keep compiler quiet */
-       }
+       ScanKeyInit(&key[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(targetRelId));
 
        /*
         * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
-        * built the critical relcache entries (this includes initdb and
-        * startup without a pg_internal.init file).  The caller can also
-        * force a heap scan by setting indexOK == false.
+        * built the critical relcache entries (this includes initdb and startup
+        * without a pg_internal.init file).  The caller can also force a heap
+        * scan by setting indexOK == false.
         */
-       pg_class_desc = heap_openr(RelationRelationName, AccessShareLock);
-       pg_class_scan = systable_beginscan(pg_class_desc, indexRelname,
+       pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
+       pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
                                                                           indexOK && criticalRelcachesBuilt,
                                                                           SnapshotNow,
-                                                                          nkeys, key);
+                                                                          1, key);
 
        pg_class_tuple = systable_getnext(pg_class_scan);
 
@@ -389,7 +297,7 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        /*
         * clear all fields of reldesc
         */
-       MemSet((char *) relation, 0, sizeof(RelationData));
+       MemSet(relation, 0, sizeof(RelationData));
        relation->rd_targblock = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
@@ -398,16 +306,18 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        /*
         * Copy the relation tuple form
         *
-        * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
-        * relacl is NOT stored in the relcache --- there'd be little point in
-        * it, since we don't copy the tuple's nullvalues bitmap and hence
-        * wouldn't know if the value is valid ... bottom line is that relacl
-        * *cannot* be retrieved from the relcache.  Get it from the syscache
-        * if you need it.
+        * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
+        * variable-length fields (relacl, reloptions) are NOT stored in the
+        * relcache --- there'd be little point in it, since we don't copy the
+        * tuple's nulls bitmap and hence wouldn't know if the values are valid.
+        * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
+        * it from the syscache if you need it.  The same goes for the original
+        * form of reloptions (however, we do store the parsed form of reloptions
+        * in rd_options).
         */
        relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
 
-       memcpy((char *) relationForm, (char *) relp, CLASS_TUPLE_SIZE);
+       memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
 
        /* initialize relation tuple form */
        relation->rd_rel = relationForm;
@@ -415,12 +325,83 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        /* and allocate attribute tuple form storage */
        relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
                                                                                           relationForm->relhasoids);
+       /* which we mark as a reference-counted tupdesc */
+       relation->rd_att->tdrefcount = 1;
 
        MemoryContextSwitchTo(oldcxt);
 
        return relation;
 }
 
+/*
+ * RelationParseRelOptions
+ *             Convert pg_class.reloptions into pre-parsed rd_options
+ *
+ * tuple is the real pg_class tuple (not rd_rel!) for relation
+ *
+ * Note: rd_rel and (if an index) rd_am must be valid already
+ */
+static void
+RelationParseRelOptions(Relation relation, HeapTuple tuple)
+{
+       Datum           datum;
+       bool            isnull;
+       bytea      *options;
+
+       relation->rd_options = NULL;
+
+       /* Fall out if relkind should not have options */
+       switch (relation->rd_rel->relkind)
+       {
+               case RELKIND_RELATION:
+               case RELKIND_TOASTVALUE:
+               case RELKIND_UNCATALOGED:
+               case RELKIND_INDEX:
+                       break;
+               default:
+                       return;
+       }
+
+       /*
+        * Fetch reloptions from tuple; have to use a hardwired descriptor because
+        * we might not have any other for pg_class yet (consider executing this
+        * code for pg_class itself)
+        */
+       datum = fastgetattr(tuple,
+                                               Anum_pg_class_reloptions,
+                                               GetPgClassDescriptor(),
+                                               &isnull);
+       if (isnull)
+               return;
+
+       /* Parse into appropriate format; don't error out here */
+       switch (relation->rd_rel->relkind)
+       {
+               case RELKIND_RELATION:
+               case RELKIND_TOASTVALUE:
+               case RELKIND_UNCATALOGED:
+                       options = heap_reloptions(relation->rd_rel->relkind, datum,
+                                                                         false);
+                       break;
+               case RELKIND_INDEX:
+                       options = index_reloptions(relation->rd_am->amoptions, datum,
+                                                                          false);
+                       break;
+               default:
+                       Assert(false);          /* can't get here */
+                       options = NULL;         /* keep compiler quiet */
+                       break;
+       }
+
+       /* Copy parsed data into CacheMemoryContext */
+       if (options)
+       {
+               relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
+                                                                                                 VARSIZE(options));
+               memcpy(relation->rd_options, options, VARSIZE(options));
+       }
+}
+
 /*
  *             RelationBuildTupleDesc
  *
@@ -428,8 +409,7 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
  *             the pg_attribute, pg_attrdef & pg_constraint system catalogs.
  */
 static void
-RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
-                                          Relation relation)
+RelationBuildTupleDesc(Relation relation)
 {
        HeapTuple       pg_attribute_tuple;
        Relation        pg_attribute_desc;
@@ -451,8 +431,8 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
 
        /*
         * Form a scan key that selects only user attributes (attnum > 0).
-        * (Eliminating system attribute rows at the index level is lots
-        * faster than fetching them.)
+        * (Eliminating system attribute rows at the index level is lots faster
+        * than fetching them.)
         */
        ScanKeyInit(&skey[0],
                                Anum_pg_attribute_attrelid,
@@ -464,13 +444,13 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
                                Int16GetDatum(0));
 
        /*
-        * Open pg_attribute and begin a scan.  Force heap scan if we haven't
-        * yet built the critical relcache entries (this includes initdb and
-        * startup without a pg_internal.init file).
+        * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
+        * built the critical relcache entries (this includes initdb and startup
+        * without a pg_internal.init file).
         */
-       pg_attribute_desc = heap_openr(AttributeRelationName, AccessShareLock);
+       pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
        pg_attribute_scan = systable_beginscan(pg_attribute_desc,
-                                                                                  AttributeRelidNumIndex,
+                                                                                  AttributeRelidNumIndexId,
                                                                                   criticalRelcachesBuilt,
                                                                                   SnapshotNow,
                                                                                   2, skey);
@@ -541,9 +521,8 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
 
        /*
         * However, we can easily set the attcacheoff value for the first
-        * attribute: it must be zero.  This eliminates the need for special
-        * cases for attnum=1 that used to exist in fastgetattr() and
-        * index_getattr().
+        * attribute: it must be zero.  This eliminates the need for special cases
+        * for attnum=1 that used to exist in fastgetattr() and index_getattr().
         */
        if (relation->rd_rel->relnatts > 0)
                relation->rd_att->attrs[0]->attcacheoff = 0;
@@ -573,7 +552,7 @@ RelationBuildTupleDesc(RelationBuildDescInfo buildinfo,
                        constr->num_check = relation->rd_rel->relchecks;
                        constr->check = (ConstrCheck *)
                                MemoryContextAllocZero(CacheMemoryContext,
-                                                               constr->num_check * sizeof(ConstrCheck));
+                                                                       constr->num_check * sizeof(ConstrCheck));
                        CheckConstraintFetch(relation);
                }
                else
@@ -617,8 +596,8 @@ RelationBuildRuleLock(Relation relation)
        int                     maxlocks;
 
        /*
-        * Make the private context.  Parameters are set on the assumption
-        * that it'll probably not contain much data.
+        * Make the private context.  Parameters are set on the assumption that
+        * it'll probably not contain much data.
         */
        rulescxt = AllocSetContextCreate(CacheMemoryContext,
                                                                         RelationGetRelationName(relation),
@@ -628,8 +607,8 @@ RelationBuildRuleLock(Relation relation)
        relation->rd_rulescxt = rulescxt;
 
        /*
-        * allocate an array to hold the rewrite rules (the array is extended
-        * if necessary)
+        * allocate an array to hold the rewrite rules (the array is extended if
+        * necessary)
         */
        maxlocks = 4;
        rules = (RewriteRule **)
@@ -647,15 +626,15 @@ RelationBuildRuleLock(Relation relation)
        /*
         * open pg_rewrite and begin a scan
         *
-        * Note: since we scan the rules using RewriteRelRulenameIndex, we will
+        * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
         * be reading the rules in name order, except possibly during
-        * emergency-recovery operations (ie, IsIgnoringSystemIndexes). This
-        * in turn ensures that rules will be fired in name order.
+        * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
+        * ensures that rules will be fired in name order.
         */
-       rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock);
+       rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
        rewrite_tupdesc = RelationGetDescr(rewrite_desc);
        rewrite_scan = systable_beginscan(rewrite_desc,
-                                                                         RewriteRelRulenameIndex,
+                                                                         RewriteRelRulenameIndexId,
                                                                          true, SnapshotNow,
                                                                          1, &key);
 
@@ -663,10 +642,8 @@ RelationBuildRuleLock(Relation relation)
        {
                Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
                bool            isnull;
-               Datum           ruleaction;
-               Datum           rule_evqual;
-               char       *ruleaction_str;
-               char       *rule_evqual_str;
+               Datum           rule_datum;
+               char       *rule_str;
                RewriteRule *rule;
 
                rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
@@ -676,33 +653,52 @@ RelationBuildRuleLock(Relation relation)
 
                rule->event = rewrite_form->ev_type - '0';
                rule->attrno = rewrite_form->ev_attr;
+               rule->enabled = rewrite_form->ev_enabled;
                rule->isInstead = rewrite_form->is_instead;
 
-               /* Must use heap_getattr to fetch ev_qual and ev_action */
-
-               ruleaction = heap_getattr(rewrite_tuple,
+               /*
+                * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
+                * rule strings are often large enough to be toasted.  To avoid
+                * leaking memory in the caller's context, do the detoasting here so
+                * we can free the detoasted version.
+                */
+               rule_datum = heap_getattr(rewrite_tuple,
                                                                  Anum_pg_rewrite_ev_action,
                                                                  rewrite_tupdesc,
                                                                  &isnull);
                Assert(!isnull);
-               ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                        ruleaction));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
-               rule->actions = (List *) stringToNode(ruleaction_str);
+               rule->actions = (List *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
-               pfree(ruleaction_str);
+               pfree(rule_str);
 
-               rule_evqual = heap_getattr(rewrite_tuple,
-                                                                  Anum_pg_rewrite_ev_qual,
-                                                                  rewrite_tupdesc,
-                                                                  &isnull);
+               rule_datum = heap_getattr(rewrite_tuple,
+                                                                 Anum_pg_rewrite_ev_qual,
+                                                                 rewrite_tupdesc,
+                                                                 &isnull);
                Assert(!isnull);
-               rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                  rule_evqual));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
-               rule->qual = (Node *) stringToNode(rule_evqual_str);
+               rule->qual = (Node *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
-               pfree(rule_evqual_str);
+               pfree(rule_str);
+
+               /*
+                * We want the rule's table references to be checked as though by the
+                * table owner, not the user referencing the rule.      Therefore, scan
+                * through the rule's actions and set the checkAsUser field on all
+                * rtable entries.      We have to look at the qual as well, in case it
+                * contains sublinks.
+                *
+                * The reason for doing this when the rule is loaded, rather than when
+                * it is stored, is that otherwise ALTER TABLE OWNER would have to
+                * grovel through stored rules to update checkAsUser fields. Scanning
+                * the rule tree during load is relatively cheap (compared to
+                * constructing it in the first place), so we do it here.
+                */
+               setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
+               setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
 
                if (numlocks >= maxlocks)
                {
@@ -743,8 +739,8 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
 
        /*
         * As of 7.3 we assume the rule ordering is repeatable, because
-        * RelationBuildRuleLock should read 'em in a consistent order.  So
-        * just compare corresponding slots.
+        * RelationBuildRuleLock should read 'em in a consistent order.  So just
+        * compare corresponding slots.
         */
        if (rlock1 != NULL)
        {
@@ -784,11 +780,14 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
  *             recycling the given old relation object.  The latter case
  *             supports rebuilding a relcache entry without invalidating
  *             pointers to it.
+ *
+ *             Returns NULL if no pg_class row could be found for the given relid
+ *             (suggesting we are trying to access a just-deleted relation).
+ *             Any other error is reported via elog.
  * --------------------------------
  */
 static Relation
-RelationBuildDesc(RelationBuildDescInfo buildinfo,
-                                 Relation oldrelation)
+RelationBuildDesc(Oid targetRelId, Relation oldrelation)
 {
        Relation        relation;
        Oid                     relid;
@@ -799,7 +798,7 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        /*
         * find the tuple in pg_class corresponding to the given relation id
         */
-       pg_class_tuple = ScanPgRelation(buildinfo, true);
+       pg_class_tuple = ScanPgRelation(targetRelId, true);
 
        /*
         * if no such tuple exists, return NULL
@@ -814,36 +813,31 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
 
        /*
-        * allocate storage for the relation descriptor, and copy
-        * pg_class_tuple to relation->rd_rel.
+        * allocate storage for the relation descriptor, and copy pg_class_tuple
+        * to relation->rd_rel.
         */
        relation = AllocateRelationDesc(oldrelation, relp);
 
-       /*
-        * now we can free the memory allocated for pg_class_tuple
-        */
-       heap_freetuple(pg_class_tuple);
-
        /*
         * initialize the relation's relation id (relation->rd_id)
         */
        RelationGetRelid(relation) = relid;
 
        /*
-        * normal relations are not nailed into the cache; nor can a
-        * pre-existing relation be new.  It could be temp though.      (Actually,
-        * it could be new too, but it's okay to forget that fact if forced to
-        * flush the entry.)
+        * normal relations are not nailed into the cache; nor can a pre-existing
+        * relation be new.  It could be temp though.  (Actually, it could be new
+        * too, but it's okay to forget that fact if forced to flush the entry.)
         */
        relation->rd_refcnt = 0;
        relation->rd_isnailed = false;
        relation->rd_createSubid = InvalidSubTransactionId;
-       relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
+       relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+       relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
 
        /*
         * initialize the tuple descriptor (relation->rd_att).
         */
-       RelationBuildTupleDesc(buildinfo, relation);
+       RelationBuildTupleDesc(relation);
 
        /*
         * Fetch rules and triggers that affect this relation
@@ -867,6 +861,9 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        if (OidIsValid(relation->rd_rel->relam))
                RelationInitIndexAccessInfo(relation);
 
+       /* extract reloptions if any */
+       RelationParseRelOptions(relation, pg_class_tuple);
+
        /*
         * initialize the relation lock manager information
         */
@@ -880,6 +877,11 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
        /* make sure relation is marked as having no open file yet */
        relation->rd_smgr = NULL;
 
+       /*
+        * now we can free the memory allocated for pg_class_tuple
+        */
+       heap_freetuple(pg_class_tuple);
+
        /*
         * Insert newly created relation into relcache hash tables.
         */
@@ -918,20 +920,21 @@ RelationInitIndexAccessInfo(Relation relation)
 {
        HeapTuple       tuple;
        Form_pg_am      aform;
+       Datum           indclassDatum;
+       Datum           indoptionDatum;
+       bool            isnull;
+       oidvector  *indclass;
+       int2vector *indoption;
        MemoryContext indexcxt;
        MemoryContext oldcontext;
-       Oid                *operator;
-       RegProcedure *support;
-       FmgrInfo   *supportinfo;
        int                     natts;
        uint16          amstrategies;
        uint16          amsupport;
 
        /*
         * Make a copy of the pg_index entry for the index.  Since pg_index
-        * contains variable-length and possibly-null fields, we have to do
-        * this honestly rather than just treating it as a Form_pg_index
-        * struct.
+        * contains variable-length and possibly-null fields, we have to do this
+        * honestly rather than just treating it as a Form_pg_index struct.
         */
        tuple = SearchSysCache(INDEXRELID,
                                                   ObjectIdGetDatum(RelationGetRelid(relation)),
@@ -967,9 +970,9 @@ RelationInitIndexAccessInfo(Relation relation)
        amsupport = aform->amsupport;
 
        /*
-        * Make the private context to hold index access info.  The reason we
-        * need a context, and not just a couple of pallocs, is so that we
-        * won't leak any subsidiary info attached to fmgr lookup records.
+        * Make the private context to hold index access info.  The reason we need
+        * a context, and not just a couple of pallocs, is so that we won't leak
+        * any subsidiary info attached to fmgr lookup records.
         *
         * Context parameters are set on the assumption that it'll probably not
         * contain much data.
@@ -984,54 +987,87 @@ RelationInitIndexAccessInfo(Relation relation)
        /*
         * Allocate arrays to hold data
         */
+       relation->rd_aminfo = (RelationAmInfo *)
+               MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
+
+       relation->rd_opfamily = (Oid *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+       relation->rd_opcintype = (Oid *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+
        if (amstrategies > 0)
-               operator = (Oid *)
+               relation->rd_operator = (Oid *)
                        MemoryContextAllocZero(indexcxt,
                                                                   natts * amstrategies * sizeof(Oid));
        else
-               operator = NULL;
+               relation->rd_operator = NULL;
 
        if (amsupport > 0)
        {
                int                     nsupport = natts * amsupport;
 
-               support = (RegProcedure *)
+               relation->rd_support = (RegProcedure *)
                        MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
-               supportinfo = (FmgrInfo *)
+               relation->rd_supportinfo = (FmgrInfo *)
                        MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
        }
        else
        {
-               support = NULL;
-               supportinfo = NULL;
+               relation->rd_support = NULL;
+               relation->rd_supportinfo = NULL;
        }
 
-       relation->rd_operator = operator;
-       relation->rd_support = support;
-       relation->rd_supportinfo = supportinfo;
+       relation->rd_indoption = (int16 *)
+               MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
 
        /*
-        * Fill the operator and support procedure OID arrays. (supportinfo is
-        * left as zeroes, and is filled on-the-fly when used)
+        * indclass cannot be referenced directly through the C struct, because it
+        * comes after the variable-width indkey field.  Must extract the datum
+        * the hard way...
         */
-       IndexSupportInitialize(relation->rd_index,
-                                                  operator, support,
+       indclassDatum = fastgetattr(relation->rd_indextuple,
+                                                               Anum_pg_index_indclass,
+                                                               GetPgIndexDescriptor(),
+                                                               &isnull);
+       Assert(!isnull);
+       indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+       /*
+        * Fill the operator and support procedure OID arrays, as well as the info
+        * about opfamilies and opclass input types.  (aminfo and supportinfo are
+        * left as zeroes, and are filled on-the-fly when used)
+        */
+       IndexSupportInitialize(indclass,
+                                                  relation->rd_operator, relation->rd_support,
+                                                  relation->rd_opfamily, relation->rd_opcintype,
                                                   amstrategies, amsupport, natts);
 
+       /*
+        * Similarly extract indoption and copy it to the cache entry
+        */
+       indoptionDatum = fastgetattr(relation->rd_indextuple,
+                                                                Anum_pg_index_indoption,
+                                                                GetPgIndexDescriptor(),
+                                                                &isnull);
+       Assert(!isnull);
+       indoption = (int2vector *) DatumGetPointer(indoptionDatum);
+       memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
+
        /*
         * expressions and predicate cache will be filled later
         */
        relation->rd_indexprs = NIL;
        relation->rd_indpred = NIL;
+       relation->rd_amcache = NULL;
 }
 
 /*
  * IndexSupportInitialize
  *             Initializes an index's cached opclass information,
- *             given the index's pg_index tuple.
+ *             given the index's pg_index.indclass entry.
  *
- * Data is returned into *indexOperator and *indexSupport, which are arrays
- * allocated by the caller.
+ * Data is returned into *indexOperator, *indexSupport, *opFamily, and
+ * *opcInType, which are arrays allocated by the caller.
  *
  * The caller also passes maxStrategyNumber, maxSupportNumber, and
  * maxAttributeNumber, since these indicate the size of the arrays
@@ -1040,32 +1076,32 @@ RelationInitIndexAccessInfo(Relation relation)
  * access method.
  */
 static void
-IndexSupportInitialize(Form_pg_index iform,
+IndexSupportInitialize(oidvector *indclass,
                                           Oid *indexOperator,
                                           RegProcedure *indexSupport,
+                                          Oid *opFamily,
+                                          Oid *opcInType,
                                           StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber)
 {
        int                     attIndex;
 
-       /*
-        * XXX note that the following assumes the INDEX tuple is well formed
-        * and that the *key and *class are 0 terminated.
-        */
        for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
        {
                OpClassCacheEnt *opcentry;
 
-               if (!OidIsValid(iform->indclass[attIndex]))
+               if (!OidIsValid(indclass->values[attIndex]))
                        elog(ERROR, "bogus pg_index tuple");
 
                /* look up the info for this opclass, using a cache */
-               opcentry = LookupOpclassInfo(iform->indclass[attIndex],
+               opcentry = LookupOpclassInfo(indclass->values[attIndex],
                                                                         maxStrategyNumber,
                                                                         maxSupportNumber);
 
                /* copy cached data into relcache entry */
+               opFamily[attIndex] = opcentry->opcfamily;
+               opcInType[attIndex] = opcentry->opcintype;
                if (maxStrategyNumber > 0)
                        memcpy(&indexOperator[attIndex * maxStrategyNumber],
                                   opcentry->operatorOids,
@@ -1089,9 +1125,13 @@ IndexSupportInitialize(Form_pg_index iform,
  * numbers is passed in, rather than being looked up, mainly because the
  * caller will have it already.
  *
- * XXX There isn't any provision for flushing the cache.  However, there
- * isn't any provision for flushing relcache entries when opclass info
- * changes, either :-(
+ * Note there is no provision for flushing the cache.  This is OK at the
+ * moment because there is no way to ALTER any interesting properties of an
+ * existing opclass --- all you can do is drop it, which will result in
+ * a useless but harmless dead entry in the cache.  To support altering
+ * opclass membership (not the same as opfamily membership!), we'd need to
+ * be able to flush this cache as well as the contents of relcache entries
+ * for indexes.
  */
 static OpClassCacheEnt *
 LookupOpclassInfo(Oid operatorClassOid,
@@ -1102,7 +1142,7 @@ LookupOpclassInfo(Oid operatorClassOid,
        bool            found;
        Relation        rel;
        SysScanDesc scan;
-       ScanKeyData skey[2];
+       ScanKeyData skey[3];
        HeapTuple       htup;
        bool            indexOK;
 
@@ -1117,7 +1157,7 @@ LookupOpclassInfo(Oid operatorClassOid,
                MemSet(&ctl, 0, sizeof(ctl));
                ctl.keysize = sizeof(Oid);
                ctl.entrysize = sizeof(OpClassCacheEnt);
-               ctl.hash = tag_hash;
+               ctl.hash = oid_hash;
                OpClassCache = hash_create("Operator class cache", 64,
                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
        }
@@ -1125,65 +1165,108 @@ LookupOpclassInfo(Oid operatorClassOid,
        opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
                                                                                           (void *) &operatorClassOid,
                                                                                           HASH_ENTER, &found);
-       if (opcentry == NULL)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OUT_OF_MEMORY),
-                                errmsg("out of memory")));
 
-       if (found && opcentry->valid)
+       if (!found)
+       {
+               /* Need to allocate memory for new entry */
+               opcentry->valid = false;        /* until known OK */
+               opcentry->numStrats = numStrats;
+               opcentry->numSupport = numSupport;
+
+               if (numStrats > 0)
+                       opcentry->operatorOids = (Oid *)
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                          numStrats * sizeof(Oid));
+               else
+                       opcentry->operatorOids = NULL;
+
+               if (numSupport > 0)
+                       opcentry->supportProcs = (RegProcedure *)
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                          numSupport * sizeof(RegProcedure));
+               else
+                       opcentry->supportProcs = NULL;
+       }
+       else
        {
-               /* Already made an entry for it */
                Assert(numStrats == opcentry->numStrats);
                Assert(numSupport == opcentry->numSupport);
-               return opcentry;
        }
 
-       /* Need to fill in new entry */
-       opcentry->valid = false;        /* until known OK */
-       opcentry->numStrats = numStrats;
-       opcentry->numSupport = numSupport;
-
-       if (numStrats > 0)
-               opcentry->operatorOids = (Oid *)
-                       MemoryContextAllocZero(CacheMemoryContext,
-                                                                  numStrats * sizeof(Oid));
-       else
-               opcentry->operatorOids = NULL;
+       /*
+        * When testing for cache-flush hazards, we intentionally disable the
+        * operator class cache and force reloading of the info on each call.
+        * This is helpful because we want to test the case where a cache flush
+        * occurs while we are loading the info, and it's very hard to provoke
+        * that if this happens only once per opclass per backend.
+        */
+#if defined(CLOBBER_CACHE_ALWAYS)
+       opcentry->valid = false;
+#endif
 
-       if (numSupport > 0)
-               opcentry->supportProcs = (RegProcedure *)
-                       MemoryContextAllocZero(CacheMemoryContext,
-                                                                  numSupport * sizeof(RegProcedure));
-       else
-               opcentry->supportProcs = NULL;
+       if (opcentry->valid)
+               return opcentry;
 
        /*
-        * To avoid infinite recursion during startup, force heap scans if
-        * we're looking up info for the opclasses used by the indexes we
-        * would like to reference here.
+        * Need to fill in new entry.
+        *
+        * To avoid infinite recursion during startup, force heap scans if we're
+        * looking up info for the opclasses used by the indexes we would like to
+        * reference here.
         */
        indexOK = criticalRelcachesBuilt ||
                (operatorClassOid != OID_BTREE_OPS_OID &&
                 operatorClassOid != INT2_BTREE_OPS_OID);
 
        /*
-        * Scan pg_amop to obtain operators for the opclass.  We only fetch
-        * the default ones (those with subtype zero).
+        * We have to fetch the pg_opclass row to determine its opfamily and
+        * opcintype, which are needed to look up the operators and functions.
+        * It'd be convenient to use the syscache here, but that probably doesn't
+        * work while bootstrapping.
+        */
+       ScanKeyInit(&skey[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(operatorClassOid));
+       rel = heap_open(OperatorClassRelationId, AccessShareLock);
+       scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
+                                                         SnapshotNow, 1, skey);
+
+       if (HeapTupleIsValid(htup = systable_getnext(scan)))
+       {
+               Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
+
+               opcentry->opcfamily = opclassform->opcfamily;
+               opcentry->opcintype = opclassform->opcintype;
+       }
+       else
+               elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
+
+       systable_endscan(scan);
+       heap_close(rel, AccessShareLock);
+
+
+       /*
+        * Scan pg_amop to obtain operators for the opclass.  We only fetch the
+        * default ones (those with lefttype = righttype = opcintype).
         */
        if (numStrats > 0)
        {
                ScanKeyInit(&skey[0],
-                                       Anum_pg_amop_amopclaid,
+                                       Anum_pg_amop_amopfamily,
                                        BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(operatorClassOid));
+                                       ObjectIdGetDatum(opcentry->opcfamily));
                ScanKeyInit(&skey[1],
-                                       Anum_pg_amop_amopsubtype,
+                                       Anum_pg_amop_amoplefttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               ScanKeyInit(&skey[2],
+                                       Anum_pg_amop_amoprighttype,
                                        BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(InvalidOid));
-               rel = heap_openr(AccessMethodOperatorRelationName,
-                                                AccessShareLock);
-               scan = systable_beginscan(rel, AccessMethodStrategyIndex, indexOK,
-                                                                 SnapshotNow, 2, skey);
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
+               scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
+                                                                 SnapshotNow, 3, skey);
 
                while (HeapTupleIsValid(htup = systable_getnext(scan)))
                {
@@ -1202,23 +1285,26 @@ LookupOpclassInfo(Oid operatorClassOid,
        }
 
        /*
-        * Scan pg_amproc to obtain support procs for the opclass.      We only
-        * fetch the default ones (those with subtype zero).
+        * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
+        * the default ones (those with lefttype = righttype = opcintype).
         */
        if (numSupport > 0)
        {
                ScanKeyInit(&skey[0],
-                                       Anum_pg_amproc_amopclaid,
+                                       Anum_pg_amproc_amprocfamily,
                                        BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(operatorClassOid));
+                                       ObjectIdGetDatum(opcentry->opcfamily));
                ScanKeyInit(&skey[1],
-                                       Anum_pg_amproc_amprocsubtype,
+                                       Anum_pg_amproc_amproclefttype,
                                        BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(InvalidOid));
-               rel = heap_openr(AccessMethodProcedureRelationName,
-                                                AccessShareLock);
-               scan = systable_beginscan(rel, AccessMethodProcedureIndex, indexOK,
-                                                                 SnapshotNow, 2, skey);
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               ScanKeyInit(&skey[2],
+                                       Anum_pg_amproc_amprocrighttype,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(opcentry->opcintype));
+               rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
+               scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
+                                                                 SnapshotNow, 3, skey);
 
                while (HeapTupleIsValid(htup = systable_getnext(scan)))
                {
@@ -1246,14 +1332,14 @@ LookupOpclassInfo(Oid operatorClassOid,
  *             formrdesc
  *
  *             This is a special cut-down version of RelationBuildDesc()
- *             used by RelationCacheInitialize() in initializing the relcache.
+ *             used by RelationCacheInitializePhase2() in initializing the relcache.
  *             The relation descriptor is built just from the supplied parameters,
  *             without actually looking at any system table entries.  We cheat
  *             quite a lot since we only need to work for a few basic system
  *             catalogs.
  *
  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
- * and pg_type (see RelationCacheInitialize).
+ * and pg_type (see RelationCacheInitializePhase2).
  *
  * Note that these catalogs can't have constraints (except attnotnull),
  * default values, rules, or triggers, since we don't cope with any of that.
@@ -1283,19 +1369,20 @@ formrdesc(const char *relationName, Oid relationReltype,
        relation->rd_refcnt = 1;
 
        /*
-        * all entries built with this routine are nailed-in-cache; none are
-        * for new or temp relations.
+        * all entries built with this routine are nailed-in-cache; none are for
+        * new or temp relations.
         */
        relation->rd_isnailed = true;
        relation->rd_createSubid = InvalidSubTransactionId;
+       relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
        relation->rd_istemp = false;
 
        /*
         * initialize relation tuple form
         *
         * The data we insert here is pretty incomplete/bogus, but it'll serve to
-        * get us launched.  RelationCacheInitializePhase2() will read the
-        * real data from pg_class and replace what we've done here.
+        * get us launched.  RelationCacheInitializePhase2() will read the real
+        * data from pg_class and replace what we've done here.
         */
        relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
 
@@ -1304,10 +1391,9 @@ formrdesc(const char *relationName, Oid relationReltype,
        relation->rd_rel->reltype = relationReltype;
 
        /*
-        * It's important to distinguish between shared and non-shared
-        * relations, even at bootstrap time, to make sure we know where they
-        * are stored.  At present, all relations that formrdesc is used for
-        * are not shared.
+        * It's important to distinguish between shared and non-shared relations,
+        * even at bootstrap time, to make sure we know where they are stored.  At
+        * present, all relations that formrdesc is used for are not shared.
         */
        relation->rd_rel->relisshared = false;
 
@@ -1321,10 +1407,12 @@ formrdesc(const char *relationName, Oid relationReltype,
         * initialize attribute tuple form
         *
         * Unlike the case with the relation tuple, this data had better be right
-        * because it will never be replaced.  The input values must be
-        * correctly defined by macros in src/include/catalog/ headers.
+        * because it will never be replaced.  The input values must be correctly
+        * defined by macros in src/include/catalog/ headers.
         */
        relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
+       relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
+
        relation->rd_att->tdtypeid = relationReltype;
        relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
 
@@ -1399,70 +1487,18 @@ formrdesc(const char *relationName, Oid relationReltype,
  * ----------------------------------------------------------------
  */
 
-/*
- *             RelationIdCacheGetRelation
- *
- *             Lookup an existing reldesc by OID.
- *
- *             Only try to get the reldesc by looking in the cache,
- *             do not go to the disk if it's not present.
- *
- *             NB: relation ref count is incremented if successful.
- *             Caller should eventually decrement count.  (Usually,
- *             that happens by calling RelationClose().)
- */
-Relation
-RelationIdCacheGetRelation(Oid relationId)
-{
-       Relation        rd;
-
-       RelationIdCacheLookup(relationId, rd);
-
-       if (RelationIsValid(rd))
-       {
-               RelationIncrementReferenceCount(rd);
-               /* revalidate nailed index if necessary */
-               if (!rd->rd_isvalid)
-                       RelationReloadClassinfo(rd);
-       }
-
-       return rd;
-}
-
-/*
- *             RelationSysNameCacheGetRelation
- *
- *             As above, but lookup by name; only works for system catalogs.
- */
-static Relation
-RelationSysNameCacheGetRelation(const char *relationName)
-{
-       Relation        rd;
-       NameData        name;
-
-       /*
-        * make sure that the name key used for hash lookup is properly
-        * null-padded
-        */
-       namestrcpy(&name, relationName);
-       RelationSysNameCacheLookup(NameStr(name), rd);
-
-       if (RelationIsValid(rd))
-       {
-               RelationIncrementReferenceCount(rd);
-               /* revalidate nailed index if necessary */
-               if (!rd->rd_isvalid)
-                       RelationReloadClassinfo(rd);
-       }
-
-       return rd;
-}
-
 /*
  *             RelationIdGetRelation
  *
  *             Lookup a reldesc by OID; make one if not already in cache.
  *
+ *             Returns NULL if no pg_class row could be found for the given relid
+ *             (suggesting we are trying to access a just-deleted relation).
+ *             Any other error is reported via elog.
+ *
+ *             NB: caller should already have at least AccessShareLock on the
+ *             relation ID, else there are nasty race conditions.
+ *
  *             NB: relation ref count is incremented, or set to 1 if new entry.
  *             Caller should eventually decrement count.  (Usually,
  *             that happens by calling RelationClose().)
@@ -1471,54 +1507,26 @@ Relation
 RelationIdGetRelation(Oid relationId)
 {
        Relation        rd;
-       RelationBuildDescInfo buildinfo;
 
        /*
-        * first try and get a reldesc from the cache
+        * first try to find reldesc in the cache
         */
-       rd = RelationIdCacheGetRelation(relationId);
-       if (RelationIsValid(rd))
-               return rd;
-
-       /*
-        * no reldesc in the cache, so have RelationBuildDesc() build one and
-        * add it.
-        */
-       buildinfo.infotype = INFO_RELID;
-       buildinfo.i.info_id = relationId;
+       RelationIdCacheLookup(relationId, rd);
 
-       rd = RelationBuildDesc(buildinfo, NULL);
        if (RelationIsValid(rd))
+       {
                RelationIncrementReferenceCount(rd);
-       return rd;
-}
-
-/*
- *             RelationSysNameGetRelation
- *
- *             As above, but lookup by name; only works for system catalogs.
- */
-Relation
-RelationSysNameGetRelation(const char *relationName)
-{
-       Relation        rd;
-       RelationBuildDescInfo buildinfo;
-
-       /*
-        * first try and get a reldesc from the cache
-        */
-       rd = RelationSysNameCacheGetRelation(relationName);
-       if (RelationIsValid(rd))
+               /* revalidate nailed index if necessary */
+               if (!rd->rd_isvalid)
+                       RelationReloadIndexInfo(rd);
                return rd;
+       }
 
        /*
-        * no reldesc in the cache, so have RelationBuildDesc() build one and
-        * add it.
+        * no reldesc in the cache, so have RelationBuildDesc() build one and add
+        * it.
         */
-       buildinfo.infotype = INFO_RELNAME;
-       buildinfo.i.info_name = (char *) relationName;
-
-       rd = RelationBuildDesc(buildinfo, NULL);
+       rd = RelationBuildDesc(relationId, NULL);
        if (RelationIsValid(rd))
                RelationIncrementReferenceCount(rd);
        return rd;
@@ -1578,56 +1586,100 @@ RelationClose(Relation relation)
 
 #ifdef RELCACHE_FORCE_RELEASE
        if (RelationHasReferenceCountZero(relation) &&
-               relation->rd_createSubid == InvalidSubTransactionId)
+               relation->rd_createSubid == InvalidSubTransactionId &&
+               relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
                RelationClearRelation(relation, false);
 #endif
 }
 
 /*
- * RelationReloadClassinfo - reload the pg_class row (only)
+ * RelationReloadIndexInfo - reload minimal information for an open index
  *
- *     This function is used only for nailed indexes.  Since a REINDEX can
- *     change the relfilenode value for a nailed index, we have to reread
- *     the pg_class row anytime we get an SI invalidation on a nailed index
- *     (without throwing away the whole relcache entry, since we'd be unable
- *     to rebuild it).
+ *     This function is used only for indexes.  A relcache inval on an index
+ *     can mean that its pg_class or pg_index row changed.  There are only
+ *     very limited changes that are allowed to an existing index's schema,
+ *     so we can update the relcache entry without a complete rebuild; which
+ *     is fortunate because we can't rebuild an index entry that is "nailed"
+ *     and/or in active use.  We support full replacement of the pg_class row,
+ *     as well as updates of a few simple fields of the pg_index row.
  *
- *     We can't necessarily reread the pg_class row right away; we might be
+ *     We can't necessarily reread the catalog rows right away; we might be
  *     in a failed transaction when we receive the SI notification.  If so,
  *     RelationClearRelation just marks the entry as invalid by setting
  *     rd_isvalid to false.  This routine is called to fix the entry when it
  *     is next needed.
  */
 static void
-RelationReloadClassinfo(Relation relation)
+RelationReloadIndexInfo(Relation relation)
 {
-       RelationBuildDescInfo buildinfo;
        bool            indexOK;
        HeapTuple       pg_class_tuple;
        Form_pg_class relp;
 
-       /* Should be called only for invalidated nailed indexes */
-       Assert(relation->rd_isnailed && !relation->rd_isvalid &&
-                  relation->rd_rel->relkind == RELKIND_INDEX);
-       /* Read the pg_class row */
-       buildinfo.infotype = INFO_RELID;
-       buildinfo.i.info_id = relation->rd_id;
+       /* Should be called only for invalidated indexes */
+       Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
+                  !relation->rd_isvalid);
+       /* Should be closed at smgr level */
+       Assert(relation->rd_smgr == NULL);
 
        /*
-        * Don't try to use an indexscan of pg_class_oid_index to reload the
-        * info for pg_class_oid_index ...
+        * Read the pg_class row
+        *
+        * Don't try to use an indexscan of pg_class_oid_index to reload the info
+        * for pg_class_oid_index ...
         */
-       indexOK = strcmp(RelationGetRelationName(relation), ClassOidIndex) != 0;
-       pg_class_tuple = ScanPgRelation(buildinfo, indexOK);
+       indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
+       pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
        if (!HeapTupleIsValid(pg_class_tuple))
-               elog(ERROR, "could not find tuple for system relation %u",
-                        relation->rd_id);
+               elog(ERROR, "could not find pg_class tuple for index %u",
+                        RelationGetRelid(relation));
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
-       memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
-       /* Now we can recalculate physical address */
-       RelationInitPhysicalAddr(relation);
+       memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
+       /* Reload reloptions in case they changed */
+       if (relation->rd_options)
+               pfree(relation->rd_options);
+       RelationParseRelOptions(relation, pg_class_tuple);
+       /* done with pg_class tuple */
        heap_freetuple(pg_class_tuple);
+       /* We must recalculate physical address in case it changed */
+       RelationInitPhysicalAddr(relation);
+       /* Make sure targblock is reset in case rel was truncated */
        relation->rd_targblock = InvalidBlockNumber;
+       /* Must free any AM cached data, too */
+       if (relation->rd_amcache)
+               pfree(relation->rd_amcache);
+       relation->rd_amcache = NULL;
+
+       /*
+        * For a non-system index, there are fields of the pg_index row that are
+        * allowed to change, so re-read that row and update the relcache entry.
+        * Most of the info derived from pg_index (such as support function lookup
+        * info) cannot change, and indeed the whole point of this routine is to
+        * update the relcache entry without clobbering that data; so wholesale
+        * replacement is not appropriate.
+        */
+       if (!IsSystemRelation(relation))
+       {
+               HeapTuple       tuple;
+               Form_pg_index index;
+
+               tuple = SearchSysCache(INDEXRELID,
+                                                          ObjectIdGetDatum(RelationGetRelid(relation)),
+                                                          0, 0, 0);
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for index %u",
+                                RelationGetRelid(relation));
+               index = (Form_pg_index) GETSTRUCT(tuple);
+
+               relation->rd_index->indisvalid = index->indisvalid;
+               relation->rd_index->indcheckxmin = index->indcheckxmin;
+               relation->rd_index->indisready = index->indisready;
+               HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
+                                                          HeapTupleHeaderGetXmin(tuple->t_data));
+
+               ReleaseSysCache(tuple);
+       }
+
        /* Okay, now it's valid again */
        relation->rd_isvalid = true;
 }
@@ -1649,25 +1701,25 @@ RelationClearRelation(Relation relation, bool rebuild)
 
        /*
         * Make sure smgr and lower levels close the relation's files, if they
-        * weren't closed already.  If the relation is not getting deleted,
-        * the next smgr access should reopen the files automatically.  This
-        * ensures that the low-level file access state is updated after, say,
-        * a vacuum truncation.
+        * weren't closed already.  If the relation is not getting deleted, the
+        * next smgr access should reopen the files automatically.      This ensures
+        * that the low-level file access state is updated after, say, a vacuum
+        * truncation.
         */
        RelationCloseSmgr(relation);
 
        /*
-        * Never, never ever blow away a nailed-in system relation, because
-        * we'd be unable to recover.  However, we must reset rd_targblock, in
-        * case we got called because of a relation cache flush that was
-        * triggered by VACUUM.
+        * Never, never ever blow away a nailed-in system relation, because we'd
+        * be unable to recover.  However, we must reset rd_targblock, in case we
+        * got called because of a relation cache flush that was triggered by
+        * VACUUM.
         *
-        * If it's a nailed index, then we need to re-read the pg_class row to
-        * see if its relfilenode changed.      We can't necessarily do that here,
-        * because we might be in a failed transaction.  We assume it's okay
-        * to do it if there are open references to the relcache entry (cf
-        * notes for AtEOXact_RelationCache).  Otherwise just mark the entry
-        * as possibly invalid, and it'll be fixed when next opened.
+        * If it's a nailed index, then we need to re-read the pg_class row to see
+        * if its relfilenode changed.  We can't necessarily do that here, because
+        * we might be in a failed transaction.  We assume it's okay to do it if
+        * there are open references to the relcache entry (cf notes for
+        * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
+        * invalid, and it'll be fixed when next opened.
         */
        if (relation->rd_isnailed)
        {
@@ -1676,11 +1728,27 @@ RelationClearRelation(Relation relation, bool rebuild)
                {
                        relation->rd_isvalid = false;           /* needs to be revalidated */
                        if (relation->rd_refcnt > 1)
-                               RelationReloadClassinfo(relation);
+                               RelationReloadIndexInfo(relation);
                }
                return;
        }
 
+       /*
+        * Even non-system indexes should not be blown away if they are open and
+        * have valid index support information.  This avoids problems with active
+        * use of the index support information.  As with nailed indexes, we
+        * re-read the pg_class row to handle possible physical relocation of the
+        * index, and we check for pg_index updates too.
+        */
+       if (relation->rd_rel->relkind == RELKIND_INDEX &&
+               relation->rd_refcnt > 0 &&
+               relation->rd_indexcxt != NULL)
+       {
+               relation->rd_isvalid = false;   /* needs to be revalidated */
+               RelationReloadIndexInfo(relation);
+               return;
+       }
+
        /*
         * Remove relation from hash tables
         *
@@ -1699,8 +1767,8 @@ RelationClearRelation(Relation relation, bool rebuild)
         * Free all the subsidiary data structures of the relcache entry. We
         * cannot free rd_att if we are trying to rebuild the entry, however,
         * because pointers to it may be cached in various places. The rule
-        * manager might also have pointers into the rewrite rules. So to
-        * begin with, we can only get rid of these fields:
+        * manager might also have pointers into the rewrite rules. So to begin
+        * with, we can only get rid of these fields:
         */
        FreeTriggerDesc(relation->trigdesc);
        if (relation->rd_indextuple)
@@ -1709,21 +1777,27 @@ RelationClearRelation(Relation relation, bool rebuild)
                pfree(relation->rd_am);
        if (relation->rd_rel)
                pfree(relation->rd_rel);
+       if (relation->rd_options)
+               pfree(relation->rd_options);
        list_free(relation->rd_indexlist);
+       bms_free(relation->rd_indexattr);
        if (relation->rd_indexcxt)
                MemoryContextDelete(relation->rd_indexcxt);
 
        /*
         * If we're really done with the relcache entry, blow it away. But if
-        * someone is still using it, reconstruct the whole deal without
-        * moving the physical RelationData record (so that the someone's
-        * pointer is still valid).
+        * someone is still using it, reconstruct the whole deal without moving
+        * the physical RelationData record (so that the someone's pointer is
+        * still valid).
         */
        if (!rebuild)
        {
                /* ok to zap remaining substructure */
                flush_rowtype_cache(old_reltype);
-               FreeTupleDesc(relation->rd_att);
+               /* can't use DecrTupleDescRefCount here */
+               Assert(relation->rd_att->tdrefcount > 0);
+               if (--relation->rd_att->tdrefcount == 0)
+                       FreeTupleDesc(relation->rd_att);
                if (relation->rd_rulescxt)
                        MemoryContextDelete(relation->rd_rulescxt);
                pfree(relation);
@@ -1731,47 +1805,58 @@ RelationClearRelation(Relation relation, bool rebuild)
        else
        {
                /*
-                * When rebuilding an open relcache entry, must preserve ref count
-                * and rd_createSubid state.  Also attempt to preserve the
-                * tupledesc and rewrite-rule substructures in place.
+                * When rebuilding an open relcache entry, must preserve ref count and
+                * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
+                * preserve the tupledesc and rewrite-rule substructures in place.
+                * (Note: the refcount mechanism for tupledescs may eventually ensure
+                * that we don't really need to preserve the tupledesc in-place, but
+                * for now there are still a lot of places that assume an open rel's
+                * tupledesc won't move.)
                 *
                 * Note that this process does not touch CurrentResourceOwner; which
                 * is good because whatever ref counts the entry may have do not
                 * necessarily belong to that resource owner.
                 */
+               Oid                     save_relid = RelationGetRelid(relation);
                int                     old_refcnt = relation->rd_refcnt;
                SubTransactionId old_createSubid = relation->rd_createSubid;
+               SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
+               struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
                TupleDesc       old_att = relation->rd_att;
                RuleLock   *old_rules = relation->rd_rules;
                MemoryContext old_rulescxt = relation->rd_rulescxt;
-               RelationBuildDescInfo buildinfo;
-
-               buildinfo.infotype = INFO_RELID;
-               buildinfo.i.info_id = RelationGetRelid(relation);
 
-               if (RelationBuildDesc(buildinfo, relation) != relation)
+               if (RelationBuildDesc(save_relid, relation) != relation)
                {
                        /* Should only get here if relation was deleted */
                        flush_rowtype_cache(old_reltype);
-                       FreeTupleDesc(old_att);
+                       Assert(old_att->tdrefcount > 0);
+                       if (--old_att->tdrefcount == 0)
+                               FreeTupleDesc(old_att);
                        if (old_rulescxt)
                                MemoryContextDelete(old_rulescxt);
                        pfree(relation);
-                       elog(ERROR, "relation %u deleted while still in use",
-                                buildinfo.i.info_id);
+                       elog(ERROR, "relation %u deleted while still in use", save_relid);
                }
                relation->rd_refcnt = old_refcnt;
                relation->rd_createSubid = old_createSubid;
+               relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
+               relation->pgstat_info = old_pgstat_info;
+
                if (equalTupleDescs(old_att, relation->rd_att))
                {
                        /* needn't flush typcache here */
-                       FreeTupleDesc(relation->rd_att);
+                       Assert(relation->rd_att->tdrefcount == 1);
+                       if (--relation->rd_att->tdrefcount == 0)
+                               FreeTupleDesc(relation->rd_att);
                        relation->rd_att = old_att;
                }
                else
                {
                        flush_rowtype_cache(old_reltype);
-                       FreeTupleDesc(old_att);
+                       Assert(old_att->tdrefcount > 0);
+                       if (--old_att->tdrefcount == 0)
+                               FreeTupleDesc(old_att);
                }
                if (equalRuleLocks(old_rules, relation->rd_rules))
                {
@@ -1798,12 +1883,13 @@ RelationFlushRelation(Relation relation)
 {
        bool            rebuild;
 
-       if (relation->rd_createSubid != InvalidSubTransactionId)
+       if (relation->rd_createSubid != InvalidSubTransactionId ||
+               relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
        {
                /*
                 * New relcache entries are always rebuilt, not flushed; else we'd
                 * forget the "new" status of the relation, which is a useful
-                * optimization to have.
+                * optimization to have.  Ditto for the new-relfilenode status.
                 */
                rebuild = true;
        }
@@ -1880,6 +1966,8 @@ RelationCacheInvalidateEntry(Oid relationId)
  *      so we do not touch new-in-transaction relations; they cannot be targets
  *      of cross-backend SI updates (and our own updates now go through a
  *      separate linked list that isn't limited by the SI message buffer size).
+ *      Likewise, we need not discard new-relfilenode-in-transaction hints,
+ *      since any invalidation of those would be a local event.
  *
  *      We do this in two phases: the first pass deletes deletable items, and
  *      the second one rebuilds the rebuildable items.  This is essential for
@@ -1934,15 +2022,14 @@ RelationCacheInvalidate(void)
                {
                        /*
                         * Add this entry to list of stuff to rebuild in second pass.
-                        * pg_class_oid_index goes on the front of rebuildFirstList,
-                        * other nailed indexes on the back, and everything else into
+                        * pg_class_oid_index goes on the front of rebuildFirstList, other
+                        * nailed indexes on the back, and everything else into
                         * rebuildList (in no particular order).
                         */
                        if (relation->rd_isnailed &&
                                relation->rd_rel->relkind == RELKIND_INDEX)
                        {
-                               if (strcmp(RelationGetRelationName(relation),
-                                                  ClassOidIndex) == 0)
+                               if (RelationGetRelid(relation) == ClassOidIndexId)
                                        rebuildFirstList = lcons(relation, rebuildFirstList);
                                else
                                        rebuildFirstList = lappend(rebuildFirstList, relation);
@@ -1952,16 +2039,20 @@ RelationCacheInvalidate(void)
                }
        }
 
-       rebuildList = list_concat(rebuildFirstList, rebuildList);
-
        /*
-        * Now zap any remaining smgr cache entries.  This must happen before
-        * we start to rebuild entries, since that may involve catalog fetches
-        * which will re-open catalog files.
+        * Now zap any remaining smgr cache entries.  This must happen before we
+        * start to rebuild entries, since that may involve catalog fetches which
+        * will re-open catalog files.
         */
        smgrcloseall();
 
        /* Phase 2: rebuild the items found to need rebuild in phase 1 */
+       foreach(l, rebuildFirstList)
+       {
+               relation = (Relation) lfirst(l);
+               RelationClearRelation(relation, true);
+       }
+       list_free(rebuildFirstList);
        foreach(l, rebuildList)
        {
                relation = (Relation) lfirst(l);
@@ -1979,6 +2070,12 @@ RelationCacheInvalidate(void)
  * In the case of abort, we don't want to try to rebuild any invalidated
  * cache entries (since we can't safely do database accesses).  Therefore
  * we must reset refcnts before handling pending invalidations.
+ *
+ * As of PostgreSQL 8.1, relcache refcnts should get released by the
+ * ResourceOwner mechanism.  This routine just does a debugging
+ * cross-check that no pins remain.  However, we also need to do special
+ * cleanup when the current transaction created any relations or made use
+ * of forced index lists.
  */
 void
 AtEOXact_RelationCache(bool isCommit)
@@ -1986,12 +2083,47 @@ AtEOXact_RelationCache(bool isCommit)
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
 
+       /*
+        * To speed up transaction exit, we want to avoid scanning the relcache
+        * unless there is actually something for this routine to do.  Other than
+        * the debug-only Assert checks, most transactions don't create any work
+        * for us to do here, so we keep a static flag that gets set if there is
+        * anything to do.      (Currently, this means either a relation is created in
+        * the current xact, or one is given a new relfilenode, or an index list
+        * is forced.)  For simplicity, the flag remains set till end of top-level
+        * transaction, even though we could clear it at subtransaction end in
+        * some cases.
+        */
+       if (!need_eoxact_work
+#ifdef USE_ASSERT_CHECKING
+               && !assert_enabled
+#endif
+               )
+               return;
+
        hash_seq_init(&status, RelationIdCache);
 
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        relation = idhentry->reldesc;
-               int                     expected_refcnt;
+
+               /*
+                * The relcache entry's ref count should be back to its normal
+                * not-in-a-transaction state: 0 unless it's nailed in cache.
+                *
+                * In bootstrap mode, this is NOT true, so don't check it --- the
+                * bootstrap code expects relations to stay open across start/commit
+                * transaction calls.  (That seems bogus, but it's not worth fixing.)
+                */
+#ifdef USE_ASSERT_CHECKING
+               if (!IsBootstrapProcessingMode())
+               {
+                       int                     expected_refcnt;
+
+                       expected_refcnt = relation->rd_isnailed ? 1 : 0;
+                       Assert(relation->rd_refcnt == expected_refcnt);
+               }
+#endif
 
                /*
                 * Is it a relation created in the current transaction?
@@ -2015,38 +2147,9 @@ AtEOXact_RelationCache(bool isCommit)
                }
 
                /*
-                * During transaction abort, we must also reset relcache entry ref
-                * counts to their normal not-in-a-transaction state.  A ref count
-                * may be too high because some routine was exited by ereport()
-                * between incrementing and decrementing the count.
-                *
-                * During commit, we should not have to do this, but it's still
-                * useful to check that the counts are correct to catch missed
-                * relcache closes.
-                *
-                * In bootstrap mode, do NOT reset the refcnt nor complain that it's
-                * nonzero --- the bootstrap code expects relations to stay open
-                * across start/commit transaction calls.  (That seems bogus, but
-                * it's not worth fixing.)
+                * Likewise, reset the hint about the relfilenode being new.
                 */
-               expected_refcnt = relation->rd_isnailed ? 1 : 0;
-
-               if (isCommit)
-               {
-                       if (relation->rd_refcnt != expected_refcnt &&
-                               !IsBootstrapProcessingMode())
-                       {
-                               elog(WARNING, "relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
-                                        RelationGetRelationName(relation),
-                                        relation->rd_refcnt, expected_refcnt);
-                               relation->rd_refcnt = expected_refcnt;
-                       }
-               }
-               else
-               {
-                       /* abort case, just reset it quietly */
-                       relation->rd_refcnt = expected_refcnt;
-               }
+               relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
 
                /*
                 * Flush any temporary index list.
@@ -2055,12 +2158,13 @@ AtEOXact_RelationCache(bool isCommit)
                {
                        list_free(relation->rd_indexlist);
                        relation->rd_indexlist = NIL;
+                       relation->rd_oidindex = InvalidOid;
                        relation->rd_indexvalid = 0;
                }
        }
 
-       /* Once done with the transaction, we can reset need_eosubxact_work */
-       need_eosubxact_work = false;
+       /* Once done with the transaction, we can reset need_eoxact_work */
+       need_eoxact_work = false;
 }
 
 /*
@@ -2078,18 +2182,10 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
        RelIdCacheEnt *idhentry;
 
        /*
-        * In the majority of subtransactions there is not anything for this
-        * routine to do, and since there are usually many entries in the
-        * relcache, uselessly scanning the cache represents a surprisingly
-        * large fraction of the subtransaction entry/exit overhead.  To avoid
-        * this, we keep a static flag that must be set whenever a condition
-        * is created that requires subtransaction-end work.  (Currently, this
-        * means either a relation is created in the current xact, or an index
-        * list is forced.)  For simplicity, the flag remains set till end of
-        * top-level transaction, even though we could clear it earlier in some
-        * cases.
+        * Skip the relcache scan if nothing to do --- see notes for
+        * AtEOXact_RelationCache.
         */
-       if (!need_eosubxact_work)
+       if (!need_eoxact_work)
                return;
 
        hash_seq_init(&status, RelationIdCache);
@@ -2116,6 +2212,18 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
                        }
                }
 
+               /*
+                * Likewise, update or drop any new-relfilenode-in-subtransaction
+                * hint.
+                */
+               if (relation->rd_newRelfilenodeSubid == mySubid)
+               {
+                       if (isCommit)
+                               relation->rd_newRelfilenodeSubid = parentSubid;
+                       else
+                               relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+               }
+
                /*
                 * Flush any temporary index list.
                 */
@@ -2123,11 +2231,29 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
                {
                        list_free(relation->rd_indexlist);
                        relation->rd_indexlist = NIL;
+                       relation->rd_oidindex = InvalidOid;
                        relation->rd_indexvalid = 0;
                }
        }
 }
 
+/*
+ * RelationCacheMarkNewRelfilenode
+ *
+ *     Mark the rel as having been given a new relfilenode in the current
+ *     (sub) transaction.      This is a hint that can be used to optimize
+ *     later operations on the rel in the same transaction.
+ */
+void
+RelationCacheMarkNewRelfilenode(Relation rel)
+{
+       /* Mark it... */
+       rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+       /* ... and now we have eoxact cleanup work to do */
+       need_eoxact_work = true;
+}
+
+
 /*
  *             RelationBuildLocalRelation
  *                     Build a relcache entry for an about-to-be-created relation,
@@ -2139,17 +2265,45 @@ RelationBuildLocalRelation(const char *relname,
                                                   TupleDesc tupDesc,
                                                   Oid relid,
                                                   Oid reltablespace,
-                                                  bool shared_relation,
-                                                  bool nailit)
+                                                  bool shared_relation)
 {
        Relation        rel;
        MemoryContext oldcxt;
        int                     natts = tupDesc->natts;
        int                     i;
        bool            has_not_null;
+       bool            nailit;
 
        AssertArg(natts >= 0);
 
+       /*
+        * check for creation of a rel that must be nailed in cache.
+        *
+        * XXX this list had better match RelationCacheInitializePhase2's list.
+        */
+       switch (relid)
+       {
+               case RelationRelationId:
+               case AttributeRelationId:
+               case ProcedureRelationId:
+               case TypeRelationId:
+                       nailit = true;
+                       break;
+               default:
+                       nailit = false;
+                       break;
+       }
+
+       /*
+        * check that hardwired list of shared rels matches what's in the
+        * bootstrap .bki file.  If you get a failure here during initdb, you
+        * probably need to fix IsSharedRelation() to match whatever you've done
+        * to the set of shared relations.
+        */
+       if (shared_relation != IsSharedRelation(relid))
+               elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
+                        relname, relid);
+
        /*
         * switch to the cache context to create the relcache entry.
         */
@@ -2168,34 +2322,30 @@ RelationBuildLocalRelation(const char *relname,
        /* make sure relation is marked as having no open file yet */
        rel->rd_smgr = NULL;
 
+       /* mark it nailed if appropriate */
+       rel->rd_isnailed = nailit;
+
        rel->rd_refcnt = nailit ? 1 : 0;
 
        /* it's being created in this transaction */
        rel->rd_createSubid = GetCurrentSubTransactionId();
+       rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
 
        /* must flag that we have rels created in this transaction */
-       need_eosubxact_work = true;
+       need_eoxact_work = true;
 
        /* is it a temporary relation? */
-       rel->rd_istemp = isTempNamespace(relnamespace);
-
-       /*
-        * nail the reldesc if this is a bootstrap create reln and we may need
-        * it in the cache later on in the bootstrap process so we don't ever
-        * want it kicked out.  e.g. pg_attribute!!!
-        */
-       if (nailit)
-               rel->rd_isnailed = true;
+       rel->rd_istemp = isTempOrToastNamespace(relnamespace);
 
        /*
         * create a new tuple descriptor from the one passed in.  We do this
-        * partly to copy it into the cache context, and partly because the
-        * new relation can't have any defaults or constraints yet; they have
-        * to be added in later steps, because they require additions to
-        * multiple system catalogs.  We can copy attnotnull constraints here,
-        * however.
+        * partly to copy it into the cache context, and partly because the new
+        * relation can't have any defaults or constraints yet; they have to be
+        * added in later steps, because they require additions to multiple system
+        * catalogs.  We can copy attnotnull constraints here, however.
         */
        rel->rd_att = CreateTupleDescCopy(tupDesc);
+       rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
        has_not_null = false;
        for (i = 0; i < natts; i++)
        {
@@ -2223,11 +2373,13 @@ RelationBuildLocalRelation(const char *relname,
        rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
        rel->rd_rel->relnatts = natts;
        rel->rd_rel->reltype = InvalidOid;
+       /* needed when bootstrapping: */
+       rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
 
        /*
-        * Insert relation physical and logical identifiers (OIDs) into the
-        * right places.  Note that the physical ID (relfilenode) is initially
-        * the same as the logical ID (OID).
+        * Insert relation physical and logical identifiers (OIDs) into the right
+        * places.      Note that the physical ID (relfilenode) is initially the same
+        * as the logical ID (OID).
         */
        rel->rd_rel->relisshared = shared_relation;
 
@@ -2269,10 +2421,11 @@ RelationBuildLocalRelation(const char *relname,
  *
  *             This initializes the relation descriptor cache.  At the time
  *             that this is invoked, we can't do database access yet (mainly
- *             because the transaction subsystem is not up), so we can't get
- *             "real" info.  However it's okay to read the pg_internal.init
- *             cache file, if one is available.  Otherwise we make phony
- *             entries for the minimum set of nailed-in-cache relations.
+ *             because the transaction subsystem is not up); all we are doing
+ *             is making an empty cache hashtable.  This must be done before
+ *             starting the initialization transaction, because otherwise
+ *             AtEOXact_RelationCache would crash if that transaction aborts
+ *             before we can get the relcache set up.
  */
 
 #define INITRELCACHESIZE               400
@@ -2292,40 +2445,15 @@ RelationCacheInitialize(void)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * create hashtables that index the relcache
+        * create hashtable that indexes the relcache
         */
        MemSet(&ctl, 0, sizeof(ctl));
-       ctl.keysize = sizeof(NameData);
-       ctl.entrysize = sizeof(RelNameCacheEnt);
-       RelationSysNameCache = hash_create("Relcache by name", INITRELCACHESIZE,
-                                                                          &ctl, HASH_ELEM);
-
        ctl.keysize = sizeof(Oid);
        ctl.entrysize = sizeof(RelIdCacheEnt);
-       ctl.hash = tag_hash;
+       ctl.hash = oid_hash;
        RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
                                                                  &ctl, HASH_ELEM | HASH_FUNCTION);
 
-       /*
-        * Try to load the relcache cache file.  If successful, we're done for
-        * now.  Otherwise, initialize the cache with pre-made descriptors for
-        * the critical "nailed-in" system catalogs.
-        */
-       if (IsBootstrapProcessingMode() ||
-               !load_relcache_init_file())
-       {
-               formrdesc(RelationRelationName, PG_CLASS_RELTYPE_OID,
-                                 true, Natts_pg_class, Desc_pg_class);
-               formrdesc(AttributeRelationName, PG_ATTRIBUTE_RELTYPE_OID,
-                                 false, Natts_pg_attribute, Desc_pg_attribute);
-               formrdesc(ProcedureRelationName, PG_PROC_RELTYPE_OID,
-                                 true, Natts_pg_proc, Desc_pg_proc);
-               formrdesc(TypeRelationName, PG_TYPE_RELTYPE_OID,
-                                 true, Natts_pg_type, Desc_pg_type);
-
-#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
-       }
-
        MemoryContextSwitchTo(oldcxt);
 }
 
@@ -2334,73 +2462,116 @@ RelationCacheInitialize(void)
  *
  *             This is called as soon as the catcache and transaction system
  *             are functional.  At this point we can actually read data from
- *             the system catalogs.  Update the relcache entries made during
- *             RelationCacheInitialize, and make sure we have entries for the
- *             critical system indexes.
+ *             the system catalogs.  We first try to read pre-computed relcache
+ *             entries from the pg_internal.init file.  If that's missing or
+ *             broken, make phony entries for the minimum set of nailed-in-cache
+ *             relations.      Then (unless bootstrapping) make sure we have entries
+ *             for the critical system indexes.  Once we've done all this, we
+ *             have enough infrastructure to open any system catalog or use any
+ *             catcache.  The last step is to rewrite pg_internal.init if needed.
  */
 void
 RelationCacheInitializePhase2(void)
 {
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
+       MemoryContext oldcxt;
+       bool            needNewCacheFile = false;
 
+       /*
+        * switch to cache memory context
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       /*
+        * Try to load the relcache cache file.  If unsuccessful, bootstrap the
+        * cache with pre-made descriptors for the critical "nailed-in" system
+        * catalogs.
+        */
+       if (IsBootstrapProcessingMode() ||
+               !load_relcache_init_file())
+       {
+               needNewCacheFile = true;
+
+               formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
+                                 true, Natts_pg_class, Desc_pg_class);
+               formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
+                                 false, Natts_pg_attribute, Desc_pg_attribute);
+               formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
+                                 true, Natts_pg_proc, Desc_pg_proc);
+               formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
+                                 true, Natts_pg_type, Desc_pg_type);
+
+#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
+       }
+
+       MemoryContextSwitchTo(oldcxt);
+
+       /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
        if (IsBootstrapProcessingMode())
                return;
 
        /*
-        * If we didn't get the critical system indexes loaded into relcache,
-        * do so now.  These are critical because the catcache depends on them
-        * for catcache fetches that are done during relcache load.  Thus, we
-        * have an infinite-recursion problem.  We can break the recursion by
-        * doing heapscans instead of indexscans at certain key spots. To
-        * avoid hobbling performance, we only want to do that until we have
-        * the critical indexes loaded into relcache.  Thus, the flag
-        * criticalRelcachesBuilt is used to decide whether to do heapscan or
-        * indexscan at the key spots, and we set it true after we've loaded
-        * the critical indexes.
+        * If we didn't get the critical system indexes loaded into relcache, do
+        * so now.      These are critical because the catcache and/or opclass cache
+        * depend on them for fetches done during relcache load.  Thus, we have an
+        * infinite-recursion problem.  We can break the recursion by doing
+        * heapscans instead of indexscans at certain key spots. To avoid hobbling
+        * performance, we only want to do that until we have the critical indexes
+        * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
+        * decide whether to do heapscan or indexscan at the key spots, and we set
+        * it true after we've loaded the critical indexes.
+        *
+        * The critical indexes are marked as "nailed in cache", partly to make it
+        * easy for load_relcache_init_file to count them, but mainly because we
+        * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
+        * true.  (NOTE: perhaps it would be possible to reload them by
+        * temporarily setting criticalRelcachesBuilt to false again.  For now,
+        * though, we just nail 'em in.)
         *
-        * The critical indexes are marked as "nailed in cache", partly to make
-        * it easy for load_relcache_init_file to count them, but mainly
-        * because we cannot flush and rebuild them once we've set
-        * criticalRelcachesBuilt to true.      (NOTE: perhaps it would be
-        * possible to reload them by temporarily setting
-        * criticalRelcachesBuilt to false again.  For now, though, we just
-        * nail 'em in.)
+        * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
+        * in the same way as the others, because the critical catalogs don't
+        * (currently) have any rules or triggers, and so these indexes can be
+        * rebuilt without inducing recursion.  However they are used during
+        * relcache load when a rel does have rules or triggers, so we choose to
+        * nail them for performance reasons.
         */
        if (!criticalRelcachesBuilt)
        {
-               RelationBuildDescInfo buildinfo;
                Relation        ird;
 
-#define LOAD_CRIT_INDEX(indname) \
+#define LOAD_CRIT_INDEX(indexoid) \
                do { \
-                       buildinfo.infotype = INFO_RELNAME; \
-                       buildinfo.i.info_name = (indname); \
-                       ird = RelationBuildDesc(buildinfo, NULL); \
+                       ird = RelationBuildDesc(indexoid, NULL); \
+                       if (ird == NULL) \
+                               elog(PANIC, "could not open critical system index %u", \
+                                        indexoid); \
                        ird->rd_isnailed = true; \
                        ird->rd_refcnt = 1; \
                } while (0)
 
-               LOAD_CRIT_INDEX(ClassNameNspIndex);
-               LOAD_CRIT_INDEX(ClassOidIndex);
-               LOAD_CRIT_INDEX(AttributeRelidNumIndex);
-               LOAD_CRIT_INDEX(IndexRelidIndex);
-               LOAD_CRIT_INDEX(AccessMethodStrategyIndex);
-               LOAD_CRIT_INDEX(AccessMethodProcedureIndex);
-               LOAD_CRIT_INDEX(OperatorOidIndex);
+               LOAD_CRIT_INDEX(ClassOidIndexId);
+               LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
+               LOAD_CRIT_INDEX(IndexRelidIndexId);
+               LOAD_CRIT_INDEX(OpclassOidIndexId);
+               LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
+               LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
+               LOAD_CRIT_INDEX(OperatorOidIndexId);
+               LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
+               LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
 
-#define NUM_CRITICAL_INDEXES   7               /* fix if you change list above */
+#define NUM_CRITICAL_INDEXES   9               /* fix if you change list above */
 
                criticalRelcachesBuilt = true;
        }
 
        /*
-        * Now, scan all the relcache entries and update anything that might
-        * be wrong in the results from formrdesc or the relcache cache file.
-        * If we faked up relcache entries using formrdesc, then read the real
-        * pg_class rows and replace the fake entries with them. Also, if any
-        * of the relcache entries have rules or triggers, load that info the
-        * hard way since it isn't recorded in the cache file.
+        * Now, scan all the relcache entries and update anything that might be
+        * wrong in the results from formrdesc or the relcache cache file. If we
+        * faked up relcache entries using formrdesc, then read the real pg_class
+        * rows and replace the fake entries with them. Also, if any of the
+        * relcache entries have rules or triggers, load that info the hard way
+        * since it isn't recorded in the cache file.
         */
        hash_seq_init(&status, RelationIdCache);
 
@@ -2417,7 +2588,7 @@ RelationCacheInitializePhase2(void)
                        Form_pg_class relp;
 
                        htup = SearchSysCache(RELOID,
-                                                       ObjectIdGetDatum(RelationGetRelid(relation)),
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)),
                                                                  0, 0, 0);
                        if (!HeapTupleIsValid(htup))
                                elog(FATAL, "cache lookup failed for relation %u",
@@ -2431,6 +2602,11 @@ RelationCacheInitializePhase2(void)
                        Assert(relation->rd_rel != NULL);
                        memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
 
+                       /* Update rd_options while we have the tuple */
+                       if (relation->rd_options)
+                               pfree(relation->rd_options);
+                       RelationParseRelOptions(relation, htup);
+
                        /*
                         * Also update the derived fields in rd_att.
                         */
@@ -2449,28 +2625,17 @@ RelationCacheInitializePhase2(void)
                if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
                        RelationBuildTriggers(relation);
        }
-}
-
-/*
- *             RelationCacheInitializePhase3
- *
- *             Final step of relcache initialization: write out a new relcache
- *             cache file if one is needed.
- */
-void
-RelationCacheInitializePhase3(void)
-{
-       if (IsBootstrapProcessingMode())
-               return;
 
+       /*
+        * Lastly, write out a new relcache cache file if one is needed.
+        */
        if (needNewCacheFile)
        {
                /*
-                * Force all the catcaches to finish initializing and thereby open
-                * the catalogs and indexes they use.  This will preload the
-                * relcache with entries for all the most important system
-                * catalogs and indexes, so that the init file will be most useful
-                * for future backends.
+                * Force all the catcaches to finish initializing and thereby open the
+                * catalogs and indexes they use.  This will preload the relcache with
+                * entries for all the most important system catalogs and indexes, so
+                * that the init file will be most useful for future backends.
                 */
                InitCatalogCachePhase2();
 
@@ -2479,6 +2644,76 @@ RelationCacheInitializePhase3(void)
        }
 }
 
+/*
+ * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
+ * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
+ *
+ * We need this kluge because we have to be able to access non-fixed-width
+ * fields of pg_class and pg_index before we have the standard catalog caches
+ * available.  We use predefined data that's set up in just the same way as
+ * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
+ * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
+ * does it have a TupleConstr field.  But it's good enough for the purpose of
+ * extracting fields.
+ */
+static TupleDesc
+BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
+{
+       TupleDesc       result;
+       MemoryContext oldcxt;
+       int                     i;
+
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       result = CreateTemplateTupleDesc(natts, hasoids);
+       result->tdtypeid = RECORDOID;           /* not right, but we don't care */
+       result->tdtypmod = -1;
+
+       for (i = 0; i < natts; i++)
+       {
+               memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
+               /* make sure attcacheoff is valid */
+               result->attrs[i]->attcacheoff = -1;
+       }
+
+       /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
+       result->attrs[0]->attcacheoff = 0;
+
+       /* Note: we don't bother to set up a TupleConstr entry */
+
+       MemoryContextSwitchTo(oldcxt);
+
+       return result;
+}
+
+static TupleDesc
+GetPgClassDescriptor(void)
+{
+       static TupleDesc pgclassdesc = NULL;
+
+       /* Already done? */
+       if (pgclassdesc == NULL)
+               pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
+                                                                                          Desc_pg_class,
+                                                                                          true);
+
+       return pgclassdesc;
+}
+
+static TupleDesc
+GetPgIndexDescriptor(void)
+{
+       static TupleDesc pgindexdesc = NULL;
+
+       /* Already done? */
+       if (pgindexdesc == NULL)
+               pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
+                                                                                          Desc_pg_index,
+                                                                                          false);
+
+       return pgindexdesc;
+}
+
 static void
 AttrDefaultFetch(Relation relation)
 {
@@ -2498,8 +2733,8 @@ AttrDefaultFetch(Relation relation)
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       adrel = heap_openr(AttrDefaultRelationName, AccessShareLock);
-       adscan = systable_beginscan(adrel, AttrDefaultIndex, true,
+       adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
+       adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
                                                                SnapshotNow, 1, &skey);
        found = 0;
 
@@ -2513,7 +2748,7 @@ AttrDefaultFetch(Relation relation)
                                continue;
                        if (attrdef[i].adbin != NULL)
                                elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
-                                        NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
+                               NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
                                         RelationGetRelationName(relation));
                        else
                                found++;
@@ -2523,12 +2758,11 @@ AttrDefaultFetch(Relation relation)
                                                          adrel->rd_att, &isnull);
                        if (isnull)
                                elog(WARNING, "null adbin for attr %s of rel %s",
-                                        NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
+                               NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
                                         RelationGetRelationName(relation));
                        else
                                attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
-                                                        DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                val)));
+                                                                                                       TextDatumGetCString(val));
                        break;
                }
 
@@ -2563,8 +2797,8 @@ CheckConstraintFetch(Relation relation)
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       conrel = heap_openr(ConstraintRelationName, AccessShareLock);
-       conscan = systable_beginscan(conrel, ConstraintRelidIndex, true,
+       conrel = heap_open(ConstraintRelationId, AccessShareLock);
+       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
                                                                 SnapshotNow, 1, skey);
 
        while (HeapTupleIsValid(htup = systable_getnext(conscan)))
@@ -2580,7 +2814,7 @@ CheckConstraintFetch(Relation relation)
                                 RelationGetRelationName(relation));
 
                check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
-                                                                                         NameStr(conform->conname));
+                                                                                                 NameStr(conform->conname));
 
                /* Grab and test conbin is actually set */
                val = fastgetattr(htup,
@@ -2591,8 +2825,7 @@ CheckConstraintFetch(Relation relation)
                                 RelationGetRelationName(relation));
 
                check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
-                                                        DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                val)));
+                                                                                                TextDatumGetCString(val));
                found++;
        }
 
@@ -2622,9 +2855,14 @@ CheckConstraintFetch(Relation relation)
  *
  * Since shared cache inval causes the relcache's copy of the list to go away,
  * we return a copy of the list palloc'd in the caller's context.  The caller
- * may freeList() the returned list after scanning it. This is necessary
+ * may list_free() the returned list after scanning it. This is necessary
  * since the caller will typically be doing syscache lookups on the relevant
  * indexes, and syscache lookup could cause SI messages to be processed!
+ *
+ * We also update rd_oidindex, which this module treats as effectively part
+ * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
+ * it is the pg_class OID of a unique index on OID when the relation has one,
+ * and InvalidOid if there is no such index.
  */
 List *
 RelationGetIndexList(Relation relation)
@@ -2634,6 +2872,7 @@ RelationGetIndexList(Relation relation)
        ScanKeyData skey;
        HeapTuple       htup;
        List       *result;
+       Oid                     oidIndex;
        MemoryContext oldcxt;
 
        /* Quick exit if we already computed the list. */
@@ -2641,12 +2880,13 @@ RelationGetIndexList(Relation relation)
                return list_copy(relation->rd_indexlist);
 
        /*
-        * We build the list we intend to return (in the caller's context)
-        * while doing the scan.  After successfully completing the scan, we
-        * copy that list into the relcache entry.      This avoids cache-context
-        * memory leakage if we get some sort of error partway through.
+        * We build the list we intend to return (in the caller's context) while
+        * doing the scan.      After successfully completing the scan, we copy that
+        * list into the relcache entry.  This avoids cache-context memory leakage
+        * if we get some sort of error partway through.
         */
        result = NIL;
+       oidIndex = InvalidOid;
 
        /* Prepare to scan pg_index for entries having indrelid = this rel. */
        ScanKeyInit(&skey,
@@ -2654,15 +2894,24 @@ RelationGetIndexList(Relation relation)
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(relation)));
 
-       indrel = heap_openr(IndexRelationName, AccessShareLock);
-       indscan = systable_beginscan(indrel, IndexIndrelidIndex, true,
+       indrel = heap_open(IndexRelationId, AccessShareLock);
+       indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
                                                                 SnapshotNow, 1, &skey);
 
        while (HeapTupleIsValid(htup = systable_getnext(indscan)))
        {
                Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
 
+               /* Add index's OID to result list in the proper order */
                result = insert_ordered_oid(result, index->indexrelid);
+
+               /* Check to see if it is a unique, non-partial btree index on OID */
+               if (index->indnatts == 1 &&
+                       index->indisunique &&
+                       index->indkey.values[0] == ObjectIdAttributeNumber &&
+                       index->indclass.values[0] == OID_BTREE_OPS_OID &&
+                       heap_attisnull(htup, Anum_pg_index_indpred))
+                       oidIndex = index->indexrelid;
        }
 
        systable_endscan(indscan);
@@ -2671,6 +2920,7 @@ RelationGetIndexList(Relation relation)
        /* Now save a copy of the completed list in the relcache entry. */
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
        relation->rd_indexlist = list_copy(result);
+       relation->rd_oidindex = oidIndex;
        relation->rd_indexvalid = 1;
        MemoryContextSwitchTo(oldcxt);
 
@@ -2714,8 +2964,8 @@ insert_ordered_oid(List *list, Oid datum)
  * RelationSetIndexList -- externally force the index list contents
  *
  * This is used to temporarily override what we think the set of valid
- * indexes is. The forcing will be valid only until transaction commit
- * or abort.
+ * indexes is (including the presence or absence of an OID index).
+ * The forcing will be valid only until transaction commit or abort.
  *
  * This should only be applied to nailed relations, because in a non-nailed
  * relation the hacked index list could be lost at any time due to SI
@@ -2724,7 +2974,7 @@ insert_ordered_oid(List *list, Oid datum)
  * It is up to the caller to make sure the given list is correctly ordered.
  */
 void
-RelationSetIndexList(Relation relation, List *indexIds)
+RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
 {
        MemoryContext oldcxt;
 
@@ -2736,9 +2986,39 @@ RelationSetIndexList(Relation relation, List *indexIds)
        /* Okay to replace old list */
        list_free(relation->rd_indexlist);
        relation->rd_indexlist = indexIds;
+       relation->rd_oidindex = oidIndex;
        relation->rd_indexvalid = 2;    /* mark list as forced */
        /* must flag that we have a forced index list */
-       need_eosubxact_work = true;
+       need_eoxact_work = true;
+       /* we deliberately do not change rd_indexattr */
+}
+
+/*
+ * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
+ *
+ * Returns InvalidOid if there is no such index.
+ */
+Oid
+RelationGetOidIndex(Relation relation)
+{
+       List       *ilist;
+
+       /*
+        * If relation doesn't have OIDs at all, caller is probably confused. (We
+        * could just silently return InvalidOid, but it seems better to throw an
+        * assertion.)
+        */
+       Assert(relation->rd_rel->relhasoids);
+
+       if (relation->rd_indexvalid == 0)
+       {
+               /* RelationGetIndexList does the heavy lifting. */
+               ilist = RelationGetIndexList(relation);
+               list_free(ilist);
+               Assert(relation->rd_indexvalid != 0);
+       }
+
+       return relation->rd_oidindex;
 }
 
 /*
@@ -2769,34 +3049,26 @@ RelationGetIndexExpressions(Relation relation)
                return NIL;
 
        /*
-        * We build the tree we intend to return in the caller's context.
-        * After successfully completing the work, we copy it into the
-        * relcache entry.      This avoids problems if we get some sort of error
-        * partway through.
-        *
-        * We make use of the syscache's copy of pg_index's tupledesc to access
-        * the non-fixed fields of the tuple.  We assume that the syscache
-        * will be initialized before any access of a partial index could
-        * occur.  (This would probably fail if we were to allow partial
-        * indexes on system catalogs.)
-        */
-       exprsDatum = SysCacheGetAttr(INDEXRELID, relation->rd_indextuple,
-                                                                Anum_pg_index_indexprs, &isnull);
+        * We build the tree we intend to return in the caller's context. After
+        * successfully completing the work, we copy it into the relcache entry.
+        * This avoids problems if we get some sort of error partway through.
+        */
+       exprsDatum = heap_getattr(relation->rd_indextuple,
+                                                         Anum_pg_index_indexprs,
+                                                         GetPgIndexDescriptor(),
+                                                         &isnull);
        Assert(!isnull);
-       exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
+       exprsString = TextDatumGetCString(exprsDatum);
        result = (List *) stringToNode(exprsString);
        pfree(exprsString);
 
        /*
-        * Run the expressions through flatten_andors and
-        * eval_const_expressions. This is not just an optimization, but is
-        * necessary, because the planner will be comparing them to
-        * similarly-processed qual clauses, and may fail to detect valid
-        * matches without this.
+        * Run the expressions through eval_const_expressions. This is not just an
+        * optimization, but is necessary, because the planner will be comparing
+        * them to similarly-processed qual clauses, and may fail to detect valid
+        * matches without this.  We don't bother with canonicalize_qual, however.
         */
-       result = (List *) flatten_andors((Node *) result);
-
-       result = (List *) eval_const_expressions((Node *) result);
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
 
        /*
         * Also mark any coercion format fields as "don't care", so that the
@@ -2844,34 +3116,31 @@ RelationGetIndexPredicate(Relation relation)
                return NIL;
 
        /*
-        * We build the tree we intend to return in the caller's context.
-        * After successfully completing the work, we copy it into the
-        * relcache entry.      This avoids problems if we get some sort of error
-        * partway through.
-        *
-        * We make use of the syscache's copy of pg_index's tupledesc to access
-        * the non-fixed fields of the tuple.  We assume that the syscache
-        * will be initialized before any access of a partial index could
-        * occur.  (This would probably fail if we were to allow partial
-        * indexes on system catalogs.)
-        */
-       predDatum = SysCacheGetAttr(INDEXRELID, relation->rd_indextuple,
-                                                               Anum_pg_index_indpred, &isnull);
+        * We build the tree we intend to return in the caller's context. After
+        * successfully completing the work, we copy it into the relcache entry.
+        * This avoids problems if we get some sort of error partway through.
+        */
+       predDatum = heap_getattr(relation->rd_indextuple,
+                                                        Anum_pg_index_indpred,
+                                                        GetPgIndexDescriptor(),
+                                                        &isnull);
        Assert(!isnull);
-       predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
+       predString = TextDatumGetCString(predDatum);
        result = (List *) stringToNode(predString);
        pfree(predString);
 
        /*
-        * Run the expression through canonicalize_qual and
-        * eval_const_expressions. This is not just an optimization, but is
-        * necessary, because the planner will be comparing it to
-        * similarly-processed qual clauses, and may fail to detect valid
-        * matches without this.
+        * Run the expression through const-simplification and canonicalization.
+        * This is not just an optimization, but is necessary, because the planner
+        * will be comparing it to similarly-processed qual clauses, and may fail
+        * to detect valid matches without this.  This must match the processing
+        * done to qual clauses in preprocess_expression()!  (We can skip the
+        * stuff involving subqueries, however, since we don't allow any in index
+        * predicates.)
         */
-       result = (List *) canonicalize_qual((Expr *) result);
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
 
-       result = (List *) eval_const_expressions((Node *) result);
+       result = (List *) canonicalize_qual((Expr *) result);
 
        /*
         * Also mark any coercion format fields as "don't care", so that the
@@ -2893,6 +3162,91 @@ RelationGetIndexPredicate(Relation relation)
        return result;
 }
 
+/*
+ * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
+ *
+ * The result has a bit set for each attribute used anywhere in the index
+ * definitions of all the indexes on this relation.  (This includes not only
+ * simple index keys, but attributes used in expressions and partial-index
+ * predicates.)
+ *
+ * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
+ * we can include system attributes (e.g., OID) in the bitmap representation.
+ *
+ * The returned result is palloc'd in the caller's memory context and should
+ * be bms_free'd when not needed anymore.
+ */
+Bitmapset *
+RelationGetIndexAttrBitmap(Relation relation)
+{
+       Bitmapset  *indexattrs;
+       List       *indexoidlist;
+       ListCell   *l;
+       MemoryContext oldcxt;
+
+       /* Quick exit if we already computed the result. */
+       if (relation->rd_indexattr != NULL)
+               return bms_copy(relation->rd_indexattr);
+
+       /* Fast path if definitely no indexes */
+       if (!RelationGetForm(relation)->relhasindex)
+               return NULL;
+
+       /*
+        * Get cached list of index OIDs
+        */
+       indexoidlist = RelationGetIndexList(relation);
+
+       /* Fall out if no indexes (but relhasindex was set) */
+       if (indexoidlist == NIL)
+               return NULL;
+
+       /*
+        * For each index, add referenced attributes to indexattrs.
+        */
+       indexattrs = NULL;
+       foreach(l, indexoidlist)
+       {
+               Oid                     indexOid = lfirst_oid(l);
+               Relation        indexDesc;
+               IndexInfo  *indexInfo;
+               int                     i;
+
+               indexDesc = index_open(indexOid, AccessShareLock);
+
+               /* Extract index key information from the index's pg_index row */
+               indexInfo = BuildIndexInfo(indexDesc);
+
+               /* Collect simple attribute references */
+               for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+               {
+                       int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
+
+                       if (attrnum != 0)
+                               indexattrs = bms_add_member(indexattrs,
+                                                          attrnum - FirstLowInvalidHeapAttributeNumber);
+               }
+
+               /* Collect all attributes used in expressions, too */
+               pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);
+
+               /* Collect all attributes in the index predicate, too */
+               pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);
+
+               index_close(indexDesc, AccessShareLock);
+       }
+
+       list_free(indexoidlist);
+
+       /* Now save a copy of the bitmap in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indexattr = bms_copy(indexattrs);
+       MemoryContextSwitchTo(oldcxt);
+
+       /* We return our original working copy for caller to play with */
+       return indexattrs;
+}
+
 
 /*
  *     load_relcache_init_file, write_relcache_init_file
@@ -2939,7 +3293,7 @@ RelationGetIndexPredicate(Relation relation)
  * load_relcache_init_file -- attempt to load cache from the init file
  *
  * If successful, return TRUE and set criticalRelcachesBuilt to true.
- * If not successful, return FALSE and set needNewCacheFile to true.
+ * If not successful, return FALSE.
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
@@ -2962,15 +3316,12 @@ load_relcache_init_file(void)
 
        fp = AllocateFile(initfilename, PG_BINARY_R);
        if (fp == NULL)
-       {
-               needNewCacheFile = true;
                return false;
-       }
 
        /*
-        * Read the index relcache entries from the file.  Note we will not
-        * enter any of them into the cache if the read fails partway through;
-        * this helps to guard against broken init files.
+        * Read the index relcache entries from the file.  Note we will not enter
+        * any of them into the cache if the read fails partway through; this
+        * helps to guard against broken init files.
         */
        max_rels = 100;
        rels = (Relation *) palloc(max_rels * sizeof(Relation));
@@ -3030,6 +3381,8 @@ load_relcache_init_file(void)
                /* initialize attribute tuple forms */
                rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
                                                                                          relform->relhasoids);
+               rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
+
                rel->rd_att->tdtypeid = relform->reltype;
                rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
 
@@ -3047,6 +3400,22 @@ load_relcache_init_file(void)
                        has_not_null |= rel->rd_att->attrs[i]->attnotnull;
                }
 
+               /* next read the access method specific field */
+               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       goto read_failed;
+               if (len > 0)
+               {
+                       rel->rd_options = palloc(len);
+                       if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
+                               goto read_failed;
+                       if (len != VARSIZE(rel->rd_options))
+                               goto read_failed;               /* sanity check */
+               }
+               else
+               {
+                       rel->rd_options = NULL;
+               }
+
                /* mark not-null status */
                if (has_not_null)
                {
@@ -3061,9 +3430,12 @@ load_relcache_init_file(void)
                {
                        Form_pg_am      am;
                        MemoryContext indexcxt;
+                       Oid                *opfamily;
+                       Oid                *opcintype;
                        Oid                *operator;
                        RegProcedure *support;
                        int                     nsupport;
+                       int16      *indoption;
 
                        /* Count nailed indexes to ensure we have 'em all */
                        if (rel->rd_isnailed)
@@ -3078,7 +3450,6 @@ load_relcache_init_file(void)
                                goto read_failed;
 
                        /* Fix up internal pointers in the tuple -- see heap_copytuple */
-                       rel->rd_indextuple->t_datamcxt = CurrentMemoryContext;
                        rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
                        rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
 
@@ -3102,6 +3473,26 @@ load_relcache_init_file(void)
                                                                                         ALLOCSET_SMALL_MAXSIZE);
                        rel->rd_indexcxt = indexcxt;
 
+                       /* next, read the vector of opfamily OIDs */
+                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                               goto read_failed;
+
+                       opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(opfamily, 1, len, fp)) != len)
+                               goto read_failed;
+
+                       rel->rd_opfamily = opfamily;
+
+                       /* next, read the vector of opcintype OIDs */
+                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                               goto read_failed;
+
+                       opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(opcintype, 1, len, fp)) != len)
+                               goto read_failed;
+
+                       rel->rd_opcintype = opcintype;
+
                        /* next, read the vector of operator OIDs */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
@@ -3112,7 +3503,7 @@ load_relcache_init_file(void)
 
                        rel->rd_operator = operator;
 
-                       /* finally, read the vector of support procedures */
+                       /* next, read the vector of support procedures */
                        if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
                                goto read_failed;
                        support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
@@ -3121,7 +3512,19 @@ load_relcache_init_file(void)
 
                        rel->rd_support = support;
 
-                       /* add a zeroed support-fmgr-info vector */
+                       /* finally, read the vector of indoption values */
+                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                               goto read_failed;
+
+                       indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
+                       if ((nread = fread(indoption, 1, len, fp)) != len)
+                               goto read_failed;
+
+                       rel->rd_indoption = indoption;
+
+                       /* set up zeroed fmgr-info vectors */
+                       rel->rd_aminfo = (RelationAmInfo *)
+                               MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
                        nsupport = relform->relnatts * am->amsupport;
                        rel->rd_supportinfo = (FmgrInfo *)
                                MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
@@ -3136,17 +3539,21 @@ load_relcache_init_file(void)
                        Assert(rel->rd_indextuple == NULL);
                        Assert(rel->rd_am == NULL);
                        Assert(rel->rd_indexcxt == NULL);
+                       Assert(rel->rd_aminfo == NULL);
+                       Assert(rel->rd_opfamily == NULL);
+                       Assert(rel->rd_opcintype == NULL);
                        Assert(rel->rd_operator == NULL);
                        Assert(rel->rd_support == NULL);
                        Assert(rel->rd_supportinfo == NULL);
+                       Assert(rel->rd_indoption == NULL);
                }
 
                /*
                 * Rules and triggers are not saved (mainly because the internal
-                * format is complex and subject to change).  They must be rebuilt
-                * if needed by RelationCacheInitializePhase2.  This is not
-                * expected to be a big performance hit since few system catalogs
-                * have such.  Ditto for index expressions and predicates.
+                * format is complex and subject to change).  They must be rebuilt if
+                * needed by RelationCacheInitializePhase2.  This is not expected to
+                * be a big performance hit since few system catalogs have such. Ditto
+                * for index expressions and predicates.
                 */
                rel->rd_rules = NULL;
                rel->rd_rulescxt = NULL;
@@ -3165,22 +3572,26 @@ load_relcache_init_file(void)
                        rel->rd_refcnt = 0;
                rel->rd_indexvalid = 0;
                rel->rd_indexlist = NIL;
+               rel->rd_indexattr = NULL;
+               rel->rd_oidindex = InvalidOid;
                rel->rd_createSubid = InvalidSubTransactionId;
+               rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+               rel->rd_amcache = NULL;
                MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
 
                /*
                 * Recompute lock and physical addressing info.  This is needed in
-                * case the pg_internal.init file was copied from some other
-                * database by CREATE DATABASE.
+                * case the pg_internal.init file was copied from some other database
+                * by CREATE DATABASE.
                 */
                RelationInitLockInfo(rel);
                RelationInitPhysicalAddr(rel);
        }
 
        /*
-        * We reached the end of the init file without apparent problem. Did
-        * we get the right number of nailed items?  (This is a useful
-        * crosscheck in case the set of critical rels or indexes changes.)
+        * We reached the end of the init file without apparent problem. Did we
+        * get the right number of nailed items?  (This is a useful crosscheck in
+        * case the set of critical rels or indexes changes.)
         */
        if (nailed_rels != NUM_CRITICAL_RELS ||
                nailed_indexes != NUM_CRITICAL_INDEXES)
@@ -3206,15 +3617,14 @@ load_relcache_init_file(void)
        return true;
 
        /*
-        * init file is broken, so do it the hard way.  We don't bother trying
-        * to free the clutter we just allocated; it's not in the relcache so
-        * it won't hurt.
+        * init file is broken, so do it the hard way.  We don't bother trying to
+        * free the clutter we just allocated; it's not in the relcache so it
+        * won't hurt.
         */
 read_failed:
        pfree(rels);
        FreeFile(fp);
 
-       needNewCacheFile = true;
        return false;
 }
 
@@ -3236,8 +3646,8 @@ write_relcache_init_file(void)
 
        /*
         * We must write a temporary file and rename it into place. Otherwise,
-        * another backend starting at about the same time might crash trying
-        * to read the partially-complete file.
+        * another backend starting at about the same time might crash trying to
+        * read the partially-complete file.
         */
        snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
                         DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
@@ -3257,7 +3667,7 @@ write_relcache_init_file(void)
                                (errcode_for_file_access(),
                                 errmsg("could not create relation-cache initialization file \"%s\": %m",
                                                tempfilename),
-                 errdetail("Continuing anyway, but there's something wrong.")));
+                         errdetail("Continuing anyway, but there's something wrong.")));
                return;
        }
 
@@ -3280,39 +3690,24 @@ write_relcache_init_file(void)
        {
                Relation        rel = idhentry->reldesc;
                Form_pg_class relform = rel->rd_rel;
-               Size            len;
-
-               /*
-                * first write the relcache entry proper
-                */
-               len = sizeof(RelationData);
-
-               /* first, write the relation descriptor length */
-               if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                       elog(FATAL, "could not write init file");
 
-               /* next, write out the Relation structure */
-               if (fwrite(rel, 1, len, fp) != len)
-                       elog(FATAL, "could not write init file");
+               /* first write the relcache entry proper */
+               write_item(rel, sizeof(RelationData), fp);
 
                /* next write the relation tuple form */
-               len = sizeof(FormData_pg_class);
-               if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                       elog(FATAL, "could not write init file");
-
-               if (fwrite(relform, 1, len, fp) != len)
-                       elog(FATAL, "could not write init file");
+               write_item(relform, CLASS_TUPLE_SIZE, fp);
 
                /* next, do all the attribute tuple form data entries */
                for (i = 0; i < relform->relnatts; i++)
                {
-                       len = ATTRIBUTE_TUPLE_SIZE;
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "could not write init file");
-                       if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
-                               elog(FATAL, "could not write init file");
+                       write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
                }
 
+               /* next, do the access method specific field */
+               write_item(rel->rd_options,
+                                  (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
+                                  fp);
+
                /* If it's an index, there's more to do */
                if (rel->rd_rel->relkind == RELKIND_INDEX)
                {
@@ -3320,36 +3715,37 @@ write_relcache_init_file(void)
 
                        /* write the pg_index tuple */
                        /* we assume this was created by heap_copytuple! */
-                       len = HEAPTUPLESIZE + rel->rd_indextuple->t_len;
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "could not write init file");
-
-                       if (fwrite(rel->rd_indextuple, 1, len, fp) != len)
-                               elog(FATAL, "could not write init file");
+                       write_item(rel->rd_indextuple,
+                                          HEAPTUPLESIZE + rel->rd_indextuple->t_len,
+                                          fp);
 
                        /* next, write the access method tuple form */
-                       len = sizeof(FormData_pg_am);
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "could not write init file");
+                       write_item(am, sizeof(FormData_pg_am), fp);
 
-                       if (fwrite(am, 1, len, fp) != len)
-                               elog(FATAL, "could not write init file");
+                       /* next, write the vector of opfamily OIDs */
+                       write_item(rel->rd_opfamily,
+                                          relform->relnatts * sizeof(Oid),
+                                          fp);
 
-                       /* next, write the vector of operator OIDs */
-                       len = relform->relnatts * (am->amstrategies * sizeof(Oid));
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "could not write init file");
+                       /* next, write the vector of opcintype OIDs */
+                       write_item(rel->rd_opcintype,
+                                          relform->relnatts * sizeof(Oid),
+                                          fp);
 
-                       if (fwrite(rel->rd_operator, 1, len, fp) != len)
-                               elog(FATAL, "could not write init file");
-
-                       /* finally, write the vector of support procedures */
-                       len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
-                       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
-                               elog(FATAL, "could not write init file");
-
-                       if (fwrite(rel->rd_support, 1, len, fp) != len)
-                               elog(FATAL, "could not write init file");
+                       /* next, write the vector of operator OIDs */
+                       write_item(rel->rd_operator,
+                                          relform->relnatts * (am->amstrategies * sizeof(Oid)),
+                                          fp);
+
+                       /* next, write the vector of support procedures */
+                       write_item(rel->rd_support,
+                                 relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
+                                          fp);
+
+                       /* finally, write the vector of indoption values */
+                       write_item(rel->rd_indoption,
+                                          relform->relnatts * sizeof(int16),
+                                          fp);
                }
 
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
@@ -3364,11 +3760,11 @@ write_relcache_init_file(void)
 
        /*
         * Now we have to check whether the data we've so painstakingly
-        * accumulated is already obsolete due to someone else's
-        * just-committed catalog changes.      If so, we just delete the temp
-        * file and leave it to the next backend to try again.  (Our own
-        * relcache entries will be updated by SI message processing, but we
-        * can't be sure whether what we wrote out was up-to-date.)
+        * accumulated is already obsolete due to someone else's just-committed
+        * catalog changes.  If so, we just delete the temp file and leave it to
+        * the next backend to try again.  (Our own relcache entries will be
+        * updated by SI message processing, but we can't be sure whether what we
+        * wrote out was up-to-date.)
         *
         * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
         * grab a serialization lock for the duration.
@@ -3379,8 +3775,8 @@ write_relcache_init_file(void)
        AcceptInvalidationMessages();
 
        /*
-        * If we have received any SI relcache invals since backend start,
-        * assume we may have written out-of-date data.
+        * If we have received any SI relcache invals since backend start, assume
+        * we may have written out-of-date data.
         */
        if (relcacheInvalsReceived == 0L)
        {
@@ -3389,9 +3785,9 @@ write_relcache_init_file(void)
                 * previously-existing init file.
                 *
                 * Note: a failure here is possible under Cygwin, if some other
-                * backend is holding open an unlinked-but-not-yet-gone init file.
-                * So treat this as a noncritical failure; just remove the useless
-                * temp file on failure.
+                * backend is holding open an unlinked-but-not-yet-gone init file. So
+                * treat this as a noncritical failure; just remove the useless temp
+                * file on failure.
                 */
                if (rename(tempfilename, finalfilename) < 0)
                        unlink(tempfilename);
@@ -3405,6 +3801,16 @@ write_relcache_init_file(void)
        LWLockRelease(RelCacheInitLock);
 }
 
+/* write a chunk of data preceded by its length */
+static void
+write_item(const void *data, Size len, FILE *fp)
+{
+       if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
+               elog(FATAL, "could not write init file");
+       if (fwrite(data, 1, len, fp) != len)
+               elog(FATAL, "could not write init file");
+}
+
 /*
  * Detect whether a given relation (identified by OID) is one of the ones
  * we store in the init file.
@@ -3430,7 +3836,7 @@ RelationIdIsInInitFile(Oid relationId)
  * just after sending them.  The unlink before ensures that a backend that's
  * currently starting cannot read the now-obsolete init file and then miss
  * the SI messages that will force it to update its relcache entries.  (This
- * works because the backend startup sequence gets into the PROC array before
+ * works because the backend startup sequence gets into the PGPROC array before
  * trying to load the init file.)  The unlink after is to synchronize with a
  * backend that may currently be trying to write an init file based on data
  * that we've just rendered invalid.  Such a backend will see the SI messages,
@@ -3457,14 +3863,35 @@ RelationCacheInitFileInvalidate(bool beforeSend)
                /*
                 * We need to interlock this against write_relcache_init_file, to
                 * guard against possibility that someone renames a new-but-
-                * already-obsolete init file into place just after we unlink.
-                * With the interlock, it's certain that write_relcache_init_file
-                * will notice our SI inval message before renaming into place, or
-                * else that we will execute second and successfully unlink the
-                * file.
+                * already-obsolete init file into place just after we unlink. With
+                * the interlock, it's certain that write_relcache_init_file will
+                * notice our SI inval message before renaming into place, or else
+                * that we will execute second and successfully unlink the file.
                 */
                LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
                unlink(initfilename);
                LWLockRelease(RelCacheInitLock);
        }
 }
+
+/*
+ * Remove the init file for a given database during postmaster startup.
+ *
+ * We used to keep the init file across restarts, but that is unsafe in PITR
+ * scenarios, and even in simple crash-recovery cases there are windows for
+ * the init file to become out-of-sync with the database.  So now we just
+ * remove it during startup and expect the first backend launch to rebuild it.
+ * Of course, this has to happen in each database of the cluster.  For
+ * simplicity this is driven by flatfiles.c, which has to scan pg_database
+ * anyway.
+ */
+void
+RelationCacheInitFileRemove(const char *dbPath)
+{
+       char            initfilename[MAXPGPATH];
+
+       snprintf(initfilename, sizeof(initfilename), "%s/%s",
+                        dbPath, RELCACHE_INIT_FILENAME);
+       unlink(initfilename);
+       /* ignore any error, since it might not be there at all */
+}