]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/cache/catcache.c
For some reason access/tupmacs.h has been #including utils/memutils.h,
[postgresql] / src / backend / utils / cache / catcache.c
index 202f447076e2a0bceeeeb243bb0f7a44a0cb8494..abe0aa060c0a4eb7cd47082f66261773dec4f1f7 100644 (file)
@@ -3,12 +3,12 @@
  * catcache.c
  *       System catalog cache for tuples matching a key.
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.93 2002/03/26 19:16:08 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/cache/catcache.c,v 1.121 2005/05/06 17:24:54 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -21,7 +21,6 @@
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
-#include "catalog/catname.h"
 #include "catalog/indexing.h"
 #include "miscadmin.h"
 #ifdef CATCACHE_STATS
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/catcache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
+#include "utils/resowner.h"
 #include "utils/syscache.h"
 
 
- /* #define CACHEDEBUG */      /* turns DEBUG elogs on */
+/* #define CACHEDEBUG */       /* turns DEBUG elogs on */
 
 /*
  * Constants related to size of the catcache.
 /* Cache management header --- pointer is NULL until created */
 static CatCacheHeader *CacheHdr = NULL;
 
-/*
- *             EQPROC is used in CatalogCacheInitializeCache to find the equality
- *             functions for system types that are used as cache key fields.
- *             See also GetCCHashFunc, which should support the same set of types.
- *
- *             XXX this should be replaced by catalog lookups,
- *             but that seems to pose considerable risk of circularity...
- */
-static const Oid eqproc[] = {
-       F_BOOLEQ, InvalidOid, F_CHAREQ, F_NAMEEQ, InvalidOid,
-       F_INT2EQ, F_INT2VECTOREQ, F_INT4EQ, F_OIDEQ, F_TEXTEQ,
-       F_OIDEQ, InvalidOid, InvalidOid, InvalidOid, F_OIDVECTOREQ
-};
 
-#define EQPROC(SYSTEMTYPEOID)  eqproc[(SYSTEMTYPEOID)-BOOLOID]
-
-
-static uint32 CatalogCacheComputeHashValue(CatCache *cache,
+static uint32 CatalogCacheComputeHashValue(CatCache *cache, int nkeys,
                                                         ScanKey cur_skey);
 static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
                                                                  HeapTuple tuple);
+
 #ifdef CATCACHE_STATS
 static void CatCachePrintStats(void);
 #endif
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
+static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                               uint32 hashValue, Index hashIndex,
+                                               bool negative);
+static HeapTuple build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys);
 
 
 /*
  *                                     internal support functions
  */
 
-static PGFunction
-GetCCHashFunc(Oid keytype)
+/*
+ * Look up the hash and equality functions for system types that are used
+ * as cache key fields.
+ *
+ * XXX this should be replaced by catalog lookups,
+ * but that seems to pose considerable risk of circularity...
+ */
+static void
+GetCCHashEqFuncs(Oid keytype, PGFunction *hashfunc, RegProcedure *eqfunc)
 {
        switch (keytype)
        {
                case BOOLOID:
+                       *hashfunc = hashchar;
+                       *eqfunc = F_BOOLEQ;
+                       break;
                case CHAROID:
-                       return hashchar;
+                       *hashfunc = hashchar;
+                       *eqfunc = F_CHAREQ;
+                       break;
                case NAMEOID:
-                       return hashname;
+                       *hashfunc = hashname;
+                       *eqfunc = F_NAMEEQ;
+                       break;
                case INT2OID:
-                       return hashint2;
+                       *hashfunc = hashint2;
+                       *eqfunc = F_INT2EQ;
+                       break;
                case INT2VECTOROID:
-                       return hashint2vector;
+                       *hashfunc = hashint2vector;
+                       *eqfunc = F_INT2VECTOREQ;
+                       break;
                case INT4OID:
-                       return hashint4;
+                       *hashfunc = hashint4;
+                       *eqfunc = F_INT4EQ;
+                       break;
                case TEXTOID:
-                       return hashvarlena;
-               case REGPROCOID:
+                       *hashfunc = hashtext;
+                       *eqfunc = F_TEXTEQ;
+                       break;
                case OIDOID:
-                       return hashoid;
+               case REGPROCOID:
+               case REGPROCEDUREOID:
+               case REGOPEROID:
+               case REGOPERATOROID:
+               case REGCLASSOID:
+               case REGTYPEOID:
+                       *hashfunc = hashoid;
+                       *eqfunc = F_OIDEQ;
+                       break;
                case OIDVECTOROID:
-                       return hashoidvector;
+                       *hashfunc = hashoidvector;
+                       *eqfunc = F_OIDVECTOREQ;
+                       break;
                default:
-                       elog(FATAL, "GetCCHashFunc: type %u unsupported as catcache key",
-                                keytype);
-                       return (PGFunction) NULL;
+                       elog(FATAL, "type %u not supported as catcache key", keytype);
+                       break;
        }
 }
 
@@ -149,16 +170,16 @@ GetCCHashFunc(Oid keytype)
  * Compute the hash value associated with a given set of lookup keys
  */
 static uint32
-CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
+CatalogCacheComputeHashValue(CatCache *cache, int nkeys, ScanKey cur_skey)
 {
        uint32          hashValue = 0;
 
-       CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p",
+       CACHE4_elog(DEBUG2, "CatalogCacheComputeHashValue %s %d %p",
                                cache->cc_relname,
-                               cache->cc_nkeys,
+                               nkeys,
                                cache);
 
-       switch (cache->cc_nkeys)
+       switch (nkeys)
        {
                case 4:
                        hashValue ^=
@@ -181,7 +202,7 @@ CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
                                                                                           cur_skey[0].sk_argument));
                        break;
                default:
-                       elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys);
+                       elog(FATAL, "wrong number of hash keys: %d", nkeys);
                        break;
        }
 
@@ -208,7 +229,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                case 4:
                        cur_skey[3].sk_argument =
                                (cache->cc_key[3] == ObjectIdAttributeNumber)
-                               ? ObjectIdGetDatum(tuple->t_data->t_oid)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
                                                          cache->cc_key[3],
                                                          cache->cc_tupdesc,
@@ -218,7 +239,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                case 3:
                        cur_skey[2].sk_argument =
                                (cache->cc_key[2] == ObjectIdAttributeNumber)
-                               ? ObjectIdGetDatum(tuple->t_data->t_oid)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
                                                          cache->cc_key[2],
                                                          cache->cc_tupdesc,
@@ -228,7 +249,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                case 2:
                        cur_skey[1].sk_argument =
                                (cache->cc_key[1] == ObjectIdAttributeNumber)
-                               ? ObjectIdGetDatum(tuple->t_data->t_oid)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
                                                          cache->cc_key[1],
                                                          cache->cc_tupdesc,
@@ -238,7 +259,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                case 1:
                        cur_skey[0].sk_argument =
                                (cache->cc_key[0] == ObjectIdAttributeNumber)
-                               ? ObjectIdGetDatum(tuple->t_data->t_oid)
+                               ? ObjectIdGetDatum(HeapTupleGetOid(tuple))
                                : fastgetattr(tuple,
                                                          cache->cc_key[0],
                                                          cache->cc_tupdesc,
@@ -246,12 +267,11 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                        Assert(!isNull);
                        break;
                default:
-                       elog(FATAL, "CCComputeTupleHashValue: %d cc_nkeys",
-                                cache->cc_nkeys);
+                       elog(FATAL, "wrong number of hash keys: %d", cache->cc_nkeys);
                        break;
        }
 
-       return CatalogCacheComputeHashValue(cache, cur_skey);
+       return CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
 }
 
 
@@ -267,17 +287,19 @@ CatCachePrintStats(void)
        long            cc_newloads = 0;
        long            cc_invals = 0;
        long            cc_discards = 0;
+       long            cc_lsearches = 0;
+       long            cc_lhits = 0;
 
-       elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches",
+       elog(DEBUG2, "catcache stats dump: %d/%d tuples in catcaches",
                 CacheHdr->ch_ntup, CacheHdr->ch_maxtup);
 
        for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
        {
                if (cache->cc_ntup == 0 && cache->cc_searches == 0)
                        continue;                       /* don't print unused caches */
-               elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+               elog(DEBUG2, "catcache %s/%u: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
                         cache->cc_relname,
-                        cache->cc_indname,
+                        cache->cc_indexoid,
                         cache->cc_ntup,
                         cache->cc_searches,
                         cache->cc_hits,
@@ -287,15 +309,19 @@ CatCachePrintStats(void)
                         cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads,
                         cache->cc_searches - cache->cc_hits - cache->cc_neg_hits,
                         cache->cc_invals,
-                        cache->cc_discards);
+                        cache->cc_discards,
+                        cache->cc_lsearches,
+                        cache->cc_lhits);
                cc_searches += cache->cc_searches;
                cc_hits += cache->cc_hits;
                cc_neg_hits += cache->cc_neg_hits;
                cc_newloads += cache->cc_newloads;
                cc_invals += cache->cc_invals;
                cc_discards += cache->cc_discards;
+               cc_lsearches += cache->cc_lsearches;
+               cc_lhits += cache->cc_lhits;
        }
-       elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+       elog(DEBUG2, "catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
                 CacheHdr->ch_ntup,
                 cc_searches,
                 cc_hits,
@@ -305,16 +331,19 @@ CatCachePrintStats(void)
                 cc_searches - cc_hits - cc_neg_hits - cc_newloads,
                 cc_searches - cc_hits - cc_neg_hits,
                 cc_invals,
-                cc_discards);
+                cc_discards,
+                cc_lsearches,
+                cc_lhits);
 }
-
-#endif /* CATCACHE_STATS */
+#endif   /* CATCACHE_STATS */
 
 
 /*
  *             CatCacheRemoveCTup
  *
  * Unlink and delete the given cache entry
+ *
+ * NB: if it is a member of a CatCList, the CatCList is deleted too.
  */
 static void
 CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
@@ -322,6 +351,9 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
        Assert(ct->refcount == 0);
        Assert(ct->my_cache == cache);
 
+       if (ct->c_list)
+               CatCacheRemoveCList(cache, ct->c_list);
+
        /* delink from linked lists */
        DLRemove(&ct->lrulist_elem);
        DLRemove(&ct->cache_elem);
@@ -335,6 +367,38 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
        --CacheHdr->ch_ntup;
 }
 
+/*
+ *             CatCacheRemoveCList
+ *
+ * Unlink and delete the given cache list entry
+ */
+static void
+CatCacheRemoveCList(CatCache *cache, CatCList *cl)
+{
+       int                     i;
+
+       Assert(cl->refcount == 0);
+       Assert(cl->my_cache == cache);
+
+       /* delink from member tuples */
+       for (i = cl->n_members; --i >= 0;)
+       {
+               CatCTup    *ct = cl->members[i];
+
+               Assert(ct->c_list == cl);
+               ct->c_list = NULL;
+       }
+
+       /* delink from linked list */
+       DLRemove(&cl->cache_elem);
+
+       /* free associated tuple data */
+       if (cl->tuple.t_data != NULL)
+               pfree(cl->tuple.t_data);
+       pfree(cl);
+}
+
+
 /*
  *     CatalogCacheIdInvalidate
  *
@@ -342,11 +406,11 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
  *     item pointer.  Positive entries are deleted if they match the item
  *     pointer.  Negative entries must be deleted if they match the hash
  *     value (since we do not have the exact key of the tuple that's being
- *     inserted).  But this should only rarely result in loss of a cache
+ *     inserted).      But this should only rarely result in loss of a cache
  *     entry that could have been kept.
  *
  *     Note that it's not very relevant whether the tuple identified by
- *     the item pointer is being inserted or deleted.  We don't expect to
+ *     the item pointer is being inserted or deleted.  We don't expect to
  *     find matching positive entries in the one case, and we don't expect
  *     to find matching negative entries in the other; but we will do the
  *     right things in any case.
@@ -364,7 +428,7 @@ CatalogCacheIdInvalidate(int cacheId,
         * sanity checks
         */
        Assert(ItemPointerIsValid(pointer));
-       CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: called");
+       CACHE1_elog(DEBUG2, "CatalogCacheIdInvalidate: called");
 
        /*
         * inspect caches to find the proper cache
@@ -380,12 +444,28 @@ CatalogCacheIdInvalidate(int cacheId,
 
                /*
                 * We don't bother to check whether the cache has finished
-                * initialization yet; if not, there will be no entries in it
-                * so no problem.
+                * initialization yet; if not, there will be no entries in it so
+                * no problem.
+                */
+
+               /*
+                * Invalidate *all* CatCLists in this cache; it's too hard to tell
+                * which searches might still be correct, so just zap 'em all.
                 */
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+               {
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount > 0)
+                               cl->dead = true;
+                       else
+                               CatCacheRemoveCList(ccp, cl);
+               }
 
                /*
-                * inspect the proper hash bucket for matches
+                * inspect the proper hash bucket for tuple matches
                 */
                hashIndex = HASH_INDEX(hashValue, ccp->cc_nbuckets);
 
@@ -405,7 +485,7 @@ CatalogCacheIdInvalidate(int cacheId,
                                        ct->dead = true;
                                else
                                        CatCacheRemoveCTup(ccp, ct);
-                               CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: invalidated");
+                               CACHE1_elog(DEBUG2, "CatalogCacheIdInvalidate: invalidated");
 #ifdef CATCACHE_STATS
                                ccp->cc_invals++;
 #endif
@@ -448,7 +528,7 @@ CreateCacheMemoryContext(void)
 /*
  *             AtEOXact_CatCache
  *
- * Clean up catcaches at end of transaction (either commit or abort)
+ * Clean up catcaches at end of main transaction (either commit or abort)
  *
  * We scan the caches to reset refcounts to zero.  This is of course
  * necessary in the abort case, since elog() may have interrupted routines.
@@ -458,9 +538,37 @@ CreateCacheMemoryContext(void)
 void
 AtEOXact_CatCache(bool isCommit)
 {
+       CatCache   *ccp;
        Dlelem     *elt,
                           *nextelt;
 
+       /*
+        * First clean up CatCLists
+        */
+       for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
+       {
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+               {
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount != 0)
+                       {
+                               if (isCommit)
+                                       PrintCatCacheListLeakWarning(cl);
+                               cl->refcount = 0;
+                       }
+
+                       /* Clean up any now-deletable dead entries */
+                       if (cl->dead)
+                               CatCacheRemoveCList(ccp, cl);
+               }
+       }
+
+       /*
+        * Now clean up tuples; we can scan them all using the global LRU list
+        */
        for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
        {
                CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -470,10 +578,7 @@ AtEOXact_CatCache(bool isCommit)
                if (ct->refcount != 0)
                {
                        if (isCommit)
-                               elog(WARNING, "Cache reference leak: cache %s (%d), tuple %u has count %d",
-                                        ct->my_cache->cc_relname, ct->my_cache->id,
-                                        ct->tuple.t_data->t_oid,
-                                        ct->refcount);
+                               PrintCatCacheLeakWarning(&ct->tuple);
                        ct->refcount = 0;
                }
 
@@ -494,14 +599,26 @@ AtEOXact_CatCache(bool isCommit)
 static void
 ResetCatalogCache(CatCache *cache)
 {
+       Dlelem     *elt,
+                          *nextelt;
        int                     i;
 
+       /* Remove each list in this cache, or at least mark it dead */
+       for (elt = DLGetHead(&cache->cc_lists); elt; elt = nextelt)
+       {
+               CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+               nextelt = DLGetSucc(elt);
+
+               if (cl->refcount > 0)
+                       cl->dead = true;
+               else
+                       CatCacheRemoveCList(cache, cl);
+       }
+
        /* Remove each tuple in this cache, or at least mark it dead */
        for (i = 0; i < cache->cc_nbuckets; i++)
        {
-               Dlelem     *elt,
-                                  *nextelt;
-
                for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
                {
                        CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -529,12 +646,12 @@ ResetCatalogCaches(void)
 {
        CatCache   *cache;
 
-       CACHE1_elog(DEBUG1, "ResetCatalogCaches called");
+       CACHE1_elog(DEBUG2, "ResetCatalogCaches called");
 
        for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
                ResetCatalogCache(cache);
 
-       CACHE1_elog(DEBUG1, "end of ResetCatalogCaches call");
+       CACHE1_elog(DEBUG2, "end of ResetCatalogCaches call");
 }
 
 /*
@@ -562,7 +679,7 @@ CatalogCacheFlushRelation(Oid relId)
 {
        CatCache   *cache;
 
-       CACHE2_elog(DEBUG1, "CatalogCacheFlushRelation called for %u", relId);
+       CACHE2_elog(DEBUG2, "CatalogCacheFlushRelation called for %u", relId);
 
        for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next)
        {
@@ -605,7 +722,7 @@ CatalogCacheFlushRelation(Oid relId)
                                        continue;
 
                                if (cache->cc_reloidattr == ObjectIdAttributeNumber)
-                                       tupRelid = ct->tuple.t_data->t_oid;
+                                       tupRelid = HeapTupleGetOid(&ct->tuple);
                                else
                                {
                                        bool            isNull;
@@ -632,7 +749,7 @@ CatalogCacheFlushRelation(Oid relId)
                }
        }
 
-       CACHE1_elog(DEBUG1, "end of CatalogCacheFlushRelation call");
+       CACHE1_elog(DEBUG2, "end of CatalogCacheFlushRelation call");
 }
 
 /*
@@ -644,20 +761,21 @@ CatalogCacheFlushRelation(Oid relId)
  *     structure initialized on the first access.
  */
 #ifdef CACHEDEBUG
-#define InitCatCache_DEBUG1 \
+#define InitCatCache_DEBUG2 \
 do { \
-       elog(DEBUG1, "InitCatCache: rel=%s id=%d nkeys=%d size=%d\n", \
-               cp->cc_relname, cp->id, cp->cc_nkeys, cp->cc_nbuckets); \
+       elog(DEBUG2, "InitCatCache: rel=%u ind=%u id=%d nkeys=%d size=%d", \
+                cp->cc_reloid, cp->cc_indexoid, cp->id, \
+                cp->cc_nkeys, cp->cc_nbuckets); \
 } while(0)
 
 #else
-#define InitCatCache_DEBUG1
+#define InitCatCache_DEBUG2
 #endif
 
 CatCache *
 InitCatCache(int id,
-                        const char *relname,
-                        const char *indname,
+                        Oid reloid,
+                        Oid indexoid,
                         int reloidattr,
                         int nkeys,
                         const int *key)
@@ -694,10 +812,9 @@ InitCatCache(int id,
        /*
         * allocate a new cache structure
         *
-        * Note: we assume zeroing initializes the bucket headers correctly
+        * Note: we assume zeroing initializes the Dllist headers correctly
         */
-       cp = (CatCache *) palloc(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
-       MemSet((char *) cp, 0, sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
+       cp = (CatCache *) palloc0(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
 
        /*
         * initialize the cache's relation information for the relation
@@ -705,9 +822,9 @@ InitCatCache(int id,
         * other internal fields.  But don't open the relation yet.
         */
        cp->id = id;
-       cp->cc_relname = relname;
-       cp->cc_indname = indname;
-       cp->cc_reloid = InvalidOid;     /* temporary */
+       cp->cc_relname = "(not known yet)";
+       cp->cc_reloid = reloid;
+       cp->cc_indexoid = indexoid;
        cp->cc_relisshared = false; /* temporary */
        cp->cc_tupdesc = (TupleDesc) NULL;
        cp->cc_reloidattr = reloidattr;
@@ -721,7 +838,7 @@ InitCatCache(int id,
         * new cache is initialized as far as we can go for now. print some
         * debugging information, if appropriate.
         */
-       InitCatCache_DEBUG1;
+       InitCatCache_DEBUG2;
 
        /*
         * add completed cache to top of group header's list
@@ -746,17 +863,17 @@ InitCatCache(int id,
  */
 #ifdef CACHEDEBUG
 #define CatalogCacheInitializeCache_DEBUG1 \
-       elog(DEBUG1, "CatalogCacheInitializeCache: cache @%p %s", cache, \
-                cache->cc_relname)
+       elog(DEBUG2, "CatalogCacheInitializeCache: cache @%p rel=%u", cache, \
+                cache->cc_reloid)
 
 #define CatalogCacheInitializeCache_DEBUG2 \
 do { \
                if (cache->cc_key[i] > 0) { \
-                       elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \
+                       elog(DEBUG2, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \
                                i+1, cache->cc_nkeys, cache->cc_key[i], \
                                 tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \
                } else { \
-                       elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d", \
+                       elog(DEBUG2, "CatalogCacheInitializeCache: load %d/%d w/%d", \
                                i+1, cache->cc_nkeys, cache->cc_key[i]); \
                } \
 } while(0)
@@ -780,7 +897,7 @@ CatalogCacheInitializeCache(CatCache *cache)
         * Open the relation without locking --- we only need the tupdesc,
         * which we assume will never change ...
         */
-       relation = heap_openr(cache->cc_relname, NoLock);
+       relation = heap_open(cache->cc_reloid, NoLock);
        Assert(RelationIsValid(relation));
 
        /*
@@ -797,9 +914,10 @@ CatalogCacheInitializeCache(CatCache *cache)
        tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
 
        /*
-        * get the relation's OID and relisshared flag, too
+        * save the relation's name and relisshared flag, too (cc_relname
+        * is used only for debugging purposes)
         */
-       cache->cc_reloid = RelationGetRelid(relation);
+       cache->cc_relname = pstrdup(RelationGetRelationName(relation));
        cache->cc_relisshared = RelationGetForm(relation)->relisshared;
 
        /*
@@ -809,7 +927,7 @@ CatalogCacheInitializeCache(CatCache *cache)
 
        heap_close(relation, NoLock);
 
-       CACHE3_elog(DEBUG1, "CatalogCacheInitializeCache: %s, %d keys",
+       CACHE3_elog(DEBUG2, "CatalogCacheInitializeCache: %s, %d keys",
                                cache->cc_relname, cache->cc_nkeys);
 
        /*
@@ -818,6 +936,7 @@ CatalogCacheInitializeCache(CatCache *cache)
        for (i = 0; i < cache->cc_nkeys; ++i)
        {
                Oid                     keytype;
+               RegProcedure eqfunc;
 
                CatalogCacheInitializeCache_DEBUG2;
 
@@ -826,28 +945,32 @@ CatalogCacheInitializeCache(CatCache *cache)
                else
                {
                        if (cache->cc_key[i] != ObjectIdAttributeNumber)
-                               elog(FATAL, "CatalogCacheInit: only sys attr supported is OID");
+                               elog(FATAL, "only sys attr supported in caches is OID");
                        keytype = OIDOID;
                }
 
-               cache->cc_hashfunc[i] = GetCCHashFunc(keytype);
+               GetCCHashEqFuncs(keytype,
+                                                &cache->cc_hashfunc[i],
+                                                &eqfunc);
 
                cache->cc_isname[i] = (keytype == NAMEOID);
 
                /*
-                * If GetCCHashFunc liked the type, safe to index into eqproc[]
+                * Do equality-function lookup (we assume this won't need a
+                * catalog lookup for any supported type)
                 */
-               cache->cc_skey[i].sk_procedure = EQPROC(keytype);
-
-               /* Do function lookup */
-               fmgr_info_cxt(cache->cc_skey[i].sk_procedure,
+               fmgr_info_cxt(eqfunc,
                                          &cache->cc_skey[i].sk_func,
                                          CacheMemoryContext);
 
                /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */
                cache->cc_skey[i].sk_attno = cache->cc_key[i];
 
-               CACHE4_elog(DEBUG1, "CatalogCacheInit %s %d %p",
+               /* Fill in sk_strategy as well --- always standard equality */
+               cache->cc_skey[i].sk_strategy = BTEqualStrategyNumber;
+               cache->cc_skey[i].sk_subtype = InvalidOid;
+
+               CACHE4_elog(DEBUG2, "CatalogCacheInit %s %d %p",
                                        cache->cc_relname,
                                        i,
                                        cache);
@@ -878,7 +1001,7 @@ InitCatCachePhase2(CatCache *cache)
        {
                Relation        idesc;
 
-               idesc = index_openr(cache->cc_indname);
+               idesc = index_open(cache->cc_indexoid);
                index_close(idesc);
        }
 }
@@ -903,8 +1026,8 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
        {
                /*
                 * Since the OIDs of indexes aren't hardwired, it's painful to
-                * figure out which is which.  Just force all pg_index searches
-                * to be heap scans while building the relcaches.
+                * figure out which is which.  Just force all pg_index searches to
+                * be heap scans while building the relcaches.
                 */
                if (!criticalRelcachesBuilt)
                        return false;
@@ -925,7 +1048,7 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
                if (!criticalRelcachesBuilt)
                {
                        /* Looking for an OID comparison function? */
-                       Oid             lookup_oid = DatumGetObjectId(cur_skey[0].sk_argument);
+                       Oid                     lookup_oid = DatumGetObjectId(cur_skey[0].sk_argument);
 
                        if (lookup_oid >= MIN_OIDCMP && lookup_oid <= MAX_OIDCMP)
                                return false;
@@ -943,7 +1066,7 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
  *             if necessary (on the first access to a particular cache).
  *
  *             The result is NULL if not found, or a pointer to a HeapTuple in
- *             the cache.  The caller must not modify the tuple, and must call
+ *             the cache.      The caller must not modify the tuple, and must call
  *             ReleaseCatCache() when done with it.
  *
  * The search key values should be expressed as Datums of the key columns'
@@ -965,9 +1088,8 @@ SearchCatCache(CatCache *cache,
        Dlelem     *elt;
        CatCTup    *ct;
        Relation        relation;
+       SysScanDesc scandesc;
        HeapTuple       ntp;
-       int                     i;
-       MemoryContext oldcxt;
 
        /*
         * one-time startup overhead for each cache
@@ -991,7 +1113,7 @@ SearchCatCache(CatCache *cache,
        /*
         * find the hash bucket in which to look for the tuple
         */
-       hashValue = CatalogCacheComputeHashValue(cache, cur_skey);
+       hashValue = CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
        hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
 
        /*
@@ -1023,27 +1145,27 @@ SearchCatCache(CatCache *cache,
                        continue;
 
                /*
-                * we found a match in the cache: move it to the front of the global
-                * LRU list.  We also move it to the front of the list for its
-                * hashbucket, in order to speed subsequent searches.  (The most
-                * frequently accessed elements in any hashbucket will tend to be
-                * near the front of the hashbucket's list.)
+                * we found a match in the cache: move it to the front of the
+                * global LRU list.  We also move it to the front of the list for
+                * its hashbucket, in order to speed subsequent searches.  (The
+                * most frequently accessed elements in any hashbucket will tend
+                * to be near the front of the hashbucket's list.)
                 */
                DLMoveToFront(&ct->lrulist_elem);
                DLMoveToFront(&ct->cache_elem);
 
                /*
-                * If it's a positive entry, bump its refcount and return it.
-                * If it's negative, we can report failure to the caller.
+                * If it's a positive entry, bump its refcount and return it. If
+                * it's negative, we can report failure to the caller.
                 */
                if (!ct->negative)
                {
+                       ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
                        ct->refcount++;
+                       ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
 
-#ifdef CACHEDEBUG
-                       CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d",
+                       CACHE3_elog(DEBUG2, "SearchCatCache(%s): found in bucket %d",
                                                cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
                        cache->cc_hits++;
@@ -1053,10 +1175,8 @@ SearchCatCache(CatCache *cache,
                }
                else
                {
-#ifdef CACHEDEBUG
-                       CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d",
+                       CACHE3_elog(DEBUG2, "SearchCatCache(%s): found neg entry in bucket %d",
                                                cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
                        cache->cc_neg_hits++;
@@ -1068,8 +1188,8 @@ SearchCatCache(CatCache *cache,
 
        /*
         * Tuple was not found in cache, so we have to try to retrieve it
-        * directly from the relation.  If found, we will add it to the
-        * cache; if not found, we will add a negative cache entry instead.
+        * directly from the relation.  If found, we will add it to the cache;
+        * if not found, we will add a negative cache entry instead.
         *
         * NOTE: it is possible for recursive cache lookups to occur while
         * reading the relation --- for example, due to shared-cache-inval
@@ -1081,187 +1201,456 @@ SearchCatCache(CatCache *cache,
         * cache, so there's no functional problem.  This case is rare enough
         * that it's not worth expending extra cycles to detect.
         */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indexoid,
+                                                                 IndexScanOK(cache, cur_skey),
+                                                                 SnapshotNow,
+                                                                 cache->cc_nkeys,
+                                                                 cur_skey);
+
+       ct = NULL;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+       {
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        false);
+               /* immediately set the refcount to 1 */
+               ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
+               ct->refcount++;
+               ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
+               break;                                  /* assume only one match */
+       }
+
+       systable_endscan(scandesc);
+
+       heap_close(relation, AccessShareLock);
+
+       /*
+        * If tuple was not found, we need to build a negative cache entry
+        * containing a fake tuple.  The fake tuple has the correct key
+        * columns, but nulls everywhere else.
+        *
+        * In bootstrap mode, we don't build negative entries, because the
+        * cache invalidation mechanism isn't alive and can't clear them
+        * if the tuple gets created later.  (Bootstrap doesn't do UPDATEs,
+        * so it doesn't need cache inval for that.)
+        */
+       if (ct == NULL)
+       {
+               if (IsBootstrapProcessingMode())
+                       return NULL;
+
+               ntp = build_dummy_tuple(cache, cache->cc_nkeys, cur_skey);
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        true);
+               heap_freetuple(ntp);
+
+               CACHE4_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
+                                       cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+               CACHE3_elog(DEBUG2, "SearchCatCache(%s): put neg entry in bucket %d",
+                                       cache->cc_relname, hashIndex);
+
+               /*
+                * We are not returning the negative entry to the caller, so leave
+                * its refcount zero.
+                */
+
+               return NULL;
+       }
+
+       CACHE4_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
+                               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+       CACHE3_elog(DEBUG2, "SearchCatCache(%s): put in bucket %d",
+                               cache->cc_relname, hashIndex);
+
+#ifdef CATCACHE_STATS
+       cache->cc_newloads++;
+#endif
+
+       return &ct->tuple;
+}
+
+/*
+ *     ReleaseCatCache
+ *
+ *     Decrement the reference count of a catcache entry (releasing the
+ *     hold grabbed by a successful SearchCatCache).
+ *
+ *     NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
+ *     will be freed as soon as their refcount goes to zero.  In combination
+ *     with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ *     to catch references to already-released catcache entries.
+ */
+void
+ReleaseCatCache(HeapTuple tuple)
+{
+       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                                                 offsetof(CatCTup, tuple));
+
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(ct->ct_magic == CT_MAGIC);
+       Assert(ct->refcount > 0);
+
+       ct->refcount--;
+       ResourceOwnerForgetCatCacheRef(CurrentResourceOwner, &ct->tuple);
+
+       if (ct->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && ct->dead
+#endif
+               )
+               CatCacheRemoveCTup(ct->my_cache, ct);
+}
+
+
+/*
+ *     SearchCatCacheList
+ *
+ *             Generate a list of all tuples matching a partial key (that is,
+ *             a key specifying just the first K of the cache's N key columns).
+ *
+ *             The caller must not modify the list object or the pointed-to tuples,
+ *             and must call ReleaseCatCacheList() when done with the list.
+ */
+CatCList *
+SearchCatCacheList(CatCache *cache,
+                                  int nkeys,
+                                  Datum v1,
+                                  Datum v2,
+                                  Datum v3,
+                                  Datum v4)
+{
+       ScanKeyData cur_skey[4];
+       uint32          lHashValue;
+       Dlelem     *elt;
+       CatCList   *cl;
+       CatCTup    *ct;
+       List       *ctlist;
+       ListCell   *ctlist_item;
+       int                     nmembers;
+       Relation        relation;
+       SysScanDesc scandesc;
+       bool            ordered;
+       HeapTuple       ntp;
+       MemoryContext oldcxt;
+       int                     i;
+
+       /*
+        * one-time startup overhead for each cache
+        */
+       if (cache->cc_tupdesc == NULL)
+               CatalogCacheInitializeCache(cache);
+
+       Assert(nkeys > 0 && nkeys < cache->cc_nkeys);
+
+#ifdef CATCACHE_STATS
+       cache->cc_lsearches++;
+#endif
 
        /*
-        * open the relation associated with the cache
+        * initialize the search key information
         */
-       relation = heap_open(cache->cc_reloid, AccessShareLock);
+       memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+       cur_skey[0].sk_argument = v1;
+       cur_skey[1].sk_argument = v2;
+       cur_skey[2].sk_argument = v3;
+       cur_skey[3].sk_argument = v4;
 
        /*
-        * Pre-create cache entry header, and mark no tuple found.
+        * compute a hash value of the given keys for faster search.  We don't
+        * presently divide the CatCList items into buckets, but this still
+        * lets us skip non-matching items quickly most of the time.
         */
-       ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup));
-       ct->negative = true;
+       lHashValue = CatalogCacheComputeHashValue(cache, nkeys, cur_skey);
 
        /*
-        * Scan the relation to find the tuple.  If there's an index, and if
-        * it's safe to do so, use the index.  Else do a heap scan.
+        * scan the items until we find a match or exhaust our list
         */
-       if ((RelationGetForm(relation))->relhasindex &&
-               !IsIgnoringSystemIndexes() &&
-               IndexScanOK(cache, cur_skey))
+       for (elt = DLGetHead(&cache->cc_lists);
+                elt;
+                elt = DLGetSucc(elt))
        {
-               Relation        idesc;
-               IndexScanDesc isd;
-               RetrieveIndexResult indexRes;
-               HeapTupleData tuple;
-               Buffer          buffer;
+               bool            res;
 
-               CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan",
-                                       cache->cc_relname);
+               cl = (CatCList *) DLE_VAL(elt);
+
+               if (cl->dead)
+                       continue;                       /* ignore dead entries */
+
+               if (cl->hash_value != lHashValue)
+                       continue;                       /* quickly skip entry if wrong hash val */
 
                /*
-                * For an index scan, sk_attno has to be set to the index
-                * attribute number(s), not the heap attribute numbers.  We assume
-                * that the index corresponds exactly to the cache keys (or its
-                * first N keys do, anyway).
+                * see if the cached list matches our key.
                 */
-               for (i = 0; i < cache->cc_nkeys; ++i)
-                       cur_skey[i].sk_attno = i + 1;
-
-               idesc = index_openr(cache->cc_indname);
-               isd = index_beginscan(idesc, false, cache->cc_nkeys, cur_skey);
-               tuple.t_datamcxt = CurrentMemoryContext;
-               tuple.t_data = NULL;
-               while ((indexRes = index_getnext(isd, ForwardScanDirection)))
+               if (cl->nkeys != nkeys)
+                       continue;
+               HeapKeyTest(&cl->tuple,
+                                       cache->cc_tupdesc,
+                                       nkeys,
+                                       cur_skey,
+                                       res);
+               if (!res)
+                       continue;
+
+               /*
+                * we found a matching list: move each of its members to the front
+                * of the global LRU list.      Also move the list itself to the front
+                * of the cache's list-of-lists, to speed subsequent searches. (We
+                * do not move the members to the fronts of their hashbucket
+                * lists, however, since there's no point in that unless they are
+                * searched for individually.)  Also bump the members' refcounts.
+                * (member refcounts are NOT registered separately with the
+                * resource owner.)
+                */
+               ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
+               for (i = 0; i < cl->n_members; i++)
                {
-                       tuple.t_self = indexRes->heap_iptr;
-                       heap_fetch(relation, SnapshotNow, &tuple, &buffer, isd);
-                       pfree(indexRes);
-                       if (tuple.t_data != NULL)
-                       {
-                               /* Copy tuple into our context */
-                               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-                               heap_copytuple_with_tuple(&tuple, &ct->tuple);
-                               ct->negative = false;
-                               MemoryContextSwitchTo(oldcxt);
-                               ReleaseBuffer(buffer);
-                               break;
-                       }
+                       cl->members[i]->refcount++;
+                       DLMoveToFront(&cl->members[i]->lrulist_elem);
                }
-               index_endscan(isd);
-               index_close(idesc);
+               DLMoveToFront(&cl->cache_elem);
+
+               /* Bump the list's refcount and return it */
+               cl->refcount++;
+               ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
+
+               CACHE2_elog(DEBUG2, "SearchCatCacheList(%s): found list",
+                                       cache->cc_relname);
+
+#ifdef CATCACHE_STATS
+               cache->cc_lhits++;
+#endif
+
+               return cl;
        }
-       else
+
+       /*
+        * List was not found in cache, so we have to build it by reading the
+        * relation.  For each matching tuple found in the relation, use an
+        * existing cache entry if possible, else build a new one.
+        */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indexoid,
+                                                                 true,
+                                                                 SnapshotNow,
+                                                                 nkeys,
+                                                                 cur_skey);
+
+       /* The list will be ordered iff we are doing an index scan */
+       ordered = (scandesc->irel != NULL);
+
+       ctlist = NIL;
+       nmembers = 0;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
        {
-               HeapScanDesc sd;
+               uint32          hashValue;
+               Index           hashIndex;
 
-               CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan",
-                                       cache->cc_relname);
+               /*
+                * See if there's an entry for this tuple already.
+                */
+               ct = NULL;
+               hashValue = CatalogCacheComputeTupleHashValue(cache, ntp);
+               hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+               for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
+                        elt;
+                        elt = DLGetSucc(elt))
+               {
+                       ct = (CatCTup *) DLE_VAL(elt);
 
-               sd = heap_beginscan(relation, 0, SnapshotNow,
-                                                       cache->cc_nkeys, cur_skey);
+                       if (ct->dead || ct->negative)
+                               continue;               /* ignore dead and negative entries */
 
-               ntp = heap_getnext(sd, 0);
+                       if (ct->hash_value != hashValue)
+                               continue;               /* quickly skip entry if wrong hash val */
 
-               if (HeapTupleIsValid(ntp))
+                       if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+                               continue;               /* not same tuple */
+
+                       /*
+                        * Found a match, but can't use it if it belongs to another
+                        * list already
+                        */
+                       if (ct->c_list)
+                               continue;
+
+                       /* Found a match, so move it to front */
+                       DLMoveToFront(&ct->lrulist_elem);
+
+                       break;
+               }
+
+               if (elt == NULL)
                {
-                       /* Copy tuple into our context */
-                       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-                       heap_copytuple_with_tuple(ntp, &ct->tuple);
-                       ct->negative = false;
-                       MemoryContextSwitchTo(oldcxt);
-                       /* We should not free the result of heap_getnext... */
+                       /* We didn't find a usable entry, so make a new one */
+                       ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                                hashValue, hashIndex,
+                                                                                false);
                }
 
-               heap_endscan(sd);
+               /*
+                * We have to bump the member refcounts immediately to ensure they
+                * won't get dropped from the cache while loading other members.
+                * If we get an error before we finish constructing the CatCList
+                * then we will leak those reference counts.  This is annoying but
+                * it has no real consequence beyond possibly generating some
+                * warning messages at the next transaction commit, so it's not
+                * worth fixing.
+                */
+               ct->refcount++;
+               ctlist = lappend(ctlist, ct);
+               nmembers++;
        }
 
-       /*
-        * close the relation
-        */
+       systable_endscan(scandesc);
+
        heap_close(relation, AccessShareLock);
 
        /*
-        * scan is complete.  If tuple was not found, we need to build
-        * a fake tuple for the negative cache entry.  The fake tuple has
-        * the correct key columns, but nulls everywhere else.
+        * Now we can build the CatCList entry.  First we need a dummy tuple
+        * containing the key values...
         */
-       if (ct->negative)
+       ntp = build_dummy_tuple(cache, nkeys, cur_skey);
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       cl = (CatCList *) palloc(sizeof(CatCList) + nmembers * sizeof(CatCTup *));
+       heap_copytuple_with_tuple(ntp, &cl->tuple);
+       MemoryContextSwitchTo(oldcxt);
+       heap_freetuple(ntp);
+
+       cl->cl_magic = CL_MAGIC;
+       cl->my_cache = cache;
+       DLInitElem(&cl->cache_elem, cl);
+       cl->refcount = 0;                       /* for the moment */
+       cl->dead = false;
+       cl->ordered = ordered;
+       cl->nkeys = nkeys;
+       cl->hash_value = lHashValue;
+       cl->n_members = nmembers;
+
+       Assert(nmembers == list_length(ctlist));
+       ctlist_item = list_head(ctlist);
+       for (i = 0; i < nmembers; i++)
        {
-               TupleDesc       tupDesc = cache->cc_tupdesc;
-               Datum      *values;
-               char       *nulls;
-               Oid                     negOid = InvalidOid;
+               cl->members[i] = ct = (CatCTup *) lfirst(ctlist_item);
+               Assert(ct->c_list == NULL);
+               ct->c_list = cl;
+               /* mark list dead if any members already dead */
+               if (ct->dead)
+                       cl->dead = true;
+               ctlist_item = lnext(ctlist_item);
+       }
 
-               values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-               nulls = (char *) palloc(tupDesc->natts * sizeof(char));
+       DLAddHead(&cache->cc_lists, &cl->cache_elem);
 
-               memset(values, 0, tupDesc->natts * sizeof(Datum));
-               memset(nulls, 'n', tupDesc->natts * sizeof(char));
+       CACHE3_elog(DEBUG2, "SearchCatCacheList(%s): made list of %d members",
+                               cache->cc_relname, nmembers);
 
-               for (i = 0; i < cache->cc_nkeys; i++)
-               {
-                       int             attindex = cache->cc_key[i];
-                       Datum   keyval = cur_skey[i].sk_argument;
+       /* Finally, bump the list's refcount and return it */
+       ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
+       cl->refcount++;
+       ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
 
-                       if (attindex > 0)
-                       {
-                               /*
-                                * Here we must be careful in case the caller passed a
-                                * C string where a NAME is wanted: convert the given
-                                * argument to a correctly padded NAME.  Otherwise the
-                                * memcpy() done in heap_formtuple could fall off the
-                                * end of memory.
-                                */
-                               if (cache->cc_isname[i])
-                               {
-                                       Name    newval = (Name) palloc(NAMEDATALEN);
+       return cl;
+}
 
-                                       namestrcpy(newval, DatumGetCString(keyval));
-                                       keyval = NameGetDatum(newval);
-                               }
-                               values[attindex-1] = keyval;
-                               nulls[attindex-1] = ' ';
-                       }
-                       else
-                       {
-                               Assert(attindex == ObjectIdAttributeNumber);
-                               negOid = DatumGetObjectId(keyval);
-                       }
-               }
+/*
+ *     ReleaseCatCacheList
+ *
+ *     Decrement the reference counts of a catcache list.
+ */
+void
+ReleaseCatCacheList(CatCList *list)
+{
+       int                     i;
+
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(list->cl_magic == CL_MAGIC);
+       Assert(list->refcount > 0);
 
-               ntp = heap_formtuple(tupDesc, values, nulls);
+       for (i = list->n_members; --i >= 0;)
+       {
+               CatCTup    *ct = list->members[i];
 
-               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               heap_copytuple_with_tuple(ntp, &ct->tuple);
-               ct->tuple.t_data->t_oid = negOid;
-               MemoryContextSwitchTo(oldcxt);
+               Assert(ct->refcount > 0);
 
-               heap_freetuple(ntp);
-               for (i = 0; i < cache->cc_nkeys; i++)
-               {
-                       if (cache->cc_isname[i])
-                               pfree(DatumGetName(values[cache->cc_key[i]-1]));
-               }
-               pfree(values);
-               pfree(nulls);
+               ct->refcount--;
+
+               if (ct->dead)
+                       list->dead = true;
+               /* can't remove tuple before list is removed */
        }
 
+       list->refcount--;
+       ResourceOwnerForgetCatCacheListRef(CurrentResourceOwner, list);
+
+       if (list->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && list->dead
+#endif
+               )
+               CatCacheRemoveCList(list->my_cache, list);
+}
+
+
+/*
+ * CatalogCacheCreateEntry
+ *             Create a new CatCTup entry, copying the given HeapTuple and other
+ *             supplied data into it.  The new entry initially has refcount 0.
+ */
+static CatCTup *
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                               uint32 hashValue, Index hashIndex, bool negative)
+{
+       CatCTup    *ct;
+       MemoryContext oldcxt;
+
        /*
-        * Finish initializing the CatCTup header, and add it to the linked
-        * lists.
+        * Allocate CatCTup header in cache memory, and copy the tuple there
+        * too.
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       ct = (CatCTup *) palloc(sizeof(CatCTup));
+       heap_copytuple_with_tuple(ntp, &ct->tuple);
+       MemoryContextSwitchTo(oldcxt);
+
+       /*
+        * Finish initializing the CatCTup header, and add it to the cache's
+        * linked lists and counts.
         */
        ct->ct_magic = CT_MAGIC;
        ct->my_cache = cache;
        DLInitElem(&ct->lrulist_elem, (void *) ct);
        DLInitElem(&ct->cache_elem, (void *) ct);
-       ct->refcount = 1;                       /* count this first reference */
+       ct->c_list = NULL;
+       ct->refcount = 0;                       /* for the moment */
        ct->dead = false;
+       ct->negative = negative;
        ct->hash_value = hashValue;
 
        DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
        DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
 
+       cache->cc_ntup++;
+       CacheHdr->ch_ntup++;
+
        /*
         * If we've exceeded the desired size of the caches, try to throw away
-        * the least recently used entry.  NB: the newly-built entry cannot
-        * get thrown away here, because it has positive refcount.
+        * the least recently used entry.  NB: be careful not to throw away
+        * the newly-built entry...
         */
-       ++cache->cc_ntup;
-       if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
+       if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
        {
-               Dlelem     *prevelt;
+               Dlelem     *elt,
+                                  *prevelt;
 
                for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt)
                {
@@ -1269,9 +1658,9 @@ SearchCatCache(CatCache *cache,
 
                        prevelt = DLGetPred(elt);
 
-                       if (oldct->refcount == 0)
+                       if (oldct->refcount == 0 && oldct != ct)
                        {
-                               CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal",
+                               CACHE2_elog(DEBUG2, "CatCacheCreateEntry(%s): Overflow, LRU removal",
                                                        cache->cc_relname);
 #ifdef CATCACHE_STATS
                                oldct->my_cache->cc_discards++;
@@ -1283,65 +1672,75 @@ SearchCatCache(CatCache *cache,
                }
        }
 
-       CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
-                               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
-
-       if (ct->negative)
-       {
-               CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
-                                       cache->cc_relname, hashIndex);
-
-               /*
-                * We are not returning the new entry to the caller, so reset its
-                * refcount.  Note it would be uncool to set the refcount to 0
-                * before doing the extra-entry removal step above.
-                */
-               ct->refcount = 0;               /* negative entries never have refs */
-
-               return NULL;
-       }
-
-       CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
-                               cache->cc_relname, hashIndex);
-
-#ifdef CATCACHE_STATS
-       cache->cc_newloads++;
-#endif
-
-       return &ct->tuple;
+       return ct;
 }
 
 /*
- *     ReleaseCatCache()
- *
- *     Decrement the reference count of a catcache entry (releasing the
- *     hold grabbed by a successful SearchCatCache).
+ * build_dummy_tuple
+ *             Generate a palloc'd HeapTuple that contains the specified key
+ *             columns, and NULLs for other columns.
  *
- *     NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
- *     will be freed as soon as their refcount goes to zero.  In combination
- *     with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
- *     to catch references to already-released catcache entries.
+ * This is used to store the keys for negative cache entries and CatCList
+ * entries, which don't have real tuples associated with them.
  */
-void
-ReleaseCatCache(HeapTuple tuple)
+static HeapTuple
+build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys)
 {
-       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
-                                                                 offsetof(CatCTup, tuple));
+       HeapTuple       ntp;
+       TupleDesc       tupDesc = cache->cc_tupdesc;
+       Datum      *values;
+       char       *nulls;
+       Oid                     tupOid = InvalidOid;
+       NameData        tempNames[4];
+       int                     i;
 
-       /* Safety checks to ensure we were handed a cache entry */
-       Assert(ct->ct_magic == CT_MAGIC);
-       Assert(ct->refcount > 0);
+       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+       nulls = (char *) palloc(tupDesc->natts * sizeof(char));
 
-       ct->refcount--;
+       memset(values, 0, tupDesc->natts * sizeof(Datum));
+       memset(nulls, 'n', tupDesc->natts * sizeof(char));
 
-       if (ct->refcount == 0
-#ifndef CATCACHE_FORCE_RELEASE
-               && ct->dead
-#endif
-               )
-               CatCacheRemoveCTup(ct->my_cache, ct);
+       for (i = 0; i < nkeys; i++)
+       {
+               int                     attindex = cache->cc_key[i];
+               Datum           keyval = skeys[i].sk_argument;
+
+               if (attindex > 0)
+               {
+                       /*
+                        * Here we must be careful in case the caller passed a C
+                        * string where a NAME is wanted: convert the given argument
+                        * to a correctly padded NAME.  Otherwise the memcpy() done in
+                        * heap_formtuple could fall off the end of memory.
+                        */
+                       if (cache->cc_isname[i])
+                       {
+                               Name            newval = &tempNames[i];
+
+                               namestrcpy(newval, DatumGetCString(keyval));
+                               keyval = NameGetDatum(newval);
+                       }
+                       values[attindex - 1] = keyval;
+                       nulls[attindex - 1] = ' ';
+               }
+               else
+               {
+                       Assert(attindex == ObjectIdAttributeNumber);
+                       tupOid = DatumGetObjectId(keyval);
+               }
+       }
+
+       ntp = heap_formtuple(tupDesc, values, nulls);
+       if (tupOid != InvalidOid)
+               HeapTupleSetOid(ntp, tupOid);
+
+       pfree(values);
+       pfree(nulls);
+
+       return ntp;
 }
 
+
 /*
  *     PrepareToInvalidateCacheTuple()
  *
@@ -1374,12 +1773,12 @@ ReleaseCatCache(HeapTuple tuple)
 void
 PrepareToInvalidateCacheTuple(Relation relation,
                                                          HeapTuple tuple,
-                                                void (*function) (int, uint32, ItemPointer, Oid))
+                                               void (*function) (int, uint32, ItemPointer, Oid))
 {
        CatCache   *ccp;
        Oid                     reloid;
 
-       CACHE1_elog(DEBUG1, "PrepareToInvalidateCacheTuple: called");
+       CACHE1_elog(DEBUG2, "PrepareToInvalidateCacheTuple: called");
 
        /*
         * sanity checks
@@ -1414,3 +1813,32 @@ PrepareToInvalidateCacheTuple(Relation relation,
                                         ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId);
        }
 }
+
+
+/*
+ * Subroutines for warning about reference leaks.  These are exported so
+ * that resowner.c can call them.
+ */
+void
+PrintCatCacheLeakWarning(HeapTuple tuple)
+{
+       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                                                 offsetof(CatCTup, tuple));
+
+       /* Safety check to ensure we were handed a cache entry */
+       Assert(ct->ct_magic == CT_MAGIC);
+
+       elog(WARNING, "cache reference leak: cache %s (%d), tuple %u/%u has count %d",
+                ct->my_cache->cc_relname, ct->my_cache->id,
+                ItemPointerGetBlockNumber(&(tuple->t_self)),
+                ItemPointerGetOffsetNumber(&(tuple->t_self)),
+                ct->refcount);
+}
+
+void
+PrintCatCacheListLeakWarning(CatCList *list)
+{
+       elog(WARNING, "cache reference leak: cache %s (%d), list %p has count %d",
+                list->my_cache->cc_relname, list->my_cache->id,
+                list, list->refcount);
+}