X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fexecutor%2Fspi.c;h=3563ba23d3ad2f7cbb8f2fed207402b3714936b9;hb=147d4bf3e5e3da2ee0f0cc132718ab1c4912a877;hp=60b39768d53541e981226a962f366a185e33ad24;hpb=d84fe82230c593f3dc5d7f427849b99d1efa8a0a;p=postgresql diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 60b39768d5..3563ba23d3 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -3,12 +3,12 @@ * spi.c * Server Programming Interface * - * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/executor/spi.c,v 1.71 2002/06/20 20:29:28 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/executor/spi.c,v 1.150 2006/04/04 19:35:34 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,10 +16,12 @@ #include "access/printtup.h" #include "catalog/heap.h" -#include "commands/portalcmds.h" +#include "commands/trigger.h" #include "executor/spi_priv.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/typcache.h" uint32 SPI_processed = 0; @@ -29,17 +31,23 @@ int SPI_result; static _SPI_connection *_SPI_stack = NULL; static _SPI_connection *_SPI_current = NULL; +static int _SPI_stack_depth = 0; /* allocated size of _SPI_stack */ static int _SPI_connected = -1; static int _SPI_curid = -1; -static int _SPI_execute(char *src, int tcount, _SPI_plan *plan); -static int _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount); +static void _SPI_prepare_plan(const char *src, _SPI_plan *plan); static int _SPI_execute_plan(_SPI_plan *plan, - Datum *Values, char *Nulls, int tcount); + Datum *Values, const char *Nulls, + Snapshot snapshot, Snapshot crosscheck_snapshot, + bool read_only, long tcount); -static void _SPI_cursor_operation(Portal portal, bool forward, int count, - CommandDest dest); +static int _SPI_pquery(QueryDesc *queryDesc, long tcount); + +static void _SPI_error_callback(void *arg); + +static void _SPI_cursor_operation(Portal portal, bool forward, long count, + DestReceiver *dest); static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location); @@ -55,54 +63,71 @@ static bool _SPI_checktuples(void); int SPI_connect(void) { - _SPI_connection *new_SPI_stack; + int newdepth; /* - * When procedure called by Executor _SPI_curid expected to be equal - * to _SPI_connected + * When procedure called by Executor _SPI_curid expected to be equal to + * _SPI_connected */ if (_SPI_curid != _SPI_connected) return SPI_ERROR_CONNECT; if (_SPI_stack == NULL) { - if (_SPI_connected != -1) - elog(FATAL, "SPI_connect: no connection(s) expected"); - new_SPI_stack = (_SPI_connection *) malloc(sizeof(_SPI_connection)); + if (_SPI_connected != -1 || _SPI_stack_depth != 0) + elog(ERROR, "SPI stack corrupted"); + newdepth = 16; + _SPI_stack = (_SPI_connection *) + MemoryContextAlloc(TopTransactionContext, + newdepth * sizeof(_SPI_connection)); + _SPI_stack_depth = newdepth; } else { - if (_SPI_connected <= -1) - elog(FATAL, "SPI_connect: some connection(s) expected"); - new_SPI_stack = (_SPI_connection *) realloc(_SPI_stack, - (_SPI_connected + 2) * sizeof(_SPI_connection)); + if (_SPI_stack_depth <= 0 || _SPI_stack_depth <= _SPI_connected) + elog(ERROR, "SPI stack corrupted"); + if (_SPI_stack_depth == _SPI_connected + 1) + { + newdepth = _SPI_stack_depth * 2; + _SPI_stack = (_SPI_connection *) + repalloc(_SPI_stack, + newdepth * sizeof(_SPI_connection)); + _SPI_stack_depth = newdepth; + } } - if (new_SPI_stack == NULL) - elog(ERROR, "Memory exhausted in SPI_connect"); - /* - * We' returning to procedure where _SPI_curid == _SPI_connected - 1 + * We're entering procedure where _SPI_curid == _SPI_connected - 1 */ - _SPI_stack = new_SPI_stack; _SPI_connected++; + Assert(_SPI_connected >= 0 && _SPI_connected < _SPI_stack_depth); _SPI_current = &(_SPI_stack[_SPI_connected]); - _SPI_current->qtlist = NULL; _SPI_current->processed = 0; + _SPI_current->lastoid = InvalidOid; _SPI_current->tuptable = NULL; + _SPI_current->procCxt = NULL; /* in case we fail to create 'em */ + _SPI_current->execCxt = NULL; + _SPI_current->connectSubid = GetCurrentSubTransactionId(); - /* Create memory contexts for this procedure */ + /* + * Create memory contexts for this procedure + * + * XXX it would be better to use PortalContext as the parent context, but + * we may not be inside a portal (consider deferred-trigger execution). + * Perhaps CurTransactionContext would do? For now it doesn't matter + * because we clean up explicitly in AtEOSubXact_SPI(). + */ _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext, "SPI Proc", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext, "SPI Exec", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); /* ... and switch to procedure's context */ _SPI_current->savedcxt = MemoryContextSwitchTo(_SPI_current->procCxt); @@ -123,73 +148,143 @@ SPI_finish(void) /* Release memory used in procedure call */ MemoryContextDelete(_SPI_current->execCxt); + _SPI_current->execCxt = NULL; MemoryContextDelete(_SPI_current->procCxt); + _SPI_current->procCxt = NULL; + + /* + * Reset result variables, especially SPI_tuptable which is probably + * pointing at a just-deleted tuptable + */ + SPI_processed = 0; + SPI_lastoid = InvalidOid; + SPI_tuptable = NULL; /* - * After _SPI_begin_call _SPI_connected == _SPI_curid. Now we are - * closing connection to SPI and returning to upper Executor and so - * _SPI_connected must be equal to _SPI_curid. + * After _SPI_begin_call _SPI_connected == _SPI_curid. Now we are closing + * connection to SPI and returning to upper Executor and so _SPI_connected + * must be equal to _SPI_curid. */ _SPI_connected--; _SPI_curid--; if (_SPI_connected == -1) - { - free(_SPI_stack); - _SPI_stack = NULL; _SPI_current = NULL; - } else - { - _SPI_connection *new_SPI_stack; - - new_SPI_stack = (_SPI_connection *) realloc(_SPI_stack, - (_SPI_connected + 1) * sizeof(_SPI_connection)); - /* This could only fail with a pretty stupid malloc package ... */ - if (new_SPI_stack == NULL) - elog(ERROR, "Memory exhausted in SPI_finish"); - _SPI_stack = new_SPI_stack; _SPI_current = &(_SPI_stack[_SPI_connected]); - } return SPI_OK_FINISH; - } /* - * Clean up SPI state at transaction commit or abort (we don't care which). + * Clean up SPI state at transaction commit or abort. */ void -AtEOXact_SPI(void) +AtEOXact_SPI(bool isCommit) { /* - * Note that memory contexts belonging to SPI stack entries will be - * freed automatically, so we can ignore them here. We just need to - * restore our static variables to initial state. + * Note that memory contexts belonging to SPI stack entries will be freed + * automatically, so we can ignore them here. We just need to restore our + * static variables to initial state. */ - if (_SPI_stack != NULL) /* there was abort */ - free(_SPI_stack); + if (isCommit && _SPI_connected != -1) + ereport(WARNING, + (errcode(ERRCODE_WARNING), + errmsg("transaction left non-empty SPI stack"), + errhint("Check for missing \"SPI_finish\" calls."))); + _SPI_current = _SPI_stack = NULL; + _SPI_stack_depth = 0; _SPI_connected = _SPI_curid = -1; SPI_processed = 0; SPI_lastoid = InvalidOid; SPI_tuptable = NULL; } +/* + * Clean up SPI state at subtransaction commit or abort. + * + * During commit, there shouldn't be any unclosed entries remaining from + * the current subtransaction; we emit a warning if any are found. + */ +void +AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) +{ + bool found = false; + + while (_SPI_connected >= 0) + { + _SPI_connection *connection = &(_SPI_stack[_SPI_connected]); + + if (connection->connectSubid != mySubid) + break; /* couldn't be any underneath it either */ + + found = true; + + /* + * Release procedure memory explicitly (see note in SPI_connect) + */ + if (connection->execCxt) + { + MemoryContextDelete(connection->execCxt); + connection->execCxt = NULL; + } + if (connection->procCxt) + { + MemoryContextDelete(connection->procCxt); + connection->procCxt = NULL; + } + + /* + * Pop the stack entry and reset global variables. Unlike + * SPI_finish(), we don't risk switching to memory contexts that might + * be already gone. + */ + _SPI_connected--; + _SPI_curid = _SPI_connected; + if (_SPI_connected == -1) + _SPI_current = NULL; + else + _SPI_current = &(_SPI_stack[_SPI_connected]); + SPI_processed = 0; + SPI_lastoid = InvalidOid; + SPI_tuptable = NULL; + } + + if (found && isCommit) + ereport(WARNING, + (errcode(ERRCODE_WARNING), + errmsg("subtransaction left non-empty SPI stack"), + errhint("Check for missing \"SPI_finish\" calls."))); +} + + +/* Pushes SPI stack to allow recursive SPI calls */ void SPI_push(void) { _SPI_curid++; } +/* Pops SPI stack to allow recursive SPI calls */ void SPI_pop(void) { _SPI_curid--; } +/* Restore state of SPI stack after aborting a subtransaction */ +void +SPI_restore_connection(void) +{ + Assert(_SPI_connected >= 0); + _SPI_curid = _SPI_connected - 1; +} + +/* Parse, plan, and execute a query string */ int -SPI_exec(char *src, int tcount) +SPI_execute(const char *src, bool read_only, long tcount) { + _SPI_plan plan; int res; if (src == NULL || tcount < 0) @@ -199,14 +294,32 @@ SPI_exec(char *src, int tcount) if (res < 0) return res; - res = _SPI_execute(src, tcount, NULL); + plan.plancxt = NULL; /* doesn't have own context */ + plan.query = src; + plan.nargs = 0; + plan.argtypes = NULL; + + _SPI_prepare_plan(src, &plan); + + res = _SPI_execute_plan(&plan, NULL, NULL, + InvalidSnapshot, InvalidSnapshot, + read_only, tcount); _SPI_end_call(true); return res; } +/* Obsolete version of SPI_execute */ +int +SPI_exec(const char *src, long tcount) +{ + return SPI_execute(src, false, tcount); +} + +/* Execute a previously prepared plan */ int -SPI_execp(void *plan, Datum *Values, char *Nulls, int tcount) +SPI_execute_plan(void *plan, Datum *Values, const char *Nulls, + bool read_only, long tcount) { int res; @@ -220,19 +333,63 @@ SPI_execp(void *plan, Datum *Values, char *Nulls, int tcount) if (res < 0) return res; - /* copy plan to current (executor) context */ - plan = (void *) _SPI_copy_plan(plan, _SPI_CPLAN_CURCXT); + res = _SPI_execute_plan((_SPI_plan *) plan, + Values, Nulls, + InvalidSnapshot, InvalidSnapshot, + read_only, tcount); + + _SPI_end_call(true); + return res; +} + +/* Obsolete version of SPI_execute_plan */ +int +SPI_execp(void *plan, Datum *Values, const char *Nulls, long tcount) +{ + return SPI_execute_plan(plan, Values, Nulls, false, tcount); +} + +/* + * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow + * the caller to specify exactly which snapshots to use. This is currently + * not documented in spi.sgml because it is only intended for use by RI + * triggers. + * + * Passing snapshot == InvalidSnapshot will select the normal behavior of + * fetching a new snapshot for each query. + */ +int +SPI_execute_snapshot(void *plan, + Datum *Values, const char *Nulls, + Snapshot snapshot, Snapshot crosscheck_snapshot, + bool read_only, long tcount) +{ + int res; + + if (plan == NULL || tcount < 0) + return SPI_ERROR_ARGUMENT; + + if (((_SPI_plan *) plan)->nargs > 0 && Values == NULL) + return SPI_ERROR_PARAM; + + res = _SPI_begin_call(true); + if (res < 0) + return res; - res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, tcount); + res = _SPI_execute_plan((_SPI_plan *) plan, + Values, Nulls, + snapshot, crosscheck_snapshot, + read_only, tcount); _SPI_end_call(true); return res; } void * -SPI_prepare(char *src, int nargs, Oid *argtypes) +SPI_prepare(const char *src, int nargs, Oid *argtypes) { - _SPI_plan *plan; + _SPI_plan plan; + _SPI_plan *result; if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL)) { @@ -244,21 +401,19 @@ SPI_prepare(char *src, int nargs, Oid *argtypes) if (SPI_result < 0) return NULL; - plan = (_SPI_plan *) palloc(sizeof(_SPI_plan)); /* Executor context */ - plan->argtypes = argtypes; - plan->nargs = nargs; + plan.plancxt = NULL; /* doesn't have own context */ + plan.query = src; + plan.nargs = nargs; + plan.argtypes = argtypes; - SPI_result = _SPI_execute(src, 0, plan); + _SPI_prepare_plan(src, &plan); - if (SPI_result >= 0) /* copy plan to procedure context */ - plan = _SPI_copy_plan(plan, _SPI_CPLAN_PROCXT); - else - plan = NULL; + /* copy plan to procedure context */ + result = _SPI_copy_plan(&plan, _SPI_CPLAN_PROCXT); _SPI_end_call(true); - return (void *) plan; - + return (void *) result; } void * @@ -282,7 +437,6 @@ SPI_saveplan(void *plan) SPI_result = 0; return (void *) newplan; - } int @@ -312,7 +466,7 @@ SPI_copytuple(HeapTuple tuple) if (_SPI_curid + 1 == _SPI_connected) /* connected */ { if (_SPI_current != &(_SPI_stack[_SPI_curid + 1])) - elog(FATAL, "SPI: stack corrupted"); + elog(ERROR, "SPI stack corrupted"); oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt); } @@ -324,40 +478,11 @@ SPI_copytuple(HeapTuple tuple) return ctuple; } -TupleDesc -SPI_copytupledesc(TupleDesc tupdesc) -{ - MemoryContext oldcxt = NULL; - TupleDesc ctupdesc; - - if (tupdesc == NULL) - { - SPI_result = SPI_ERROR_ARGUMENT; - return NULL; - } - - if (_SPI_curid + 1 == _SPI_connected) /* connected */ - { - if (_SPI_current != &(_SPI_stack[_SPI_curid + 1])) - elog(FATAL, "SPI: stack corrupted"); - oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt); - } - - ctupdesc = CreateTupleDescCopy(tupdesc); - - if (oldcxt) - MemoryContextSwitchTo(oldcxt); - - return ctupdesc; -} - -TupleTableSlot * -SPI_copytupleintoslot(HeapTuple tuple, TupleDesc tupdesc) +HeapTupleHeader +SPI_returntuple(HeapTuple tuple, TupleDesc tupdesc) { MemoryContext oldcxt = NULL; - TupleTableSlot *cslot; - HeapTuple ctuple; - TupleDesc ctupdesc; + HeapTupleHeader dtup; if (tuple == NULL || tupdesc == NULL) { @@ -365,40 +490,43 @@ SPI_copytupleintoslot(HeapTuple tuple, TupleDesc tupdesc) return NULL; } + /* For RECORD results, make sure a typmod has been assigned */ + if (tupdesc->tdtypeid == RECORDOID && + tupdesc->tdtypmod < 0) + assign_record_type_typmod(tupdesc); + if (_SPI_curid + 1 == _SPI_connected) /* connected */ { if (_SPI_current != &(_SPI_stack[_SPI_curid + 1])) - elog(FATAL, "SPI: stack corrupted"); + elog(ERROR, "SPI stack corrupted"); oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt); } - ctuple = heap_copytuple(tuple); - ctupdesc = CreateTupleDescCopy(tupdesc); + dtup = (HeapTupleHeader) palloc(tuple->t_len); + memcpy((char *) dtup, (char *) tuple->t_data, tuple->t_len); - cslot = MakeTupleTableSlot(); - ExecSetSlotDescriptor(cslot, ctupdesc, true); - cslot = ExecStoreTuple(ctuple, cslot, InvalidBuffer, true); + HeapTupleHeaderSetDatumLength(dtup, tuple->t_len); + HeapTupleHeaderSetTypeId(dtup, tupdesc->tdtypeid); + HeapTupleHeaderSetTypMod(dtup, tupdesc->tdtypmod); if (oldcxt) MemoryContextSwitchTo(oldcxt); - return cslot; + return dtup; } HeapTuple SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum, - Datum *Values, char *Nulls) + Datum *Values, const char *Nulls) { MemoryContext oldcxt = NULL; HeapTuple mtuple; int numberOfAttributes; - uint8 infomask; Datum *v; char *n; - bool isnull; int i; - if (rel == NULL || tuple == NULL || natts <= 0 || attnum == NULL || Values == NULL) + if (rel == NULL || tuple == NULL || natts < 0 || attnum == NULL || Values == NULL) { SPI_result = SPI_ERROR_ARGUMENT; return NULL; @@ -407,7 +535,7 @@ SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum, if (_SPI_curid + 1 == _SPI_connected) /* connected */ { if (_SPI_current != &(_SPI_stack[_SPI_curid + 1])) - elog(FATAL, "SPI: stack corrupted"); + elog(ERROR, "SPI stack corrupted"); oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt); } SPI_result = 0; @@ -416,11 +544,7 @@ SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum, n = (char *) palloc(numberOfAttributes * sizeof(char)); /* fetch old values and nulls */ - for (i = 0; i < numberOfAttributes; i++) - { - v[i] = heap_getattr(tuple, i + 1, rel->rd_att, &isnull); - n[i] = (isnull) ? 'n' : ' '; - } + heap_deformtuple(tuple, rel->rd_att, v, n); /* replace values and nulls */ for (i = 0; i < natts; i++) @@ -434,12 +558,16 @@ SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum, if (i == natts) /* no errors in *attnum */ { mtuple = heap_formtuple(rel->rd_att, v, n); - infomask = mtuple->t_data->t_infomask; - memmove(&(mtuple->t_data->t_oid), &(tuple->t_data->t_oid), - ((char *) &(tuple->t_data->t_hoff) - - (char *) &(tuple->t_data->t_oid))); - mtuple->t_data->t_infomask = infomask; - mtuple->t_data->t_natts = numberOfAttributes; + + /* + * copy the identification info of the old tuple: t_ctid, t_self, and + * OID (if any) + */ + mtuple->t_data->t_ctid = tuple->t_data->t_ctid; + mtuple->t_self = tuple->t_self; + mtuple->t_tableOid = tuple->t_tableOid; + if (rel->rd_att->tdhasoid) + HeapTupleSetOid(mtuple, HeapTupleGetOid(tuple)); } else { @@ -457,7 +585,7 @@ SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum, } int -SPI_fnumber(TupleDesc tupdesc, char *fname) +SPI_fnumber(TupleDesc tupdesc, const char *fname) { int res; Form_pg_attribute sysatt; @@ -501,14 +629,12 @@ SPI_fname(TupleDesc tupdesc, int fnumber) char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber) { + char *result; Datum origval, - val, - result; + val; bool isnull; Oid typoid, - foutoid, - typelem; - int32 typmod; + foutoid; bool typisvarlena; SPI_result = 0; @@ -525,41 +651,28 @@ SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber) return NULL; if (fnumber > 0) - { typoid = tupdesc->attrs[fnumber - 1]->atttypid; - typmod = tupdesc->attrs[fnumber - 1]->atttypmod; - } else - { typoid = (SystemAttributeDefinition(fnumber, true))->atttypid; - typmod = -1; - } - if (!getTypeOutputInfo(typoid, &foutoid, &typelem, &typisvarlena)) - { - SPI_result = SPI_ERROR_NOOUTFUNC; - return NULL; - } + getTypeOutputInfo(typoid, &foutoid, &typisvarlena); /* - * If we have a toasted datum, forcibly detoast it here to avoid - * memory leakage inside the type's output routine. + * If we have a toasted datum, forcibly detoast it here to avoid memory + * leakage inside the type's output routine. */ if (typisvarlena) val = PointerGetDatum(PG_DETOAST_DATUM(origval)); else val = origval; - result = OidFunctionCall3(foutoid, - val, - ObjectIdGetDatum(typelem), - Int32GetDatum(typmod)); + result = OidOutputFunctionCall(foutoid, val); /* Clean up detoasted copy, if any */ if (val != origval) pfree(DatumGetPointer(val)); - return DatumGetCString(result); + return result; } Datum @@ -638,6 +751,12 @@ SPI_getrelname(Relation rel) return pstrdup(RelationGetRelationName(rel)); } +char * +SPI_getnspname(Relation rel) +{ + return get_namespace_name(RelationGetNamespace(rel)); +} + void * SPI_palloc(Size size) { @@ -647,7 +766,7 @@ SPI_palloc(Size size) if (_SPI_curid + 1 == _SPI_connected) /* connected */ { if (_SPI_current != &(_SPI_stack[_SPI_curid + 1])) - elog(FATAL, "SPI: stack corrupted"); + elog(ERROR, "SPI stack corrupted"); oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt); } @@ -695,107 +814,90 @@ SPI_freetuptable(SPITupleTable *tuptable) * Open a prepared SPI plan as a portal */ Portal -SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls) +SPI_cursor_open(const char *name, void *plan, + Datum *Values, const char *Nulls, + bool read_only) { - static int unnamed_portal_count = 0; - _SPI_plan *spiplan = (_SPI_plan *) plan; List *qtlist = spiplan->qtlist; List *ptlist = spiplan->ptlist; Query *queryTree; Plan *planTree; - QueryDesc *queryDesc; - EState *eState; - TupleDesc attinfo; + ParamListInfo paramLI; + Snapshot snapshot; MemoryContext oldcontext; Portal portal; - char portalname[64]; int k; - /* Ensure that the plan contains only one regular SELECT query */ - if (length(ptlist) != 1) - elog(ERROR, "cannot open multi-query plan as cursor"); - queryTree = (Query *) lfirst(qtlist); - planTree = (Plan *) lfirst(ptlist); + /* Ensure that the plan contains only one query */ + if (list_length(ptlist) != 1 || list_length(qtlist) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), + errmsg("cannot open multi-query plan as cursor"))); + queryTree = (Query *) linitial((List *) linitial(qtlist)); + planTree = (Plan *) linitial(ptlist); - if (queryTree->commandType != CMD_SELECT) - elog(ERROR, "plan in SPI_cursor_open() is not a SELECT"); - if (queryTree->isPortal) - elog(ERROR, "plan in SPI_cursor_open() must NOT be a DECLARE already"); - else if (queryTree->into != NULL) - elog(ERROR, "plan in SPI_cursor_open() must NOT be a SELECT INTO"); - - /* Increment CommandCounter to see changes made by now */ - CommandCounterIncrement(); + /* Must be a query that returns tuples */ + switch (queryTree->commandType) + { + case CMD_SELECT: + if (queryTree->into != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), + errmsg("cannot open SELECT INTO query as cursor"))); + break; + case CMD_UTILITY: + if (!UtilityReturnsTuples(queryTree->utilityStmt)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), + errmsg("cannot open non-SELECT query as cursor"))); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), + errmsg("cannot open non-SELECT query as cursor"))); + break; + } - /* Reset SPI result */ + /* Reset SPI result (note we deliberately don't touch lastoid) */ SPI_processed = 0; SPI_tuptable = NULL; _SPI_current->processed = 0; _SPI_current->tuptable = NULL; - if (name == NULL) + /* Create the portal */ + if (name == NULL || name[0] == '\0') { - /* Make up a portal name if none given */ - for (;;) - { - unnamed_portal_count++; - if (unnamed_portal_count < 0) - unnamed_portal_count = 0; - sprintf(portalname, "", unnamed_portal_count); - if (GetPortalByName(portalname) == NULL) - break; - } - - name = portalname; + /* Use a random nonconflicting name */ + portal = CreateNewPortal(); } else { - /* Ensure the portal doesn't exist already */ - portal = GetPortalByName(name); - if (portal != NULL) - elog(ERROR, "cursor \"%s\" already in use", name); + /* In this path, error if portal of same name already exists */ + portal = CreatePortal(name, false, false); } - /* Create the portal */ - portal = CreatePortal(name); - if (portal == NULL) - elog(ERROR, "failed to create portal \"%s\"", name); - /* Switch to portals memory and copy the parsetree and plan to there */ oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); queryTree = copyObject(queryTree); planTree = copyObject(planTree); - /* Modify the parsetree to be a cursor */ - queryTree->isPortal = true; - queryTree->into = makeNode(RangeVar); - queryTree->into->relname = pstrdup(name); - queryTree->isBinary = false; - - /* Create the QueryDesc object and the executor state */ - queryDesc = CreateQueryDesc(queryTree, planTree, SPI, NULL); - eState = CreateExecutorState(); - - /* If the plan has parameters, put them into the executor state */ + /* If the plan has parameters, set them up */ if (spiplan->nargs > 0) { - ParamListInfo paramLI; + paramLI = (ParamListInfo) palloc0((spiplan->nargs + 1) * + sizeof(ParamListInfoData)); - paramLI = (ParamListInfo) palloc((spiplan->nargs + 1) * - sizeof(ParamListInfoData)); - MemSet(paramLI, 0, (spiplan->nargs + 1) * sizeof(ParamListInfoData)); - - eState->es_param_list_info = paramLI; - for (k = 0; k < spiplan->nargs; paramLI++, k++) + for (k = 0; k < spiplan->nargs; k++) { - paramLI->kind = PARAM_NUM; - paramLI->id = k + 1; - paramLI->isnull = (Nulls && Nulls[k] == 'n'); - if (paramLI->isnull) + paramLI[k].kind = PARAM_NUM; + paramLI[k].id = k + 1; + paramLI[k].ptype = spiplan->argtypes[k]; + paramLI[k].isnull = (Nulls && Nulls[k] == 'n'); + if (paramLI[k].isnull) { /* nulls just copy */ - paramLI->value = Values[k]; + paramLI[k].value = Values[k]; } else { @@ -805,24 +907,56 @@ SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls) get_typlenbyval(spiplan->argtypes[k], ¶mTypLen, ¶mTypByVal); - paramLI->value = datumCopy(Values[k], - paramTypByVal, paramTypLen); + paramLI[k].value = datumCopy(Values[k], + paramTypByVal, paramTypLen); } } - paramLI->kind = PARAM_INVALID; + paramLI[k].kind = PARAM_INVALID; } else - eState->es_param_list_info = NULL; - - /* Start the executor */ - attinfo = ExecutorStart(queryDesc, eState); + paramLI = NULL; - /* Put all the objects into the portal */ - PortalSetQuery(portal, queryDesc, attinfo, eState, PortalCleanup); + /* + * Set up the portal. + */ + PortalDefineQuery(portal, + spiplan->query, + "SELECT", /* don't have the raw parse tree... */ + list_make1(queryTree), + list_make1(planTree), + PortalGetHeapMemory(portal)); - /* Switch back to the callers memory context */ MemoryContextSwitchTo(oldcontext); + /* + * Set up options for portal. + */ + portal->cursorOptions &= ~(CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL); + if (planTree == NULL || ExecSupportsBackwardScan(planTree)) + portal->cursorOptions |= CURSOR_OPT_SCROLL; + else + portal->cursorOptions |= CURSOR_OPT_NO_SCROLL; + + /* + * Set up the snapshot to use. (PortalStart will do CopySnapshot, so we + * skip that here.) + */ + if (read_only) + snapshot = ActiveSnapshot; + else + { + CommandCounterIncrement(); + snapshot = GetTransactionSnapshot(); + } + + /* + * Start portal execution. + */ + PortalStart(portal, paramLI, snapshot); + + Assert(portal->strategy == PORTAL_ONE_SELECT || + portal->strategy == PORTAL_UTIL_SELECT); + /* Return the created portal */ return portal; } @@ -834,7 +968,7 @@ SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls) * Find the portal of an existing open cursor */ Portal -SPI_cursor_find(char *name) +SPI_cursor_find(const char *name) { return GetPortalByName(name); } @@ -846,9 +980,11 @@ SPI_cursor_find(char *name) * Fetch rows in a cursor */ void -SPI_cursor_fetch(Portal portal, bool forward, int count) +SPI_cursor_fetch(Portal portal, bool forward, long count) { - _SPI_cursor_operation(portal, forward, count, SPI); + _SPI_cursor_operation(portal, forward, count, + CreateDestReceiver(DestSPI, NULL)); + /* we know that the DestSPI receiver doesn't need a destroy call */ } @@ -858,9 +994,9 @@ SPI_cursor_fetch(Portal portal, bool forward, int count) * Move in a cursor */ void -SPI_cursor_move(Portal portal, bool forward, int count) +SPI_cursor_move(Portal portal, bool forward, long count) { - _SPI_cursor_operation(portal, forward, count, None); + _SPI_cursor_operation(portal, forward, count, None_Receiver); } @@ -875,19 +1011,141 @@ SPI_cursor_close(Portal portal) if (!PortalIsValid(portal)) elog(ERROR, "invalid portal in SPI cursor operation"); - PortalDrop(portal); + PortalDrop(portal, false); +} + +/* + * Returns the Oid representing the type id for argument at argIndex. First + * parameter is at index zero. + */ +Oid +SPI_getargtypeid(void *plan, int argIndex) +{ + if (plan == NULL || argIndex < 0 || argIndex >= ((_SPI_plan *) plan)->nargs) + { + SPI_result = SPI_ERROR_ARGUMENT; + return InvalidOid; + } + return ((_SPI_plan *) plan)->argtypes[argIndex]; +} + +/* + * Returns the number of arguments for the prepared plan. + */ +int +SPI_getargcount(void *plan) +{ + if (plan == NULL) + { + SPI_result = SPI_ERROR_ARGUMENT; + return -1; + } + return ((_SPI_plan *) plan)->nargs; +} + +/* + * Returns true if the plan contains exactly one command + * and that command originates from normal SELECT (i.e. + * *not* a SELECT ... INTO). In essence, the result indicates + * if the command can be used with SPI_cursor_open + * + * Parameters + * plan A plan previously prepared using SPI_prepare + */ +bool +SPI_is_cursor_plan(void *plan) +{ + _SPI_plan *spiplan = (_SPI_plan *) plan; + List *qtlist; + + if (spiplan == NULL) + { + SPI_result = SPI_ERROR_ARGUMENT; + return false; + } + + qtlist = spiplan->qtlist; + if (list_length(spiplan->ptlist) == 1 && list_length(qtlist) == 1) + { + Query *queryTree = (Query *) linitial((List *) linitial(qtlist)); + + if (queryTree->commandType == CMD_SELECT && queryTree->into == NULL) + return true; + } + return false; +} + +/* + * SPI_result_code_string --- convert any SPI return code to a string + * + * This is often useful in error messages. Most callers will probably + * only pass negative (error-case) codes, but for generality we recognize + * the success codes too. + */ +const char * +SPI_result_code_string(int code) +{ + static char buf[64]; + + switch (code) + { + case SPI_ERROR_CONNECT: + return "SPI_ERROR_CONNECT"; + case SPI_ERROR_COPY: + return "SPI_ERROR_COPY"; + case SPI_ERROR_OPUNKNOWN: + return "SPI_ERROR_OPUNKNOWN"; + case SPI_ERROR_UNCONNECTED: + return "SPI_ERROR_UNCONNECTED"; + case SPI_ERROR_CURSOR: + return "SPI_ERROR_CURSOR"; + case SPI_ERROR_ARGUMENT: + return "SPI_ERROR_ARGUMENT"; + case SPI_ERROR_PARAM: + return "SPI_ERROR_PARAM"; + case SPI_ERROR_TRANSACTION: + return "SPI_ERROR_TRANSACTION"; + case SPI_ERROR_NOATTRIBUTE: + return "SPI_ERROR_NOATTRIBUTE"; + case SPI_ERROR_NOOUTFUNC: + return "SPI_ERROR_NOOUTFUNC"; + case SPI_ERROR_TYPUNKNOWN: + return "SPI_ERROR_TYPUNKNOWN"; + case SPI_OK_CONNECT: + return "SPI_OK_CONNECT"; + case SPI_OK_FINISH: + return "SPI_OK_FINISH"; + case SPI_OK_FETCH: + return "SPI_OK_FETCH"; + case SPI_OK_UTILITY: + return "SPI_OK_UTILITY"; + case SPI_OK_SELECT: + return "SPI_OK_SELECT"; + case SPI_OK_SELINTO: + return "SPI_OK_SELINTO"; + case SPI_OK_INSERT: + return "SPI_OK_INSERT"; + case SPI_OK_DELETE: + return "SPI_OK_DELETE"; + case SPI_OK_UPDATE: + return "SPI_OK_UPDATE"; + case SPI_OK_CURSOR: + return "SPI_OK_CURSOR"; + } + /* Unrecognized code ... return something useful ... */ + sprintf(buf, "Unrecognized SPI code %d", code); + return buf; } /* =================== private functions =================== */ /* - * spi_printtup - * store tuple retrieved by Executor into SPITupleTable + * spi_dest_startup + * Initialize to receive tuples from Executor into SPITupleTable * of current SPI procedure - * */ void -spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self) +spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { SPITupleTable *tuptable; MemoryContext oldcxt; @@ -898,259 +1156,380 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self) * _SPI_connected */ if (_SPI_curid != _SPI_connected || _SPI_connected < 0) - elog(FATAL, "SPI: improper call to spi_printtup"); + elog(ERROR, "improper call to spi_dest_startup"); if (_SPI_current != &(_SPI_stack[_SPI_curid])) - elog(FATAL, "SPI: stack corrupted in spi_printtup"); + elog(ERROR, "SPI stack corrupted"); + + if (_SPI_current->tuptable != NULL) + elog(ERROR, "improper call to spi_dest_startup"); oldcxt = _SPI_procmem(); /* switch to procedure memory context */ + tuptabcxt = AllocSetContextCreate(CurrentMemoryContext, + "SPI TupTable", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(tuptabcxt); + + _SPI_current->tuptable = tuptable = (SPITupleTable *) + palloc(sizeof(SPITupleTable)); + tuptable->tuptabcxt = tuptabcxt; + tuptable->alloced = tuptable->free = 128; + tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple)); + tuptable->tupdesc = CreateTupleDescCopy(typeinfo); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * spi_printtup + * store tuple retrieved by Executor into SPITupleTable + * of current SPI procedure + */ +void +spi_printtup(TupleTableSlot *slot, DestReceiver *self) +{ + SPITupleTable *tuptable; + MemoryContext oldcxt; + + /* + * When called by Executor _SPI_curid expected to be equal to + * _SPI_connected + */ + if (_SPI_curid != _SPI_connected || _SPI_connected < 0) + elog(ERROR, "improper call to spi_printtup"); + if (_SPI_current != &(_SPI_stack[_SPI_curid])) + elog(ERROR, "SPI stack corrupted"); + tuptable = _SPI_current->tuptable; if (tuptable == NULL) - { - tuptabcxt = AllocSetContextCreate(CurrentMemoryContext, - "SPI TupTable", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(tuptabcxt); - - _SPI_current->tuptable = tuptable = (SPITupleTable *) - palloc(sizeof(SPITupleTable)); - tuptable->tuptabcxt = tuptabcxt; - tuptable->alloced = tuptable->free = 128; - tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple)); - tuptable->tupdesc = CreateTupleDescCopy(tupdesc); - } - else - { - MemoryContextSwitchTo(tuptable->tuptabcxt); + elog(ERROR, "improper call to spi_printtup"); - if (tuptable->free == 0) - { - tuptable->free = 256; - tuptable->alloced += tuptable->free; - tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, - tuptable->alloced * sizeof(HeapTuple)); - } + oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt); + + if (tuptable->free == 0) + { + tuptable->free = 256; + tuptable->alloced += tuptable->free; + tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, + tuptable->alloced * sizeof(HeapTuple)); } - tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple); + tuptable->vals[tuptable->alloced - tuptable->free] = + ExecCopySlotTuple(slot); (tuptable->free)--; MemoryContextSwitchTo(oldcxt); - return; } /* * Static functions */ -static int -_SPI_execute(char *src, int tcount, _SPI_plan *plan) +/* + * Parse and plan a querystring. + * + * At entry, plan->argtypes and plan->nargs must be valid. + * + * Query and plan lists are stored into *plan. + */ +static void +_SPI_prepare_plan(const char *src, _SPI_plan *plan) { - List *queryTree_list; - List *planTree_list; - List *queryTree_list_item; - QueryDesc *qdesc; - Query *queryTree; - Plan *planTree; - EState *state; - int nargs = 0; - Oid *argtypes = NULL; - int res = 0; - bool islastquery; + List *raw_parsetree_list; + List *query_list_list; + List *plan_list; + ListCell *list_item; + ErrorContextCallback spierrcontext; + Oid *argtypes = plan->argtypes; + int nargs = plan->nargs; - /* Increment CommandCounter to see changes made by now */ + /* + * Increment CommandCounter to see changes made by now. We must do this + * to be sure of seeing any schema changes made by a just-preceding SPI + * command. (But we don't bother advancing the snapshot, since the + * planner generally operates under SnapshotNow rules anyway.) + */ CommandCounterIncrement(); - SPI_processed = 0; - SPI_lastoid = InvalidOid; - SPI_tuptable = NULL; - _SPI_current->tuptable = NULL; - _SPI_current->qtlist = NULL; - - if (plan) - { - nargs = plan->nargs; - argtypes = plan->argtypes; - } - - queryTree_list = pg_parse_and_rewrite(src, argtypes, nargs); + /* + * Setup error traceback support for ereport() + */ + spierrcontext.callback = _SPI_error_callback; + spierrcontext.arg = (void *) src; + spierrcontext.previous = error_context_stack; + error_context_stack = &spierrcontext; - _SPI_current->qtlist = queryTree_list; + /* + * Parse the request string into a list of raw parse trees. + */ + raw_parsetree_list = pg_parse_query(src); - planTree_list = NIL; + /* + * Do parse analysis and rule rewrite for each raw parsetree. + * + * We save the querytrees from each raw parsetree as a separate sublist. + * This allows _SPI_execute_plan() to know where the boundaries between + * original queries fall. + */ + query_list_list = NIL; + plan_list = NIL; - foreach(queryTree_list_item, queryTree_list) + foreach(list_item, raw_parsetree_list) { - queryTree = (Query *) lfirst(queryTree_list_item); - islastquery = (lnext(queryTree_list_item) == NIL); + Node *parsetree = (Node *) lfirst(list_item); + List *query_list; - planTree = pg_plan_query(queryTree); - planTree_list = lappend(planTree_list, planTree); + query_list = pg_analyze_and_rewrite(parsetree, src, argtypes, nargs); - if (queryTree->commandType == CMD_UTILITY) - { - if (nodeTag(queryTree->utilityStmt) == T_CopyStmt) - { - CopyStmt *stmt = (CopyStmt *) (queryTree->utilityStmt); + query_list_list = lappend(query_list_list, query_list); - if (stmt->filename == NULL) - return SPI_ERROR_COPY; - } - else if (nodeTag(queryTree->utilityStmt) == T_ClosePortalStmt || - nodeTag(queryTree->utilityStmt) == T_FetchStmt) - return SPI_ERROR_CURSOR; - else if (nodeTag(queryTree->utilityStmt) == T_TransactionStmt) - return SPI_ERROR_TRANSACTION; - res = SPI_OK_UTILITY; - if (plan == NULL) - { - ProcessUtility(queryTree->utilityStmt, None, NULL); - if (!islastquery) - CommandCounterIncrement(); - else - return res; - } - else if (islastquery) - break; - } - else if (plan == NULL) - { - qdesc = CreateQueryDesc(queryTree, planTree, - islastquery ? SPI : None, NULL); - state = CreateExecutorState(); - res = _SPI_pquery(qdesc, state, islastquery ? tcount : 0); - if (res < 0 || islastquery) - return res; - CommandCounterIncrement(); - } - else - { - qdesc = CreateQueryDesc(queryTree, planTree, - islastquery ? SPI : None, NULL); - res = _SPI_pquery(qdesc, NULL, islastquery ? tcount : 0); - if (res < 0) - return res; - if (islastquery) - break; - } + plan_list = list_concat(plan_list, + pg_plan_queries(query_list, NULL, false)); } - if (plan) - { - plan->qtlist = queryTree_list; - plan->ptlist = planTree_list; - } + plan->qtlist = query_list_list; + plan->ptlist = plan_list; - return res; + /* + * Pop the error context stack + */ + error_context_stack = spierrcontext.previous; } +/* + * Execute the given plan with the given parameter values + * + * snapshot: query snapshot to use, or InvalidSnapshot for the normal + * behavior of taking a new snapshot for each query. + * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot + * read_only: TRUE for read-only execution (no CommandCounterIncrement) + * tcount: execution tuple-count limit, or 0 for none + */ static int -_SPI_execute_plan(_SPI_plan *plan, Datum *Values, char *Nulls, int tcount) +_SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, + Snapshot snapshot, Snapshot crosscheck_snapshot, + bool read_only, long tcount) { - List *queryTree_list = plan->qtlist; - List *planTree_list = plan->ptlist; - List *queryTree_list_item; - QueryDesc *qdesc; - Query *queryTree; - Plan *planTree; - EState *state; - int nargs = plan->nargs; - int res = 0; - bool islastquery; - int k; - - /* Increment CommandCounter to see changes made by now */ - CommandCounterIncrement(); - - SPI_processed = 0; - SPI_lastoid = InvalidOid; - SPI_tuptable = NULL; - _SPI_current->tuptable = NULL; - _SPI_current->qtlist = NULL; - - foreach(queryTree_list_item, queryTree_list) + volatile int res = 0; + volatile uint32 my_processed = 0; + volatile Oid my_lastoid = InvalidOid; + SPITupleTable *volatile my_tuptable = NULL; + Snapshot saveActiveSnapshot; + + /* Be sure to restore ActiveSnapshot on error exit */ + saveActiveSnapshot = ActiveSnapshot; + PG_TRY(); { - queryTree = (Query *) lfirst(queryTree_list_item); - planTree = lfirst(planTree_list); - planTree_list = lnext(planTree_list); - islastquery = (planTree_list == NIL); /* assume lists are same - * len */ + List *query_list_list = plan->qtlist; + ListCell *plan_list_item = list_head(plan->ptlist); + ListCell *query_list_list_item; + ErrorContextCallback spierrcontext; + int nargs = plan->nargs; + ParamListInfo paramLI; - if (queryTree->commandType == CMD_UTILITY) + /* Convert parameters to form wanted by executor */ + if (nargs > 0) { - ProcessUtility(queryTree->utilityStmt, None, NULL); - if (!islastquery) - CommandCounterIncrement(); - else - return SPI_OK_UTILITY; + int k; + + paramLI = (ParamListInfo) + palloc0((nargs + 1) * sizeof(ParamListInfoData)); + + for (k = 0; k < nargs; k++) + { + paramLI[k].kind = PARAM_NUM; + paramLI[k].id = k + 1; + paramLI[k].ptype = plan->argtypes[k]; + paramLI[k].isnull = (Nulls && Nulls[k] == 'n'); + paramLI[k].value = Values[k]; + } + paramLI[k].kind = PARAM_INVALID; } else + paramLI = NULL; + + /* + * Setup error traceback support for ereport() + */ + spierrcontext.callback = _SPI_error_callback; + spierrcontext.arg = (void *) plan->query; + spierrcontext.previous = error_context_stack; + error_context_stack = &spierrcontext; + + foreach(query_list_list_item, query_list_list) { - qdesc = CreateQueryDesc(queryTree, planTree, - islastquery ? SPI : None, NULL); - state = CreateExecutorState(); - if (nargs > 0) + List *query_list = lfirst(query_list_list_item); + ListCell *query_list_item; + + foreach(query_list_item, query_list) { - ParamListInfo paramLI; + Query *queryTree = (Query *) lfirst(query_list_item); + Plan *planTree; + QueryDesc *qdesc; + DestReceiver *dest; + + planTree = lfirst(plan_list_item); + plan_list_item = lnext(plan_list_item); + + _SPI_current->processed = 0; + _SPI_current->lastoid = InvalidOid; + _SPI_current->tuptable = NULL; + + if (queryTree->commandType == CMD_UTILITY) + { + if (IsA(queryTree->utilityStmt, CopyStmt)) + { + CopyStmt *stmt = (CopyStmt *) queryTree->utilityStmt; + + if (stmt->filename == NULL) + { + res = SPI_ERROR_COPY; + goto fail; + } + } + else if (IsA(queryTree->utilityStmt, DeclareCursorStmt) || + IsA(queryTree->utilityStmt, ClosePortalStmt) || + IsA(queryTree->utilityStmt, FetchStmt)) + { + res = SPI_ERROR_CURSOR; + goto fail; + } + else if (IsA(queryTree->utilityStmt, TransactionStmt)) + { + res = SPI_ERROR_TRANSACTION; + goto fail; + } + } - paramLI = (ParamListInfo) palloc((nargs + 1) * - sizeof(ParamListInfoData)); - MemSet(paramLI, 0, (nargs + 1) * sizeof(ParamListInfoData)); + if (read_only && !QueryIsReadOnly(queryTree)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateQueryTag(queryTree)))); + + /* + * If not read-only mode, advance the command counter before + * each command. + */ + if (!read_only) + CommandCounterIncrement(); + + dest = CreateDestReceiver(queryTree->canSetTag ? DestSPI : DestNone, + NULL); + + if (snapshot == InvalidSnapshot) + { + /* + * Default read_only behavior is to use the entry-time + * ActiveSnapshot; if read-write, grab a full new snap. + */ + if (read_only) + ActiveSnapshot = CopySnapshot(saveActiveSnapshot); + else + ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); + } + else + { + /* + * We interpret read_only with a specified snapshot to be + * exactly that snapshot, but read-write means use the + * snap with advancing of command ID. + */ + ActiveSnapshot = CopySnapshot(snapshot); + if (!read_only) + ActiveSnapshot->curcid = GetCurrentCommandId(); + } - state->es_param_list_info = paramLI; - for (k = 0; k < plan->nargs; paramLI++, k++) + if (queryTree->commandType == CMD_UTILITY) + { + ProcessUtility(queryTree->utilityStmt, paramLI, + dest, NULL); + res = SPI_OK_UTILITY; + } + else + { + qdesc = CreateQueryDesc(queryTree, planTree, + ActiveSnapshot, + crosscheck_snapshot, + dest, + paramLI, false); + res = _SPI_pquery(qdesc, + queryTree->canSetTag ? tcount : 0); + FreeQueryDesc(qdesc); + } + FreeSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + + /* + * The last canSetTag query sets the auxiliary values returned + * to the caller. Be careful to free any tuptables not + * returned, to avoid intratransaction memory leak. + */ + if (queryTree->canSetTag) + { + my_processed = _SPI_current->processed; + my_lastoid = _SPI_current->lastoid; + SPI_freetuptable(my_tuptable); + my_tuptable = _SPI_current->tuptable; + } + else { - paramLI->kind = PARAM_NUM; - paramLI->id = k + 1; - paramLI->isnull = (Nulls && Nulls[k] == 'n'); - paramLI->value = Values[k]; + SPI_freetuptable(_SPI_current->tuptable); + _SPI_current->tuptable = NULL; } - paramLI->kind = PARAM_INVALID; + /* we know that the receiver doesn't need a destroy call */ + if (res < 0) + goto fail; } - else - state->es_param_list_info = NULL; - res = _SPI_pquery(qdesc, state, islastquery ? tcount : 0); - if (res < 0 || islastquery) - return res; - CommandCounterIncrement(); } + +fail: + + /* + * Pop the error context stack + */ + error_context_stack = spierrcontext.previous; + } + PG_CATCH(); + { + /* Restore global vars and propagate error */ + ActiveSnapshot = saveActiveSnapshot; + PG_RE_THROW(); } + PG_END_TRY(); + + ActiveSnapshot = saveActiveSnapshot; + + /* Save results for caller */ + SPI_processed = my_processed; + SPI_lastoid = my_lastoid; + SPI_tuptable = my_tuptable; return res; } static int -_SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount) +_SPI_pquery(QueryDesc *queryDesc, long tcount) { - Query *parseTree = queryDesc->parsetree; int operation = queryDesc->operation; - CommandDest dest = queryDesc->dest; - bool isRetrieveIntoPortal = false; - bool isRetrieveIntoRelation = false; - char *intoName = NULL; int res; - Oid save_lastoid; switch (operation) { case CMD_SELECT: res = SPI_OK_SELECT; - if (parseTree->isPortal) + if (queryDesc->parsetree->into) /* select into table? */ { - isRetrieveIntoPortal = true; - intoName = parseTree->into->relname; - parseTree->isBinary = false; /* */ - - return SPI_ERROR_CURSOR; - + res = SPI_OK_SELINTO; + queryDesc->dest = None_Receiver; /* don't output results */ } - else if (parseTree->into != NULL) /* select into table */ + else if (queryDesc->dest->mydest != DestSPI) { - res = SPI_OK_SELINTO; - isRetrieveIntoRelation = true; - queryDesc->dest = None; /* */ + /* Don't return SPI_OK_SELECT if we're discarding result */ + res = SPI_OK_UTILITY; } break; case CMD_INSERT: @@ -1166,52 +1545,63 @@ _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount) return SPI_ERROR_OPUNKNOWN; } - if (state == NULL) /* plan preparation */ - return res; - #ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ResetUsage(); #endif - ExecutorStart(queryDesc, state); + AfterTriggerBeginQuery(); - /* - * Don't work currently --- need to rearrange callers so that we - * prepare the portal before doing CreateExecutorState() etc. See - * pquery.c for the correct order of operations. - */ - if (isRetrieveIntoPortal) - elog(FATAL, "SPI_select: retrieve into portal not implemented"); + ExecutorStart(queryDesc, 0); - ExecutorRun(queryDesc, state, ForwardScanDirection, (long) tcount); + ExecutorRun(queryDesc, ForwardScanDirection, tcount); - _SPI_current->processed = state->es_processed; - save_lastoid = state->es_lastoid; + _SPI_current->processed = queryDesc->estate->es_processed; + _SPI_current->lastoid = queryDesc->estate->es_lastoid; - if (operation == CMD_SELECT && queryDesc->dest == SPI) + if (operation == CMD_SELECT && queryDesc->dest->mydest == DestSPI) { if (_SPI_checktuples()) - elog(FATAL, "SPI_select: # of processed tuples check failed"); + elog(ERROR, "consistency check on SPI tuple count failed"); } - ExecutorEnd(queryDesc, state); + /* Take care of any queued AFTER triggers */ + AfterTriggerEndQuery(queryDesc->estate); + + ExecutorEnd(queryDesc); #ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ShowUsage("SPI EXECUTOR STATS"); #endif - if (dest == SPI) - { - SPI_processed = _SPI_current->processed; - SPI_lastoid = save_lastoid; - SPI_tuptable = _SPI_current->tuptable; - } - queryDesc->dest = dest; - return res; +} + +/* + * _SPI_error_callback + * + * Add context information when a query invoked via SPI fails + */ +static void +_SPI_error_callback(void *arg) +{ + const char *query = (const char *) arg; + int syntaxerrposition; + /* + * If there is a syntax error position, convert to internal syntax error; + * otherwise treat the query as an item of context stack + */ + syntaxerrposition = geterrposition(); + if (syntaxerrposition > 0) + { + errposition(0); + internalerrposition(syntaxerrposition); + internalerrquery(query); + } + else + errcontext("SQL statement \"%s\"", query); } /* @@ -1220,78 +1610,43 @@ _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount) * Do a FETCH or MOVE in a cursor */ static void -_SPI_cursor_operation(Portal portal, bool forward, int count, - CommandDest dest) +_SPI_cursor_operation(Portal portal, bool forward, long count, + DestReceiver *dest) { - QueryDesc *querydesc; - EState *estate; - MemoryContext oldcontext; - ScanDirection direction; - CommandDest olddest; + long nfetched; /* Check that the portal is valid */ if (!PortalIsValid(portal)) elog(ERROR, "invalid portal in SPI cursor operation"); /* Push the SPI stack */ - _SPI_begin_call(true); + if (_SPI_begin_call(true) < 0) + elog(ERROR, "SPI cursor operation called while not connected"); - /* Reset the SPI result */ + /* Reset the SPI result (note we deliberately don't touch lastoid) */ SPI_processed = 0; SPI_tuptable = NULL; _SPI_current->processed = 0; _SPI_current->tuptable = NULL; - /* Switch to the portals memory context */ - oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); - - querydesc = PortalGetQueryDesc(portal); - estate = PortalGetState(portal); - - /* Save the queries command destination and set it to SPI (for fetch) */ - /* or None (for move) */ - olddest = querydesc->dest; - querydesc->dest = dest; - - /* Run the executor like PerformPortalFetch and remember states */ - if (forward) - { - if (portal->atEnd) - direction = NoMovementScanDirection; - else - direction = ForwardScanDirection; + /* Run the cursor */ + nfetched = PortalRunFetch(portal, + forward ? FETCH_FORWARD : FETCH_BACKWARD, + count, + dest); - ExecutorRun(querydesc, estate, direction, (long) count); - - if (estate->es_processed > 0) - portal->atStart = false; /* OK to back up now */ - if (count <= 0 || (int) estate->es_processed < count) - portal->atEnd = true; /* we retrieved 'em all */ - } - else - { - if (portal->atStart) - direction = NoMovementScanDirection; - else - direction = BackwardScanDirection; - - ExecutorRun(querydesc, estate, direction, (long) count); - - if (estate->es_processed > 0) - portal->atEnd = false; /* OK to go forward now */ - if (count <= 0 || (int) estate->es_processed < count) - portal->atStart = true; /* we retrieved 'em all */ - } - - _SPI_current->processed = estate->es_processed; - - /* Restore the old command destination and switch back to callers */ - /* memory context */ - querydesc->dest = olddest; - MemoryContextSwitchTo(oldcontext); + /* + * Think not to combine this store with the preceding function call. If + * the portal contains calls to functions that use SPI, then SPI_stack is + * likely to move around while the portal runs. When control returns, + * _SPI_current will point to the correct stack entry... but the pointer + * may be different than it was beforehand. So we must be sure to re-fetch + * the pointer after the function call completes. + */ + _SPI_current->processed = nfetched; - if (dest == SPI && _SPI_checktuples()) - elog(FATAL, "SPI_fetch: # of processed tuples check failed"); + if (dest->mydest == DestSPI && _SPI_checktuples()) + elog(ERROR, "consistency check on SPI tuple count failed"); /* Put the result into place for access by caller */ SPI_processed = _SPI_current->processed; @@ -1303,20 +1658,19 @@ _SPI_cursor_operation(Portal portal, bool forward, int count, static MemoryContext -_SPI_execmem() +_SPI_execmem(void) { return MemoryContextSwitchTo(_SPI_current->execCxt); } static MemoryContext -_SPI_procmem() +_SPI_procmem(void) { return MemoryContextSwitchTo(_SPI_current->procCxt); } /* - * _SPI_begin_call - * + * _SPI_begin_call: begin a SPI operation within a connected procedure */ static int _SPI_begin_call(bool execmem) @@ -1325,7 +1679,7 @@ _SPI_begin_call(bool execmem) return SPI_ERROR_UNCONNECTED; _SPI_curid++; if (_SPI_current != &(_SPI_stack[_SPI_curid])) - elog(FATAL, "SPI: stack corrupted"); + elog(ERROR, "SPI stack corrupted"); if (execmem) /* switch to the Executor memory context */ _SPI_execmem(); @@ -1333,16 +1687,19 @@ _SPI_begin_call(bool execmem) return 0; } +/* + * _SPI_end_call: end a SPI operation within a connected procedure + * + * Note: this currently has no failure return cases, so callers don't check + */ static int _SPI_end_call(bool procmem) { /* - * We' returning to procedure where _SPI_curid == _SPI_connected - 1 + * We're returning to procedure where _SPI_curid == _SPI_connected - 1 */ _SPI_curid--; - _SPI_current->qtlist = NULL; - if (procmem) /* switch to the procedure memory context */ { _SPI_procmem(); @@ -1360,19 +1717,10 @@ _SPI_checktuples(void) SPITupleTable *tuptable = _SPI_current->tuptable; bool failed = false; - if (processed == 0) - { - if (tuptable != NULL) - failed = true; - } - else - { - /* some tuples were processed */ - if (tuptable == NULL) /* spi_printtup was not called */ - failed = true; - else if (processed != (tuptable->alloced - tuptable->free)) - failed = true; - } + if (tuptable == NULL) /* spi_dest_startup was not called */ + failed = true; + else if (processed != (tuptable->alloced - tuptable->free)) + failed = true; return failed; } @@ -1391,22 +1739,24 @@ _SPI_copy_plan(_SPI_plan *plan, int location) else if (location == _SPI_CPLAN_TOPCXT) parentcxt = TopMemoryContext; else + /* (this case not currently used) */ parentcxt = CurrentMemoryContext; /* - * Create a memory context for the plan. We don't expect the plan to - * be very large, so use smaller-than-default alloc parameters. + * Create a memory context for the plan. We don't expect the plan to be + * very large, so use smaller-than-default alloc parameters. */ plancxt = AllocSetContextCreate(parentcxt, "SPI Plan", - 1024, - 1024, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); oldcxt = MemoryContextSwitchTo(plancxt); /* Copy the SPI plan into its own context */ newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan)); newplan->plancxt = plancxt; + newplan->query = pstrdup(plan->query); newplan->qtlist = (List *) copyObject(plan->qtlist); newplan->ptlist = (List *) copyObject(plan->ptlist); newplan->nargs = plan->nargs;