* We have an execution_state record for each query in a function. Each
* record contains a plantree for its query. If the query is currently in
* F_EXEC_RUN state then there's a QueryDesc too.
+ *
+ * The "next" fields chain together all the execution_state records generated
+ * from a single original parsetree. (There will only be more than one in
+ * case of rule expansion of the original parsetree.)
*/
typedef enum
{
JunkFilter *junkFilter; /* will be NULL if function returns VOID */
- /* head of linked list of execution_state records */
- execution_state *func_state;
+ /*
+ * func_state is a List of execution_state records, each of which is the
+ * first for its original parsetree, with any additional records chained
+ * to it via the "next" fields. This sublist structure is needed to keep
+ * track of where the original query boundaries are.
+ */
+ List *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
-static execution_state *init_execution_state(List *queryTree_list,
+static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
static void sqlfunction_destroy(DestReceiver *self);
-/* Set up the list of per-query execution_state records for a SQL function */
-static execution_state *
+/*
+ * Set up the per-query execution_state records for a SQL function.
+ *
+ * The input is a List of Lists of parsed and rewritten, but not planned,
+ * querytrees. The sublist structure denotes the original query boundaries.
+ */
+static List *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
- execution_state *firstes = NULL;
- execution_state *preves = NULL;
+ List *eslist = NIL;
execution_state *lasttages = NULL;
- ListCell *qtl_item;
+ ListCell *lc1;
- foreach(qtl_item, queryTree_list)
+ foreach(lc1, queryTree_list)
{
- Query *queryTree = (Query *) lfirst(qtl_item);
- Node *stmt;
- execution_state *newes;
+ List *qtlist = (List *) lfirst(lc1);
+ execution_state *firstes = NULL;
+ execution_state *preves = NULL;
+ ListCell *lc2;
- Assert(IsA(queryTree, Query));
+ foreach(lc2, qtlist)
+ {
+ Query *queryTree = (Query *) lfirst(lc2);
+ Node *stmt;
+ execution_state *newes;
- if (queryTree->commandType == CMD_UTILITY)
- stmt = queryTree->utilityStmt;
- else
- stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
+ Assert(IsA(queryTree, Query));
- /* Precheck all commands for validity in a function */
- if (IsA(stmt, TransactionStmt))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- /* translator: %s is a SQL statement name */
- errmsg("%s is not allowed in a SQL function",
- CreateCommandTag(stmt))));
+ /* Plan the query if needed */
+ if (queryTree->commandType == CMD_UTILITY)
+ stmt = queryTree->utilityStmt;
+ else
+ stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
- if (fcache->readonly_func && !CommandIsReadOnly(stmt))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- /* translator: %s is a SQL statement name */
- errmsg("%s is not allowed in a non-volatile function",
- CreateCommandTag(stmt))));
+ /* Precheck all commands for validity in a function */
+ if (IsA(stmt, TransactionStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ /* translator: %s is a SQL statement name */
+ errmsg("%s is not allowed in a SQL function",
+ CreateCommandTag(stmt))));
- newes = (execution_state *) palloc(sizeof(execution_state));
- if (preves)
- preves->next = newes;
- else
- firstes = newes;
+ if (fcache->readonly_func && !CommandIsReadOnly(stmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ /* translator: %s is a SQL statement name */
+ errmsg("%s is not allowed in a non-volatile function",
+ CreateCommandTag(stmt))));
+
+ /* OK, build the execution_state for this query */
+ newes = (execution_state *) palloc(sizeof(execution_state));
+ if (preves)
+ preves->next = newes;
+ else
+ firstes = newes;
- newes->next = NULL;
- newes->status = F_EXEC_START;
- newes->setsResult = false; /* might change below */
- newes->lazyEval = false; /* might change below */
- newes->stmt = stmt;
- newes->qd = NULL;
+ newes->next = NULL;
+ newes->status = F_EXEC_START;
+ newes->setsResult = false; /* might change below */
+ newes->lazyEval = false; /* might change below */
+ newes->stmt = stmt;
+ newes->qd = NULL;
- if (queryTree->canSetTag)
- lasttages = newes;
+ if (queryTree->canSetTag)
+ lasttages = newes;
- preves = newes;
+ preves = newes;
+ }
+
+ eslist = lappend(eslist, firstes);
}
/*
}
}
- return firstes;
+ return eslist;
}
/* Initialize the SQLFunctionCache for a SQL function */
SQLFunctionCachePtr fcache;
Oid *argOidVect;
int nargs;
+ List *raw_parsetree_list;
List *queryTree_list;
+ List *flat_query_list;
+ ListCell *lc;
Datum tmp;
bool isNull;
fcache->src = TextDatumGetCString(tmp);
/*
- * Parse and rewrite the queries in the function text.
+ * Parse and rewrite the queries in the function text. Use sublists to
+ * keep track of the original query boundaries. But we also build a
+ * "flat" list of the rewritten queries to pass to check_sql_fn_retval.
+ * This is because the last canSetTag query determines the result type
+ * independently of query boundaries --- and it might not be in the last
+ * sublist, for example if the last query rewrites to DO INSTEAD NOTHING.
+ * (It might not be unreasonable to throw an error in such a case, but
+ * this is the historical behavior and it doesn't seem worth changing.)
*/
- queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs);
+ raw_parsetree_list = pg_parse_query(fcache->src);
+
+ queryTree_list = NIL;
+ flat_query_list = NIL;
+ foreach(lc, raw_parsetree_list)
+ {
+ Node *parsetree = (Node *) lfirst(lc);
+ List *queryTree_sublist;
+
+ queryTree_sublist = pg_analyze_and_rewrite(parsetree,
+ fcache->src,
+ argOidVect,
+ nargs);
+ queryTree_list = lappend(queryTree_list, queryTree_sublist);
+ flat_query_list = list_concat(flat_query_list,
+ list_copy(queryTree_sublist));
+ }
/*
* Check that the function returns the type it claims to. Although in
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
- queryTree_list,
+ flat_query_list,
NULL,
&fcache->junkFilter);
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
- Snapshot snapshot;
DestReceiver *dest;
Assert(es->qd == NULL);
- /*
- * In a read-only function, use the surrounding query's snapshot;
- * otherwise take a new snapshot for each query. The snapshot should
- * include a fresh command ID so that all work to date in this transaction
- * is visible.
- */
- if (fcache->readonly_func)
- snapshot = GetActiveSnapshot();
- else
- {
- CommandCounterIncrement();
- snapshot = GetTransactionSnapshot();
- }
+ /* Caller should have ensured a suitable snapshot is active */
+ Assert(ActiveSnapshotSet());
/*
* If this query produces the function result, send its output to the
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
- snapshot, InvalidSnapshot,
+ GetActiveSnapshot(),
+ InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
- snapshot,
+ GetActiveSnapshot(),
dest,
fcache->paramLI);
- /* We assume we don't need to set up ActiveSnapshot for ExecutorStart */
-
/* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL)
{
{
bool result;
- /* Make our snapshot the active one for any called functions */
- PushActiveSnapshot(es->qd->snapshot);
-
if (es->qd->utilitystmt)
{
/* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */
result = (count == 0L || es->qd->estate->es_processed == 0);
}
- PopActiveSnapshot();
-
return result;
}
/* Utility commands don't need Executor. */
if (es->qd->utilitystmt == NULL)
{
- /* Make our snapshot the active one for any called functions */
- PushActiveSnapshot(es->qd->snapshot);
-
ExecutorFinish(es->qd);
ExecutorEnd(es->qd);
-
- PopActiveSnapshot();
}
(*es->qd->dest->rDestroy) (es->qd->dest);
ErrorContextCallback sqlerrcontext;
bool randomAccess;
bool lazyEvalOK;
+ bool is_first;
+ bool pushed_snapshot;
execution_state *es;
TupleTableSlot *slot;
Datum result;
+ List *eslist;
+ ListCell *eslc;
/*
* Switch to context in which the fcache lives. This ensures that
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
- es = fcache->func_state;
+ eslist = fcache->func_state;
+
+ /*
+ * Find first unfinished query in function, and note whether it's the
+ * first query.
+ */
+ es = NULL;
+ is_first = true;
+ foreach(eslc, eslist)
+ {
+ es = (execution_state *) lfirst(eslc);
+
+ while (es && es->status == F_EXEC_DONE)
+ {
+ is_first = false;
+ es = es->next;
+ }
+
+ if (es)
+ break;
+ }
/*
* Convert params to appropriate format if starting a fresh execution. (If
* continuing execution, we can re-use prior params.)
*/
- if (es && es->status == F_EXEC_START)
+ if (is_first && es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
if (!fcache->tstore)
fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem);
- /*
- * Find first unfinished query in function.
- */
- while (es && es->status == F_EXEC_DONE)
- es = es->next;
-
/*
* Execute each command in the function one after another until we either
* run out of commands or get a result row from a lazily-evaluated SELECT.
+ *
+ * Notes about snapshot management:
+ *
+ * In a read-only function, we just use the surrounding query's snapshot.
+ *
+ * In a non-read-only function, we rely on the fact that we'll never
+ * suspend execution between queries of the function: the only reason to
+ * suspend execution before completion is if we are returning a row from
+ * a lazily-evaluated SELECT. So, when first entering this loop, we'll
+ * either start a new query (and push a fresh snapshot) or re-establish
+ * the active snapshot from the existing query descriptor. If we need to
+ * start a new query in a subsequent execution of the loop, either we need
+ * a fresh snapshot (and pushed_snapshot is false) or the existing
+ * snapshot is on the active stack and we can just bump its command ID.
*/
+ pushed_snapshot = false;
while (es)
{
bool completed;
if (es->status == F_EXEC_START)
+ {
+ /*
+ * If not read-only, be sure to advance the command counter for
+ * each command, so that all work to date in this transaction is
+ * visible. Take a new snapshot if we don't have one yet,
+ * otherwise just bump the command ID in the existing snapshot.
+ */
+ if (!fcache->readonly_func)
+ {
+ CommandCounterIncrement();
+ if (!pushed_snapshot)
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pushed_snapshot = true;
+ }
+ else
+ UpdateActiveSnapshotCommandId();
+ }
+
postquel_start(es, fcache);
+ }
+ else if (!fcache->readonly_func && !pushed_snapshot)
+ {
+ /* Re-establish active snapshot when re-entering function */
+ PushActiveSnapshot(es->qd->snapshot);
+ pushed_snapshot = true;
+ }
completed = postquel_getnext(es, fcache);
*/
if (es->status != F_EXEC_DONE)
break;
+
+ /*
+ * Advance to next execution_state, which might be in the next list.
+ */
es = es->next;
+ while (!es)
+ {
+ eslc = lnext(eslc);
+ if (!eslc)
+ break; /* end of function */
+
+ es = (execution_state *) lfirst(eslc);
+
+ /*
+ * Flush the current snapshot so that we will take a new one
+ * for the new query list. This ensures that new snaps are
+ * taken at original-query boundaries, matching the behavior
+ * of interactive execution.
+ */
+ if (pushed_snapshot)
+ {
+ PopActiveSnapshot();
+ pushed_snapshot = false;
+ }
+ }
}
/*
tuplestore_clear(fcache->tstore);
}
+ /* Pop snapshot if we have pushed one */
+ if (pushed_snapshot)
+ PopActiveSnapshot();
+
/*
* If we've gone through every command in the function, we are done. Reset
* the execution states to start over again on next call.
*/
if (es == NULL)
{
- es = fcache->func_state;
- while (es)
+ foreach(eslc, fcache->func_state)
{
- es->status = F_EXEC_START;
- es = es->next;
+ es = (execution_state *) lfirst(eslc);
+ while (es)
+ {
+ es->status = F_EXEC_START;
+ es = es->next;
+ }
}
}
{
execution_state *es;
int query_num;
+ ListCell *lc;
- es = fcache->func_state;
+ es = NULL;
query_num = 1;
- while (es)
+ foreach(lc, fcache->func_state)
{
- if (es->qd)
+ es = (execution_state *) lfirst(lc);
+ while (es)
{
- errcontext("SQL function \"%s\" statement %d",
- fcache->fname, query_num);
- break;
+ if (es->qd)
+ {
+ errcontext("SQL function \"%s\" statement %d",
+ fcache->fname, query_num);
+ break;
+ }
+ es = es->next;
}
- es = es->next;
+ if (es)
+ break;
query_num++;
}
if (es == NULL)
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
- execution_state *es = fcache->func_state;
+ execution_state *es;
+ ListCell *lc;
- while (es != NULL)
+ foreach(lc, fcache->func_state)
{
- /* Shut down anything still running */
- if (es->status == F_EXEC_RUN)
- postquel_end(es);
- /* Reset states to START in case we're called again */
- es->status = F_EXEC_START;
- es = es->next;
+ es = (execution_state *) lfirst(lc);
+ while (es)
+ {
+ /* Shut down anything still running */
+ if (es->status == F_EXEC_RUN)
+ {
+ /* Re-establish active snapshot for any called functions */
+ if (!fcache->readonly_func)
+ PushActiveSnapshot(es->qd->snapshot);
+
+ postquel_end(es);
+
+ if (!fcache->readonly_func)
+ PopActiveSnapshot();
+ }
+
+ /* Reset states to START in case we're called again */
+ es->status = F_EXEC_START;
+ es = es->next;
+ }
}
/* Release tuplestore if we have one */
Oid my_lastoid = InvalidOid;
SPITupleTable *my_tuptable = NULL;
int res = 0;
- bool have_active_snap = ActiveSnapshotSet();
+ bool pushed_active_snap = false;
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
spierrcontext.previous = error_context_stack;
error_context_stack = &spierrcontext;
+ /*
+ * We support four distinct snapshot management behaviors:
+ *
+ * snapshot != InvalidSnapshot, read_only = true: use exactly the given
+ * snapshot.
+ *
+ * snapshot != InvalidSnapshot, read_only = false: use the given
+ * snapshot, modified by advancing its command ID before each querytree.
+ *
+ * snapshot == InvalidSnapshot, read_only = true: use the entry-time
+ * ActiveSnapshot, if any (if there isn't one, we run with no snapshot).
+ *
+ * snapshot == InvalidSnapshot, read_only = false: take a full new
+ * snapshot for each user command, and advance its command ID before each
+ * querytree within the command.
+ *
+ * In the first two cases, we can just push the snap onto the stack
+ * once for the whole plan list.
+ */
+ if (snapshot != InvalidSnapshot)
+ {
+ if (read_only)
+ {
+ PushActiveSnapshot(snapshot);
+ pushed_active_snap = true;
+ }
+ else
+ {
+ /* Make sure we have a private copy of the snapshot to modify */
+ PushCopiedSnapshot(snapshot);
+ pushed_active_snap = true;
+ }
+ }
+
foreach(lc1, plan->plancache_list)
{
CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
stmt_list = plansource->plan->stmt_list;
}
+ /*
+ * In the default non-read-only case, get a new snapshot, replacing
+ * any that we pushed in a previous cycle.
+ */
+ if (snapshot == InvalidSnapshot && !read_only)
+ {
+ if (pushed_active_snap)
+ PopActiveSnapshot();
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pushed_active_snap = true;
+ }
+
foreach(lc2, stmt_list)
{
Node *stmt = (Node *) lfirst(lc2);
bool canSetTag;
DestReceiver *dest;
- bool pushed_active_snap = false;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
/*
* If not read-only mode, advance the command counter before each
- * command.
+ * command and update the snapshot.
*/
if (!read_only)
+ {
CommandCounterIncrement();
+ UpdateActiveSnapshotCommandId();
+ }
dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
- if (snapshot == InvalidSnapshot)
- {
- /*
- * Default read_only behavior is to use the entry-time
- * ActiveSnapshot, if any; if read-write, grab a full new
- * snap.
- */
- if (read_only)
- {
- if (have_active_snap)
- {
- PushActiveSnapshot(GetActiveSnapshot());
- pushed_active_snap = true;
- }
- }
- else
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- pushed_active_snap = true;
- }
- }
- else
- {
- /*
- * We interpret read_only with a specified snapshot to be
- * exactly that snapshot, but read-write means use the snap
- * with advancing of command ID.
- */
- if (read_only)
- PushActiveSnapshot(snapshot);
- else
- PushUpdatedSnapshot(snapshot);
- pushed_active_snap = true;
- }
-
if (IsA(stmt, PlannedStmt) &&
((PlannedStmt *) stmt)->utilityStmt == NULL)
{
res = SPI_OK_UTILITY;
}
- if (pushed_active_snap)
- PopActiveSnapshot();
-
/*
* The last canSetTag query sets the status values returned to the
* caller. Be careful to free any tuptables not returned, to
fail:
+ /* Pop the snapshot off the stack if we pushed one */
+ if (pushed_active_snap)
+ PopActiveSnapshot();
+
/* We no longer need the cached plan refcount, if any */
if (cplan)
ReleaseCachedPlan(cplan, true);
elog(DEBUG3, "ProcessQuery");
- /*
- * Must always set a snapshot for plannable queries.
- */
- PushActiveSnapshot(GetTransactionSnapshot());
-
/*
* Create the QueryDesc object
*/
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
-
- PopActiveSnapshot();
}
/*
* seems to be to enumerate those that do not need one; this is a short
* list. Transaction control, LOCK, and SET must *not* set a snapshot
* since they need to be executable at the start of a transaction-snapshot
- * mode transaction without freezing a snapshot. By extension we allow SHOW
- * not to set a snapshot. The other stmts listed are just efficiency
+ * mode transaction without freezing a snapshot. By extension we allow
+ * SHOW not to set a snapshot. The other stmts listed are just efficiency
* hacks. Beware of listing anything that can modify the database --- if,
* say, it has to update an index with expressions that invoke
* user-defined functions, then it had better have a snapshot.
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
+ bool active_snapshot_set = false;
ListCell *stmtlist_item;
/*
if (log_executor_stats)
ResetUsage();
+ /*
+ * Must always have a snapshot for plannable queries. First time
+ * through, take a new snapshot; for subsequent queries in the
+ * same portal, just update the snapshot's copy of the command
+ * counter.
+ */
+ if (!active_snapshot_set)
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ active_snapshot_set = true;
+ }
+ else
+ UpdateActiveSnapshotCommandId();
+
if (pstmt->canSetTag)
{
/* statement can set tag string */
*
* These are assumed canSetTag if they're the only stmt in the
* portal.
+ *
+ * We must not set a snapshot here for utility commands (if one is
+ * needed, PortalRunUtility will do it). If a utility command is
+ * alone in a portal then everything's fine. The only case where
+ * a utility command can be part of a longer list is that rules
+ * are allowed to include NotifyStmt. NotifyStmt doesn't care
+ * whether it has a snapshot or not, so we just leave the current
+ * snapshot alone if we have one.
*/
if (list_length(portal->stmts) == 1)
- PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag);
+ {
+ Assert(!active_snapshot_set);
+ /* statement can set tag string */
+ PortalRunUtility(portal, stmt, isTopLevel,
+ dest, completionTag);
+ }
else
- PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL);
+ {
+ Assert(IsA(stmt, NotifyStmt));
+ /* stmt added by rewrite cannot set tag */
+ PortalRunUtility(portal, stmt, isTopLevel,
+ altdest, NULL);
+ }
}
/*
MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
}
+ /* Pop the snapshot if we pushed one. */
+ if (active_snapshot_set)
+ PopActiveSnapshot();
+
/*
* If a command completion tag was supplied, use it. Otherwise use the
* portal's commandTag as the default completion tag.