4 * src/pl/plpython/plpy_cursorobject.c
11 #include "access/xact.h"
12 #include "mb/pg_wchar.h"
13 #include "utils/memutils.h"
17 #include "plpy_cursorobject.h"
19 #include "plpy_elog.h"
20 #include "plpy_main.h"
21 #include "plpy_planobject.h"
22 #include "plpy_procedure.h"
23 #include "plpy_resultobject.h"
27 static PyObject *PLy_cursor_query(const char *query);
28 static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
34 static char PLy_cursor_doc[] = {
35 "Wrapper around a PostgreSQL cursor"
38 static PyMethodDef PLy_cursor_methods[] = {
39 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40 {"close", PLy_cursor_close, METH_NOARGS, NULL},
44 static PyTypeObject PLy_CursorType = {
45 PyVarObject_HEAD_INIT(NULL, 0)
46 "PLyCursor", /* tp_name */
47 sizeof(PLyCursorObject), /* tp_size */
53 PLy_cursor_dealloc, /* tp_dealloc */
60 0, /* tp_as_sequence */
61 0, /* tp_as_mapping */
68 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
69 PLy_cursor_doc, /* tp_doc */
72 0, /* tp_richcompare */
73 0, /* tp_weaklistoffset */
74 PyObject_SelfIter, /* tp_iter */
75 PLy_cursor_iternext, /* tp_iternext */
76 PLy_cursor_methods, /* tp_tpmethods */
80 PLy_cursor_init_type(void)
82 if (PyType_Ready(&PLy_CursorType) < 0)
83 elog(ERROR, "could not initialize PLy_CursorType");
87 PLy_cursor(PyObject *self, PyObject *args)
91 PyObject *planargs = NULL;
93 if (PyArg_ParseTuple(args, "s", &query))
94 return PLy_cursor_query(query);
98 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99 return PLy_cursor_plan(plan, planargs);
101 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
107 PLy_cursor_query(const char *query)
109 PLyCursorObject *cursor;
110 volatile MemoryContext oldcontext;
111 volatile ResourceOwner oldowner;
113 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
115 cursor->portalname = NULL;
116 cursor->closed = false;
117 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
118 "PL/Python cursor context",
119 ALLOCSET_DEFAULT_SIZES);
120 PLy_typeinfo_init(&cursor->result, cursor->mcxt);
122 oldcontext = CurrentMemoryContext;
123 oldowner = CurrentResourceOwner;
125 PLy_spi_subtransaction_begin(oldcontext, oldowner);
129 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
133 pg_verifymbstr(query, strlen(query), false);
135 plan = SPI_prepare(query, 0, NULL);
137 elog(ERROR, "SPI_prepare failed: %s",
138 SPI_result_code_string(SPI_result));
140 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
141 exec_ctx->curr_proc->fn_readonly);
145 elog(ERROR, "SPI_cursor_open() failed: %s",
146 SPI_result_code_string(SPI_result));
148 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
150 PLy_spi_subtransaction_commit(oldcontext, oldowner);
154 PLy_spi_subtransaction_abort(oldcontext, oldowner);
159 Assert(cursor->portalname != NULL);
160 return (PyObject *) cursor;
164 PLy_cursor_plan(PyObject *ob, PyObject *args)
166 PLyCursorObject *cursor;
170 volatile MemoryContext oldcontext;
171 volatile ResourceOwner oldowner;
175 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
177 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
180 nargs = PySequence_Length(args);
185 plan = (PLyPlanObject *) ob;
187 if (nargs != plan->nargs)
190 PyObject *so = PyObject_Str(args);
193 PLy_elog(ERROR, "could not execute plan");
194 sv = PyString_AsString(so);
195 PLy_exception_set_plural(PyExc_TypeError,
196 "Expected sequence of %d argument, got %d: %s",
197 "Expected sequence of %d arguments, got %d: %s",
199 plan->nargs, nargs, sv);
205 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
207 cursor->portalname = NULL;
208 cursor->closed = false;
209 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
210 "PL/Python cursor context",
211 ALLOCSET_DEFAULT_SIZES);
212 PLy_typeinfo_init(&cursor->result, cursor->mcxt);
214 oldcontext = CurrentMemoryContext;
215 oldowner = CurrentResourceOwner;
217 PLy_spi_subtransaction_begin(oldcontext, oldowner);
221 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
223 char *volatile nulls;
227 nulls = palloc(nargs * sizeof(char));
231 for (j = 0; j < nargs; j++)
235 elem = PySequence_GetItem(args, j);
241 plan->args[j].out.d.func(&(plan->args[j].out.d),
259 InputFunctionCall(&(plan->args[j].out.d.typfunc),
261 plan->args[j].out.d.typioparam,
267 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
268 exec_ctx->curr_proc->fn_readonly);
270 elog(ERROR, "SPI_cursor_open() failed: %s",
271 SPI_result_code_string(SPI_result));
273 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
275 PLy_spi_subtransaction_commit(oldcontext, oldowner);
281 /* cleanup plan->values array */
282 for (k = 0; k < nargs; k++)
284 if (!plan->args[k].out.d.typbyval &&
285 (plan->values[k] != PointerGetDatum(NULL)))
287 pfree(DatumGetPointer(plan->values[k]));
288 plan->values[k] = PointerGetDatum(NULL);
294 PLy_spi_subtransaction_abort(oldcontext, oldowner);
299 for (i = 0; i < nargs; i++)
301 if (!plan->args[i].out.d.typbyval &&
302 (plan->values[i] != PointerGetDatum(NULL)))
304 pfree(DatumGetPointer(plan->values[i]));
305 plan->values[i] = PointerGetDatum(NULL);
309 Assert(cursor->portalname != NULL);
310 return (PyObject *) cursor;
314 PLy_cursor_dealloc(PyObject *arg)
316 PLyCursorObject *cursor;
319 cursor = (PLyCursorObject *) arg;
323 portal = GetPortalByName(cursor->portalname);
325 if (PortalIsValid(portal))
326 SPI_cursor_close(portal);
327 cursor->closed = true;
331 MemoryContextDelete(cursor->mcxt);
334 arg->ob_type->tp_free(arg);
338 PLy_cursor_iternext(PyObject *self)
340 PLyCursorObject *cursor;
342 volatile MemoryContext oldcontext;
343 volatile ResourceOwner oldowner;
346 cursor = (PLyCursorObject *) self;
350 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
354 portal = GetPortalByName(cursor->portalname);
355 if (!PortalIsValid(portal))
357 PLy_exception_set(PyExc_ValueError,
358 "iterating a cursor in an aborted subtransaction");
362 oldcontext = CurrentMemoryContext;
363 oldowner = CurrentResourceOwner;
365 PLy_spi_subtransaction_begin(oldcontext, oldowner);
369 SPI_cursor_fetch(portal, true, 1);
370 if (SPI_processed == 0)
372 PyErr_SetNone(PyExc_StopIteration);
377 if (cursor->result.is_rowtype != 1)
378 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
380 ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
381 SPI_tuptable->tupdesc);
384 SPI_freetuptable(SPI_tuptable);
386 PLy_spi_subtransaction_commit(oldcontext, oldowner);
390 PLy_spi_subtransaction_abort(oldcontext, oldowner);
399 PLy_cursor_fetch(PyObject *self, PyObject *args)
401 PLyCursorObject *cursor;
403 PLyResultObject *ret;
404 volatile MemoryContext oldcontext;
405 volatile ResourceOwner oldowner;
408 if (!PyArg_ParseTuple(args, "i", &count))
411 cursor = (PLyCursorObject *) self;
415 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
419 portal = GetPortalByName(cursor->portalname);
420 if (!PortalIsValid(portal))
422 PLy_exception_set(PyExc_ValueError,
423 "iterating a cursor in an aborted subtransaction");
427 ret = (PLyResultObject *) PLy_result_new();
431 oldcontext = CurrentMemoryContext;
432 oldowner = CurrentResourceOwner;
434 PLy_spi_subtransaction_begin(oldcontext, oldowner);
438 SPI_cursor_fetch(portal, true, count);
440 if (cursor->result.is_rowtype != 1)
441 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
443 Py_DECREF(ret->status);
444 ret->status = PyInt_FromLong(SPI_OK_FETCH);
446 Py_DECREF(ret->nrows);
447 ret->nrows = (SPI_processed > (uint64) LONG_MAX) ?
448 PyFloat_FromDouble((double) SPI_processed) :
449 PyInt_FromLong((long) SPI_processed);
451 if (SPI_processed != 0)
456 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
457 * and list indices; so we cannot support a result larger than
460 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
462 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
463 errmsg("query result has too many rows to fit in a Python list")));
465 Py_DECREF(ret->rows);
466 ret->rows = PyList_New(SPI_processed);
468 for (i = 0; i < SPI_processed; i++)
470 PyObject *row = PLyDict_FromTuple(&cursor->result,
471 SPI_tuptable->vals[i],
472 SPI_tuptable->tupdesc);
474 PyList_SetItem(ret->rows, i, row);
478 SPI_freetuptable(SPI_tuptable);
480 PLy_spi_subtransaction_commit(oldcontext, oldowner);
484 PLy_spi_subtransaction_abort(oldcontext, oldowner);
489 return (PyObject *) ret;
493 PLy_cursor_close(PyObject *self, PyObject *unused)
495 PLyCursorObject *cursor = (PLyCursorObject *) self;
499 Portal portal = GetPortalByName(cursor->portalname);
501 if (!PortalIsValid(portal))
503 PLy_exception_set(PyExc_ValueError,
504 "closing a cursor in an aborted subtransaction");
508 SPI_cursor_close(portal);
509 cursor->closed = true;