From f5206262c885b627455b52712507454e6df7141b Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Mon, 22 Nov 2004 20:31:53 +0000
Subject: [PATCH] Try to instill some sanity in plperl's function result
 processing. Get rid of static variables for SETOF result, don't crash when
 called from non-FROM context, eliminate dead code, etc.

---
 src/pl/plperl/plperl.c | 399 ++++++++++++++---------------------------
 1 file changed, 131 insertions(+), 268 deletions(-)

diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c
index b2f4bf74a4..9aa5102e19 100644
--- a/src/pl/plperl/plperl.c
+++ b/src/pl/plperl/plperl.c
@@ -33,7 +33,7 @@
  *	  ENHANCEMENTS, OR MODIFICATIONS.
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.61 2004/11/21 22:13:37 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.62 2004/11/22 20:31:53 tgl Exp $
  *
  **********************************************************************/
 
@@ -83,8 +83,8 @@ typedef struct plperl_proc_desc
 	bool		lanpltrusted;
 	bool		fn_retistuple;	/* true, if function returns tuple */
 	bool		fn_retisset;	/* true, if function returns set */
-	Oid			ret_oid;		/* Oid of returning type */
-	FmgrInfo	result_in_func;
+	Oid			result_oid;		/* Oid of result type */
+	FmgrInfo	result_in_func;	/* I/O function and arg for result type */
 	Oid			result_typioparam;
 	int			nargs;
 	FmgrInfo	arg_out_func[FUNC_MAX_ARGS];
@@ -101,9 +101,6 @@ static int	plperl_firstcall = 1;
 static bool plperl_safe_init_done = false;
 static PerlInterpreter *plperl_interp = NULL;
 static HV  *plperl_proc_hash = NULL;
-static AV  *g_column_keys = NULL;
-static SV  *srf_perlret = NULL; /* keep returned value */
-static int	g_attr_num = 0;
 
 /* this is saved and restored by plperl_call_handler */
 static plperl_proc_desc *plperl_current_prodesc = NULL;
@@ -163,27 +160,7 @@ plperl_init(void)
 		return;
 
 	/************************************************************
-	 * Free the proc hash table
-	 ************************************************************/
-	if (plperl_proc_hash != NULL)
-	{
-		hv_undef(plperl_proc_hash);
-		SvREFCNT_dec((SV *) plperl_proc_hash);
-		plperl_proc_hash = NULL;
-	}
-
-	/************************************************************
-	 * Destroy the existing Perl interpreter
-	 ************************************************************/
-	if (plperl_interp != NULL)
-	{
-		perl_destruct(plperl_interp);
-		perl_free(plperl_interp);
-		plperl_interp = NULL;
-	}
-
-	/************************************************************
-	 * Now recreate a new Perl interpreter
+	 * Create the Perl interpreter
 	 ************************************************************/
 	plperl_init_interp();
 
@@ -217,8 +194,7 @@ plperl_init_all(void)
 static void
 plperl_init_interp(void)
 {
-
-	char	   *embedding[3] = {
+	static char	   *embedding[3] = {
 		"", "-e",
 
 		/*
@@ -238,7 +214,7 @@ plperl_init_interp(void)
 	perl_run(plperl_interp);
 
 	/************************************************************
-	 * Initialize the proc and query hash tables
+	 * Initialize the procedure hash table
 	 ************************************************************/
 	plperl_proc_hash = newHV();
 }
@@ -269,7 +245,6 @@ plperl_safe_init(void)
 			   ;
 
 	SV		   *res;
-
 	float		safe_version;
 
 	res = eval_pv(safe_module, FALSE);	/* TRUE = croak if failure */
@@ -415,54 +390,6 @@ plperl_trigger_build_args(FunctionCallInfo fcinfo)
 }
 
 
-/**********************************************************************
- * check return value from plperl function
- **********************************************************************/
-static int
-plperl_is_set(SV *sv)
-{
-	int			i = 0;
-	int			len = 0;
-	int			set = 0;
-	int			other = 0;
-	AV		   *input_av;
-	SV		  **val;
-
-	if (SvTYPE(sv) != SVt_RV)
-		return 0;
-
-	if (SvTYPE(SvRV(sv)) == SVt_PVHV)
-		return 0;
-
-	if (SvTYPE(SvRV(sv)) == SVt_PVAV)
-	{
-		input_av = (AV *) SvRV(sv);
-		len = av_len(input_av) + 1;
-
-		for (i = 0; i < len; i++)
-		{
-			val = av_fetch(input_av, i, FALSE);
-			if (SvTYPE(*val) == SVt_RV)
-				set = 1;
-			else
-				other = 1;
-		}
-	}
-
-	if (len == 0)
-		return 1;
-	if (set && !other)
-		return 1;
-	if (!set && other)
-		return 0;
-	if (set && other)
-		elog(ERROR, "plperl: check your return value structure");
-	if (!set && !other)
-		elog(ERROR, "plperl: check your return value structure");
-
-	return 0;					/* for compiler */
-}
-
 /**********************************************************************
  * extract a list of keys from a hash
  **********************************************************************/
@@ -505,7 +432,6 @@ plperl_get_key(AV *keys, int index)
  * extract a value for a given key from a hash
  *
  * return NULL on error or if we got an undef
- *
  **********************************************************************/
 static char *
 plperl_get_elem(HV *hash, char *key)
@@ -516,6 +442,28 @@ plperl_get_elem(HV *hash, char *key)
 	return SvTYPE(*svp) == SVt_NULL ? NULL : SvPV(*svp, PL_na);
 }
 
+/*
+ * Obtain tuple descriptor for a function returning tuple
+ *
+ * NB: copy the result if needed for any great length of time
+ */
+static TupleDesc
+get_function_tupdesc(Oid result_type, ReturnSetInfo *rsinfo)
+{
+	if (result_type == RECORDOID)
+	{
+		/* We must get the information from call context */
+		if (!rsinfo || !IsA(rsinfo, ReturnSetInfo) ||
+			rsinfo->expectedDesc == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("could not determine row description for function returning record")));
+		return rsinfo->expectedDesc;
+	}
+	else				/* ordinary composite type */
+		return lookup_rowtype_tupdesc(result_type, -1);
+}
+
 /**********************************************************************
  * set up the new tuple returned from a trigger
  **********************************************************************/
@@ -630,16 +578,10 @@ plperl_call_handler(PG_FUNCTION_ARGS)
 
 	PG_TRY();
 	{
-		/************************************************************
-		 * Connect to SPI manager
-		 ************************************************************/
-		if (SPI_connect() != SPI_OK_CONNECT)
-			elog(ERROR, "could not connect to SPI manager");
-
-		/************************************************************
+		/*
 		 * Determine if called as function or trigger and
 		 * call appropriate subhandler
-		 ************************************************************/
+		 */
 		if (CALLED_AS_TRIGGER(fcinfo))
 			retval = PointerGetDatum(plperl_trigger_handler(fcinfo));
 		else
@@ -910,6 +852,10 @@ plperl_func_handler(PG_FUNCTION_ARGS)
 	SV		   *perlret;
 	Datum		retval;
 
+	/* Connect to SPI manager */
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "could not connect to SPI manager");
+
 	/* Find or compile the function */
 	prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false);
 
@@ -920,19 +866,14 @@ plperl_func_handler(PG_FUNCTION_ARGS)
 	 ************************************************************/
 	if (!prodesc->fn_retisset)
 		perlret = plperl_call_perl_func(prodesc, fcinfo);
+	else if (SRF_IS_FIRSTCALL())
+		perlret = plperl_call_perl_func(prodesc, fcinfo);
 	else
 	{
-		if (SRF_IS_FIRSTCALL()) /* call function only once */
-			srf_perlret = plperl_call_perl_func(prodesc, fcinfo);
-		perlret = srf_perlret;
-	}
+		/* Get back the SV stashed on initial call */
+		FuncCallContext *funcctx = (FuncCallContext *) fcinfo->flinfo->fn_extra;
 
-	if (prodesc->fn_retisset && SRF_IS_FIRSTCALL())
-	{
-		if (prodesc->fn_retistuple)
-			g_column_keys = newAV();
-		if (SvTYPE(perlret) != SVt_RV)
-			elog(ERROR, "plperl: set-returning function must return reference");
+		perlret = (SV *) funcctx->user_fctx;
 	}
 
 	/************************************************************
@@ -947,147 +888,78 @@ plperl_func_handler(PG_FUNCTION_ARGS)
 	if (!(perlret && SvOK(perlret) && SvTYPE(perlret) != SVt_NULL))
 	{
 		/* return NULL if Perl code returned undef */
-		fcinfo->isnull = true;
+		ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+
+		if (perlret)
+			SvREFCNT_dec(perlret);
+		if (rsi && IsA(rsi, ReturnSetInfo))
+			rsi->isDone = ExprEndResult;
+		PG_RETURN_NULL();
 	}
 
-	if (prodesc->fn_retisset && !(perlret && SvTYPE(SvRV(perlret)) == SVt_PVAV))
+	if (prodesc->fn_retisset &&
+		(SvTYPE(perlret) != SVt_RV || SvTYPE(SvRV(perlret)) != SVt_PVAV))
 		elog(ERROR, "plperl: set-returning function must return reference to array");
 
-	if (prodesc->fn_retistuple && perlret && SvTYPE(perlret) != SVt_RV)
+	if (prodesc->fn_retistuple && SvTYPE(perlret) != SVt_RV)
 		elog(ERROR, "plperl: composite-returning function must return a reference");
 
-	if (prodesc->fn_retisset && !fcinfo->resultinfo)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("set-valued function called in context that cannot accept a set")));
-
-	if (prodesc->fn_retistuple && fcinfo->resultinfo)	/* set of tuples */
+	if (prodesc->fn_retisset && prodesc->fn_retistuple)
 	{
-		/*
-		 *  This branch will be taken when the function call
-		 *  appears in a context that can return a set of tuples,
-		 *  even if it only actually returns a single tuple
-		 *  (e.g. select a from foo() where foo returns a singleton
-		 *  of some composite type with member a). In this case, the
-		 *  return value will be a hashref. If a rowset is returned
-		 *  it will be an arrayref whose members will be hashrefs.
-		 *
-		 *  Care is taken in the code only to refer to the appropriate
-		 *  one of ret_hv and ret_av, only one of which is therefore
-		 *  valid for any given call.
-		 *
-		 *  XXX This code is in dire need of cleanup.
-		 */
-	
-		/* SRF support */
-		HV		   *ret_hv = NULL;
-		AV		   *ret_av = NULL;
+		/* set of tuples */
+		AV		   *ret_av = (AV *) SvRV(perlret);
 		FuncCallContext *funcctx;
-		int			call_cntr;
-		int			max_calls;
 		TupleDesc	tupdesc;
 		AttInMetadata *attinmeta;
-		bool		isset;
-		char	  **values = NULL;
-		ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
-		isset = plperl_is_set(perlret);
-
-		if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-			ret_hv = (HV *) SvRV(perlret);
-		else
-			ret_av = (AV *) SvRV(perlret);
 
 		if (SRF_IS_FIRSTCALL())
 		{
 			MemoryContext oldcontext;
-			int			i;
 
 			funcctx = SRF_FIRSTCALL_INIT();
 
-			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-			if (SvTYPE(SvRV(perlret)) == SVt_PVHV)
-			{
-				if (isset)
-					funcctx->max_calls = hv_iterinit(ret_hv);
-				else
-					funcctx->max_calls = 1;
-			}
-			else
-			{
-				if (isset)
-					funcctx->max_calls = av_len(ret_av) + 1;
-				else
-					funcctx->max_calls = 1;
-			}
-
-			tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
-
-			g_attr_num = tupdesc->natts;
+			funcctx->user_fctx = (void *) perlret;
 
-			for (i = 0; i < tupdesc->natts; i++)
-				av_store(g_column_keys, i + 1,
-						 newSVpv(SPI_fname(tupdesc, i+1), 0));
+			funcctx->max_calls = av_len(ret_av) + 1;
 
-			attinmeta = TupleDescGetAttInMetadata(tupdesc);
-			funcctx->attinmeta = attinmeta;
+			/* Cache a copy of the result's tupdesc and attinmeta */
+			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+			tupdesc = get_function_tupdesc(prodesc->result_oid,
+										(ReturnSetInfo *) fcinfo->resultinfo);
+			tupdesc = CreateTupleDescCopy(tupdesc);
+			funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 			MemoryContextSwitchTo(oldcontext);
 		}
 
 		funcctx = SRF_PERCALL_SETUP();
-		call_cntr = funcctx->call_cntr;
-		max_calls = funcctx->max_calls;
 		attinmeta = funcctx->attinmeta;
 		tupdesc = attinmeta->tupdesc;
 
-		if (call_cntr < max_calls)
+		if (funcctx->call_cntr < funcctx->max_calls)
 		{
+			SV		  **svp;
+			HV		   *row_hv;
+			char	  **values;
 			HeapTuple	tuple;
-			Datum		result;
 			int			i;
-			char	   *column_key;
-			char	   *elem;
-
-			if (isset)
-			{
-				HV		   *row_hv;
-				SV		  **svp;
-
-				svp = av_fetch(ret_av, call_cntr, FALSE);
 
-				row_hv = (HV *) SvRV(*svp);
+			svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
-				values = (char **) palloc(g_attr_num * sizeof(char *));
+			if (SvTYPE(*svp) != SVt_RV)
+				elog(ERROR, "plperl: check your return value structure");
+			row_hv = (HV *) SvRV(*svp);
 
-				for (i = 0; i < g_attr_num; i++)
-				{
-					column_key = plperl_get_key(g_column_keys, i + 1);
-					elem = plperl_get_elem(row_hv, column_key);
-					if (elem)
-						values[i] = elem;
-					else
-						values[i] = NULL;
-				}
-			}
-			else
+			values = (char **) palloc(tupdesc->natts * sizeof(char *));
+			for (i = 0; i < tupdesc->natts; i++)
 			{
-				int			i;
+				char	   *column_key;
 
-				values = (char **) palloc(g_attr_num * sizeof(char *));
-				for (i = 0; i < g_attr_num; i++)
-				{
-					column_key = SPI_fname(tupdesc, i + 1);
-					elem = plperl_get_elem(ret_hv, column_key);
-					if (elem)
-						values[i] = elem;
-					else
-						values[i] = NULL;
-				}
+				column_key = SPI_fname(tupdesc, i + 1);
+				values[i] = plperl_get_elem(row_hv, column_key);
 			}
 			tuple = BuildTupleFromCStrings(attinmeta, values);
-			result = HeapTupleGetDatum(tuple);
-			SRF_RETURN_NEXT(funcctx, result);
+			retval = HeapTupleGetDatum(tuple);
+			SRF_RETURN_NEXT(funcctx, retval);
 		}
 		else
 		{
@@ -1095,95 +967,91 @@ plperl_func_handler(PG_FUNCTION_ARGS)
 			SRF_RETURN_DONE(funcctx);
 		}
 	}
-	else if (prodesc->fn_retisset)		/* set of non-tuples */
+	else if (prodesc->fn_retisset)
 	{
+		/* set of non-tuples */
+		AV		   *ret_av = (AV *) SvRV(perlret);
 		FuncCallContext *funcctx;
 
 		if (SRF_IS_FIRSTCALL())
 		{
-			MemoryContext oldcontext;
-
 			funcctx = SRF_FIRSTCALL_INIT();
-			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-			funcctx->max_calls = av_len((AV *) SvRV(perlret)) + 1;
+			funcctx->user_fctx = (void *) perlret;
+
+			funcctx->max_calls = av_len(ret_av) + 1;
 		}
 
 		funcctx = SRF_PERCALL_SETUP();
 
 		if (funcctx->call_cntr < funcctx->max_calls)
 		{
-			Datum		result;
-			AV		   *array;
 			SV		  **svp;
 
-			array = (AV *) SvRV(perlret);
-			svp = av_fetch(array, funcctx->call_cntr, FALSE);
+			svp = av_fetch(ret_av, funcctx->call_cntr, FALSE);
 
 			if (SvTYPE(*svp) != SVt_NULL)
 			{
+				char	   *val = SvPV(*svp, PL_na);
+
 				fcinfo->isnull = false;
-				result = FunctionCall3(&prodesc->result_in_func,
-									   PointerGetDatum(SvPV(*svp, PL_na)),
+				retval = FunctionCall3(&prodesc->result_in_func,
+									   PointerGetDatum(val),
 							ObjectIdGetDatum(prodesc->result_typioparam),
 									   Int32GetDatum(-1));
 			}
 			else
 			{
 				fcinfo->isnull = true;
-				result = (Datum) 0;
+				retval = (Datum) 0;
 			}
-			SRF_RETURN_NEXT(funcctx, result);
+			SRF_RETURN_NEXT(funcctx, retval);
 		}
 		else
 		{
-			if (perlret)
-				SvREFCNT_dec(perlret);
+			SvREFCNT_dec(perlret);
 			SRF_RETURN_DONE(funcctx);
 		}
 	}
-	else if (!fcinfo->isnull)	/* non-null singleton */
+	else if (prodesc->fn_retistuple)
 	{
-		if (prodesc->fn_retistuple)		/* singleton perl hash to Datum */
+		/* singleton perl hash to Datum */
+		HV		   *perlhash = (HV *) SvRV(perlret);
+		TupleDesc	td;
+		int			i;
+		char	  **values;
+		AttInMetadata *attinmeta;
+		HeapTuple	tup;
+
+		/*
+		 * XXX should cache the attinmetadata instead of recomputing
+		 */
+		td = get_function_tupdesc(prodesc->result_oid,
+								  (ReturnSetInfo *) fcinfo->resultinfo);
+		/* td = CreateTupleDescCopy(td); */
+		attinmeta = TupleDescGetAttInMetadata(td);
+
+		values = (char **) palloc(td->natts * sizeof(char *));
+		for (i = 0; i < td->natts; i++)
 		{
-			TupleDesc	td = lookup_rowtype_tupdesc(prodesc->ret_oid, (int32) -1);
-			HV		   *perlhash = (HV *) SvRV(perlret);
-			int			i;
-			char	  **values;
-			char	   *key,
-					   *val;
-			AttInMetadata *attinmeta;
-			HeapTuple	tup;
-
-			if (!td)
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("no TupleDesc info available")));
-
-			values = (char **) palloc(td->natts * sizeof(char *));
-			for (i = 0; i < td->natts; i++)
-			{
+			char	   *key;
 
-				key = SPI_fname(td, i + 1);
-				val = plperl_get_elem(perlhash, key);
-				if (val)
-					values[i] = val;
-				else
-					values[i] = NULL;
-			}
-			attinmeta = TupleDescGetAttInMetadata(td);
-			tup = BuildTupleFromCStrings(attinmeta, values);
-			retval = HeapTupleGetDatum(tup);
+			key = SPI_fname(td, i + 1);
+			values[i] = plperl_get_elem(perlhash, key);
 		}
-		else
-			/* perl string to Datum */
-			retval = FunctionCall3(&prodesc->result_in_func,
-								   PointerGetDatum(SvPV(perlret, PL_na)),
-							ObjectIdGetDatum(prodesc->result_typioparam),
-								   Int32GetDatum(-1));
+		tup = BuildTupleFromCStrings(attinmeta, values);
+		retval = HeapTupleGetDatum(tup);
+	}
+	else
+	{
+		/* perl string to Datum */
+		char	   *val = SvPV(perlret, PL_na);
+
+		retval = FunctionCall3(&prodesc->result_in_func,
+							   CStringGetDatum(val),
+							   ObjectIdGetDatum(prodesc->result_typioparam),
+							   Int32GetDatum(-1));
 	}
-	else		/* null singleton */
-		retval = (Datum) 0;
 
 	SvREFCNT_dec(perlret);
 	return retval;
@@ -1202,6 +1070,10 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
 	SV		   *svTD;
 	HV		   *hvTD;
 
+	/* Connect to SPI manager */
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "could not connect to SPI manager");
+
 	/* Find or compile the function */
 	prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
 
@@ -1248,7 +1120,6 @@ plperl_trigger_handler(PG_FUNCTION_ARGS)
 	{
 		if (!fcinfo->isnull)
 		{
-
 			HeapTuple	trv;
 
 			if (strcasecmp(tmp, "SKIP") == 0)
@@ -1441,17 +1312,10 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
 				}
 			}
 
-			prodesc->fn_retisset = procStruct->proretset;		/* true, if function
-																 * returns set */
-
-			if (typeStruct->typtype == 'c' || procStruct->prorettype == RECORDOID)
-			{
-				prodesc->fn_retistuple = true;
-				prodesc->ret_oid =
-					procStruct->prorettype == RECORDOID ?
-					typeStruct->typrelid :
-					procStruct->prorettype;
-			}
+			prodesc->result_oid = procStruct->prorettype;
+			prodesc->fn_retisset = procStruct->proretset;
+			prodesc->fn_retistuple = (typeStruct->typtype == 'c' ||
+									  procStruct->prorettype == RECORDOID);
 
 			perm_fmgr_info(typeStruct->typinput, &(prodesc->result_in_func));
 			prodesc->result_typioparam = getTypeIOParam(typeTup);
@@ -1509,7 +1373,6 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
 		 * create the text of the anonymous subroutine.
 		 * we do not use a named subroutine so that we can call directly
 		 * through the reference.
-		 *
 		 ************************************************************/
 		prosrcdatum = SysCacheGetAttr(PROCOID, procTup,
 									  Anum_pg_proc_prosrc, &isnull);
-- 
2.50.1