OSDN Git Service

Implement partial-key searching of syscaches, per recent suggestion
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 6 Apr 2002 06:59:25 +0000 (06:59 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 6 Apr 2002 06:59:25 +0000 (06:59 +0000)
to pghackers.  Use this to do searching for ambiguous functions ---
it will get more uses soon.

src/backend/catalog/namespace.c
src/backend/parser/parse_func.c
src/backend/utils/cache/catcache.c
src/backend/utils/cache/syscache.c
src/include/catalog/namespace.h
src/include/utils/catcache.h
src/include/utils/syscache.h

index fab1912..a7d73bb 100644 (file)
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.5 2002/04/01 03:34:25 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.6 2002/04/06 06:59:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,6 +26,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_shadow.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -33,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/catcache.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -302,6 +304,174 @@ TypenameGetTypid(const char *typname)
 }
 
 /*
+ * FuncnameGetCandidates
+ *             Given a possibly-qualified function name and argument count,
+ *             retrieve a list of the possible matches.
+ *
+ * We search a single namespace if the function name is qualified, else
+ * all namespaces in the search path.  The return list will never contain
+ * multiple entries with identical argument types --- in the multiple-
+ * namespace case, we arrange for entries in earlier namespaces to mask
+ * identical entries in later namespaces.
+ */
+FuncCandidateList
+FuncnameGetCandidates(List *names, int nargs)
+{
+       FuncCandidateList resultList = NULL;
+       char       *catalogname;
+       char       *schemaname = NULL;
+       char       *funcname = NULL;
+       Oid                     namespaceId;
+       CatCList   *catlist;
+       int                     i;
+
+       /* deconstruct the name list */
+       switch (length(names))
+       {
+               case 1:
+                       funcname = strVal(lfirst(names));
+                       break;
+               case 2:
+                       schemaname = strVal(lfirst(names));
+                       funcname = strVal(lsecond(names));
+                       break;
+               case 3:
+                       catalogname = strVal(lfirst(names));
+                       schemaname = strVal(lsecond(names));
+                       funcname = strVal(lfirst(lnext(lnext(names))));
+                       /*
+                        * We check the catalog name and then ignore it.
+                        */
+                       if (strcmp(catalogname, DatabaseName) != 0)
+                               elog(ERROR, "Cross-database references are not implemented");
+                       break;
+               default:
+                       elog(ERROR, "Improper qualified name (too many dotted names)");
+                       break;
+       }
+
+       if (schemaname)
+       {
+               /* use exact schema given */
+               namespaceId = GetSysCacheOid(NAMESPACENAME,
+                                                                        CStringGetDatum(schemaname),
+                                                                        0, 0, 0);
+               if (!OidIsValid(namespaceId))
+                       elog(ERROR, "Namespace \"%s\" does not exist",
+                                schemaname);
+       }
+       else
+       {
+               /* flag to indicate we need namespace search */
+               namespaceId = InvalidOid;
+       }
+
+       /* Search syscache by name and nargs only */
+       catlist = SearchSysCacheList(PROCNAME, 2,
+                                                                CStringGetDatum(funcname),
+                                                                Int16GetDatum(nargs),
+                                                                0, 0);
+
+       for (i = 0; i < catlist->n_members; i++)
+       {
+               HeapTuple       proctup = &catlist->members[i]->tuple;
+               Form_pg_proc procform = (Form_pg_proc) GETSTRUCT(proctup);
+               int                     pathpos = 0;
+               FuncCandidateList newResult;
+
+               if (OidIsValid(namespaceId))
+               {
+                       /* Consider only procs in specified namespace */
+                       if (procform->pronamespace != namespaceId)
+                               continue;
+                       /* No need to check args, they must all be different */
+               }
+               else
+               {
+                       /* Consider only procs that are in the search path */
+                       if (pathContainsSystemNamespace ||
+                               procform->pronamespace != PG_CATALOG_NAMESPACE)
+                       {
+                               List       *nsp;
+
+                               foreach(nsp, namespaceSearchPath)
+                               {
+                                       pathpos++;
+                                       if (procform->pronamespace == (Oid) lfirsti(nsp))
+                                               break;
+                               }
+                               if (nsp == NIL)
+                                       continue;       /* proc is not in search path */
+                       }
+
+                       /*
+                        * Okay, it's in the search path, but does it have the same
+                        * arguments as something we already accepted?  If so, keep
+                        * only the one that appears earlier in the search path.
+                        *
+                        * If we have an ordered list from SearchSysCacheList (the
+                        * normal case), then any conflicting proc must immediately
+                        * adjoin this one in the list, so we only need to look at
+                        * the newest result item.  If we have an unordered list,
+                        * we have to scan the whole result list.
+                        */
+                       if (resultList)
+                       {
+                               FuncCandidateList       prevResult;
+
+                               if (catlist->ordered)
+                               {
+                                       if (memcmp(procform->proargtypes, resultList->args,
+                                                          nargs * sizeof(Oid)) == 0)
+                                               prevResult = resultList;
+                                       else
+                                               prevResult = NULL;
+                               }
+                               else
+                               {
+                                       for (prevResult = resultList;
+                                                prevResult;
+                                                prevResult = prevResult->next)
+                                       {
+                                               if (memcmp(procform->proargtypes, prevResult->args,
+                                                                  nargs * sizeof(Oid)) == 0)
+                                                       break;
+                                       }
+                               }
+                               if (prevResult)
+                               {
+                                       /* We have a match with a previous result */
+                                       Assert(pathpos != prevResult->pathpos);
+                                       if (pathpos > prevResult->pathpos)
+                                               continue; /* keep previous result */
+                                       /* replace previous result */
+                                       prevResult->pathpos = pathpos;
+                                       prevResult->oid = proctup->t_data->t_oid;
+                                       continue;       /* args are same, of course */
+                               }
+                       }
+               }
+
+               /*
+                * Okay to add it to result list
+                */
+               newResult = (FuncCandidateList)
+                       palloc(sizeof(struct _FuncCandidateList) - sizeof(Oid)
+                                  + nargs * sizeof(Oid));
+               newResult->pathpos = pathpos;
+               newResult->oid = proctup->t_data->t_oid;
+               memcpy(newResult->args, procform->proargtypes, nargs * sizeof(Oid));
+
+               newResult->next = resultList;
+               resultList = newResult;
+       }
+
+       ReleaseSysCacheList(catlist);
+
+       return resultList;
+}
+
+/*
  * QualifiedNameGetCreationNamespace
  *             Given a possibly-qualified name for an object (in List-of-Values
  *             format), determine what namespace the object should be created in.
index f3c8712..578402f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.123 2002/04/05 00:31:27 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.124 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,6 +18,7 @@
 #include "access/heapam.h"
 #include "catalog/catname.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -40,7 +41,6 @@ static Node *ParseComplexProjection(ParseState *pstate,
 static Oid **argtype_inherit(int nargs, Oid *argtypes);
 
 static int     find_inheritors(Oid relid, Oid **supervec);
-static CandidateList func_get_candidates(char *funcname, int nargs);
 static Oid **gen_cross_product(InhPaths *arginh, int nargs);
 static void make_arguments(ParseState *pstate,
                           int nargs,
@@ -48,14 +48,15 @@ static void make_arguments(ParseState *pstate,
                           Oid *input_typeids,
                           Oid *function_typeids);
 static int match_argtypes(int nargs,
-                          Oid *input_typeids,
-                          CandidateList function_typeids,
-                          CandidateList *candidates);
+                                                 Oid *input_typeids,
+                                                 FuncCandidateList function_typeids,
+                                                 FuncCandidateList *candidates);
 static FieldSelect *setup_field_select(Node *input, char *attname, Oid relid);
-static Oid *func_select_candidate(int nargs, Oid *input_typeids,
-                                         CandidateList candidates);
-static int     agg_get_candidates(char *aggname, Oid typeId, CandidateList *candidates);
-static Oid     agg_select_candidate(Oid typeid, CandidateList candidates);
+static FuncCandidateList func_select_candidate(int nargs, Oid *input_typeids,
+                                                                 FuncCandidateList candidates);
+static int     agg_get_candidates(char *aggname, Oid typeId,
+                                                          FuncCandidateList *candidates);
+static Oid     agg_select_candidate(Oid typeid, FuncCandidateList candidates);
 
 
 /*
@@ -170,7 +171,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
        {
                Oid                     basetype = exprType(lfirst(fargs));
                int                     ncandidates;
-               CandidateList candidates;
+               FuncCandidateList candidates;
 
                /* try for exact match first... */
                if (SearchSysCacheExists(AGGNAME,
@@ -374,7 +375,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
 static int
 agg_get_candidates(char *aggname,
                                   Oid typeId,
-                                  CandidateList *candidates)
+                                  FuncCandidateList *candidates)
 {
        Relation        pg_aggregate_desc;
        SysScanDesc     pg_aggregate_scan;
@@ -398,11 +399,10 @@ agg_get_candidates(char *aggname,
        while (HeapTupleIsValid(tup = systable_getnext(pg_aggregate_scan)))
        {
                Form_pg_aggregate agg = (Form_pg_aggregate) GETSTRUCT(tup);
-               CandidateList current_candidate;
-
-               current_candidate = (CandidateList) palloc(sizeof(struct _CandidateList));
-               current_candidate->args = (Oid *) palloc(sizeof(Oid));
+               FuncCandidateList current_candidate;
 
+               current_candidate = (FuncCandidateList)
+                       palloc(sizeof(struct _FuncCandidateList));
                current_candidate->args[0] = agg->aggbasetype;
                current_candidate->next = *candidates;
                *candidates = current_candidate;
@@ -422,10 +422,10 @@ agg_get_candidates(char *aggname,
  * if successful, else InvalidOid.
  */
 static Oid
-agg_select_candidate(Oid typeid, CandidateList candidates)
+agg_select_candidate(Oid typeid, FuncCandidateList candidates)
 {
-       CandidateList current_candidate;
-       CandidateList last_candidate;
+       FuncCandidateList current_candidate;
+       FuncCandidateList last_candidate;
        Oid                     current_typeid;
        int                     ncandidates;
        CATEGORY        category,
@@ -498,91 +498,37 @@ agg_select_candidate(Oid typeid, CandidateList candidates)
 }      /* agg_select_candidate() */
 
 
-/* func_get_candidates()
- * get a list of all argument type vectors for which a function named
- * funcname taking nargs arguments exists
- */
-static CandidateList
-func_get_candidates(char *funcname, int nargs)
-{
-       Relation        heapRelation;
-       ScanKeyData skey[2];
-       HeapTuple       tuple;
-       SysScanDesc     funcscan;
-       CandidateList candidates = NULL;
-       int                     i;
-
-       heapRelation = heap_openr(ProcedureRelationName, AccessShareLock);
-
-       ScanKeyEntryInitialize(&skey[0],
-                                                  (bits16) 0x0,
-                                                  (AttrNumber) Anum_pg_proc_proname,
-                                                  (RegProcedure) F_NAMEEQ,
-                                                  PointerGetDatum(funcname));
-       ScanKeyEntryInitialize(&skey[1],
-                                                  (bits16) 0x0,
-                                                  (AttrNumber) Anum_pg_proc_pronargs,
-                                                  (RegProcedure) F_INT2EQ,
-                                                  Int16GetDatum(nargs));
-
-       funcscan = systable_beginscan(heapRelation, ProcedureNameNspIndex, true,
-                                                                 SnapshotNow, 2, skey);
-
-       while (HeapTupleIsValid(tuple = systable_getnext(funcscan)))
-       {
-               Form_pg_proc pgProcP = (Form_pg_proc) GETSTRUCT(tuple);
-               CandidateList current_candidate;
-
-               current_candidate = (CandidateList)
-                       palloc(sizeof(struct _CandidateList));
-               current_candidate->args = (Oid *)
-                       palloc(FUNC_MAX_ARGS * sizeof(Oid));
-               MemSet(current_candidate->args, 0, FUNC_MAX_ARGS * sizeof(Oid));
-               for (i = 0; i < nargs; i++)
-                       current_candidate->args[i] = pgProcP->proargtypes[i];
-
-               current_candidate->next = candidates;
-               candidates = current_candidate;
-       }
-
-       systable_endscan(funcscan);
-       heap_close(heapRelation, AccessShareLock);
-
-       return candidates;
-}
-
-
 /* match_argtypes()
+ *
  * Given a list of possible typeid arrays to a function and an array of
  * input typeids, produce a shortlist of those function typeid arrays
  * that match the input typeids (either exactly or by coercion), and
- * return the number of such arrays
+ * return the number of such arrays.
+ *
+ * NB: okay to modify input list structure, as long as we find at least
+ * one match.
  */
 static int
 match_argtypes(int nargs,
                           Oid *input_typeids,
-                          CandidateList function_typeids,
-                          CandidateList *candidates)           /* return value */
+                          FuncCandidateList function_typeids,
+                          FuncCandidateList *candidates) /* return value */
 {
-       CandidateList current_candidate;
-       CandidateList matching_candidate;
-       Oid                *current_typeids;
+       FuncCandidateList current_candidate;
+       FuncCandidateList next_candidate;
        int                     ncandidates = 0;
 
        *candidates = NULL;
 
        for (current_candidate = function_typeids;
                 current_candidate != NULL;
-                current_candidate = current_candidate->next)
+                current_candidate = next_candidate)
        {
-               current_typeids = current_candidate->args;
-               if (can_coerce_type(nargs, input_typeids, current_typeids))
+               next_candidate = current_candidate->next;
+               if (can_coerce_type(nargs, input_typeids, current_candidate->args))
                {
-                       matching_candidate = (CandidateList)
-                               palloc(sizeof(struct _CandidateList));
-                       matching_candidate->args = current_typeids;
-                       matching_candidate->next = *candidates;
-                       *candidates = matching_candidate;
+                       current_candidate->next = *candidates;
+                       *candidates = current_candidate;
                        ncandidates++;
                }
        }
@@ -593,8 +539,8 @@ match_argtypes(int nargs,
 
 /* func_select_candidate()
  * Given the input argtype array and more than one candidate
- * for the function argtype array, attempt to resolve the conflict.
- * Returns the selected argtype array if the conflict can be resolved,
+ * for the function, attempt to resolve the conflict.
+ * Returns the selected candidate if the conflict can be resolved,
  * otherwise returns NULL.
  *
  * By design, this is pretty similar to oper_select_candidate in parse_oper.c.
@@ -602,13 +548,13 @@ match_argtypes(int nargs,
  * already pruned away "candidates" that aren't actually coercion-compatible
  * with the input types, whereas oper_select_candidate must do that itself.
  */
-static Oid *
+static FuncCandidateList
 func_select_candidate(int nargs,
                                          Oid *input_typeids,
-                                         CandidateList candidates)
+                                         FuncCandidateList candidates)
 {
-       CandidateList current_candidate;
-       CandidateList last_candidate;
+       FuncCandidateList current_candidate;
+       FuncCandidateList last_candidate;
        Oid                *current_typeids;
        Oid                     current_type;
        int                     i;
@@ -662,7 +608,7 @@ func_select_candidate(int nargs,
                last_candidate->next = NULL;
 
        if (ncandidates == 1)
-               return candidates->args;
+               return candidates;
 
        /*
         * Still too many candidates? Run through all candidates and keep
@@ -709,7 +655,7 @@ func_select_candidate(int nargs,
                last_candidate->next = NULL;
 
        if (ncandidates == 1)
-               return candidates->args;
+               return candidates;
 
        /*
         * Still too many candidates? Now look for candidates which are
@@ -755,7 +701,7 @@ func_select_candidate(int nargs,
                last_candidate->next = NULL;
 
        if (ncandidates == 1)
-               return candidates->args;
+               return candidates;
 
        /*
         * Still too many candidates? Try assigning types for the unknown
@@ -888,7 +834,7 @@ func_select_candidate(int nargs,
        }
 
        if (ncandidates == 1)
-               return candidates->args;
+               return candidates;
 
        return NULL;                            /* failed to determine a unique candidate */
 }      /* func_select_candidate() */
@@ -925,22 +871,24 @@ func_get_detail(char *funcname,
                                bool *retset,   /* return value */
                                Oid **true_typeids)             /* return value */
 {
-       HeapTuple       ftup;
-       CandidateList function_typeids;
+       FuncCandidateList function_typeids;
+       FuncCandidateList best_candidate;
 
-       /* attempt to find with arguments exactly as specified... */
-       ftup = SearchSysCache(PROCNAME,
-                                                 PointerGetDatum(funcname),
-                                                 Int32GetDatum(nargs),
-                                                 PointerGetDatum(argtypes),
-                                                 0);
+       /* Get list of possible candidates from namespace search */
+       function_typeids = FuncnameGetCandidates(makeList1(makeString(funcname)), nargs);
 
-       if (HeapTupleIsValid(ftup))
+       /*
+        * See if there is an exact match
+        */
+       for (best_candidate = function_typeids;
+                best_candidate != NULL;
+                best_candidate = best_candidate->next)
        {
-               /* given argument types are the right ones */
-               *true_typeids = argtypes;
+               if (memcmp(argtypes, best_candidate->args, nargs * sizeof(Oid)) == 0)
+                       break;
        }
-       else
+
+       if (best_candidate == NULL)
        {
                /*
                 * If we didn't find an exact match, next consider the possibility
@@ -1001,10 +949,6 @@ func_get_detail(char *funcname,
                 * didn't find an exact match, so now try to match up
                 * candidates...
                 */
-
-               function_typeids = func_get_candidates(funcname, nargs);
-
-               /* found something, so let's look through them... */
                if (function_typeids != NULL)
                {
                        Oid               **input_typeid_vector = NULL;
@@ -1019,7 +963,7 @@ func_get_detail(char *funcname,
 
                        do
                        {
-                               CandidateList current_function_typeids;
+                               FuncCandidateList current_function_typeids;
                                int                     ncandidates;
 
                                ncandidates = match_argtypes(nargs, current_input_typeids,
@@ -1029,13 +973,7 @@ func_get_detail(char *funcname,
                                /* one match only? then run with it... */
                                if (ncandidates == 1)
                                {
-                                       *true_typeids = current_function_typeids->args;
-                                       ftup = SearchSysCache(PROCNAME,
-                                                                                 PointerGetDatum(funcname),
-                                                                                 Int32GetDatum(nargs),
-                                                                                 PointerGetDatum(*true_typeids),
-                                                                                 0);
-                                       Assert(HeapTupleIsValid(ftup));
+                                       best_candidate = current_function_typeids;
                                        break;
                                }
 
@@ -1045,25 +983,15 @@ func_get_detail(char *funcname,
                                 */
                                if (ncandidates > 1)
                                {
-                                       *true_typeids = func_select_candidate(nargs,
+                                       best_candidate = func_select_candidate(nargs,
                                                                                                   current_input_typeids,
                                                                                           current_function_typeids);
 
-                                       if (*true_typeids != NULL)
-                                       {
-                                               /* was able to choose a best candidate */
-                                               ftup = SearchSysCache(PROCNAME,
-                                                                                         PointerGetDatum(funcname),
-                                                                                         Int32GetDatum(nargs),
-                                                                                 PointerGetDatum(*true_typeids),
-                                                                                         0);
-                                               Assert(HeapTupleIsValid(ftup));
-                                               break;
-                                       }
-
                                        /*
-                                        * otherwise, ambiguous function call, so fail by
-                                        * exiting loop with ftup still NULL.
+                                        * If we were able to choose a best candidate, we're
+                                        * done.  Otherwise, ambiguous function call, so fail
+                                        * by exiting loop with best_candidate still NULL.
+                                        * Either way, we're outta here.
                                         */
                                        break;
                                }
@@ -1082,11 +1010,20 @@ func_get_detail(char *funcname,
                }
        }
 
-       if (HeapTupleIsValid(ftup))
+       if (best_candidate)
        {
-               Form_pg_proc pform = (Form_pg_proc) GETSTRUCT(ftup);
-
-               *funcid = ftup->t_data->t_oid;
+               HeapTuple       ftup;
+               Form_pg_proc pform;
+
+               *funcid = best_candidate->oid;
+               *true_typeids = best_candidate->args;
+
+               ftup = SearchSysCache(PROCOID,
+                                                         ObjectIdGetDatum(best_candidate->oid),
+                                                         0, 0, 0);
+               if (!HeapTupleIsValid(ftup)) /* should not happen */
+                       elog(ERROR, "function %u not found", best_candidate->oid);
+               pform = (Form_pg_proc) GETSTRUCT(ftup);
                *rettype = pform->prorettype;
                *retset = pform->proretset;
                ReleaseSysCache(ftup);
index 202f447..efcb65d 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.93 2002/03/26 19:16:08 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.94 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -34,7 +34,7 @@
 #include "utils/syscache.h"
 
 
- /* #define CACHEDEBUG */      /* turns DEBUG elogs on */
+/* #define CACHEDEBUG */       /* turns DEBUG elogs on */
 
 /*
  * Constants related to size of the catcache.
@@ -98,7 +98,7 @@ static const Oid eqproc[] = {
 #define EQPROC(SYSTEMTYPEOID)  eqproc[(SYSTEMTYPEOID)-BOOLOID]
 
 
-static uint32 CatalogCacheComputeHashValue(CatCache *cache,
+static uint32 CatalogCacheComputeHashValue(CatCache *cache, int nkeys,
                                                         ScanKey cur_skey);
 static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
                                                                  HeapTuple tuple);
@@ -106,7 +106,12 @@ static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
 static void CatCachePrintStats(void);
 #endif
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
+static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                                                               uint32 hashValue, Index hashIndex,
+                                                                               bool negative);
+static HeapTuple build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys);
 
 
 /*
@@ -149,16 +154,16 @@ GetCCHashFunc(Oid keytype)
  * Compute the hash value associated with a given set of lookup keys
  */
 static uint32
-CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
+CatalogCacheComputeHashValue(CatCache *cache, int nkeys, ScanKey cur_skey)
 {
        uint32          hashValue = 0;
 
        CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p",
                                cache->cc_relname,
-                               cache->cc_nkeys,
+                               nkeys,
                                cache);
 
-       switch (cache->cc_nkeys)
+       switch (nkeys)
        {
                case 4:
                        hashValue ^=
@@ -181,7 +186,7 @@ CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
                                                                                           cur_skey[0].sk_argument));
                        break;
                default:
-                       elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys);
+                       elog(FATAL, "CCComputeHashValue: %d nkeys", nkeys);
                        break;
        }
 
@@ -251,7 +256,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
                        break;
        }
 
-       return CatalogCacheComputeHashValue(cache, cur_skey);
+       return CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
 }
 
 
@@ -267,6 +272,8 @@ CatCachePrintStats(void)
        long            cc_newloads = 0;
        long            cc_invals = 0;
        long            cc_discards = 0;
+       long            cc_lsearches = 0;
+       long            cc_lhits = 0;
 
        elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches",
                 CacheHdr->ch_ntup, CacheHdr->ch_maxtup);
@@ -275,7 +282,7 @@ CatCachePrintStats(void)
        {
                if (cache->cc_ntup == 0 && cache->cc_searches == 0)
                        continue;                       /* don't print unused caches */
-               elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+               elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
                         cache->cc_relname,
                         cache->cc_indname,
                         cache->cc_ntup,
@@ -287,15 +294,19 @@ CatCachePrintStats(void)
                         cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads,
                         cache->cc_searches - cache->cc_hits - cache->cc_neg_hits,
                         cache->cc_invals,
-                        cache->cc_discards);
+                        cache->cc_discards,
+                        cache->cc_lsearches,
+                        cache->cc_lhits);
                cc_searches += cache->cc_searches;
                cc_hits += cache->cc_hits;
                cc_neg_hits += cache->cc_neg_hits;
                cc_newloads += cache->cc_newloads;
                cc_invals += cache->cc_invals;
                cc_discards += cache->cc_discards;
+               cc_lsearches += cache->cc_lsearches;
+               cc_lhits += cache->cc_lhits;
        }
-       elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+       elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
                 CacheHdr->ch_ntup,
                 cc_searches,
                 cc_hits,
@@ -305,7 +316,9 @@ CatCachePrintStats(void)
                 cc_searches - cc_hits - cc_neg_hits - cc_newloads,
                 cc_searches - cc_hits - cc_neg_hits,
                 cc_invals,
-                cc_discards);
+                cc_discards,
+                cc_lsearches,
+                cc_lhits);
 }
 
 #endif /* CATCACHE_STATS */
@@ -315,6 +328,8 @@ CatCachePrintStats(void)
  *             CatCacheRemoveCTup
  *
  * Unlink and delete the given cache entry
+ *
+ * NB: if it is a member of a CatCList, the CatCList is deleted too.
  */
 static void
 CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
@@ -322,6 +337,9 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
        Assert(ct->refcount == 0);
        Assert(ct->my_cache == cache);
 
+       if (ct->c_list)
+               CatCacheRemoveCList(cache, ct->c_list);
+
        /* delink from linked lists */
        DLRemove(&ct->lrulist_elem);
        DLRemove(&ct->cache_elem);
@@ -336,6 +354,38 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
 }
 
 /*
+ *             CatCacheRemoveCList
+ *
+ * Unlink and delete the given cache list entry
+ */
+static void
+CatCacheRemoveCList(CatCache *cache, CatCList *cl)
+{
+       int                     i;
+
+       Assert(cl->refcount == 0);
+       Assert(cl->my_cache == cache);
+
+       /* delink from member tuples */
+       for (i = cl->n_members; --i >= 0; )
+       {
+               CatCTup    *ct = cl->members[i];
+
+               Assert(ct->c_list == cl);
+               ct->c_list = NULL;
+       }
+
+       /* delink from linked list */
+       DLRemove(&cl->cache_elem);
+
+       /* free associated tuple data */
+       if (cl->tuple.t_data != NULL)
+               pfree(cl->tuple.t_data);
+       pfree(cl);
+}
+
+
+/*
  *     CatalogCacheIdInvalidate
  *
  *     Invalidate entries in the specified cache, given a hash value and
@@ -385,7 +435,23 @@ CatalogCacheIdInvalidate(int cacheId,
                 */
 
                /*
-                * inspect the proper hash bucket for matches
+                * Invalidate *all* CatCLists in this cache; it's too hard to tell
+                * which searches might still be correct, so just zap 'em all.
+                */
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+               {
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount > 0)
+                               cl->dead = true;
+                       else
+                               CatCacheRemoveCList(ccp, cl);
+               }
+
+               /*
+                * inspect the proper hash bucket for tuple matches
                 */
                hashIndex = HASH_INDEX(hashValue, ccp->cc_nbuckets);
 
@@ -458,9 +524,38 @@ CreateCacheMemoryContext(void)
 void
 AtEOXact_CatCache(bool isCommit)
 {
+       CatCache   *ccp;
        Dlelem     *elt,
                           *nextelt;
 
+       /*
+        * First clean up CatCLists
+        */
+       for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
+       {
+               for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+               {
+                       CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+                       nextelt = DLGetSucc(elt);
+
+                       if (cl->refcount != 0)
+                       {
+                               if (isCommit)
+                                       elog(WARNING, "Cache reference leak: cache %s (%d), list %p has count %d",
+                                                ccp->cc_relname, ccp->id, cl, cl->refcount);
+                               cl->refcount = 0;
+                       }
+
+                       /* Clean up any now-deletable dead entries */
+                       if (cl->dead)
+                               CatCacheRemoveCList(ccp, cl);
+               }
+       }
+
+       /*
+        * Now clean up tuples; we can scan them all using the global LRU list
+        */
        for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
        {
                CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -494,14 +589,26 @@ AtEOXact_CatCache(bool isCommit)
 static void
 ResetCatalogCache(CatCache *cache)
 {
+       Dlelem     *elt,
+                          *nextelt;
        int                     i;
 
+       /* Remove each list in this cache, or at least mark it dead */
+       for (elt = DLGetHead(&cache->cc_lists); elt; elt = nextelt)
+       {
+               CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+               nextelt = DLGetSucc(elt);
+
+               if (cl->refcount > 0)
+                       cl->dead = true;
+               else
+                       CatCacheRemoveCList(cache, cl);
+       }
+
        /* Remove each tuple in this cache, or at least mark it dead */
        for (i = 0; i < cache->cc_nbuckets; i++)
        {
-               Dlelem     *elt,
-                                  *nextelt;
-
                for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
                {
                        CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -694,7 +801,7 @@ InitCatCache(int id,
        /*
         * allocate a new cache structure
         *
-        * Note: we assume zeroing initializes the bucket headers correctly
+        * Note: we assume zeroing initializes the Dllist headers correctly
         */
        cp = (CatCache *) palloc(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
        MemSet((char *) cp, 0, sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
@@ -965,9 +1072,8 @@ SearchCatCache(CatCache *cache,
        Dlelem     *elt;
        CatCTup    *ct;
        Relation        relation;
+       SysScanDesc     scandesc;
        HeapTuple       ntp;
-       int                     i;
-       MemoryContext oldcxt;
 
        /*
         * one-time startup overhead for each cache
@@ -991,7 +1097,7 @@ SearchCatCache(CatCache *cache,
        /*
         * find the hash bucket in which to look for the tuple
         */
-       hashValue = CatalogCacheComputeHashValue(cache, cur_skey);
+       hashValue = CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
        hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
 
        /*
@@ -1040,10 +1146,8 @@ SearchCatCache(CatCache *cache,
                {
                        ct->refcount++;
 
-#ifdef CACHEDEBUG
                        CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d",
                                                cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
                        cache->cc_hits++;
@@ -1053,10 +1157,8 @@ SearchCatCache(CatCache *cache,
                }
                else
                {
-#ifdef CACHEDEBUG
                        CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d",
                                                cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
                        cache->cc_neg_hits++;
@@ -1081,187 +1183,422 @@ SearchCatCache(CatCache *cache,
         * cache, so there's no functional problem.  This case is rare enough
         * that it's not worth expending extra cycles to detect.
         */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indname,
+                                                                 IndexScanOK(cache, cur_skey),
+                                                                 SnapshotNow,
+                                                                 cache->cc_nkeys,
+                                                                 cur_skey);
+
+       ct = NULL;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+       {
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        false);
+               break;                                  /* assume only one match */
+       }
+
+       systable_endscan(scandesc);
+
+       heap_close(relation, AccessShareLock);
 
        /*
-        * open the relation associated with the cache
+        * If tuple was not found, we need to build a negative cache entry
+        * containing a fake tuple.  The fake tuple has the correct key columns,
+        * but nulls everywhere else.
         */
-       relation = heap_open(cache->cc_reloid, AccessShareLock);
+       if (ct == NULL)
+       {
+               ntp = build_dummy_tuple(cache, cache->cc_nkeys, cur_skey);
+               ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                        hashValue, hashIndex,
+                                                                        true);
+               heap_freetuple(ntp);
+
+               CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+                                       cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+               CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
+                                       cache->cc_relname, hashIndex);
+
+               /*
+                * We are not returning the new entry to the caller, so reset its
+                * refcount.
+                */
+               ct->refcount = 0;               /* negative entries never have refs */
+
+               return NULL;
+       }
+
+       CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+                               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+       CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
+                               cache->cc_relname, hashIndex);
+
+#ifdef CATCACHE_STATS
+       cache->cc_newloads++;
+#endif
+
+       return &ct->tuple;
+}
+
+/*
+ *     ReleaseCatCache
+ *
+ *     Decrement the reference count of a catcache entry (releasing the
+ *     hold grabbed by a successful SearchCatCache).
+ *
+ *     NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
+ *     will be freed as soon as their refcount goes to zero.  In combination
+ *     with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ *     to catch references to already-released catcache entries.
+ */
+void
+ReleaseCatCache(HeapTuple tuple)
+{
+       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+                                                                 offsetof(CatCTup, tuple));
+
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(ct->ct_magic == CT_MAGIC);
+       Assert(ct->refcount > 0);
+
+       ct->refcount--;
+
+       if (ct->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && ct->dead
+#endif
+               )
+               CatCacheRemoveCTup(ct->my_cache, ct);
+}
+
+
+/*
+ *     SearchCatCacheList
+ *
+ *             Generate a list of all tuples matching a partial key (that is,
+ *             a key specifying just the first K of the cache's N key columns).
+ *
+ *             The caller must not modify the list object or the pointed-to tuples,
+ *             and must call ReleaseCatCacheList() when done with the list.
+ */
+CatCList *
+SearchCatCacheList(CatCache *cache,
+                                  int nkeys,
+                                  Datum v1,
+                                  Datum v2,
+                                  Datum v3,
+                                  Datum v4)
+{
+       ScanKeyData cur_skey[4];
+       uint32          lHashValue;
+       Dlelem     *elt;
+       CatCList   *cl;
+       CatCTup    *ct;
+       List       *ctlist;
+       int                     nmembers;
+       Relation        relation;
+       SysScanDesc     scandesc;
+       bool            ordered;
+       HeapTuple       ntp;
+       MemoryContext oldcxt;
+       int                     i;
+
+       /*
+        * one-time startup overhead for each cache
+        */
+       if (cache->cc_tupdesc == NULL)
+               CatalogCacheInitializeCache(cache);
+
+       Assert(nkeys > 0 && nkeys < cache->cc_nkeys);
+
+#ifdef CATCACHE_STATS
+       cache->cc_lsearches++;
+#endif
+
+       /*
+        * initialize the search key information
+        */
+       memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+       cur_skey[0].sk_argument = v1;
+       cur_skey[1].sk_argument = v2;
+       cur_skey[2].sk_argument = v3;
+       cur_skey[3].sk_argument = v4;
 
        /*
-        * Pre-create cache entry header, and mark no tuple found.
+        * compute a hash value of the given keys for faster search.  We don't
+        * presently divide the CatCList items into buckets, but this still lets
+        * us skip non-matching items quickly most of the time.
         */
-       ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup));
-       ct->negative = true;
+       lHashValue = CatalogCacheComputeHashValue(cache, nkeys, cur_skey);
 
        /*
-        * Scan the relation to find the tuple.  If there's an index, and if
-        * it's safe to do so, use the index.  Else do a heap scan.
+        * scan the items until we find a match or exhaust our list
         */
-       if ((RelationGetForm(relation))->relhasindex &&
-               !IsIgnoringSystemIndexes() &&
-               IndexScanOK(cache, cur_skey))
+       for (elt = DLGetHead(&cache->cc_lists);
+                elt;
+                elt = DLGetSucc(elt))
        {
-               Relation        idesc;
-               IndexScanDesc isd;
-               RetrieveIndexResult indexRes;
-               HeapTupleData tuple;
-               Buffer          buffer;
+               bool            res;
 
-               CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan",
-                                       cache->cc_relname);
+               cl = (CatCList *) DLE_VAL(elt);
+
+               if (cl->dead)
+                       continue;                       /* ignore dead entries */
+
+               if (cl->hash_value != lHashValue)
+                       continue;                       /* quickly skip entry if wrong hash val */
 
                /*
-                * For an index scan, sk_attno has to be set to the index
-                * attribute number(s), not the heap attribute numbers.  We assume
-                * that the index corresponds exactly to the cache keys (or its
-                * first N keys do, anyway).
+                * see if the cached list matches our key.
                 */
-               for (i = 0; i < cache->cc_nkeys; ++i)
-                       cur_skey[i].sk_attno = i + 1;
+               if (cl->nkeys != nkeys)
+                       continue;
+               HeapKeyTest(&cl->tuple,
+                                       cache->cc_tupdesc,
+                                       nkeys,
+                                       cur_skey,
+                                       res);
+               if (!res)
+                       continue;
 
-               idesc = index_openr(cache->cc_indname);
-               isd = index_beginscan(idesc, false, cache->cc_nkeys, cur_skey);
-               tuple.t_datamcxt = CurrentMemoryContext;
-               tuple.t_data = NULL;
-               while ((indexRes = index_getnext(isd, ForwardScanDirection)))
+               /*
+                * we found a matching list: move each of its members to the front
+                * of the global LRU list.  Also move the list itself to the front
+                * of the cache's list-of-lists, to speed subsequent searches.
+                * (We do not move the members to the fronts of their hashbucket
+                * lists, however, since there's no point in that unless they are
+                * searched for individually.)  Also bump the members' refcounts.
+                */
+               for (i = 0; i < cl->n_members; i++)
                {
-                       tuple.t_self = indexRes->heap_iptr;
-                       heap_fetch(relation, SnapshotNow, &tuple, &buffer, isd);
-                       pfree(indexRes);
-                       if (tuple.t_data != NULL)
-                       {
-                               /* Copy tuple into our context */
-                               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-                               heap_copytuple_with_tuple(&tuple, &ct->tuple);
-                               ct->negative = false;
-                               MemoryContextSwitchTo(oldcxt);
-                               ReleaseBuffer(buffer);
-                               break;
-                       }
+                       cl->members[i]->refcount++;
+                       DLMoveToFront(&cl->members[i]->lrulist_elem);
                }
-               index_endscan(isd);
-               index_close(idesc);
+               DLMoveToFront(&cl->cache_elem);
+
+               /* Bump the list's refcount and return it */
+               cl->refcount++;
+
+               CACHE2_elog(DEBUG1, "SearchCatCacheList(%s): found list",
+                                       cache->cc_relname);
+
+#ifdef CATCACHE_STATS
+               cache->cc_lhits++;
+#endif
+
+               return cl;
        }
-       else
+
+       /*
+        * List was not found in cache, so we have to build it by reading
+        * the relation.  For each matching tuple found in the relation,
+        * use an existing cache entry if possible, else build a new one.
+        */
+       relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+       scandesc = systable_beginscan(relation,
+                                                                 cache->cc_indname,
+                                                                 true,
+                                                                 SnapshotNow,
+                                                                 nkeys,
+                                                                 cur_skey);
+
+       /* The list will be ordered iff we are doing an index scan */
+       ordered = (scandesc->irel != NULL);
+
+       ctlist = NIL;
+       nmembers = 0;
+
+       while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
        {
-               HeapScanDesc sd;
+               uint32          hashValue;
+               Index           hashIndex;
 
-               CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan",
-                                       cache->cc_relname);
+               /*
+                * See if there's an entry for this tuple already.
+                */
+               ct = NULL;
+               hashValue = CatalogCacheComputeTupleHashValue(cache, ntp);
+               hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+               for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
+                        elt;
+                        elt = DLGetSucc(elt))
+               {
+                       ct = (CatCTup *) DLE_VAL(elt);
+
+                       if (ct->dead || ct->negative)
+                               continue;                       /* ignore dead and negative entries */
+
+                       if (ct->hash_value != hashValue)
+                               continue;                       /* quickly skip entry if wrong hash val */
 
-               sd = heap_beginscan(relation, 0, SnapshotNow,
-                                                       cache->cc_nkeys, cur_skey);
+                       if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+                               continue;                       /* not same tuple */
 
-               ntp = heap_getnext(sd, 0);
+                       /*
+                        * Found a match, but can't use it if it belongs to another list
+                        * already
+                        */
+                       if (ct->c_list)
+                               continue;
 
-               if (HeapTupleIsValid(ntp))
+                       /* Found a match, so bump its refcount and move to front */
+                       ct->refcount++;
+
+                       DLMoveToFront(&ct->lrulist_elem);
+
+                       break;
+               }
+
+               if (elt == NULL)
                {
-                       /* Copy tuple into our context */
-                       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-                       heap_copytuple_with_tuple(ntp, &ct->tuple);
-                       ct->negative = false;
-                       MemoryContextSwitchTo(oldcxt);
-                       /* We should not free the result of heap_getnext... */
+                       /* We didn't find a usable entry, so make a new one */
+                       ct = CatalogCacheCreateEntry(cache, ntp,
+                                                                                hashValue, hashIndex,
+                                                                                false);
                }
 
-               heap_endscan(sd);
+               ctlist = lcons(ct, ctlist);
+               nmembers++;
        }
 
-       /*
-        * close the relation
-        */
+       systable_endscan(scandesc);
+
        heap_close(relation, AccessShareLock);
 
        /*
-        * scan is complete.  If tuple was not found, we need to build
-        * a fake tuple for the negative cache entry.  The fake tuple has
-        * the correct key columns, but nulls everywhere else.
+        * Now we can build the CatCList entry.  First we need a dummy tuple
+        * containing the key values...
         */
-       if (ct->negative)
+       ntp = build_dummy_tuple(cache, nkeys, cur_skey);
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       cl = (CatCList *) palloc(sizeof(CatCList) + nmembers * sizeof(CatCTup *));
+       heap_copytuple_with_tuple(ntp, &cl->tuple);
+       MemoryContextSwitchTo(oldcxt);
+       heap_freetuple(ntp);
+
+       cl->cl_magic = CL_MAGIC;
+       cl->my_cache = cache;
+       DLInitElem(&cl->cache_elem, (void *) cl);
+       cl->refcount = 1;                       /* count this first reference */
+       cl->dead = false;
+       cl->ordered = ordered;
+       cl->nkeys = nkeys;
+       cl->hash_value = lHashValue;
+       cl->n_members = nmembers;
+       /* The list is backwards because we built it with lcons */
+       for (i = nmembers; --i >= 0; )
        {
-               TupleDesc       tupDesc = cache->cc_tupdesc;
-               Datum      *values;
-               char       *nulls;
-               Oid                     negOid = InvalidOid;
+               cl->members[i] = ct = (CatCTup *) lfirst(ctlist);
+               Assert(ct->c_list == NULL);
+               ct->c_list = cl;
+               /* mark list dead if any members already dead */
+               if (ct->dead)
+                       cl->dead = true;
+               ctlist = lnext(ctlist);
+       }
 
-               values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-               nulls = (char *) palloc(tupDesc->natts * sizeof(char));
+       DLAddHead(&cache->cc_lists, &cl->cache_elem);
 
-               memset(values, 0, tupDesc->natts * sizeof(Datum));
-               memset(nulls, 'n', tupDesc->natts * sizeof(char));
+       CACHE3_elog(DEBUG1, "SearchCatCacheList(%s): made list of %d members",
+                               cache->cc_relname, nmembers);
 
-               for (i = 0; i < cache->cc_nkeys; i++)
-               {
-                       int             attindex = cache->cc_key[i];
-                       Datum   keyval = cur_skey[i].sk_argument;
+       return cl;
+}
 
-                       if (attindex > 0)
-                       {
-                               /*
-                                * Here we must be careful in case the caller passed a
-                                * C string where a NAME is wanted: convert the given
-                                * argument to a correctly padded NAME.  Otherwise the
-                                * memcpy() done in heap_formtuple could fall off the
-                                * end of memory.
-                                */
-                               if (cache->cc_isname[i])
-                               {
-                                       Name    newval = (Name) palloc(NAMEDATALEN);
+/*
+ *     ReleaseCatCacheList
+ *
+ *     Decrement the reference counts of a catcache list.
+ */
+void
+ReleaseCatCacheList(CatCList *list)
+{
+       int                     i;
 
-                                       namestrcpy(newval, DatumGetCString(keyval));
-                                       keyval = NameGetDatum(newval);
-                               }
-                               values[attindex-1] = keyval;
-                               nulls[attindex-1] = ' ';
-                       }
-                       else
-                       {
-                               Assert(attindex == ObjectIdAttributeNumber);
-                               negOid = DatumGetObjectId(keyval);
-                       }
-               }
+       /* Safety checks to ensure we were handed a cache entry */
+       Assert(list->cl_magic == CL_MAGIC);
+       Assert(list->refcount > 0);
 
-               ntp = heap_formtuple(tupDesc, values, nulls);
+       for (i = list->n_members; --i >= 0; )
+       {
+               CatCTup    *ct = list->members[i];
 
-               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               heap_copytuple_with_tuple(ntp, &ct->tuple);
-               ct->tuple.t_data->t_oid = negOid;
-               MemoryContextSwitchTo(oldcxt);
+               Assert(ct->refcount > 0);
 
-               heap_freetuple(ntp);
-               for (i = 0; i < cache->cc_nkeys; i++)
-               {
-                       if (cache->cc_isname[i])
-                               pfree(DatumGetName(values[cache->cc_key[i]-1]));
-               }
-               pfree(values);
-               pfree(nulls);
+               ct->refcount--;
+
+               if (ct->dead)
+                       list->dead = true;
+               /* can't remove tuple before list is removed */
        }
 
+       list->refcount--;
+
+       if (list->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+               && list->dead
+#endif
+               )
+               CatCacheRemoveCList(list->my_cache, list);
+}
+
+
+/*
+ * CatalogCacheCreateEntry
+ *             Create a new CatCTup entry, copying the given HeapTuple and other
+ *             supplied data into it.  The new entry is given refcount 1.
+ */
+static CatCTup *
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+                                               uint32 hashValue, Index hashIndex, bool negative)
+{
+       CatCTup    *ct;
+       MemoryContext oldcxt;
+
+       /*
+        * Allocate CatCTup header in cache memory, and copy the tuple there too.
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       ct = (CatCTup *) palloc(sizeof(CatCTup));
+       heap_copytuple_with_tuple(ntp, &ct->tuple);
+       MemoryContextSwitchTo(oldcxt);
+
        /*
-        * Finish initializing the CatCTup header, and add it to the linked
-        * lists.
+        * Finish initializing the CatCTup header, and add it to the cache's
+        * linked lists and counts.
         */
        ct->ct_magic = CT_MAGIC;
        ct->my_cache = cache;
        DLInitElem(&ct->lrulist_elem, (void *) ct);
        DLInitElem(&ct->cache_elem, (void *) ct);
+       ct->c_list = NULL;
        ct->refcount = 1;                       /* count this first reference */
        ct->dead = false;
+       ct->negative = negative;
        ct->hash_value = hashValue;
 
        DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
        DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
 
+       cache->cc_ntup++;
+       CacheHdr->ch_ntup++;
+
        /*
         * If we've exceeded the desired size of the caches, try to throw away
         * the least recently used entry.  NB: the newly-built entry cannot
         * get thrown away here, because it has positive refcount.
         */
-       ++cache->cc_ntup;
-       if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
+       if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
        {
-               Dlelem     *prevelt;
+               Dlelem     *elt,
+                                  *prevelt;
 
                for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt)
                {
@@ -1271,7 +1608,7 @@ SearchCatCache(CatCache *cache,
 
                        if (oldct->refcount == 0)
                        {
-                               CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal",
+                               CACHE2_elog(DEBUG1, "CatCacheCreateEntry(%s): Overflow, LRU removal",
                                                        cache->cc_relname);
 #ifdef CATCACHE_STATS
                                oldct->my_cache->cc_discards++;
@@ -1283,65 +1620,75 @@ SearchCatCache(CatCache *cache,
                }
        }
 
-       CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
-                               cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
-
-       if (ct->negative)
-       {
-               CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
-                                       cache->cc_relname, hashIndex);
-
-               /*
-                * We are not returning the new entry to the caller, so reset its
-                * refcount.  Note it would be uncool to set the refcount to 0
-                * before doing the extra-entry removal step above.
-                */
-               ct->refcount = 0;               /* negative entries never have refs */
-
-               return NULL;
-       }
-
-       CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
-                               cache->cc_relname, hashIndex);
-
-#ifdef CATCACHE_STATS
-       cache->cc_newloads++;
-#endif
-
-       return &ct->tuple;
+       return ct;
 }
 
 /*
- *     ReleaseCatCache()
+ * build_dummy_tuple
+ *             Generate a palloc'd HeapTuple that contains the specified key
+ *             columns, and NULLs for other columns.
  *
- *     Decrement the reference count of a catcache entry (releasing the
- *     hold grabbed by a successful SearchCatCache).
- *
- *     NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
- *     will be freed as soon as their refcount goes to zero.  In combination
- *     with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
- *     to catch references to already-released catcache entries.
+ * This is used to store the keys for negative cache entries and CatCList
+ * entries, which don't have real tuples associated with them.
  */
-void
-ReleaseCatCache(HeapTuple tuple)
+static HeapTuple
+build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys)
 {
-       CatCTup    *ct = (CatCTup *) (((char *) tuple) -
-                                                                 offsetof(CatCTup, tuple));
+       HeapTuple       ntp;
+       TupleDesc       tupDesc = cache->cc_tupdesc;
+       Datum      *values;
+       char       *nulls;
+       Oid                     tupOid = InvalidOid;
+       NameData        tempNames[4];
+       int                     i;
 
-       /* Safety checks to ensure we were handed a cache entry */
-       Assert(ct->ct_magic == CT_MAGIC);
-       Assert(ct->refcount > 0);
+       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+       nulls = (char *) palloc(tupDesc->natts * sizeof(char));
 
-       ct->refcount--;
+       memset(values, 0, tupDesc->natts * sizeof(Datum));
+       memset(nulls, 'n', tupDesc->natts * sizeof(char));
 
-       if (ct->refcount == 0
-#ifndef CATCACHE_FORCE_RELEASE
-               && ct->dead
-#endif
-               )
-               CatCacheRemoveCTup(ct->my_cache, ct);
+       for (i = 0; i < nkeys; i++)
+       {
+               int             attindex = cache->cc_key[i];
+               Datum   keyval = skeys[i].sk_argument;
+
+               if (attindex > 0)
+               {
+                       /*
+                        * Here we must be careful in case the caller passed a
+                        * C string where a NAME is wanted: convert the given
+                        * argument to a correctly padded NAME.  Otherwise the
+                        * memcpy() done in heap_formtuple could fall off the
+                        * end of memory.
+                        */
+                       if (cache->cc_isname[i])
+                       {
+                               Name    newval = &tempNames[i];
+
+                               namestrcpy(newval, DatumGetCString(keyval));
+                               keyval = NameGetDatum(newval);
+                       }
+                       values[attindex-1] = keyval;
+                       nulls[attindex-1] = ' ';
+               }
+               else
+               {
+                       Assert(attindex == ObjectIdAttributeNumber);
+                       tupOid = DatumGetObjectId(keyval);
+               }
+       }
+
+       ntp = heap_formtuple(tupDesc, values, nulls);
+       ntp->t_data->t_oid = tupOid;
+
+       pfree(values);
+       pfree(nulls);
+
+       return ntp;
 }
 
+
 /*
  *     PrepareToInvalidateCacheTuple()
  *
index 84cea0b..7e6a95a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.73 2002/04/05 00:31:31 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.74 2002/04/06 06:59:23 tgl Exp $
  *
  * NOTES
  *       These routines allow the parser/planner/executor to perform
@@ -626,3 +626,18 @@ SysCacheGetAttr(int cacheId, HeapTuple tup,
                                                SysCache[cacheId]->cc_tupdesc,
                                                isNull);
 }
+
+/*
+ * List-search interface
+ */
+struct catclist *
+SearchSysCacheList(int cacheId, int nkeys,
+                                  Datum key1, Datum key2, Datum key3, Datum key4)
+{
+       if (cacheId < 0 || cacheId >= SysCacheSize ||
+               ! PointerIsValid(SysCache[cacheId]))
+               elog(ERROR, "SearchSysCacheList: Bad cache id %d", cacheId);
+
+       return SearchCatCacheList(SysCache[cacheId], nkeys,
+                                                         key1, key2, key3, key4);
+}
index a8a64bd..4c81e78 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: namespace.h,v 1.5 2002/04/01 03:34:27 tgl Exp $
+ * $Id: namespace.h,v 1.6 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "nodes/primnodes.h"
 
 
+/*
+ *     This structure holds a list of possible functions or operators
+ *     found by namespace lookup.  Each function/operator is identified
+ *     by OID and by argument types; the list must be pruned by type
+ *     resolution rules that are embodied in the parser, not here.
+ *     The number of arguments is assumed to be known a priori.
+ */
+typedef struct _FuncCandidateList
+{
+       struct _FuncCandidateList *next;
+       int                     pathpos;                /* for internal use of namespace lookup */
+       Oid                     oid;                    /* the function or operator's OID */
+       Oid                     args[1];                /* arg types --- VARIABLE LENGTH ARRAY */
+} *FuncCandidateList;                  /* VARIABLE LENGTH STRUCT */
+
+
 extern Oid     RangeVarGetRelid(const RangeVar *relation, bool failOK);
 
 extern Oid     RangeVarGetCreationNamespace(const RangeVar *newRelation);
@@ -25,6 +41,8 @@ extern Oid    RelnameGetRelid(const char *relname);
 
 extern Oid     TypenameGetTypid(const char *typname);
 
+extern FuncCandidateList FuncnameGetCandidates(List *names, int nargs);
+
 extern Oid     QualifiedNameGetCreationNamespace(List *names, char **objname_p);
 
 extern RangeVar *makeRangeVarFromNameList(List *names);
index 8e98f41..2a1a83f 100644 (file)
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catcache.h,v 1.41 2002/03/26 19:16:56 tgl Exp $
+ * $Id: catcache.h,v 1.42 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,6 +25,7 @@
 
 /*
  *             struct catctup:                 individual tuple in the cache.
+ *             struct catclist:                list of tuples matching a partial key.
  *             struct catcache:                information for managing a cache.
  *             struct catcacheheader:  information for managing all the caches.
  */
@@ -36,7 +37,7 @@ typedef struct catcache
        const char *cc_relname;         /* name of relation the tuples come from */
        const char *cc_indname;         /* name of index matching cache keys */
        Oid                     cc_reloid;              /* OID of relation the tuples come from */
-       bool            cc_relisshared; /* is relation shared? */
+       bool            cc_relisshared; /* is relation shared across databases? */
        TupleDesc       cc_tupdesc;             /* tuple descriptor (copied from reldesc) */
        int                     cc_reloidattr;  /* AttrNumber of relation OID attr, or 0 */
        int                     cc_ntup;                /* # of tuples currently in this cache */
@@ -46,6 +47,7 @@ typedef struct catcache
        PGFunction      cc_hashfunc[4]; /* hash function to use for each key */
        ScanKeyData cc_skey[4];         /* precomputed key info for heap scans */
        bool            cc_isname[4];   /* flag key columns that are NAMEs */
+       Dllist          cc_lists;               /* list of CatCList structs */
 #ifdef CATCACHE_STATS
        long            cc_searches;    /* total # searches against this cache */
        long            cc_hits;                /* # of matches against existing entry */
@@ -57,6 +59,8 @@ typedef struct catcache
         */
        long            cc_invals;              /* # of entries invalidated from cache */
        long            cc_discards;    /* # of entries discarded due to overflow */
+       long            cc_lsearches;   /* total # list-searches */
+       long            cc_lhits;               /* # of matches against existing lists */
 #endif
        Dllist          cc_bucket[1];   /* hash buckets --- VARIABLE LENGTH ARRAY */
 } CatCache;                                            /* VARIABLE LENGTH STRUCT */
@@ -64,15 +68,25 @@ typedef struct catcache
 
 typedef struct catctup
 {
-       int                     ct_magic;               /* for Assert checks */
+       int                     ct_magic;               /* for identifying CatCTup entries */
 #define CT_MAGIC   0x57261502
        CatCache   *my_cache;           /* link to owning catcache */
 
        /*
-        * Each tuple in a cache is a member of two lists: one lists all the
+        * Each tuple in a cache is a member of two Dllists: one lists all the
         * elements in all the caches in LRU order, and the other lists just
         * the elements in one hashbucket of one cache, also in LRU order.
         *
+        * The tuple may also be a member of at most one CatCList.  (If a single
+        * catcache is list-searched with varying numbers of keys, we may have
+        * to make multiple entries for the same tuple because of this
+        * restriction.  Currently, that's not expected to be common, so we
+        * accept the potential inefficiency.)
+        */
+       Dlelem          lrulist_elem;   /* list member of global LRU list */
+       Dlelem          cache_elem;             /* list member of per-bucket list */
+       struct catclist *c_list;        /* containing catclist, or NULL if none */
+       /*
         * A tuple marked "dead" must not be returned by subsequent searches.
         * However, it won't be physically deleted from the cache until its
         * refcount goes to zero.
@@ -82,8 +96,6 @@ typedef struct catctup
         * so far as avoiding catalog searches is concerned.  Management of
         * positive and negative entries is identical.
         */
-       Dlelem          lrulist_elem;   /* list member of global LRU list */
-       Dlelem          cache_elem;             /* list member of per-bucket list */
        int                     refcount;               /* number of active references */
        bool            dead;                   /* dead but not yet removed? */
        bool            negative;               /* negative cache entry? */
@@ -92,6 +104,47 @@ typedef struct catctup
 } CatCTup;
 
 
+typedef struct catclist
+{
+       int                     cl_magic;               /* for identifying CatCList entries */
+#define CL_MAGIC   0x52765103
+       CatCache   *my_cache;           /* link to owning catcache */
+
+       /*
+        * A CatCList describes the result of a partial search, ie, a search
+        * using only the first K key columns of an N-key cache.  We form the
+        * keys used into a tuple (with other attributes NULL) to represent
+        * the stored key set.  The CatCList object contains links to cache
+        * entries for all the table rows satisfying the partial key.  (Note:
+        * none of these will be negative cache entries.)
+        *
+        * A CatCList is only a member of a per-cache list; we do not do
+        * separate LRU management for CatCLists.  Instead, a CatCList is
+        * dropped from the cache as soon as any one of its member tuples
+        * ages out due to tuple-level LRU management.
+        *
+        * A list marked "dead" must not be returned by subsequent searches.
+        * However, it won't be physically deleted from the cache until its
+        * refcount goes to zero.  (Its member tuples must have refcounts at
+        * least as large, so they won't go away either.)
+        *
+        * If "ordered" is true then the member tuples appear in the order of
+        * the cache's underlying index.  This will be true in normal operation,
+        * but might not be true during bootstrap or recovery operations.
+        * (namespace.c is able to save some cycles when it is true.)
+        */
+       Dlelem          cache_elem;             /* list member of per-catcache list */
+       int                     refcount;               /* number of active references */
+       bool            dead;                   /* dead but not yet removed? */
+       bool            ordered;                /* members listed in index order? */
+       short           nkeys;                  /* number of lookup keys specified */
+       uint32          hash_value;             /* hash value for lookup keys */
+       HeapTupleData tuple;            /* header for tuple holding keys */
+       int                     n_members;              /* number of member tuples */
+       CatCTup    *members[1];         /* members --- VARIABLE LENGTH ARRAY */
+} CatCList;                                            /* VARIABLE LENGTH STRUCT */
+
+
 typedef struct catcacheheader
 {
        CatCache   *ch_caches;          /* head of list of CatCache structs */
@@ -117,6 +170,11 @@ extern HeapTuple SearchCatCache(CatCache *cache,
                           Datum v3, Datum v4);
 extern void ReleaseCatCache(HeapTuple tuple);
 
+extern CatCList *SearchCatCacheList(CatCache *cache, int nkeys,
+                          Datum v1, Datum v2,
+                          Datum v3, Datum v4);
+extern void ReleaseCatCacheList(CatCList *list);
+
 extern void ResetCatalogCaches(void);
 extern void CatalogCacheFlushRelation(Oid relId);
 extern void CatalogCacheIdInvalidate(int cacheId, uint32 hashValue,
index d08dfad..1d9ddd9 100644 (file)
@@ -9,7 +9,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: syscache.h,v 1.41 2002/03/29 19:06:26 tgl Exp $
+ * $Id: syscache.h,v 1.42 2002/04/06 06:59:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,4 +79,9 @@ extern Oid GetSysCacheOid(int cacheId,
 extern Datum SysCacheGetAttr(int cacheId, HeapTuple tup,
                                AttrNumber attributeNumber, bool *isNull);
 
+/* list-search interface.  Users of this must import catcache.h too */
+extern struct catclist *SearchSysCacheList(int cacheId, int nkeys,
+                          Datum key1, Datum key2, Datum key3, Datum key4);
+#define ReleaseSysCacheList(x)  ReleaseCatCacheList(x)
+
 #endif   /* SYSCACHE_H */