OSDN Git Service

Try to instill some sanity in plperl's function result processing.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Nov 2004 20:31:53 +0000 (20:31 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Nov 2004 20:31:53 +0000 (20:31 +0000)
Get rid of static variables for SETOF result, don't crash when called
from non-FROM context, eliminate dead code, etc.

src/pl/plperl/plperl.c

index b2f4bf7..9aa5102 100644 (file)
@@ -33,7 +33,7 @@
  *       ENHANCEMENTS, OR MODIFICATIONS.
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.61 2004/11/21 22:13:37 tgl Exp $
+ *       $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.62 2004/11/22 20:31:53 tgl Exp $
  *
  **********************************************************************/
 
@@ -83,8 +83,8 @@ typedef struct plperl_proc_desc
        bool            lanpltrusted;
        bool            fn_retistuple;  /* true, if function returns tuple */
        bool            fn_retisset;    /* true, if function returns set */
-       Oid                     ret_oid;                /* Oid of returning type */
-       FmgrInfo        result_in_func;
+       Oid                     result_oid;             /* Oid of result type */
+       FmgrInfo        result_in_func; /* I/O function and arg for result type */
        Oid                     result_typioparam;
        int                     nargs;
        FmgrInfo        arg_out_func[FUNC_MAX_ARGS];
@@ -101,9 +101,6 @@ static int  plperl_firstcall = 1;
 static bool plperl_safe_init_done = false;
 static PerlInterpreter *plperl_interp = NULL;
 static HV  *plperl_proc_hash = NULL;
-static AV  *g_column_keys = NULL;
-static SV  *srf_perlret = NULL; /* keep returned value */
-static int     g_attr_num = 0;
 
 /* this is saved and restored by plperl_call_handler */
 static plperl_proc_desc *plperl_current_prodesc = NULL;
@@ -163,27 +160,7 @@ plperl_init(void)
                return;
 
        /************************************************************
-        * Free the proc hash table
-        ************************************************************/
-       if (plperl_proc_hash != NULL)
-       {
-               hv_undef(plperl_proc_hash);
-               SvREFCNT_dec((SV *) plperl_proc_hash);
-               plperl_proc_hash = NULL;
-       }
-
-       /************************************************************
-        * Destroy the existing Perl interpreter
-        ************************************************************/
-       if (plperl_interp != NULL)
-       {
-               perl_destruct(plperl_interp);
-               perl_free(plperl_interp);
-               plperl_interp = NULL;
-       }
-
-       /************************************************************
-        * Now recreate a new Perl interpreter
+        * Create the Perl interpreter
         ************************************************************/
        plperl_init_interp();
 
@@ -217,8 +194,7 @@ plperl_init_all(void)
 static void
 plperl_init_interp(void)
 {
-
-       char       *embedding[3] = {
+       static char        *embedding[3] = {
                "", "-e",
 
                /*
@@ -238,7 +214,7 @@ plperl_init_interp(void)
        perl_run(plperl_interp);
 
        /************************************************************
-        * Initialize the proc and query hash tables
+        * Initialize the procedure hash table
         ************************************************************/
        plperl_proc_hash = newHV();
 }
@@ -269,7 +245,6 @@ plperl_safe_init(void)
                           ;
 
        SV                 *res;
-
        float           safe_version;
 
        res = eval_pv(safe_module, FALSE);      /* TRUE = croak if failure */
@@ -416,54 +391,6 @@ plperl_trigger_build_args(FunctionCallInfo fcinfo)
 
 
 /**********************************************************************
- * check return value from plperl function
- **********************************************************************/
-static int
-plperl_is_set(SV *sv)
-{
-       int                     i = 0;
-       int                     len = 0;
-       int                     set = 0;
-       int                     other = 0;
-       AV                 *input_av;
-       SV                **val;
-
-       if (SvTYPE(sv) != SVt_RV)
-               return 0;
-
-       if (SvTYPE(SvRV(sv)) == SVt_PVHV)
-               return 0;
-
-       if (SvTYPE(SvRV(sv)) == SVt_PVAV)
-       {
-               input_av = (AV *) SvRV(sv);
-               len = av_len(input_av) + 1;
-
-               for (i = 0; i < len; i++)
-               {
-                       val = av_fetch(input_av, i, FALSE);
-                       if (SvTYPE(*val) == SVt_RV)
-                               set = 1;
-                       else
-                               other = 1;
-               }
-       }
-
-       if (len == 0)
-               return 1;
-       if (set && !other)
-               return 1;
-       if (!set && other)
-               return 0;
-       if (set && other)
-               elog(ERROR, "plperl: check your return value structure");
-       if (!set && !other)
-               elog(ERROR, "plperl: check your return value structure");
-
-       return 0;                                       /* for compiler */
-}
-
-/**********************************************************************
  * extract a list of keys from a hash
  **********************************************************************/
 static AV  *
@@ -505,7 +432,6 @@ plperl_get_key(AV *keys, int index)
  * extract a value for a given key from a hash
  *
  * return NULL on error or if we got an undef
- *
  **********************************************************************/
 static char *
 plperl_get_elem(HV *hash, char *key)
@@ -516,6 +442,28 @@ plperl_get_elem(HV *hash, char *key)
        return SvTYPE(*svp) == SVt_NULL ? NULL : SvPV(*svp, PL_na);
 }
 
+/*
+ * Obtain tuple descriptor for a function returning tuple
+ *
+ * NB: copy the result if needed for any great length of time
+ */
+static TupleDesc
+get_function_tupdesc(Oid result_type, ReturnSetInfo *rsinfo)
+{
+       if (result_type == RECORDOID)
+       {
+               /* We must get the information from call context */
+               if (!rsinfo || !IsA(rsinfo, ReturnSetInfo) ||
+                       rsinfo->expectedDesc == NULL)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("could not determine row description for function returning record")));
+               return rsinfo->expectedDesc;
+       }
+       else                            /* ordinary composite type */
+               return lookup_rowtype_tupdesc(result_type, -1);
+}
+
 /**********************************************************************
  * set up the new tuple returned from a trigger
  **********************************************************************/
@@ -630,16 +578,10 @@ plperl_call_handler(PG_FUNCTION_ARGS)
 
        PG_TRY();
        {
-               /************************************************************
-                * Connect to SPI manager
-                ************************************************************/
-               if (SPI_connect() != SPI_OK_CONNECT)
-                       elog(ERROR, "could not connect to SPI manager");
-
-               /************************************************************
+               /*
                 * Determine if called as function or trigger and
                 * call appropriate subhandler
-                ************************************************************/
+                */
                if (CALLED_AS_TRIGGER(fcinfo))
                        retval = PointerGetDatum(plperl_trigger_handler(fcinfo));
                else
@@ -910,6 +852,10 @@ plperl_func_handler(PG_FUNCTION_ARGS)
        SV                 *perlret;
        Datum           retval;
 
+       /* Connect to SPI manager */
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "could not connect to SPI manager");
+
        /* Find or compile the function */
        prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false);
 
@@ -920,19 +866,14 @@ plperl_func_handler(PG_FUNCTION_ARGS)
         ************************************************************/
        if (!prodesc->fn_retisset)
                perlret = plperl_call_perl_func(prodesc, fcinfo);
+       else if (SRF_IS_FIRSTCALL())
+               perlret = plperl_call_perl_func(prodesc, fcinfo);
        else
        {
-               if (SRF_IS_FIRSTCALL()) /* call function only once */
-                       srf_perlret = plperl_call_perl_func(prodesc, fcinfo);
-               perlret = srf_perlret;
-       }
+               /* Get back the SV stashed on initial call */
+               FuncCallContext *funcctx = (FuncCallContext *) fcinfo->flinfo->fn_extra;
 
-       if (prodesc->fn_retisset && SRF_IS_FIRSTCALL())
-       {
-               if (prodesc->fn_retistuple)
-                       g_column_keys = newAV();
-               if (SvTYPE(perlret) != SVt_RV)
-                       elog(ERROR, "plperl: set-returning function must return reference");
+               perlret = (SV *) funcctx->user_fctx;
        }
 
        /************************************************************
@@ -947,147 +888,78 @@ plperl_func_handler(PG_FUNCTION_ARGS)
        if (!(perlret && SvOK(perlret) && SvTYPE(perlret) != SVt_NULL))
        {
                /* return NULL if Perl code returned undef */
-               fcinfo->isnull = true;
+               ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+
+               if (perlret)
+                       SvREFCNT_dec(perlret);
+               if (rsi && IsA(rsi, ReturnSetInfo))
+                       rsi->isDone = ExprEndResult;
+               PG_RETURN_NULL();
        }
 
-       if (prodesc->fn_retisset && !(perlret && SvTYPE(SvRV(perlret)) == SVt_PVAV))
+       if (prodesc->fn_retisset &&
+               (SvTYPE(perlret) != SVt_RV || SvTYPE(SvRV(perlret)) != SVt_PVAV))
                elog(ERROR, "plperl: set-returning function must return reference to array");
 
-       if (prodesc->fn_retistuple && perlret && SvTYPE(perlret) != SVt_RV)
+       if (prodesc->fn_retistuple && SvTYPE(perlret) != SVt_RV)
                elog(ERROR, "plperl: composite-returning function must return a reference");
 
-       if (prodesc->fn_retisset && !fcinfo->resultinfo)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("set-valued function called in context that cannot accept a set")));
-
-       if (prodesc->fn_retistuple && fcinfo->resultinfo)       /* set of tuples */
+       if (prodesc->fn_retisset && prodesc->fn_retistuple)
        {
-               /*
-                *  This branch will be taken when the function call
-                *  appears in a context that can return a set of tuples,
-                *  even if it only actually returns a single tuple
-                *  (e.g. select a from foo() where foo returns a singleton
-                *  of some composite type with member a). In this case, the
-                *  return value will be a hashref. If a rowset is returned
-                *  it will be an arrayref whose members will be hashrefs.
-                *
-                *  Care is taken in the code only to refer to the appropriate
-                *  one of ret_hv and ret_av, only one of which is therefore
-                *  valid for any given call.
-                *
-                *  XXX This code is in dire need of cleanup.
-                */
-       
-               /* SRF support */
-               HV                 *ret_hv = NULL;
-               AV                 *ret_av = NULL;
+               /* set of tuples */
+               AV                 *ret_av = (AV *) SvRV(perlret);
                FuncCallContext *funcctx;
-               int                     call_cntr;
-               int                     max_calls;
                TupleDesc       tupdesc;
                AttInMetadata *attinmeta;
-               bool            isset;
-               char      **values = NULL;
-               ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
-               isset = plperl_is_set(perlret);
-
-               if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-                       ret_hv = (HV *) SvRV(perlret);
-               else
-                       ret_av = (AV *) SvRV(perlret);
 
                if (SRF_IS_FIRSTCALL())
                {
                        MemoryContext oldcontext;
-                       int                     i;
 
                        funcctx = SRF_FIRSTCALL_INIT();
 
-                       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-                       if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-                       {
-                               if (isset)
-                                       funcctx->max_calls = hv_iterinit(ret_hv);
-                               else
-                                       funcctx->max_calls = 1;
-                       }
-                       else
-                       {
-                               if (isset)
-                                       funcctx->max_calls = av_len(ret_av) + 1;
-                               else
-                                       funcctx->max_calls = 1;
-                       }
-
-                       tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
-
-                       g_attr_num = tupdesc->natts;
+                       funcctx->user_fctx = (void *) perlret;
 
-                       for (i = 0; i < tupdesc->natts; i++)
-                               av_store(g_column_keys, i + 1,
-                                                newSVpv(SPI_fname(tupdesc, i+1), 0));
+                       funcctx->max_calls = av_len(ret_av) + 1;
 
-                       attinmeta = TupleDescGetAttInMetadata(tupdesc);
-                       funcctx->attinmeta = attinmeta;
+                       /* Cache a copy of the result's tupdesc and attinmeta */
+                       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+                       tupdesc = get_function_tupdesc(prodesc->result_oid,
+                                                                               (ReturnSetInfo *) fcinfo->resultinfo);
+                       tupdesc = CreateTupleDescCopy(tupdesc);
+                       funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
                        MemoryContextSwitchTo(oldcontext);
                }
 
                funcctx = SRF_PERCALL_SETUP();
-               call_cntr = funcctx->call_cntr;
-               max_calls = funcctx->max_calls;
                attinmeta = funcctx->attinmeta;
                tupdesc = attinmeta->tupdesc;
 
-               if (call_cntr < max_calls)
+               if (funcctx->call_cntr < funcctx->max_calls)
                {
+                       SV                **svp;
+                       HV                 *row_hv;
+                       char      **values;
                        HeapTuple       tuple;
-                       Datum           result;
                        int                     i;
-                       char       *column_key;
-                       char       *elem;
-
-                       if (isset)
-                       {
-                               HV                 *row_hv;
-                               SV                **svp;
-
-                               svp = av_fetch(ret_av, call_cntr, FALSE);
 
-                               row_hv = (HV *) SvRV(*svp);
+                       svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
-                               values = (char **) palloc(g_attr_num * sizeof(char *));
+                       if (SvTYPE(*svp) != SVt_RV)
+                               elog(ERROR, "plperl: check your return value structure");
+                       row_hv = (HV *) SvRV(*svp);
 
-                               for (i = 0; i < g_attr_num; i++)
-                               {
-                                       column_key = plperl_get_key(g_column_keys, i + 1);
-                                       elem = plperl_get_elem(row_hv, column_key);
-                                       if (elem)
-                                               values[i] = elem;
-                                       else
-                                               values[i] = NULL;
-                               }
-                       }
-                       else
+                       values = (char **) palloc(tupdesc->natts * sizeof(char *));
+                       for (i = 0; i < tupdesc->natts; i++)
                        {
-                               int                     i;
+                               char       *column_key;
 
-                               values = (char **) palloc(g_attr_num * sizeof(char *));
-                               for (i = 0; i < g_attr_num; i++)
-                               {
-                                       column_key = SPI_fname(tupdesc, i + 1);
-                                       elem = plperl_get_elem(ret_hv, column_key);
-                                       if (elem)
-                                               values[i] = elem;
-                                       else
-                                               values[i] = NULL;
-                               }
+                               column_key = SPI_fname(tupdesc, i + 1);
+                               values[i] = plperl_get_elem(row_hv, column_key);
                        }
                        tuple = BuildTupleFromCStrings(attinmeta, values);
-                       result = HeapTupleGetDatum(tuple);
-                       SRF_RETURN_NEXT(funcctx, result);
+                       retval = HeapTupleGetDatum(tuple);
+                       SRF_RETURN_NEXT(funcctx, retval);
                }
                else
                {
@@ -1095,95 +967,91 @@ plperl_func_handler(PG_FUNCTION_ARGS)
                        SRF_RETURN_DONE(funcctx);
                }
        }
-       else if (prodesc->fn_retisset)          /* set of non-tuples */
+       else if (prodesc->fn_retisset)
        {
+               /* set of non-tuples */
+               AV                 *ret_av = (AV *) SvRV(perlret);
                FuncCallContext *funcctx;
 
                if (SRF_IS_FIRSTCALL())
                {
-                       MemoryContext oldcontext;
-
                        funcctx = SRF_FIRSTCALL_INIT();
-                       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-                       funcctx->max_calls = av_len((AV *) SvRV(perlret)) + 1;
+                       funcctx->user_fctx = (void *) perlret;
+
+                       funcctx->max_calls = av_len(ret_av) + 1;
                }
 
                funcctx = SRF_PERCALL_SETUP();
 
                if (funcctx->call_cntr < funcctx->max_calls)
                {
-                       Datum           result;
-                       AV                 *array;
                        SV                **svp;
 
-                       array = (AV *) SvRV(perlret);
-                       svp = av_fetch(array, funcctx->call_cntr, FALSE);
+                       svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
                        if (SvTYPE(*svp) != SVt_NULL)
                        {
+                               char       *val = SvPV(*svp, PL_na);
+
                                fcinfo->isnull = false;
-                               result = FunctionCall3(&prodesc->result_in_func,
-                                                                          PointerGetDatum(SvPV(*svp, PL_na)),
+                               retval = FunctionCall3(&prodesc->result_in_func,
+                                                                          PointerGetDatum(val),
                                                        ObjectIdGetDatum(prodesc->result_typioparam),
                                                                           Int32GetDatum(-1));
                        }
                        else
                        {
                                fcinfo->isnull = true;
-                               result = (Datum) 0;
+                               retval = (Datum) 0;
                        }
-                       SRF_RETURN_NEXT(funcctx, result);
+                       SRF_RETURN_NEXT(funcctx, retval);
                }
                else
                {
-                       if (perlret)
-                               SvREFCNT_dec(perlret);
+                       SvREFCNT_dec(perlret);
                        SRF_RETURN_DONE(funcctx);
                }
        }
-       else if (!fcinfo->isnull)       /* non-null singleton */
+       else if (prodesc->fn_retistuple)
        {
-               if (prodesc->fn_retistuple)             /* singleton perl hash to Datum */
+               /* singleton perl hash to Datum */
+               HV                 *perlhash = (HV *) SvRV(perlret);
+               TupleDesc       td;
+               int                     i;
+               char      **values;
+               AttInMetadata *attinmeta;
+               HeapTuple       tup;
+
+               /*
+                * XXX should cache the attinmetadata instead of recomputing
+                */
+               td = get_function_tupdesc(prodesc->result_oid,
+                                                                 (ReturnSetInfo *) fcinfo->resultinfo);
+               /* td = CreateTupleDescCopy(td); */
+               attinmeta = TupleDescGetAttInMetadata(td);
+
+               values = (char **) palloc(td->natts * sizeof(char *));
+               for (i = 0; i < td->natts; i++)
                {
-                       TupleDesc       td = lookup_rowtype_tupdesc(prodesc->ret_oid, (int32) -1);
-                       HV                 *perlhash = (HV *) SvRV(perlret);
-                       int                     i;
-                       char      **values;
-                       char       *key,
-                                          *val;
-                       AttInMetadata *attinmeta;
-                       HeapTuple       tup;
-
-                       if (!td)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("no TupleDesc info available")));
-
-                       values = (char **) palloc(td->natts * sizeof(char *));
-                       for (i = 0; i < td->natts; i++)
-                       {
+                       char       *key;
 
-                               key = SPI_fname(td, i + 1);
-                               val = plperl_get_elem(perlhash, key);
-                               if (val)
-                                       values[i] = val;
-                               else
-                                       values[i] = NULL;
-                       }
-                       attinmeta = TupleDescGetAttInMetadata(td);
-                       tup = BuildTupleFromCStrings(attinmeta, values);
-                       retval = HeapTupleGetDatum(tup);
+                       key = SPI_fname(td, i + 1);
+                       values[i] = plperl_get_elem(perlhash, key);
                }
-               else
-                       /* perl string to Datum */
-                       retval = FunctionCall3(&prodesc->result_in_func,
-                                                                  PointerGetDatum(SvPV(perlret, PL_na)),
-                                                       ObjectIdGetDatum(prodesc->result_typioparam),
-                                                                  Int32GetDatum(-1));
+               tup = BuildTupleFromCStrings(attinmeta, values);
+               retval = HeapTupleGetDatum(tup);
+       }
+       else
+       {
+               /* perl string to Datum */
+               char       *val = SvPV(perlret, PL_na);
+
+               retval = FunctionCall3(&prodesc->result_in_func,
+                                                          CStringGetDatum(val),
+                                                          ObjectIdGetDatum(prodesc->result_typioparam),
+                                                          Int32GetDatum(-1));
        }
-       else            /* null singleton */
-               retval = (Datum) 0;
 
        SvREFCNT_dec(perlret);
        return retval;
@@ -1202,6 +1070,10 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
        SV                 *svTD;
        HV                 *hvTD;
 
+       /* Connect to SPI manager */
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "could not connect to SPI manager");
+
        /* Find or compile the function */
        prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
 
@@ -1248,7 +1120,6 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
        {
                if (!fcinfo->isnull)
                {
-
                        HeapTuple       trv;
 
                        if (strcasecmp(tmp, "SKIP") == 0)
@@ -1441,17 +1312,10 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
                                }
                        }
 
-                       prodesc->fn_retisset = procStruct->proretset;           /* true, if function
-                                                                                                                                * returns set */
-
-                       if (typeStruct->typtype == 'c' || procStruct->prorettype == RECORDOID)
-                       {
-                               prodesc->fn_retistuple = true;
-                               prodesc->ret_oid =
-                                       procStruct->prorettype == RECORDOID ?
-                                       typeStruct->typrelid :
-                                       procStruct->prorettype;
-                       }
+                       prodesc->result_oid = procStruct->prorettype;
+                       prodesc->fn_retisset = procStruct->proretset;
+                       prodesc->fn_retistuple = (typeStruct->typtype == 'c' ||
+                                                                         procStruct->prorettype == RECORDOID);
 
                        perm_fmgr_info(typeStruct->typinput, &(prodesc->result_in_func));
                        prodesc->result_typioparam = getTypeIOParam(typeTup);
@@ -1509,7 +1373,6 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
                 * create the text of the anonymous subroutine.
                 * we do not use a named subroutine so that we can call directly
                 * through the reference.
-                *
                 ************************************************************/
                prosrcdatum = SysCacheGetAttr(PROCOID, procTup,
                                                                          Anum_pg_proc_prosrc, &isnull);