]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/cache/catcache.c
For some reason access/tupmacs.h has been #including utils/memutils.h,
[postgresql] / src / backend / utils / cache / catcache.c
index 97accc93db04345dd9a9f396f672aa28fb87d5e1..abe0aa060c0a4eb7cd47082f66261773dec4f1f7 100644 (file)
@@ -1,53 +1,66 @@
 /*-------------------------------------------------------------------------
  *
- * catcache.c--
+ * catcache.c
  *       System catalog cache for tuples matching a key.
  *
- * Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.35 1998/10/12 00:53:33 momjian Exp $
- *
- * Notes:
- *             XXX This needs to use exception.h to handle recovery when
- *                             an abort occurs during DisableCache.
+ *       $PostgreSQL: pgsql/src/backend/utils/cache/catcache.c,v 1.121 2005/05/06 17:24:54 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-#include <string.h>
 #include "postgres.h"
-#include "access/heapam.h"
+
 #include "access/genam.h"
-#include "utils/tqual.h"
-#include "utils/builtins.h"
-#include "utils/portal.h"
-#include "utils/catcache.h"
-#include "utils/elog.h"
-#include "utils/palloc.h"
-#include "utils/mcxt.h"
-#include "utils/rel.h"
-#include "storage/bufpage.h"
+#include "access/hash.h"
+#include "access/heapam.h"
 #include "access/valid.h"
+#include "catalog/pg_opclass.h"
+#include "catalog/pg_operator.h"
+#include "catalog/pg_type.h"
+#include "catalog/indexing.h"
 #include "miscadmin.h"
-#include "fmgr.h"                              /* for F_BOOLEQ, etc.  DANGER */
-#include "catalog/pg_type.h"   /* for OID of int28 type */
-#include "lib/dllist.h"
-
-static void CatCacheRemoveCTup(CatCache *cache, Dlelem *e);
-static Index CatalogCacheComputeHashIndex(struct catcache * cacheInP);
-static Index CatalogCacheComputeTupleHashIndex(struct catcache * cacheInOutP,
-                                                                 Relation relation, HeapTuple tuple);
-static void CatalogCacheInitializeCache(struct catcache * cache,
-                                                       Relation relation);
-static long comphash(long l, char *v);
-
-/* ----------------
- *             variables, macros and other stuff
+#ifdef CATCACHE_STATS
+#include "storage/ipc.h"               /* for on_proc_exit */
+#endif
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/catcache.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/resowner.h"
+#include "utils/syscache.h"
+
+
+/* #define CACHEDEBUG */       /* turns DEBUG elogs on */
+
+/*
+ * Constants related to size of the catcache.
+ *
+ * NCCBUCKETS must be a power of two and must be less than 64K (because
+ * SharedInvalCatcacheMsg crams hash indexes into a uint16 field).     In
+ * practice it should be a lot less, anyway, to avoid chewing up too much
+ * space on hash bucket headers.
  *
- *     note CCSIZE allocates 51 buckets .. one was already allocated in
- *     the catcache structure.
- * ----------------
+ * MAXCCTUPLES could be as small as a few hundred, if per-backend memory
+ * consumption is at a premium.
+ */
+#define NCCBUCKETS 256                 /* Hash buckets per CatCache */
+#define MAXCCTUPLES 5000               /* Maximum # of tuples in all caches */
+
+/*
+ * Given a hash value and the size of the hash table, find the bucket
+ * in which the hash value belongs. Since the hash table must contain
+ * a power-of-2 number of elements, this is a simple bitmask.
+ */
+#define HASH_INDEX(h, sz) ((Index) ((h) & ((sz) - 1)))
+
+
+/*
+ *             variables, macros and other stuff
  */
 
 #ifdef CACHEDEBUG
@@ -66,1091 +79,1766 @@ static long comphash(long l, char *v);
 #define CACHE6_elog(a,b,c,d,e,f,g)
 #endif
 
-CatCache   *Caches = NULL;
-GlobalMemory CacheCxt;
-
-static int     DisableCache;
-
-/* ----------------
- *             EQPROC is used in CatalogCacheInitializeCache
- *             XXX this should be replaced by catalog lookups soon
- * ----------------
- */
-static long eqproc[] = {
-       F_BOOLEQ, 0l, F_CHAREQ, F_NAMEEQ, 0l,
-       F_INT2EQ, F_KEYFIRSTEQ, F_INT4EQ, 0l, F_TEXTEQ,
-       F_OIDEQ, 0l, 0l, 0l, F_OID8EQ
-};
-
-#define EQPROC(SYSTEMTYPEOID)  eqproc[(SYSTEMTYPEOID)-16]
+/* Cache management header --- pointer is NULL until created */
+static CatCacheHeader *CacheHdr = NULL;
 
-/* ----------------------------------------------------------------
- *                                     internal support functions
- * ----------------------------------------------------------------
- */
-/* --------------------------------
- *             CatalogCacheInitializeCache
- * --------------------------------
- */
-#ifdef CACHEDEBUG
-#define CatalogCacheInitializeCache_DEBUG1 \
-do { \
-       elog(DEBUG, "CatalogCacheInitializeCache: cache @%08lx", cache); \
-       if (relation) \
-               elog(DEBUG, "CatalogCacheInitializeCache: called w/relation(inval)"); \
-       else \
-               elog(DEBUG, "CatalogCacheInitializeCache: called w/relname %s", \
-                       cache->cc_relname) \
-} while(0)
 
-#define CatalogCacheInitializeCache_DEBUG2 \
-do { \
-               if (cache->cc_key[i] > 0) { \
-                       elog(DEBUG, "CatalogCacheInitializeCache: load %d/%d w/%d, %d", \
-                               i+1, cache->cc_nkeys, cache->cc_key[i], \
-                               relation->rd_att->attrs[cache->cc_key[i] - 1]->attlen); \
-               } else { \
-                       elog(DEBUG, "CatalogCacheInitializeCache: load %d/%d w/%d", \
-                               i+1, cache->cc_nkeys, cache->cc_key[i]); \
-               } \
-} while(0)
+static uint32 CatalogCacheComputeHashValue(CatCache *cache, int nkeys,
+                                                        ScanKey cur_skey);
+static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
+                                                                 HeapTuple tuple);
 
-#else
-#define CatalogCacheInitializeCache_DEBUG1
-#define CatalogCacheInitializeCache_DEBUG2
+#ifdef CATCACHE_STATS
+static void CatCachePrintStats(void);
 #endif
+static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
+static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
+static void CatalogCacheInitializeCache(CatCache *cache);
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                               uint32 hashValue, Index hashIndex,
+                                               bool negative);
+static HeapTuple build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys);
 
-static void
-CatalogCacheInitializeCache(struct catcache * cache,
-                                                       Relation relation)
-{
-       MemoryContext oldcxt;
-       short           didopen = 0;
-       short           i;
-       TupleDesc       tupdesc;
-
-       CatalogCacheInitializeCache_DEBUG1;
-
-       /* ----------------
-        *      first switch to the cache context so our allocations
-        *      do not vanish at the end of a transaction
-        * ----------------
-        */
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
-
-       /* ----------------
-        *      If no relation was passed we must open it to get access to
-        *      its fields.  If one of the other caches has already opened
-        *      it we use heap_open() instead of heap_openr()
-        * ----------------
-        */
-       if (!RelationIsValid(relation))
-       {
-               struct catcache *cp;
-
-               /* ----------------
-                *      scan the caches to see if any other cache has opened the relation
-                * ----------------
-                */
-               for (cp = Caches; cp; cp = cp->cc_next)
-               {
-                       if (strncmp(cp->cc_relname, cache->cc_relname, NAMEDATALEN) == 0)
-                       {
-                               if (cp->relationId != InvalidOid)
-                                       break;
-                       }
-               }
-
-               /* ----------------
-                *      open the relation by name or by id
-                * ----------------
-                */
-               if (cp)
-                       relation = heap_open(cp->relationId);
-               else
-                       relation = heap_openr(cache->cc_relname);
-
-               didopen = 1;
-       }
-
-       /* ----------------
-        *      initialize the cache's relation id
-        * ----------------
-        */
-       Assert(RelationIsValid(relation));
-       cache->relationId = RelationGetRelid(relation);
-       tupdesc = cache->cc_tupdesc = RelationGetDescr(relation);
-
-       CACHE3_elog(DEBUG, "CatalogCacheInitializeCache: relid %d, %d keys",
-                               cache->relationId, cache->cc_nkeys);
-
-       /* ----------------
-        *      initialize cache's key information
-        * ----------------
-        */
-       for (i = 0; i < cache->cc_nkeys; ++i)
-       {
-               CatalogCacheInitializeCache_DEBUG2;
-
-               if (cache->cc_key[i] > 0)
-               {
-
-                       /*
-                        * Yoiks.  The implementation of the hashing code and the
-                        * implementation of int28's are at loggerheads.  The right
-                        * thing to do is to throw out the implementation of int28's
-                        * altogether; until that happens, we do the right thing here
-                        * to guarantee that the hash key generator doesn't try to
-                        * dereference an int2 by mistake.
-                        */
-
-                       if (tupdesc->attrs[cache->cc_key[i] - 1]->atttypid == INT28OID)
-                               cache->cc_klen[i] = sizeof(short);
-                       else
-                               cache->cc_klen[i] = tupdesc->attrs[cache->cc_key[i] - 1]->attlen;
-
-                       cache->cc_skey[i].sk_procedure =
-                               EQPROC(tupdesc->attrs[cache->cc_key[i] - 1]->atttypid);
-
-                       fmgr_info(cache->cc_skey[i].sk_procedure,
-                                         &cache->cc_skey[i].sk_func);
-                       cache->cc_skey[i].sk_nargs = cache->cc_skey[i].sk_func.fn_nargs;
-
-                       CACHE5_elog(DEBUG, "CatalogCacheInit %s %d %d %x",
-                                               &relation->rd_rel->relname,
-                                               i,
-                                               tupdesc->attrs[cache->cc_key[i] - 1]->attlen,
-                                               cache);
-               }
-       }
-
-       /* ----------------
-        *      close the relation if we opened it
-        * ----------------
-        */
-       if (didopen)
-               heap_close(relation);
-
-       /* ----------------
-        *      initialize index information for the cache.  this
-        *      should only be done once per cache.
-        * ----------------
-        */
-       if (cache->cc_indname != NULL && cache->indexId == InvalidOid)
-       {
-               if (RelationGetForm(relation)->relhasindex)
-               {
-
-                       /*
-                        * If the index doesn't exist we are in trouble.
-                        */
-                       relation = index_openr(cache->cc_indname);
-                       Assert(relation);
-                       cache->indexId = RelationGetRelid(relation);
-                       index_close(relation);
-               }
-               else
-                       cache->cc_indname = NULL;
-       }
-
-       /* ----------------
-        *      return to the proper memory context
-        * ----------------
-        */
-       MemoryContextSwitchTo(oldcxt);
-}
 
-/* --------------------------------
- *             CatalogCacheSetId
- *
- *             XXX temporary function
- * --------------------------------
+/*
+ *                                     internal support functions
  */
-#ifdef NOT_USED
-void
-CatalogCacheSetId(CatCache *cacheInOutP, int id)
-{
-       Assert(id == InvalidCatalogCacheId || id >= 0);
-       cacheInOutP->id = id;
-}
-
-#endif
 
-/* ----------------
- * comphash --
- *             Compute a hash value, somehow.
- *
- * XXX explain algorithm here.
+/*
+ * Look up the hash and equality functions for system types that are used
+ * as cache key fields.
  *
- * l is length of the attribute value, v
- * v is the attribute value ("Datum")
- * ----------------
+ * XXX this should be replaced by catalog lookups,
+ * but that seems to pose considerable risk of circularity...
  */
-static long
-comphash(long l, char *v)
+static void
+GetCCHashEqFuncs(Oid keytype, PGFunction *hashfunc, RegProcedure *eqfunc)
 {
-       long            i;
-       NameData        n;
-
-       CACHE3_elog(DEBUG, "comphash (%d,%x)", l, v);
-
-       switch (l)
-       {
-               case 1:
-               case 2:
-               case 4:
-                       return (long) v;
-       }
-
-       if (l == NAMEDATALEN)
+       switch (keytype)
        {
-
-               /*
-                * if it's a name, make sure that the values are null-padded.
-                *
-                * Note that this other fixed-length types can also have the same
-                * typelen so this may break them         - XXX
-                */
-               namestrcpy(&n, v);
-               v = n.data;
+               case BOOLOID:
+                       *hashfunc = hashchar;
+                       *eqfunc = F_BOOLEQ;
+                       break;
+               case CHAROID:
+                       *hashfunc = hashchar;
+                       *eqfunc = F_CHAREQ;
+                       break;
+               case NAMEOID:
+                       *hashfunc = hashname;
+                       *eqfunc = F_NAMEEQ;
+                       break;
+               case INT2OID:
+                       *hashfunc = hashint2;
+                       *eqfunc = F_INT2EQ;
+                       break;
+               case INT2VECTOROID:
+                       *hashfunc = hashint2vector;
+                       *eqfunc = F_INT2VECTOREQ;
+                       break;
+               case INT4OID:
+                       *hashfunc = hashint4;
+                       *eqfunc = F_INT4EQ;
+                       break;
+               case TEXTOID:
+                       *hashfunc = hashtext;
+                       *eqfunc = F_TEXTEQ;
+                       break;
+               case OIDOID:
+               case REGPROCOID:
+               case REGPROCEDUREOID:
+               case REGOPEROID:
+               case REGOPERATOROID:
+               case REGCLASSOID:
+               case REGTYPEOID:
+                       *hashfunc = hashoid;
+                       *eqfunc = F_OIDEQ;
+                       break;
+               case OIDVECTOROID:
+                       *hashfunc = hashoidvector;
+                       *eqfunc = F_OIDVECTOREQ;
+                       break;
+               default:
+                       elog(FATAL, "type %u not supported as catcache key", keytype);
+                       break;
        }
-       else if (l < 0)
-               l = VARSIZE(v);
-
-       i = 0;
-       while (l--)
-               i += *v++;
-       return i;
 }
 
-/* --------------------------------
- *             CatalogCacheComputeHashIndex
- * --------------------------------
+/*
+ *             CatalogCacheComputeHashValue
+ *
+ * Compute the hash value associated with a given set of lookup keys
  */
-static Index
-CatalogCacheComputeHashIndex(struct catcache * cacheInP)
+static uint32
+CatalogCacheComputeHashValue(CatCache *cache, int nkeys, ScanKey cur_skey)
 {
-       Index           hashIndex;
+       uint32          hashValue = 0;
 
-       hashIndex = 0x0;
-       CACHE6_elog(DEBUG, "CatalogCacheComputeHashIndex %s %d %d %d %x",
-                               cacheInP->cc_relname,
-                               cacheInP->cc_nkeys,
-                               cacheInP->cc_klen[0],
-                               cacheInP->cc_klen[1],
-                               cacheInP);
+       CACHE4_elog(DEBUG2, "CatalogCacheComputeHashValue %s %d %p",
+                               cache->cc_relname,
+                               nkeys,
+                               cache);
 
-       switch (cacheInP->cc_nkeys)
+       switch (nkeys)
        {
                case 4:
-                       hashIndex ^= comphash(cacheInP->cc_klen[3],
-                                                (char *) cacheInP->cc_skey[3].sk_argument) << 9;
+                       hashValue ^=
+                               DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[3],
+                                                                                 cur_skey[3].sk_argument)) << 9;
                        /* FALLTHROUGH */
                case 3:
-                       hashIndex ^= comphash(cacheInP->cc_klen[2],
-                                                (char *) cacheInP->cc_skey[2].sk_argument) << 6;
+                       hashValue ^=
+                               DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[2],
+                                                                                 cur_skey[2].sk_argument)) << 6;
                        /* FALLTHROUGH */
                case 2:
-                       hashIndex ^= comphash(cacheInP->cc_klen[1],
-                                                (char *) cacheInP->cc_skey[1].sk_argument) << 3;
+                       hashValue ^=
+                               DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[1],
+                                                                                 cur_skey[1].sk_argument)) << 3;
                        /* FALLTHROUGH */
                case 1:
-                       hashIndex ^= comphash(cacheInP->cc_klen[0],
-                                                         (char *) cacheInP->cc_skey[0].sk_argument);
+                       hashValue ^=
+                               DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[0],
+                                                                                          cur_skey[0].sk_argument));
                        break;
                default:
-                       elog(FATAL, "CCComputeHashIndex: %d cc_nkeys", cacheInP->cc_nkeys);
+                       elog(FATAL, "wrong number of hash keys: %d", nkeys);
                        break;
        }
-       hashIndex %= cacheInP->cc_size;
-       return hashIndex;
+
+       return hashValue;
 }
 
-/* --------------------------------
- *             CatalogCacheComputeTupleHashIndex
- * --------------------------------
+/*
+ *             CatalogCacheComputeTupleHashValue
+ *
+ * Compute the hash value associated with a given tuple to be cached
  */
-static Index
-CatalogCacheComputeTupleHashIndex(struct catcache * cacheInOutP,
-                                                                 Relation relation,
-                                                                 HeapTuple tuple)
+static uint32
+CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
 {
-       bool            isNull = '\0';
+       ScanKeyData cur_skey[4];
+       bool            isNull = false;
 
-       if (cacheInOutP->relationId == InvalidOid)
-               CatalogCacheInitializeCache(cacheInOutP, relation);
-       switch (cacheInOutP->cc_nkeys)
+       /* Copy pre-initialized overhead data for scankey */
+       memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+
+       /* Now extract key fields from tuple, insert into scankey */
+       switch (cache->cc_nkeys)
        {
                case 4:
-                       cacheInOutP->cc_skey[3].sk_argument =
-                               (cacheInOutP->cc_key[3] == ObjectIdAttributeNumber)
-                               ? (Datum) tuple->t_oid
+                       cur_skey[3].sk_argument =
+                               (cache->cc_key[3] == ObjectIdAttributeNumber)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
-                                                         cacheInOutP->cc_key[3],
-                                                         RelationGetDescr(relation),
+                                                         cache->cc_key[3],
+                                                         cache->cc_tupdesc,
                                                          &isNull);
                        Assert(!isNull);
                        /* FALLTHROUGH */
                case 3:
-                       cacheInOutP->cc_skey[2].sk_argument =
-                               (cacheInOutP->cc_key[2] == ObjectIdAttributeNumber)
-                               ? (Datum) tuple->t_oid
+                       cur_skey[2].sk_argument =
+                               (cache->cc_key[2] == ObjectIdAttributeNumber)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
-                                                         cacheInOutP->cc_key[2],
-                                                         RelationGetDescr(relation),
+                                                         cache->cc_key[2],
+                                                         cache->cc_tupdesc,
                                                          &isNull);
                        Assert(!isNull);
                        /* FALLTHROUGH */
                case 2:
-                       cacheInOutP->cc_skey[1].sk_argument =
-                               (cacheInOutP->cc_key[1] == ObjectIdAttributeNumber)
-                               ? (Datum) tuple->t_oid
+                       cur_skey[1].sk_argument =
+                               (cache->cc_key[1] == ObjectIdAttributeNumber)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
-                                                         cacheInOutP->cc_key[1],
-                                                         RelationGetDescr(relation),
+                                                         cache->cc_key[1],
+                                                         cache->cc_tupdesc,
                                                          &isNull);
                        Assert(!isNull);
                        /* FALLTHROUGH */
                case 1:
-                       cacheInOutP->cc_skey[0].sk_argument =
-                               (cacheInOutP->cc_key[0] == ObjectIdAttributeNumber)
-                               ? (Datum) tuple->t_oid
+                       cur_skey[0].sk_argument =
+                               (cache->cc_key[0] == ObjectIdAttributeNumber)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
-                                                         cacheInOutP->cc_key[0],
-                                                         RelationGetDescr(relation),
+                                                         cache->cc_key[0],
+                                                         cache->cc_tupdesc,
                                                          &isNull);
                        Assert(!isNull);
                        break;
                default:
-                       elog(FATAL, "CCComputeTupleHashIndex: %d cc_nkeys",
-                                cacheInOutP->cc_nkeys
-                               );
+                       elog(FATAL, "wrong number of hash keys: %d", cache->cc_nkeys);
                        break;
        }
 
-       return
-               CatalogCacheComputeHashIndex(cacheInOutP);
+       return CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
+}
+
+
+#ifdef CATCACHE_STATS
+
+static void
+CatCachePrintStats(void)
+{
+       CatCache   *cache;
+       long            cc_searches = 0;
+       long            cc_hits = 0;
+       long            cc_neg_hits = 0;
+       long            cc_newloads = 0;
+       long            cc_invals = 0;
+       long            cc_discards = 0;
+       long            cc_lsearches = 0;
+       long            cc_lhits = 0;
+
+       elog(DEBUG2, "catcache stats dump: %d/%d tuples in catcaches",
+                CacheHdr->ch_ntup, CacheHdr->ch_maxtup);
+
+       for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
+       {
+               if (cache->cc_ntup == 0 && cache->cc_searches == 0)
+                       continue;                       /* don't print unused caches */
+               elog(DEBUG2, "catcache %s/%u: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
+                        cache->cc_relname,
+                        cache->cc_indexoid,
+                        cache->cc_ntup,
+                        cache->cc_searches,
+                        cache->cc_hits,
+                        cache->cc_neg_hits,
+                        cache->cc_hits + cache->cc_neg_hits,
+                        cache->cc_newloads,
+                        cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads,
+                        cache->cc_searches - cache->cc_hits - cache->cc_neg_hits,
+                        cache->cc_invals,
+                        cache->cc_discards,
+                        cache->cc_lsearches,
+                        cache->cc_lhits);
+               cc_searches += cache->cc_searches;
+               cc_hits += cache->cc_hits;
+               cc_neg_hits += cache->cc_neg_hits;
+               cc_newloads += cache->cc_newloads;
+               cc_invals += cache->cc_invals;
+               cc_discards += cache->cc_discards;
+               cc_lsearches += cache->cc_lsearches;
+               cc_lhits += cache->cc_lhits;
+       }
+       elog(DEBUG2, "catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
+                CacheHdr->ch_ntup,
+                cc_searches,
+                cc_hits,
+                cc_neg_hits,
+                cc_hits + cc_neg_hits,
+                cc_newloads,
+                cc_searches - cc_hits - cc_neg_hits - cc_newloads,
+                cc_searches - cc_hits - cc_neg_hits,
+                cc_invals,
+                cc_discards,
+                cc_lsearches,
+                cc_lhits);
 }
+#endif   /* CATCACHE_STATS */
+
 
-/* --------------------------------
+/*
  *             CatCacheRemoveCTup
- * --------------------------------
+ *
+ * Unlink and delete the given cache entry
+ *
+ * NB: if it is a member of a CatCList, the CatCList is deleted too.
  */
 static void
-CatCacheRemoveCTup(CatCache *cache, Dlelem *elt)
+CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
 {
-       CatCTup    *ct;
-       CatCTup    *other_ct;
-       Dlelem     *other_elt;
+       Assert(ct->refcount == 0);
+       Assert(ct->my_cache == cache);
+
+       if (ct->c_list)
+               CatCacheRemoveCList(cache, ct->c_list);
+
+       /* delink from linked lists */
+       DLRemove(&ct->lrulist_elem);
+       DLRemove(&ct->cache_elem);
+
+       /* free associated tuple data */
+       if (ct->tuple.t_data != NULL)
+               pfree(ct->tuple.t_data);
+       pfree(ct);
 
-       if (elt)
-               ct = (CatCTup *) DLE_VAL(elt);
-       else
-               return;
-
-       other_elt = ct->ct_node;
-       other_ct = (CatCTup *) DLE_VAL(other_elt);
-       DLRemove(other_elt);
-       DLFreeElem(other_elt);
-       free(other_ct);
-       DLRemove(elt);
-       DLFreeElem(elt);
-       free(ct);
        --cache->cc_ntup;
+       --CacheHdr->ch_ntup;
+}
+
+/*
+ *             CatCacheRemoveCList
+ *
+ * Unlink and delete the given cache list entry
+ */
+static void
+CatCacheRemoveCList(CatCache *cache, CatCList *cl)
+{
+       int                     i;
+
+       Assert(cl->refcount == 0);
+       Assert(cl->my_cache == cache);
+
+       /* delink from member tuples */
+       for (i = cl->n_members; --i >= 0;)
+       {
+               CatCTup    *ct = cl->members[i];
+
+               Assert(ct->c_list == cl);
+               ct->c_list = NULL;
+       }
+
+       /* delink from linked list */
+       DLRemove(&cl->cache_elem);
+
+       /* free associated tuple data */
+       if (cl->tuple.t_data != NULL)
+               pfree(cl->tuple.t_data);
+       pfree(cl);
 }
 
-/* --------------------------------
- *     CatalogCacheIdInvalidate()
+
+/*
+ *     CatalogCacheIdInvalidate
  *
- *     Invalidate a tuple given a cache id.  In this case the id should always
- *     be found (whether the cache has opened its relation or not).  Of course,
- *     if the cache has yet to open its relation, there will be no tuples so
- *     no problem.
- * --------------------------------
+ *     Invalidate entries in the specified cache, given a hash value and
+ *     item pointer.  Positive entries are deleted if they match the item
+ *     pointer.  Negative entries must be deleted if they match the hash
+ *     value (since we do not have the exact key of the tuple that's being
+ *     inserted).      But this should only rarely result in loss of a cache
+ *     entry that could have been kept.
+ *
+ *     Note that it's not very relevant whether the tuple identified by
+ *     the item pointer is being inserted or deleted.  We don't expect to
+ *     find matching positive entries in the one case, and we don't expect
+ *     to find matching negative entries in the other; but we will do the
+ *     right things in any case.
+ *
+ *     This routine is only quasi-public: it should only be used by inval.c.
  */
 void
-CatalogCacheIdInvalidate(int cacheId,  /* XXX */
-                                                Index hashIndex,
+CatalogCacheIdInvalidate(int cacheId,
+                                                uint32 hashValue,
                                                 ItemPointer pointer)
 {
        CatCache   *ccp;
-       CatCTup    *ct;
-       Dlelem     *elt;
-       MemoryContext oldcxt;
 
-       /* ----------------
-        *      sanity checks
-        * ----------------
+       /*
+        * sanity checks
         */
-       Assert(hashIndex < NCCBUCK);
        Assert(ItemPointerIsValid(pointer));
-       CACHE1_elog(DEBUG, "CatalogCacheIdInvalidate: called");
-
-       /* ----------------
-        *      switch to the cache context for our memory allocations
-        * ----------------
-        */
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
+       CACHE1_elog(DEBUG2, "CatalogCacheIdInvalidate: called");
 
-       /* ----------------
-        *      inspect every cache that could contain the tuple
-        * ----------------
+       /*
+        * inspect caches to find the proper cache
         */
-       for (ccp = Caches; ccp; ccp = ccp->cc_next)
+       for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
        {
+               Index           hashIndex;
+               Dlelem     *elt,
+                                  *nextelt;
+
                if (cacheId != ccp->id)
                        continue;
-               /* ----------------
-                *      inspect the hash bucket until we find a match or exhaust
-                * ----------------
+
+               /*
+                * We don't bother to check whether the cache has finished
+                * initialization yet; if not, there will be no entries in it so
+                * no problem.
                 */
-               for (elt = DLGetHead(ccp->cc_cache[hashIndex]);
-                        elt;
-                        elt = DLGetSucc(elt))
+
+               /*
+                * Invalidate *all* CatCLists in this cache; it's too hard to tell
+                * which searches might still be correct, so just zap 'em all.
+                */
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
                {
-                       ct = (CatCTup *) DLE_VAL(elt);
-                       if (ItemPointerEquals(pointer, &ct->ct_tup->t_ctid))
-                               break;
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount > 0)
+                               cl->dead = true;
+                       else
+                               CatCacheRemoveCList(ccp, cl);
                }
 
-               /* ----------------
-                *      if we found a matching tuple, invalidate it.
-                * ----------------
+               /*
+                * inspect the proper hash bucket for tuple matches
                 */
+               hashIndex = HASH_INDEX(hashValue, ccp->cc_nbuckets);
 
-               if (elt)
+               for (elt = DLGetHead(&ccp->cc_bucket[hashIndex]); elt; elt = nextelt)
                {
-                       CatCacheRemoveCTup(ccp, elt);
+                       CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
 
-                       CACHE1_elog(DEBUG, "CatalogCacheIdInvalidate: invalidated");
-               }
+                       nextelt = DLGetSucc(elt);
 
-               if (cacheId != InvalidCatalogCacheId)
-                       break;
-       }
+                       if (hashValue != ct->hash_value)
+                               continue;               /* ignore non-matching hash values */
 
-       /* ----------------
-        *      return to the proper memory context
-        * ----------------
-        */
-       MemoryContextSwitchTo(oldcxt);
-       /* sendpm('I', "Invalidated tuple"); */
+                       if (ct->negative ||
+                               ItemPointerEquals(pointer, &ct->tuple.t_self))
+                       {
+                               if (ct->refcount > 0)
+                                       ct->dead = true;
+                               else
+                                       CatCacheRemoveCTup(ccp, ct);
+                               CACHE1_elog(DEBUG2, "CatalogCacheIdInvalidate: invalidated");
+#ifdef CATCACHE_STATS
+                               ccp->cc_invals++;
+#endif
+                               /* could be multiple matches, so keep looking! */
+                       }
+               }
+               break;                                  /* need only search this one cache */
+       }
 }
 
 /* ----------------------------------------------------------------
  *                                        public functions
- *
- *             ResetSystemCache
- *             InitIndexedSysCache
- *             InitSysCache
- *             SearchSysCache
- *             RelationInvalidateCatalogCacheTuple
  * ----------------------------------------------------------------
  */
-/* --------------------------------
- *             ResetSystemCache
- * --------------------------------
+
+
+/*
+ * Standard routine for creating cache context if it doesn't exist yet
+ *
+ * There are a lot of places (probably far more than necessary) that check
+ * whether CacheMemoryContext exists yet and want to create it if not.
+ * We centralize knowledge of exactly how to create it here.
  */
 void
-ResetSystemCache()
+CreateCacheMemoryContext(void)
 {
-       MemoryContext oldcxt;
-       struct catcache *cache;
-
-       /* ----------------
-        *      sanity checks
-        * ----------------
+       /*
+        * Purely for paranoia, check that context doesn't exist; caller
+        * probably did so already.
         */
-       CACHE1_elog(DEBUG, "ResetSystemCache called");
-       if (DisableCache)
-       {
-               elog(ERROR, "ResetSystemCache: Called while cache disabled");
-               return;
-       }
+       if (!CacheMemoryContext)
+               CacheMemoryContext = AllocSetContextCreate(TopMemoryContext,
+                                                                                                  "CacheMemoryContext",
+                                                                                               ALLOCSET_DEFAULT_MINSIZE,
+                                                                                          ALLOCSET_DEFAULT_INITSIZE,
+                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
+}
 
-       /* ----------------
-        *      first switch to the cache context so our allocations
-        *      do not vanish at the end of a transaction
-        * ----------------
-        */
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
 
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
+/*
+ *             AtEOXact_CatCache
+ *
+ * Clean up catcaches at end of main transaction (either commit or abort)
+ *
+ * We scan the caches to reset refcounts to zero.  This is of course
+ * necessary in the abort case, since elog() may have interrupted routines.
+ * In the commit case, any nonzero counts indicate failure to call
+ * ReleaseSysCache, so we put out a notice for debugging purposes.
+ */
+void
+AtEOXact_CatCache(bool isCommit)
+{
+       CatCache   *ccp;
+       Dlelem     *elt,
+                          *nextelt;
 
-       /* ----------------
-        *      here we purge the contents of all the caches
-        *
-        *      for each system cache
-        *         for each hash bucket
-        *                 for each tuple in hash bucket
-        *                         remove the tuple
-        * ----------------
+       /*
+        * First clean up CatCLists
         */
-       for (cache = Caches; PointerIsValid(cache); cache = cache->cc_next)
+       for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
        {
-               int                     hash;
-
-               for (hash = 0; hash < NCCBUCK; hash += 1)
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
                {
-                       Dlelem     *elt,
-                                          *nextelt;
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
 
-                       for (elt = DLGetHead(cache->cc_cache[hash]); elt; elt = nextelt)
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount != 0)
                        {
-                               nextelt = DLGetSucc(elt);
-                               CatCacheRemoveCTup(cache, elt);
-                               if (cache->cc_ntup == -1)
-                                       elog(ERROR, "ResetSystemCache: cc_ntup<0 (software error)");
+                               if (isCommit)
+                                       PrintCatCacheListLeakWarning(cl);
+                               cl->refcount = 0;
                        }
+
+                       /* Clean up any now-deletable dead entries */
+                       if (cl->dead)
+                               CatCacheRemoveCList(ccp, cl);
                }
-               cache->cc_ntup = 0;             /* in case of WARN error above */
        }
 
-       CACHE1_elog(DEBUG, "end of ResetSystemCache call");
-
-       /* ----------------
-        *      back to the old context before we return...
-        * ----------------
+       /*
+        * Now clean up tuples; we can scan them all using the global LRU list
         */
-       MemoryContextSwitchTo(oldcxt);
+       for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
+       {
+               CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
+
+               nextelt = DLGetSucc(elt);
+
+               if (ct->refcount != 0)
+               {
+                       if (isCommit)
+                               PrintCatCacheLeakWarning(&ct->tuple);
+                       ct->refcount = 0;
+               }
+
+               /* Clean up any now-deletable dead entries */
+               if (ct->dead)
+                       CatCacheRemoveCTup(ct->my_cache, ct);
+       }
+}
+
+/*
+ *             ResetCatalogCache
+ *
+ * Reset one catalog cache to empty.
+ *
+ * This is not very efficient if the target cache is nearly empty.
+ * However, it shouldn't need to be efficient; we don't invoke it often.
+ */
+static void
+ResetCatalogCache(CatCache *cache)
+{
+       Dlelem     *elt,
+                          *nextelt;
+       int                     i;
+
+       /* Remove each list in this cache, or at least mark it dead */
+       for (elt = DLGetHead(&cache->cc_lists); elt; elt = nextelt)
+       {
+               CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+               nextelt = DLGetSucc(elt);
+
+               if (cl->refcount > 0)
+                       cl->dead = true;
+               else
+                       CatCacheRemoveCList(cache, cl);
+       }
+
+       /* Remove each tuple in this cache, or at least mark it dead */
+       for (i = 0; i < cache->cc_nbuckets; i++)
+       {
+               for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
+               {
+                       CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (ct->refcount > 0)
+                               ct->dead = true;
+                       else
+                               CatCacheRemoveCTup(cache, ct);
+#ifdef CATCACHE_STATS
+                       cache->cc_invals++;
+#endif
+               }
+       }
+}
+
+/*
+ *             ResetCatalogCaches
+ *
+ * Reset all caches when a shared cache inval event forces it
+ */
+void
+ResetCatalogCaches(void)
+{
+       CatCache   *cache;
+
+       CACHE1_elog(DEBUG2, "ResetCatalogCaches called");
+
+       for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
+               ResetCatalogCache(cache);
+
+       CACHE1_elog(DEBUG2, "end of ResetCatalogCaches call");
 }
 
-/* --------------------------------
- *             SystemCacheRelationFlushed
+/*
+ *             CatalogCacheFlushRelation
  *
- *     RelationFlushRelation() frees some information referenced in the
- *     cache structures. So we get informed when this is done and arrange
- *     for the next SearchSysCache() call that this information is setup
- *     again.
- * --------------------------------
+ *     This is called by RelationFlushRelation() to clear out cached information
+ *     about a relation being dropped.  (This could be a DROP TABLE command,
+ *     or a temp table being dropped at end of transaction, or a table created
+ *     during the current transaction that is being dropped because of abort.)
+ *     Remove all cache entries relevant to the specified relation OID.
+ *
+ *     A special case occurs when relId is itself one of the cacheable system
+ *     tables --- although those'll never be dropped, they can get flushed from
+ *     the relcache (VACUUM causes this, for example).  In that case we need
+ *     to flush all cache entries that came from that table.  (At one point we
+ *     also tried to force re-execution of CatalogCacheInitializeCache for
+ *     the cache(s) on that table.  This is a bad idea since it leads to all
+ *     kinds of trouble if a cache flush occurs while loading cache entries.
+ *     We now avoid the need to do it by copying cc_tupdesc out of the relcache,
+ *     rather than relying on the relcache to keep a tupdesc for us.  Of course
+ *     this assumes the tupdesc of a cachable system table will not change...)
  */
 void
-SystemCacheRelationFlushed(Oid relId)
+CatalogCacheFlushRelation(Oid relId)
 {
-       struct catcache *cache;
+       CatCache   *cache;
+
+       CACHE2_elog(DEBUG2, "CatalogCacheFlushRelation called for %u", relId);
 
-       for (cache = Caches; PointerIsValid(cache); cache = cache->cc_next)
+       for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
        {
-               if (cache->relationId == relId)
-                       cache->relationId = InvalidOid;
+               int                     i;
+
+               /* We can ignore uninitialized caches, since they must be empty */
+               if (cache->cc_tupdesc == NULL)
+                       continue;
+
+               /* Does this cache store tuples of the target relation itself? */
+               if (cache->cc_tupdesc->attrs[0]->attrelid == relId)
+               {
+                       /* Yes, so flush all its contents */
+                       ResetCatalogCache(cache);
+                       continue;
+               }
+
+               /* Does this cache store tuples associated with relations at all? */
+               if (cache->cc_reloidattr == 0)
+                       continue;                       /* nope, leave it alone */
+
+               /* Yes, scan the tuples and remove those related to relId */
+               for (i = 0; i < cache->cc_nbuckets; i++)
+               {
+                       Dlelem     *elt,
+                                          *nextelt;
+
+                       for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
+                       {
+                               CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
+                               Oid                     tupRelid;
+
+                               nextelt = DLGetSucc(elt);
+
+                               /*
+                                * Negative entries are never considered related to a rel,
+                                * even if the rel is part of their lookup key.
+                                */
+                               if (ct->negative)
+                                       continue;
+
+                               if (cache->cc_reloidattr == ObjectIdAttributeNumber)
+                                       tupRelid = HeapTupleGetOid(&ct->tuple);
+                               else
+                               {
+                                       bool            isNull;
+
+                                       tupRelid =
+                                               DatumGetObjectId(fastgetattr(&ct->tuple,
+                                                                                                        cache->cc_reloidattr,
+                                                                                                        cache->cc_tupdesc,
+                                                                                                        &isNull));
+                                       Assert(!isNull);
+                               }
+
+                               if (tupRelid == relId)
+                               {
+                                       if (ct->refcount > 0)
+                                               ct->dead = true;
+                                       else
+                                               CatCacheRemoveCTup(cache, ct);
+#ifdef CATCACHE_STATS
+                                       cache->cc_invals++;
+#endif
+                               }
+                       }
+               }
        }
+
+       CACHE1_elog(DEBUG2, "end of CatalogCacheFlushRelation call");
 }
 
-/* --------------------------------
- *             InitIndexedSysCache
+/*
+ *             InitCatCache
  *
  *     This allocates and initializes a cache for a system catalog relation.
  *     Actually, the cache is only partially initialized to avoid opening the
  *     relation.  The relation will be opened and the rest of the cache
  *     structure initialized on the first access.
- * --------------------------------
  */
 #ifdef CACHEDEBUG
-#define InitSysCache_DEBUG1 \
+#define InitCatCache_DEBUG2 \
 do { \
-       elog(DEBUG, "InitSysCache: rid=%d id=%d nkeys=%d size=%d\n", \
-               cp->relationId, cp->id, cp->cc_nkeys, cp->cc_size); \
-       for (i = 0; i < nkeys; i += 1) \
-       { \
-               elog(DEBUG, "InitSysCache: key=%d len=%d skey=[%d %d %d %d]\n", \
-                        cp->cc_key[i], cp->cc_klen[i], \
-                        cp->cc_skey[i].sk_flags, \
-                        cp->cc_skey[i].sk_attno, \
-                        cp->cc_skey[i].sk_procedure, \
-                        cp->cc_skey[i].sk_argument); \
-       } \
+       elog(DEBUG2, "InitCatCache: rel=%u ind=%u id=%d nkeys=%d size=%d", \
+                cp->cc_reloid, cp->cc_indexoid, cp->id, \
+                cp->cc_nkeys, cp->cc_nbuckets); \
 } while(0)
 
 #else
-#define InitSysCache_DEBUG1
+#define InitCatCache_DEBUG2
 #endif
 
-CatCache   *
-InitSysCache(char *relname,
-                        char *iname,
-                        int id,
+CatCache *
+InitCatCache(int id,
+                        Oid reloid,
+                        Oid indexoid,
+                        int reloidattr,
                         int nkeys,
-                        int *key,
-                        HeapTuple (*iScanfuncP) ())
+                        const int *key)
 {
        CatCache   *cp;
-       int                     i;
        MemoryContext oldcxt;
+       int                     i;
 
-       char       *indname;
+       /*
+        * first switch to the cache context so our allocations do not vanish
+        * at the end of a transaction
+        */
+       if (!CacheMemoryContext)
+               CreateCacheMemoryContext();
 
-       indname = (iname) ? iname : NULL;
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
-       /* ----------------
-        *      first switch to the cache context so our allocations
-        *      do not vanish at the end of a transaction
-        * ----------------
+       /*
+        * if first time through, initialize the cache group header, including
+        * global LRU list header
         */
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
+       if (CacheHdr == NULL)
+       {
+               CacheHdr = (CatCacheHeader *) palloc(sizeof(CatCacheHeader));
+               CacheHdr->ch_caches = NULL;
+               CacheHdr->ch_ntup = 0;
+               CacheHdr->ch_maxtup = MAXCCTUPLES;
+               DLInitList(&CacheHdr->ch_lrulist);
+#ifdef CATCACHE_STATS
+               on_proc_exit(CatCachePrintStats, 0);
+#endif
+       }
 
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
+       /*
+        * allocate a new cache structure
+        *
+        * Note: we assume zeroing initializes the Dllist headers correctly
+        */
+       cp = (CatCache *) palloc0(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
 
-       /* ----------------
-        *      allocate a new cache structure
-        * ----------------
+       /*
+        * initialize the cache's relation information for the relation
+        * corresponding to this cache, and initialize some of the new cache's
+        * other internal fields.  But don't open the relation yet.
         */
-       cp = (CatCache *) palloc(sizeof(CatCache));
-       MemSet((char *) cp, 0, sizeof(CatCache));
+       cp->id = id;
+       cp->cc_relname = "(not known yet)";
+       cp->cc_reloid = reloid;
+       cp->cc_indexoid = indexoid;
+       cp->cc_relisshared = false; /* temporary */
+       cp->cc_tupdesc = (TupleDesc) NULL;
+       cp->cc_reloidattr = reloidattr;
+       cp->cc_ntup = 0;
+       cp->cc_nbuckets = NCCBUCKETS;
+       cp->cc_nkeys = nkeys;
+       for (i = 0; i < nkeys; ++i)
+               cp->cc_key[i] = key[i];
 
-       /* ----------------
-        *      initialize the cache buckets (each bucket is a list header)
-        *      and the LRU tuple list
-        * ----------------
+       /*
+        * new cache is initialized as far as we can go for now. print some
+        * debugging information, if appropriate.
         */
-       {
+       InitCatCache_DEBUG2;
 
-               /*
-                * We can only do this optimization because the number of hash
-                * buckets never changes.  Without it, we call malloc() too much.
-                * We could move this to dllist.c, but the way we do this is not
-                * dynamic/portabl, so why allow other routines to use it.
-                */
-               Dllist     *cache_begin = malloc((NCCBUCK + 1) * sizeof(Dllist));
+       /*
+        * add completed cache to top of group header's list
+        */
+       cp->cc_next = CacheHdr->ch_caches;
+       CacheHdr->ch_caches = cp;
 
-               for (i = 0; i <= NCCBUCK; ++i)
-               {
-                       cp->cc_cache[i] = &cache_begin[i];
-                       cp->cc_cache[i]->dll_head = 0;
-                       cp->cc_cache[i]->dll_tail = 0;
-               }
-       }
+       /*
+        * back to the old context before we return...
+        */
+       MemoryContextSwitchTo(oldcxt);
 
-       cp->cc_lrulist = DLNewList();
+       return cp;
+}
 
-       /* ----------------
-        *      Caches is the pointer to the head of the list of all the
-        *      system caches.  here we add the new cache to the top of the list.
-        * ----------------
+/*
+ *             CatalogCacheInitializeCache
+ *
+ * This function does final initialization of a catcache: obtain the tuple
+ * descriptor and set up the hash and equality function links. We assume
+ * that the relcache entry can be opened at this point!
+ */
+#ifdef CACHEDEBUG
+#define CatalogCacheInitializeCache_DEBUG1 \
+       elog(DEBUG2, "CatalogCacheInitializeCache: cache @%p rel=%u", cache, \
+                cache->cc_reloid)
+
+#define CatalogCacheInitializeCache_DEBUG2 \
+do { \
+               if (cache->cc_key[i] > 0) { \
+                       elog(DEBUG2, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \
+                               i+1, cache->cc_nkeys, cache->cc_key[i], \
+                                tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \
+               } else { \
+                       elog(DEBUG2, "CatalogCacheInitializeCache: load %d/%d w/%d", \
+                               i+1, cache->cc_nkeys, cache->cc_key[i]); \
+               } \
+} while(0)
+
+#else
+#define CatalogCacheInitializeCache_DEBUG1
+#define CatalogCacheInitializeCache_DEBUG2
+#endif
+
+static void
+CatalogCacheInitializeCache(CatCache *cache)
+{
+       Relation        relation;
+       MemoryContext oldcxt;
+       TupleDesc       tupdesc;
+       int                     i;
+
+       CatalogCacheInitializeCache_DEBUG1;
+
+       /*
+        * Open the relation without locking --- we only need the tupdesc,
+        * which we assume will never change ...
         */
-       cp->cc_next = Caches;           /* list of caches (single link) */
-       Caches = cp;
+       relation = heap_open(cache->cc_reloid, NoLock);
+       Assert(RelationIsValid(relation));
 
-       /* ----------------
-        *      initialize the cache's relation information for the relation
-        *      corresponding to this cache and initialize some of the the new
-        *      cache's other internal fields.
-        * ----------------
+       /*
+        * switch to the cache context so our allocations do not vanish at the
+        * end of a transaction
         */
-       cp->relationId = InvalidOid;
-       cp->indexId = InvalidOid;
-       cp->cc_relname = relname;
-       cp->cc_indname = indname;
-       cp->cc_tupdesc = (TupleDesc) NULL;
-       cp->id = id;
-       cp->cc_maxtup = MAXTUP;
-       cp->cc_size = NCCBUCK;
-       cp->cc_nkeys = nkeys;
-       cp->cc_iscanfunc = iScanfuncP;
+       Assert(CacheMemoryContext != NULL);
 
-       /* ----------------
-        *      initialize the cache's key information
-        * ----------------
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       /*
+        * copy the relcache's tuple descriptor to permanent cache storage
         */
-       for (i = 0; i < nkeys; ++i)
+       tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
+
+       /*
+        * save the relation's name and relisshared flag, too (cc_relname
+        * is used only for debugging purposes)
+        */
+       cache->cc_relname = pstrdup(RelationGetRelationName(relation));
+       cache->cc_relisshared = RelationGetForm(relation)->relisshared;
+
+       /*
+        * return to the caller's memory context and close the rel
+        */
+       MemoryContextSwitchTo(oldcxt);
+
+       heap_close(relation, NoLock);
+
+       CACHE3_elog(DEBUG2, "CatalogCacheInitializeCache: %s, %d keys",
+                               cache->cc_relname, cache->cc_nkeys);
+
+       /*
+        * initialize cache's key information
+        */
+       for (i = 0; i < cache->cc_nkeys; ++i)
        {
-               cp->cc_key[i] = key[i];
-               if (!key[i])
-                       elog(FATAL, "InitSysCache: called with 0 key[%d]", i);
-               if (key[i] < 0)
-               {
-                       if (key[i] != ObjectIdAttributeNumber)
-                               elog(FATAL, "InitSysCache: called with %d key[%d]", key[i], i);
-                       else
-                       {
-                               cp->cc_klen[i] = sizeof(Oid);
+               Oid                     keytype;
+               RegProcedure eqfunc;
 
-                               /*
-                                * ScanKeyEntryData and struct skey are equivalent. It
-                                * looks like a move was made to obsolete struct skey, but
-                                * it didn't reach this file.  Someday we should clean up
-                                * this code and consolidate to ScanKeyEntry - mer 10 Nov
-                                * 1991
-                                */
-                               ScanKeyEntryInitialize(&cp->cc_skey[i],
-                                                                          (bits16) 0,
-                                                                          (AttrNumber) key[i],
-                                                                          (RegProcedure) F_OIDEQ,
-                                                                          (Datum) 0);
-                               continue;
-                       }
+               CatalogCacheInitializeCache_DEBUG2;
+
+               if (cache->cc_key[i] > 0)
+                       keytype = tupdesc->attrs[cache->cc_key[i] - 1]->atttypid;
+               else
+               {
+                       if (cache->cc_key[i] != ObjectIdAttributeNumber)
+                               elog(FATAL, "only sys attr supported in caches is OID");
+                       keytype = OIDOID;
                }
 
-               cp->cc_skey[i].sk_attno = key[i];
+               GetCCHashEqFuncs(keytype,
+                                                &cache->cc_hashfunc[i],
+                                                &eqfunc);
+
+               cache->cc_isname[i] = (keytype == NAMEOID);
+
+               /*
+                * Do equality-function lookup (we assume this won't need a
+                * catalog lookup for any supported type)
+                */
+               fmgr_info_cxt(eqfunc,
+                                         &cache->cc_skey[i].sk_func,
+                                         CacheMemoryContext);
+
+               /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */
+               cache->cc_skey[i].sk_attno = cache->cc_key[i];
+
+               /* Fill in sk_strategy as well --- always standard equality */
+               cache->cc_skey[i].sk_strategy = BTEqualStrategyNumber;
+               cache->cc_skey[i].sk_subtype = InvalidOid;
+
+               CACHE4_elog(DEBUG2, "CatalogCacheInit %s %d %p",
+                                       cache->cc_relname,
+                                       i,
+                                       cache);
        }
 
-       /* ----------------
-        *      all done.  new cache is initialized.  print some debugging
-        *      information, if appropriate.
-        * ----------------
+       /*
+        * mark this cache fully initialized
         */
-       InitSysCache_DEBUG1;
+       cache->cc_tupdesc = tupdesc;
+}
 
-       /* ----------------
-        *      back to the old context before we return...
-        * ----------------
-        */
-       MemoryContextSwitchTo(oldcxt);
-       return cp;
+/*
+ * InitCatCachePhase2 -- external interface for CatalogCacheInitializeCache
+ *
+ * The only reason to call this routine is to ensure that the relcache
+ * has created entries for all the catalogs and indexes referenced by
+ * catcaches.  Therefore, open the index too.  An exception is the indexes
+ * on pg_am, which we don't use (cf. IndexScanOK).
+ */
+void
+InitCatCachePhase2(CatCache *cache)
+{
+       if (cache->cc_tupdesc == NULL)
+               CatalogCacheInitializeCache(cache);
+
+       if (cache->id != AMOID &&
+               cache->id != AMNAME)
+       {
+               Relation        idesc;
+
+               idesc = index_open(cache->cc_indexoid);
+               index_close(idesc);
+       }
 }
 
 
-/* --------------------------------
- *             SearchSysCache
+/*
+ *             IndexScanOK
+ *
+ *             This function checks for tuples that will be fetched by
+ *             IndexSupportInitialize() during relcache initialization for
+ *             certain system indexes that support critical syscaches.
+ *             We can't use an indexscan to fetch these, else we'll get into
+ *             infinite recursion.  A plain heap scan will work, however.
+ *
+ *             Once we have completed relcache initialization (signaled by
+ *             criticalRelcachesBuilt), we don't have to worry anymore.
+ */
+static bool
+IndexScanOK(CatCache *cache, ScanKey cur_skey)
+{
+       if (cache->id == INDEXRELID)
+       {
+               /*
+                * Since the OIDs of indexes aren't hardwired, it's painful to
+                * figure out which is which.  Just force all pg_index searches to
+                * be heap scans while building the relcaches.
+                */
+               if (!criticalRelcachesBuilt)
+                       return false;
+       }
+       else if (cache->id == AMOID ||
+                        cache->id == AMNAME)
+       {
+               /*
+                * Always do heap scans in pg_am, because it's so small there's
+                * not much point in an indexscan anyway.  We *must* do this when
+                * initially building critical relcache entries, but we might as
+                * well just always do it.
+                */
+               return false;
+       }
+       else if (cache->id == OPEROID)
+       {
+               if (!criticalRelcachesBuilt)
+               {
+                       /* Looking for an OID comparison function? */
+                       Oid                     lookup_oid = DatumGetObjectId(cur_skey[0].sk_argument);
+
+                       if (lookup_oid >= MIN_OIDCMP && lookup_oid <= MAX_OIDCMP)
+                               return false;
+               }
+       }
+
+       /* Normal case, allow index scan */
+       return true;
+}
+
+/*
+ *     SearchCatCache
  *
  *             This call searches a system cache for a tuple, opening the relation
- *             if necessary (the first access to a particular cache).
- * --------------------------------
+ *             if necessary (on the first access to a particular cache).
+ *
+ *             The result is NULL if not found, or a pointer to a HeapTuple in
+ *             the cache.      The caller must not modify the tuple, and must call
+ *             ReleaseCatCache() when done with it.
+ *
+ * The search key values should be expressed as Datums of the key columns'
+ * datatype(s).  (Pass zeroes for any unused parameters.)  As a special
+ * exception, the passed-in key for a NAME column can be just a C string;
+ * the caller need not go to the trouble of converting it to a fully
+ * null-padded NAME.
  */
 HeapTuple
-SearchSysCache(struct catcache * cache,
+SearchCatCache(CatCache *cache,
                           Datum v1,
                           Datum v2,
                           Datum v3,
                           Datum v4)
 {
-       unsigned        hash;
-       CatCTup    *ct = NULL;
-       CatCTup    *nct;
-       CatCTup    *nct2;
+       ScanKeyData cur_skey[4];
+       uint32          hashValue;
+       Index           hashIndex;
        Dlelem     *elt;
-       HeapTuple       ntp = 0;
-
+       CatCTup    *ct;
        Relation        relation;
-       MemoryContext oldcxt;
+       SysScanDesc scandesc;
+       HeapTuple       ntp;
 
-       /* ----------------
-        *      sanity checks
-        * ----------------
+       /*
+        * one-time startup overhead for each cache
         */
-       if (cache->relationId == InvalidOid)
-               CatalogCacheInitializeCache(cache, NULL);
+       if (cache->cc_tupdesc == NULL)
+               CatalogCacheInitializeCache(cache);
 
-       /* ----------------
-        *      initialize the search key information
-        * ----------------
-        */
-       cache->cc_skey[0].sk_argument = v1;
-       cache->cc_skey[1].sk_argument = v2;
-       cache->cc_skey[2].sk_argument = v3;
-       cache->cc_skey[3].sk_argument = v4;
+#ifdef CATCACHE_STATS
+       cache->cc_searches++;
+#endif
 
-       /* ----------------
-        *      find the hash bucket in which to look for the tuple
-        * ----------------
+       /*
+        * initialize the search key information
+        */
+       memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+       cur_skey[0].sk_argument = v1;
+       cur_skey[1].sk_argument = v2;
+       cur_skey[2].sk_argument = v3;
+       cur_skey[3].sk_argument = v4;
+
+       /*
+        * find the hash bucket in which to look for the tuple
         */
-       hash = CatalogCacheComputeHashIndex(cache);
+       hashValue = CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
+       hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
 
-       /* ----------------
-        *      scan the hash bucket until we find a match or exhaust our tuples
-        * ----------------
+       /*
+        * scan the hash bucket until we find a match or exhaust our tuples
         */
-       for (elt = DLGetHead(cache->cc_cache[hash]);
+       for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
                 elt;
                 elt = DLGetSucc(elt))
        {
                bool            res;
 
                ct = (CatCTup *) DLE_VAL(elt);
-               /* ----------------
-                *      see if the cached tuple matches our key.
-                *      (should we be worried about time ranges? -cim 10/2/90)
-                * ----------------
+
+               if (ct->dead)
+                       continue;                       /* ignore dead entries */
+
+               if (ct->hash_value != hashValue)
+                       continue;                       /* quickly skip entry if wrong hash val */
+
+               /*
+                * see if the cached tuple matches our key.
                 */
-               HeapKeyTest(ct->ct_tup,
+               HeapKeyTest(&ct->tuple,
                                        cache->cc_tupdesc,
                                        cache->cc_nkeys,
-                                       cache->cc_skey,
+                                       cur_skey,
                                        res);
-               if (res)
-                       break;
-       }
+               if (!res)
+                       continue;
 
-       /* ----------------
-        *      if we found a tuple in the cache, move it to the top of the
-        *      lru list, and return it.
-        * ----------------
-        */
-       if (elt)
-       {
-               Dlelem     *old_lru_elt;
+               /*
+                * we found a match in the cache: move it to the front of the
+                * global LRU list.  We also move it to the front of the list for
+                * its hashbucket, in order to speed subsequent searches.  (The
+                * most frequently accessed elements in any hashbucket will tend
+                * to be near the front of the hashbucket's list.)
+                */
+               DLMoveToFront(&ct->lrulist_elem);
+               DLMoveToFront(&ct->cache_elem);
+
+               /*
+                * If it's a positive entry, bump its refcount and return it. If
+                * it's negative, we can report failure to the caller.
+                */
+               if (!ct->negative)
+               {
+                       ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
+                       ct->refcount++;
+                       ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
 
-               old_lru_elt = ((CatCTup *) DLE_VAL(elt))->ct_node;
-               DLRemove(old_lru_elt);
-               DLAddHead(cache->cc_lrulist, old_lru_elt);
+                       CACHE3_elog(DEBUG2, "SearchCatCache(%s): found in bucket %d",
+                                               cache->cc_relname, hashIndex);
 
-#ifdef CACHEDEBUG
-               relation = heap_open(cache->relationId);
-               CACHE3_elog(DEBUG, "SearchSysCache(%s): found in bucket %d",
-                                       RelationGetRelationName(relation), hash);
-               heap_close(relation);
-#endif  /* CACHEDEBUG */
+#ifdef CATCACHE_STATS
+                       cache->cc_hits++;
+#endif
+
+                       return &ct->tuple;
+               }
+               else
+               {
+                       CACHE3_elog(DEBUG2, "SearchCatCache(%s): found neg entry in bucket %d",
+                                               cache->cc_relname, hashIndex);
 
-               return ct->ct_tup;
+#ifdef CATCACHE_STATS
+                       cache->cc_neg_hits++;
+#endif
+
+                       return NULL;
+               }
        }
 
-       /* ----------------
-        *      Tuple was not found in cache, so we have to try and
-        *      retrieve it directly from the relation.  If it's found,
-        *      we add it to the cache.  We must avoid recursion here,
-        *      so we disable cache operations.  If operations are
-        *      currently disabled and we couldn't find the requested item
-        *      in the cache, then this may be a recursive request, and we
-        *      abort with an error.
-        * ----------------
+       /*
+        * Tuple was not found in cache, so we have to try to retrieve it
+        * directly from the relation.  If found, we will add it to the cache;
+        * if not found, we will add a negative cache entry instead.
+        *
+        * NOTE: it is possible for recursive cache lookups to occur while
+        * reading the relation --- for example, due to shared-cache-inval
+        * messages being processed during heap_open().  This is OK.  It's
+        * even possible for one of those lookups to find and enter the very
+        * same tuple we are trying to fetch here.      If that happens, we will
+        * enter a second copy of the tuple into the cache.  The first copy
+        * will never be referenced again, and will eventually age out of the
+        * cache, so there's no functional problem.  This case is rare enough
+        * that it's not worth expending extra cycles to detect.
         */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indexoid,
+                                                                 IndexScanOK(cache, cur_skey),
+                                                                 SnapshotNow,
+                                                                 cache->cc_nkeys,
+                                                                 cur_skey);
 
-       if (DisableCache)
+       ct = NULL;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
        {
-               elog(ERROR, "SearchSysCache: Called while cache disabled");
-               return (HeapTuple) NULL;
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        false);
+               /* immediately set the refcount to 1 */
+               ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
+               ct->refcount++;
+               ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
+               break;                                  /* assume only one match */
        }
 
-       /* ----------------
-        *      open the relation associated with the cache
-        * ----------------
+       systable_endscan(scandesc);
+
+       heap_close(relation, AccessShareLock);
+
+       /*
+        * If tuple was not found, we need to build a negative cache entry
+        * containing a fake tuple.  The fake tuple has the correct key
+        * columns, but nulls everywhere else.
+        *
+        * In bootstrap mode, we don't build negative entries, because the
+        * cache invalidation mechanism isn't alive and can't clear them
+        * if the tuple gets created later.  (Bootstrap doesn't do UPDATEs,
+        * so it doesn't need cache inval for that.)
         */
-       relation = heap_open(cache->relationId);
-       CACHE2_elog(DEBUG, "SearchSysCache(%s)",
-                               RelationGetRelationName(relation));
+       if (ct == NULL)
+       {
+               if (IsBootstrapProcessingMode())
+                       return NULL;
 
-       /* ----------------
-        *      DisableCache and then switch to the cache memory context.
-        * ----------------
+               ntp = build_dummy_tuple(cache, cache->cc_nkeys, cur_skey);
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        true);
+               heap_freetuple(ntp);
+
+               CACHE4_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
+                                       cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+               CACHE3_elog(DEBUG2, "SearchCatCache(%s): put neg entry in bucket %d",
+                                       cache->cc_relname, hashIndex);
+
+               /*
+                * We are not returning the negative entry to the caller, so leave
+                * its refcount zero.
+                */
+
+               return NULL;
+       }
+
+       CACHE4_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
+                               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+       CACHE3_elog(DEBUG2, "SearchCatCache(%s): put in bucket %d",
+                               cache->cc_relname, hashIndex);
+
+#ifdef CATCACHE_STATS
+       cache->cc_newloads++;
+#endif
+
+       return &ct->tuple;
+}
+
+/*
+ *     ReleaseCatCache
+ *
+ *     Decrement the reference count of a catcache entry (releasing the
+ *     hold grabbed by a successful SearchCatCache).
+ *
+ *     NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
+ *     will be freed as soon as their refcount goes to zero.  In combination
+ *     with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ *     to catch references to already-released catcache entries.
+ */
+void
+ReleaseCatCache(HeapTuple tuple)
+{
+       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                                                 offsetof(CatCTup, tuple));
+
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(ct->ct_magic == CT_MAGIC);
+       Assert(ct->refcount > 0);
+
+       ct->refcount--;
+       ResourceOwnerForgetCatCacheRef(CurrentResourceOwner, &ct->tuple);
+
+       if (ct->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && ct->dead
+#endif
+               )
+               CatCacheRemoveCTup(ct->my_cache, ct);
+}
+
+
+/*
+ *     SearchCatCacheList
+ *
+ *             Generate a list of all tuples matching a partial key (that is,
+ *             a key specifying just the first K of the cache's N key columns).
+ *
+ *             The caller must not modify the list object or the pointed-to tuples,
+ *             and must call ReleaseCatCacheList() when done with the list.
+ */
+CatCList *
+SearchCatCacheList(CatCache *cache,
+                                  int nkeys,
+                                  Datum v1,
+                                  Datum v2,
+                                  Datum v3,
+                                  Datum v4)
+{
+       ScanKeyData cur_skey[4];
+       uint32          lHashValue;
+       Dlelem     *elt;
+       CatCList   *cl;
+       CatCTup    *ct;
+       List       *ctlist;
+       ListCell   *ctlist_item;
+       int                     nmembers;
+       Relation        relation;
+       SysScanDesc scandesc;
+       bool            ordered;
+       HeapTuple       ntp;
+       MemoryContext oldcxt;
+       int                     i;
+
+       /*
+        * one-time startup overhead for each cache
         */
-       DisableCache = 1;
+       if (cache->cc_tupdesc == NULL)
+               CatalogCacheInitializeCache(cache);
 
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
+       Assert(nkeys > 0 && nkeys < cache->cc_nkeys);
 
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
+#ifdef CATCACHE_STATS
+       cache->cc_lsearches++;
+#endif
 
-       /* ----------------
-        *      Scan the relation to find the tuple.  If there's an index, and
-        *      if this isn't bootstrap (initdb) time, use the index.
-        * ----------------
+       /*
+        * initialize the search key information
         */
-       CACHE2_elog(DEBUG, "SearchSysCache: performing scan (override==%d)",
-                               heapisoverride());
+       memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+       cur_skey[0].sk_argument = v1;
+       cur_skey[1].sk_argument = v2;
+       cur_skey[2].sk_argument = v3;
+       cur_skey[3].sk_argument = v4;
+
+       /*
+        * compute a hash value of the given keys for faster search.  We don't
+        * presently divide the CatCList items into buckets, but this still
+        * lets us skip non-matching items quickly most of the time.
+        */
+       lHashValue = CatalogCacheComputeHashValue(cache, nkeys, cur_skey);
 
-       if ((RelationGetForm(relation))->relhasindex
-               && !IsBootstrapProcessingMode())
+       /*
+        * scan the items until we find a match or exhaust our list
+        */
+       for (elt = DLGetHead(&cache->cc_lists);
+                elt;
+                elt = DLGetSucc(elt))
        {
-               /* ----------
-                *      Switch back to old memory context so memory not freed
-                *      in the scan function will go away at transaction end.
-                *      wieck - 10/18/1996
-                * ----------
+               bool            res;
+
+               cl = (CatCList *) DLE_VAL(elt);
+
+               if (cl->dead)
+                       continue;                       /* ignore dead entries */
+
+               if (cl->hash_value != lHashValue)
+                       continue;                       /* quickly skip entry if wrong hash val */
+
+               /*
+                * see if the cached list matches our key.
                 */
-               MemoryContextSwitchTo(oldcxt);
-               Assert(cache->cc_iscanfunc);
-               switch (cache->cc_nkeys)
+               if (cl->nkeys != nkeys)
+                       continue;
+               HeapKeyTest(&cl->tuple,
+                                       cache->cc_tupdesc,
+                                       nkeys,
+                                       cur_skey,
+                                       res);
+               if (!res)
+                       continue;
+
+               /*
+                * we found a matching list: move each of its members to the front
+                * of the global LRU list.      Also move the list itself to the front
+                * of the cache's list-of-lists, to speed subsequent searches. (We
+                * do not move the members to the fronts of their hashbucket
+                * lists, however, since there's no point in that unless they are
+                * searched for individually.)  Also bump the members' refcounts.
+                * (member refcounts are NOT registered separately with the
+                * resource owner.)
+                */
+               ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
+               for (i = 0; i < cl->n_members; i++)
                {
-                       case 4:
-                               ntp = cache->cc_iscanfunc(relation, v1, v2, v3, v4);
-                               break;
-                       case 3:
-                               ntp = cache->cc_iscanfunc(relation, v1, v2, v3);
-                               break;
-                       case 2:
-                               ntp = cache->cc_iscanfunc(relation, v1, v2);
-                               break;
-                       case 1:
-                               ntp = cache->cc_iscanfunc(relation, v1);
-                               break;
+                       cl->members[i]->refcount++;
+                       DLMoveToFront(&cl->members[i]->lrulist_elem);
                }
-               /* ----------
-                *      Back to Cache context. If we got a tuple copy it
-                *      into our context.
-                *      wieck - 10/18/1996
-                * ----------
-                */
-               MemoryContextSwitchTo((MemoryContext) CacheCxt);
-               if (HeapTupleIsValid(ntp))
-                       ntp = heap_copytuple(ntp);
+               DLMoveToFront(&cl->cache_elem);
+
+               /* Bump the list's refcount and return it */
+               cl->refcount++;
+               ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
+
+               CACHE2_elog(DEBUG2, "SearchCatCacheList(%s): found list",
+                                       cache->cc_relname);
+
+#ifdef CATCACHE_STATS
+               cache->cc_lhits++;
+#endif
+
+               return cl;
        }
-       else
+
+       /*
+        * List was not found in cache, so we have to build it by reading the
+        * relation.  For each matching tuple found in the relation, use an
+        * existing cache entry if possible, else build a new one.
+        */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indexoid,
+                                                                 true,
+                                                                 SnapshotNow,
+                                                                 nkeys,
+                                                                 cur_skey);
+
+       /* The list will be ordered iff we are doing an index scan */
+       ordered = (scandesc->irel != NULL);
+
+       ctlist = NIL;
+       nmembers = 0;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
        {
-               HeapScanDesc sd;
+               uint32          hashValue;
+               Index           hashIndex;
 
-               /* ----------
-                *      As above do the lookup in the callers memory
-                *      context.
-                *      wieck - 10/18/1996
-                * ----------
+               /*
+                * See if there's an entry for this tuple already.
                 */
-               MemoryContextSwitchTo(oldcxt);
+               ct = NULL;
+               hashValue = CatalogCacheComputeTupleHashValue(cache, ntp);
+               hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+               for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
+                        elt;
+                        elt = DLGetSucc(elt))
+               {
+                       ct = (CatCTup *) DLE_VAL(elt);
+
+                       if (ct->dead || ct->negative)
+                               continue;               /* ignore dead and negative entries */
+
+                       if (ct->hash_value != hashValue)
+                               continue;               /* quickly skip entry if wrong hash val */
 
-               sd = heap_beginscan(relation, 0, SnapshotNow,
-                                                       cache->cc_nkeys, cache->cc_skey);
+                       if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+                               continue;               /* not same tuple */
 
-               ntp = heap_getnext(sd, 0);
+                       /*
+                        * Found a match, but can't use it if it belongs to another
+                        * list already
+                        */
+                       if (ct->c_list)
+                               continue;
 
-               MemoryContextSwitchTo((MemoryContext) CacheCxt);
+                       /* Found a match, so move it to front */
+                       DLMoveToFront(&ct->lrulist_elem);
 
-               if (HeapTupleIsValid(ntp))
+                       break;
+               }
+
+               if (elt == NULL)
                {
-                       CACHE1_elog(DEBUG, "SearchSysCache: found tuple");
-                       ntp = heap_copytuple(ntp);
+                       /* We didn't find a usable entry, so make a new one */
+                       ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                                hashValue, hashIndex,
+                                                                                false);
                }
 
-               MemoryContextSwitchTo(oldcxt);
+               /*
+                * We have to bump the member refcounts immediately to ensure they
+                * won't get dropped from the cache while loading other members.
+                * If we get an error before we finish constructing the CatCList
+                * then we will leak those reference counts.  This is annoying but
+                * it has no real consequence beyond possibly generating some
+                * warning messages at the next transaction commit, so it's not
+                * worth fixing.
+                */
+               ct->refcount++;
+               ctlist = lappend(ctlist, ct);
+               nmembers++;
+       }
+
+       systable_endscan(scandesc);
 
-               heap_endscan(sd);
+       heap_close(relation, AccessShareLock);
 
-               MemoryContextSwitchTo((MemoryContext) CacheCxt);
+       /*
+        * Now we can build the CatCList entry.  First we need a dummy tuple
+        * containing the key values...
+        */
+       ntp = build_dummy_tuple(cache, nkeys, cur_skey);
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       cl = (CatCList *) palloc(sizeof(CatCList) + nmembers * sizeof(CatCTup *));
+       heap_copytuple_with_tuple(ntp, &cl->tuple);
+       MemoryContextSwitchTo(oldcxt);
+       heap_freetuple(ntp);
+
+       cl->cl_magic = CL_MAGIC;
+       cl->my_cache = cache;
+       DLInitElem(&cl->cache_elem, cl);
+       cl->refcount = 0;                       /* for the moment */
+       cl->dead = false;
+       cl->ordered = ordered;
+       cl->nkeys = nkeys;
+       cl->hash_value = lHashValue;
+       cl->n_members = nmembers;
+
+       Assert(nmembers == list_length(ctlist));
+       ctlist_item = list_head(ctlist);
+       for (i = 0; i < nmembers; i++)
+       {
+               cl->members[i] = ct = (CatCTup *) lfirst(ctlist_item);
+               Assert(ct->c_list == NULL);
+               ct->c_list = cl;
+               /* mark list dead if any members already dead */
+               if (ct->dead)
+                       cl->dead = true;
+               ctlist_item = lnext(ctlist_item);
        }
 
-       DisableCache = 0;
+       DLAddHead(&cache->cc_lists, &cl->cache_elem);
 
-       /* ----------------
-        *      scan is complete.  if tup is valid, we copy it and add the copy to
-        *      the cache.
-        * ----------------
+       CACHE3_elog(DEBUG2, "SearchCatCacheList(%s): made list of %d members",
+                               cache->cc_relname, nmembers);
+
+       /* Finally, bump the list's refcount and return it */
+       ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
+       cl->refcount++;
+       ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
+
+       return cl;
+}
+
+/*
+ *     ReleaseCatCacheList
+ *
+ *     Decrement the reference counts of a catcache list.
+ */
+void
+ReleaseCatCacheList(CatCList *list)
+{
+       int                     i;
+
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(list->cl_magic == CL_MAGIC);
+       Assert(list->refcount > 0);
+
+       for (i = list->n_members; --i >= 0;)
+       {
+               CatCTup    *ct = list->members[i];
+
+               Assert(ct->refcount > 0);
+
+               ct->refcount--;
+
+               if (ct->dead)
+                       list->dead = true;
+               /* can't remove tuple before list is removed */
+       }
+
+       list->refcount--;
+       ResourceOwnerForgetCatCacheListRef(CurrentResourceOwner, list);
+
+       if (list->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && list->dead
+#endif
+               )
+               CatCacheRemoveCList(list->my_cache, list);
+}
+
+
+/*
+ * CatalogCacheCreateEntry
+ *             Create a new CatCTup entry, copying the given HeapTuple and other
+ *             supplied data into it.  The new entry initially has refcount 0.
+ */
+static CatCTup *
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                               uint32 hashValue, Index hashIndex, bool negative)
+{
+       CatCTup    *ct;
+       MemoryContext oldcxt;
+
+       /*
+        * Allocate CatCTup header in cache memory, and copy the tuple there
+        * too.
         */
-       if (HeapTupleIsValid(ntp))
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       ct = (CatCTup *) palloc(sizeof(CatCTup));
+       heap_copytuple_with_tuple(ntp, &ct->tuple);
+       MemoryContextSwitchTo(oldcxt);
+
+       /*
+        * Finish initializing the CatCTup header, and add it to the cache's
+        * linked lists and counts.
+        */
+       ct->ct_magic = CT_MAGIC;
+       ct->my_cache = cache;
+       DLInitElem(&ct->lrulist_elem, (void *) ct);
+       DLInitElem(&ct->cache_elem, (void *) ct);
+       ct->c_list = NULL;
+       ct->refcount = 0;                       /* for the moment */
+       ct->dead = false;
+       ct->negative = negative;
+       ct->hash_value = hashValue;
+
+       DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
+       DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
+
+       cache->cc_ntup++;
+       CacheHdr->ch_ntup++;
+
+       /*
+        * If we've exceeded the desired size of the caches, try to throw away
+        * the least recently used entry.  NB: be careful not to throw away
+        * the newly-built entry...
+        */
+       if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
        {
-               /* ----------------
-                *      allocate a new cache tuple holder, store the pointer
-                *      to the heap tuple there and initialize the list pointers.
-                * ----------------
-                */
-               Dlelem     *lru_elt;
+               Dlelem     *elt,
+                                  *prevelt;
 
-               /*
-                * this is a little cumbersome here because we want the Dlelem's
-                * in both doubly linked lists to point to one another. That makes
-                * it easier to remove something from both the cache bucket and
-                * the lru list at the same time
-                */
-               nct = (CatCTup *) malloc(sizeof(CatCTup));
-               nct->ct_tup = ntp;
-               elt = DLNewElem(nct);
-               nct2 = (CatCTup *) malloc(sizeof(CatCTup));
-               nct2->ct_tup = ntp;
-               lru_elt = DLNewElem(nct2);
-               nct2->ct_node = elt;
-               nct->ct_node = lru_elt;
-
-               DLAddHead(cache->cc_lrulist, lru_elt);
-               DLAddHead(cache->cc_cache[hash], elt);
-
-               /* ----------------
-                *      deal with hash bucket overflow
-                * ----------------
-                */
-               if (++cache->cc_ntup > cache->cc_maxtup)
+               for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt)
                {
-                       CatCTup    *ct;
+                       CatCTup    *oldct = (CatCTup *) DLE_VAL(elt);
 
-                       elt = DLGetTail(cache->cc_lrulist);
-                       ct = (CatCTup *) DLE_VAL(elt);
+                       prevelt = DLGetPred(elt);
 
-                       if (ct != nct)
+                       if (oldct->refcount == 0 && oldct != ct)
                        {
-                               CACHE2_elog(DEBUG, "SearchSysCache(%s): Overflow, LRU removal",
-                                                       RelationGetRelationName(relation));
+                               CACHE2_elog(DEBUG2, "CatCacheCreateEntry(%s): Overflow, LRU removal",
+                                                       cache->cc_relname);
+#ifdef CATCACHE_STATS
+                               oldct->my_cache->cc_discards++;
+#endif
+                               CatCacheRemoveCTup(oldct->my_cache, oldct);
+                               if (CacheHdr->ch_ntup <= CacheHdr->ch_maxtup)
+                                       break;
+                       }
+               }
+       }
 
-                               CatCacheRemoveCTup(cache, elt);
+       return ct;
+}
+
+/*
+ * build_dummy_tuple
+ *             Generate a palloc'd HeapTuple that contains the specified key
+ *             columns, and NULLs for other columns.
+ *
+ * This is used to store the keys for negative cache entries and CatCList
+ * entries, which don't have real tuples associated with them.
+ */
+static HeapTuple
+build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys)
+{
+       HeapTuple       ntp;
+       TupleDesc       tupDesc = cache->cc_tupdesc;
+       Datum      *values;
+       char       *nulls;
+       Oid                     tupOid = InvalidOid;
+       NameData        tempNames[4];
+       int                     i;
+
+       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+       nulls = (char *) palloc(tupDesc->natts * sizeof(char));
+
+       memset(values, 0, tupDesc->natts * sizeof(Datum));
+       memset(nulls, 'n', tupDesc->natts * sizeof(char));
+
+       for (i = 0; i < nkeys; i++)
+       {
+               int                     attindex = cache->cc_key[i];
+               Datum           keyval = skeys[i].sk_argument;
+
+               if (attindex > 0)
+               {
+                       /*
+                        * Here we must be careful in case the caller passed a C
+                        * string where a NAME is wanted: convert the given argument
+                        * to a correctly padded NAME.  Otherwise the memcpy() done in
+                        * heap_formtuple could fall off the end of memory.
+                        */
+                       if (cache->cc_isname[i])
+                       {
+                               Name            newval = &tempNames[i];
 
+                               namestrcpy(newval, DatumGetCString(keyval));
+                               keyval = NameGetDatum(newval);
                        }
+                       values[attindex - 1] = keyval;
+                       nulls[attindex - 1] = ' ';
+               }
+               else
+               {
+                       Assert(attindex == ObjectIdAttributeNumber);
+                       tupOid = DatumGetObjectId(keyval);
                }
-
-               CACHE4_elog(DEBUG, "SearchSysCache(%s): Contains %d/%d tuples",
-                                       RelationGetRelationName(relation),
-                                       cache->cc_ntup, cache->cc_maxtup);
-               CACHE3_elog(DEBUG, "SearchSysCache(%s): put in bucket %d",
-                                       RelationGetRelationName(relation), hash);
        }
 
-       /* ----------------
-        *      close the relation, switch back to the original memory context
-        *      and return the tuple we found (or NULL)
-        * ----------------
-        */
-       heap_close(relation);
+       ntp = heap_formtuple(tupDesc, values, nulls);
+       if (tupOid != InvalidOid)
+               HeapTupleSetOid(ntp, tupOid);
+
+       pfree(values);
+       pfree(nulls);
 
-       MemoryContextSwitchTo(oldcxt);
        return ntp;
 }
 
-/* --------------------------------
- *     RelationInvalidateCatalogCacheTuple()
+
+/*
+ *     PrepareToInvalidateCacheTuple()
+ *
+ *     This is part of a rather subtle chain of events, so pay attention:
+ *
+ *     When a tuple is inserted or deleted, it cannot be flushed from the
+ *     catcaches immediately, for reasons explained at the top of cache/inval.c.
+ *     Instead we have to add entry(s) for the tuple to a list of pending tuple
+ *     invalidations that will be done at the end of the command or transaction.
+ *
+ *     The lists of tuples that need to be flushed are kept by inval.c.  This
+ *     routine is a helper routine for inval.c.  Given a tuple belonging to
+ *     the specified relation, find all catcaches it could be in, compute the
+ *     correct hash value for each such catcache, and call the specified function
+ *     to record the cache id, hash value, and tuple ItemPointer in inval.c's
+ *     lists.  CatalogCacheIdInvalidate will be called later, if appropriate,
+ *     using the recorded information.
  *
- *     Invalidate a tuple from a specific relation.  This call determines the
- *     cache in question and calls CatalogCacheIdInvalidate().  It is -ok-
- *     if the relation cannot be found, it simply means this backend has yet
- *     to open it.
- * --------------------------------
+ *     Note that it is irrelevant whether the given tuple is actually loaded
+ *     into the catcache at the moment.  Even if it's not there now, it might
+ *     be by the end of the command, or there might be a matching negative entry
+ *     to flush --- or other backends' caches might have such entries --- so
+ *     we have to make list entries to flush it later.
+ *
+ *     Also note that it's not an error if there are no catcaches for the
+ *     specified relation.  inval.c doesn't know exactly which rels have
+ *     catcaches --- it will call this routine for any tuple that's in a
+ *     system relation.
  */
 void
-RelationInvalidateCatalogCacheTuple(Relation relation,
-                                                                       HeapTuple tuple,
-                                                         void (*function) (int, Index, ItemPointer))
+PrepareToInvalidateCacheTuple(Relation relation,
+                                                         HeapTuple tuple,
+                                               void (*function) (int, uint32, ItemPointer, Oid))
 {
-       struct catcache *ccp;
-       MemoryContext oldcxt;
-       Oid                     relationId;
+       CatCache   *ccp;
+       Oid                     reloid;
 
-       /* ----------------
-        *      sanity checks
-        * ----------------
+       CACHE1_elog(DEBUG2, "PrepareToInvalidateCacheTuple: called");
+
+       /*
+        * sanity checks
         */
        Assert(RelationIsValid(relation));
        Assert(HeapTupleIsValid(tuple));
        Assert(PointerIsValid(function));
-       CACHE1_elog(DEBUG, "RelationInvalidateCatalogCacheTuple: called");
+       Assert(CacheHdr != NULL);
 
-       /* ----------------
-        *      switch to the cache memory context
-        * ----------------
-        */
-       if (!CacheCxt)
-               CacheCxt = CreateGlobalMemory("Cache");
-       oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt);
+       reloid = RelationGetRelid(relation);
 
        /* ----------------
         *      for each cache
         *         if the cache contains tuples from the specified relation
-        *                 call the invalidation function on the tuples
-        *                 in the proper hash bucket
+        *                 compute the tuple's hash value in this cache,
+        *                 and call the passed function to register the information.
         * ----------------
         */
-       relationId = RelationGetRelid(relation);
 
-       for (ccp = Caches; ccp; ccp = ccp->cc_next)
+       for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
        {
-               if (relationId != ccp->relationId)
-                       continue;
+               /* Just in case cache hasn't finished initialization yet... */
+               if (ccp->cc_tupdesc == NULL)
+                       CatalogCacheInitializeCache(ccp);
 
-#ifdef NOT_USED
-               /* OPT inline simplification of CatalogCacheIdInvalidate */
-               if (!PointerIsValid(function))
-                       function = CatalogCacheIdInvalidate;
-#endif
+               if (ccp->cc_reloid != reloid)
+                       continue;
 
                (*function) (ccp->id,
-                                CatalogCacheComputeTupleHashIndex(ccp, relation, tuple),
-                                        &tuple->t_ctid);
-
-               heap_close(relation);
+                                        CatalogCacheComputeTupleHashValue(ccp, tuple),
+                                        &tuple->t_self,
+                                        ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId);
        }
+}
 
-       /* ----------------
-        *      return to the proper memory context
-        * ----------------
-        */
-       MemoryContextSwitchTo(oldcxt);
 
-       /* sendpm('I', "Invalidated tuple"); */
+/*
+ * Subroutines for warning about reference leaks.  These are exported so
+ * that resowner.c can call them.
+ */
+void
+PrintCatCacheLeakWarning(HeapTuple tuple)
+{
+       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                                                 offsetof(CatCTup, tuple));
+
+       /* Safety check to ensure we were handed a cache entry */
+       Assert(ct->ct_magic == CT_MAGIC);
+
+       elog(WARNING, "cache reference leak: cache %s (%d), tuple %u/%u has count %d",
+                ct->my_cache->cc_relname, ct->my_cache->id,
+                ItemPointerGetBlockNumber(&(tuple->t_self)),
+                ItemPointerGetOffsetNumber(&(tuple->t_self)),
+                ct->refcount);
+}
+
+void
+PrintCatCacheListLeakWarning(CatCList *list)
+{
+       elog(WARNING, "cache reference leak: cache %s (%d), list %p has count %d",
+                list->my_cache->cc_relname, list->my_cache->id,
+                list, list->refcount);
 }