]> granicus.if.org Git - postgresql/blob - src/backend/executor/spi.c
Expose more cursor-related functionality in SPI: specifically, allow
[postgresql] / src / backend / executor / spi.c
1 /*-------------------------------------------------------------------------
2  *
3  * spi.c
4  *                              Server Programming Interface
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/executor/spi.c,v 1.176 2007/04/16 01:14:56 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/printtup.h"
18 #include "catalog/heap.h"
19 #include "commands/trigger.h"
20 #include "executor/spi_priv.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/typcache.h"
24
25
26 uint32          SPI_processed = 0;
27 Oid                     SPI_lastoid = InvalidOid;
28 SPITupleTable *SPI_tuptable = NULL;
29 int                     SPI_result;
30
31 static _SPI_connection *_SPI_stack = NULL;
32 static _SPI_connection *_SPI_current = NULL;
33 static int      _SPI_stack_depth = 0;           /* allocated size of _SPI_stack */
34 static int      _SPI_connected = -1;
35 static int      _SPI_curid = -1;
36
37 static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan,
38                                                           int cursorOptions);
39
40 static int _SPI_execute_plan(SPIPlanPtr plan,
41                                   Datum *Values, const char *Nulls,
42                                   Snapshot snapshot, Snapshot crosscheck_snapshot,
43                                   bool read_only, long tcount);
44
45 static int      _SPI_pquery(QueryDesc *queryDesc, long tcount);
46
47 static void _SPI_error_callback(void *arg);
48
49 static void _SPI_cursor_operation(Portal portal,
50                                                                   FetchDirection direction, long count,
51                                                                   DestReceiver *dest);
52
53 static SPIPlanPtr _SPI_copy_plan(SPIPlanPtr plan, MemoryContext parentcxt);
54 static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
55
56 static int      _SPI_begin_call(bool execmem);
57 static int      _SPI_end_call(bool procmem);
58 static MemoryContext _SPI_execmem(void);
59 static MemoryContext _SPI_procmem(void);
60 static bool _SPI_checktuples(void);
61
62
63 /* =================== interface functions =================== */
64
65 int
66 SPI_connect(void)
67 {
68         int                     newdepth;
69
70         /*
71          * When procedure called by Executor _SPI_curid expected to be equal to
72          * _SPI_connected
73          */
74         if (_SPI_curid != _SPI_connected)
75                 return SPI_ERROR_CONNECT;
76
77         if (_SPI_stack == NULL)
78         {
79                 if (_SPI_connected != -1 || _SPI_stack_depth != 0)
80                         elog(ERROR, "SPI stack corrupted");
81                 newdepth = 16;
82                 _SPI_stack = (_SPI_connection *)
83                         MemoryContextAlloc(TopTransactionContext,
84                                                            newdepth * sizeof(_SPI_connection));
85                 _SPI_stack_depth = newdepth;
86         }
87         else
88         {
89                 if (_SPI_stack_depth <= 0 || _SPI_stack_depth <= _SPI_connected)
90                         elog(ERROR, "SPI stack corrupted");
91                 if (_SPI_stack_depth == _SPI_connected + 1)
92                 {
93                         newdepth = _SPI_stack_depth * 2;
94                         _SPI_stack = (_SPI_connection *)
95                                 repalloc(_SPI_stack,
96                                                  newdepth * sizeof(_SPI_connection));
97                         _SPI_stack_depth = newdepth;
98                 }
99         }
100
101         /*
102          * We're entering procedure where _SPI_curid == _SPI_connected - 1
103          */
104         _SPI_connected++;
105         Assert(_SPI_connected >= 0 && _SPI_connected < _SPI_stack_depth);
106
107         _SPI_current = &(_SPI_stack[_SPI_connected]);
108         _SPI_current->processed = 0;
109         _SPI_current->lastoid = InvalidOid;
110         _SPI_current->tuptable = NULL;
111         _SPI_current->procCxt = NULL;           /* in case we fail to create 'em */
112         _SPI_current->execCxt = NULL;
113         _SPI_current->connectSubid = GetCurrentSubTransactionId();
114
115         /*
116          * Create memory contexts for this procedure
117          *
118          * XXX it would be better to use PortalContext as the parent context, but
119          * we may not be inside a portal (consider deferred-trigger execution).
120          * Perhaps CurTransactionContext would do?      For now it doesn't matter
121          * because we clean up explicitly in AtEOSubXact_SPI().
122          */
123         _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext,
124                                                                                                   "SPI Proc",
125                                                                                                   ALLOCSET_DEFAULT_MINSIZE,
126                                                                                                   ALLOCSET_DEFAULT_INITSIZE,
127                                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
128         _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext,
129                                                                                                   "SPI Exec",
130                                                                                                   ALLOCSET_DEFAULT_MINSIZE,
131                                                                                                   ALLOCSET_DEFAULT_INITSIZE,
132                                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
133         /* ... and switch to procedure's context */
134         _SPI_current->savedcxt = MemoryContextSwitchTo(_SPI_current->procCxt);
135
136         return SPI_OK_CONNECT;
137 }
138
139 int
140 SPI_finish(void)
141 {
142         int                     res;
143
144         res = _SPI_begin_call(false);           /* live in procedure memory */
145         if (res < 0)
146                 return res;
147
148         /* Restore memory context as it was before procedure call */
149         MemoryContextSwitchTo(_SPI_current->savedcxt);
150
151         /* Release memory used in procedure call */
152         MemoryContextDelete(_SPI_current->execCxt);
153         _SPI_current->execCxt = NULL;
154         MemoryContextDelete(_SPI_current->procCxt);
155         _SPI_current->procCxt = NULL;
156
157         /*
158          * Reset result variables, especially SPI_tuptable which is probably
159          * pointing at a just-deleted tuptable
160          */
161         SPI_processed = 0;
162         SPI_lastoid = InvalidOid;
163         SPI_tuptable = NULL;
164
165         /*
166          * After _SPI_begin_call _SPI_connected == _SPI_curid. Now we are closing
167          * connection to SPI and returning to upper Executor and so _SPI_connected
168          * must be equal to _SPI_curid.
169          */
170         _SPI_connected--;
171         _SPI_curid--;
172         if (_SPI_connected == -1)
173                 _SPI_current = NULL;
174         else
175                 _SPI_current = &(_SPI_stack[_SPI_connected]);
176
177         return SPI_OK_FINISH;
178 }
179
180 /*
181  * Clean up SPI state at transaction commit or abort.
182  */
183 void
184 AtEOXact_SPI(bool isCommit)
185 {
186         /*
187          * Note that memory contexts belonging to SPI stack entries will be freed
188          * automatically, so we can ignore them here.  We just need to restore our
189          * static variables to initial state.
190          */
191         if (isCommit && _SPI_connected != -1)
192                 ereport(WARNING,
193                                 (errcode(ERRCODE_WARNING),
194                                  errmsg("transaction left non-empty SPI stack"),
195                                  errhint("Check for missing \"SPI_finish\" calls.")));
196
197         _SPI_current = _SPI_stack = NULL;
198         _SPI_stack_depth = 0;
199         _SPI_connected = _SPI_curid = -1;
200         SPI_processed = 0;
201         SPI_lastoid = InvalidOid;
202         SPI_tuptable = NULL;
203 }
204
205 /*
206  * Clean up SPI state at subtransaction commit or abort.
207  *
208  * During commit, there shouldn't be any unclosed entries remaining from
209  * the current subtransaction; we emit a warning if any are found.
210  */
211 void
212 AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
213 {
214         bool            found = false;
215
216         while (_SPI_connected >= 0)
217         {
218                 _SPI_connection *connection = &(_SPI_stack[_SPI_connected]);
219
220                 if (connection->connectSubid != mySubid)
221                         break;                          /* couldn't be any underneath it either */
222
223                 found = true;
224
225                 /*
226                  * Release procedure memory explicitly (see note in SPI_connect)
227                  */
228                 if (connection->execCxt)
229                 {
230                         MemoryContextDelete(connection->execCxt);
231                         connection->execCxt = NULL;
232                 }
233                 if (connection->procCxt)
234                 {
235                         MemoryContextDelete(connection->procCxt);
236                         connection->procCxt = NULL;
237                 }
238
239                 /*
240                  * Pop the stack entry and reset global variables.      Unlike
241                  * SPI_finish(), we don't risk switching to memory contexts that might
242                  * be already gone.
243                  */
244                 _SPI_connected--;
245                 _SPI_curid = _SPI_connected;
246                 if (_SPI_connected == -1)
247                         _SPI_current = NULL;
248                 else
249                         _SPI_current = &(_SPI_stack[_SPI_connected]);
250                 SPI_processed = 0;
251                 SPI_lastoid = InvalidOid;
252                 SPI_tuptable = NULL;
253         }
254
255         if (found && isCommit)
256                 ereport(WARNING,
257                                 (errcode(ERRCODE_WARNING),
258                                  errmsg("subtransaction left non-empty SPI stack"),
259                                  errhint("Check for missing \"SPI_finish\" calls.")));
260
261         /*
262          * If we are aborting a subtransaction and there is an open SPI context
263          * surrounding the subxact, clean up to prevent memory leakage.
264          */
265         if (_SPI_current && !isCommit)
266         {
267                 /* free Executor memory the same as _SPI_end_call would do */
268                 MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
269                 /* throw away any partially created tuple-table */
270                 SPI_freetuptable(_SPI_current->tuptable);
271                 _SPI_current->tuptable = NULL;
272         }
273 }
274
275
276 /* Pushes SPI stack to allow recursive SPI calls */
277 void
278 SPI_push(void)
279 {
280         _SPI_curid++;
281 }
282
283 /* Pops SPI stack to allow recursive SPI calls */
284 void
285 SPI_pop(void)
286 {
287         _SPI_curid--;
288 }
289
290 /* Restore state of SPI stack after aborting a subtransaction */
291 void
292 SPI_restore_connection(void)
293 {
294         Assert(_SPI_connected >= 0);
295         _SPI_curid = _SPI_connected - 1;
296 }
297
298 /* Parse, plan, and execute a query string */
299 int
300 SPI_execute(const char *src, bool read_only, long tcount)
301 {
302         _SPI_plan       plan;
303         int                     res;
304
305         if (src == NULL || tcount < 0)
306                 return SPI_ERROR_ARGUMENT;
307
308         res = _SPI_begin_call(true);
309         if (res < 0)
310                 return res;
311
312         memset(&plan, 0, sizeof(_SPI_plan));
313         plan.magic = _SPI_PLAN_MAGIC;
314
315         _SPI_prepare_plan(src, &plan, 0);
316
317         res = _SPI_execute_plan(&plan, NULL, NULL,
318                                                         InvalidSnapshot, InvalidSnapshot,
319                                                         read_only, tcount);
320
321         _SPI_end_call(true);
322         return res;
323 }
324
325 /* Obsolete version of SPI_execute */
326 int
327 SPI_exec(const char *src, long tcount)
328 {
329         return SPI_execute(src, false, tcount);
330 }
331
332 /* Execute a previously prepared plan */
333 int
334 SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
335                                  bool read_only, long tcount)
336 {
337         int                     res;
338
339         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
340                 return SPI_ERROR_ARGUMENT;
341
342         if (plan->nargs > 0 && Values == NULL)
343                 return SPI_ERROR_PARAM;
344
345         res = _SPI_begin_call(true);
346         if (res < 0)
347                 return res;
348
349         res = _SPI_execute_plan(plan,
350                                                         Values, Nulls,
351                                                         InvalidSnapshot, InvalidSnapshot,
352                                                         read_only, tcount);
353
354         _SPI_end_call(true);
355         return res;
356 }
357
358 /* Obsolete version of SPI_execute_plan */
359 int
360 SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount)
361 {
362         return SPI_execute_plan(plan, Values, Nulls, false, tcount);
363 }
364
365 /*
366  * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
367  * the caller to specify exactly which snapshots to use.  This is currently
368  * not documented in spi.sgml because it is only intended for use by RI
369  * triggers.
370  *
371  * Passing snapshot == InvalidSnapshot will select the normal behavior of
372  * fetching a new snapshot for each query.
373  */
374 int
375 SPI_execute_snapshot(SPIPlanPtr plan,
376                                          Datum *Values, const char *Nulls,
377                                          Snapshot snapshot, Snapshot crosscheck_snapshot,
378                                          bool read_only, long tcount)
379 {
380         int                     res;
381
382         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
383                 return SPI_ERROR_ARGUMENT;
384
385         if (plan->nargs > 0 && Values == NULL)
386                 return SPI_ERROR_PARAM;
387
388         res = _SPI_begin_call(true);
389         if (res < 0)
390                 return res;
391
392         res = _SPI_execute_plan(plan,
393                                                         Values, Nulls,
394                                                         snapshot, crosscheck_snapshot,
395                                                         read_only, tcount);
396
397         _SPI_end_call(true);
398         return res;
399 }
400
401 SPIPlanPtr
402 SPI_prepare(const char *src, int nargs, Oid *argtypes)
403 {
404         return SPI_prepare_cursor(src, nargs, argtypes, 0);
405 }
406
407 SPIPlanPtr
408 SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
409                                    int cursorOptions)
410 {
411         _SPI_plan       plan;
412         SPIPlanPtr      result;
413
414         if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL))
415         {
416                 SPI_result = SPI_ERROR_ARGUMENT;
417                 return NULL;
418         }
419
420         SPI_result = _SPI_begin_call(true);
421         if (SPI_result < 0)
422                 return NULL;
423
424         memset(&plan, 0, sizeof(_SPI_plan));
425         plan.magic = _SPI_PLAN_MAGIC;
426         plan.nargs = nargs;
427         plan.argtypes = argtypes;
428
429         _SPI_prepare_plan(src, &plan, cursorOptions);
430
431         /* copy plan to procedure context */
432         result = _SPI_copy_plan(&plan, _SPI_current->procCxt);
433
434         _SPI_end_call(true);
435
436         return result;
437 }
438
439 SPIPlanPtr
440 SPI_saveplan(SPIPlanPtr plan)
441 {
442         SPIPlanPtr      newplan;
443
444         /* We don't currently support copying an already-saved plan */
445         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || plan->saved)
446         {
447                 SPI_result = SPI_ERROR_ARGUMENT;
448                 return NULL;
449         }
450
451         SPI_result = _SPI_begin_call(false);            /* don't change context */
452         if (SPI_result < 0)
453                 return NULL;
454
455         newplan = _SPI_save_plan(plan);
456
457         _SPI_curid--;
458         SPI_result = 0;
459
460         return newplan;
461 }
462
463 int
464 SPI_freeplan(SPIPlanPtr plan)
465 {
466         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
467                 return SPI_ERROR_ARGUMENT;
468
469         /* If plancache.c owns the plancache entries, we must release them */
470         if (plan->saved)
471         {
472                 ListCell   *lc;
473
474                 foreach(lc, plan->plancache_list)
475                 {
476                         CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
477
478                         DropCachedPlan(plansource);
479                 }
480         }
481
482         /* Now get rid of the _SPI_plan and subsidiary data in its plancxt */
483         MemoryContextDelete(plan->plancxt);
484
485         return 0;
486 }
487
488 HeapTuple
489 SPI_copytuple(HeapTuple tuple)
490 {
491         MemoryContext oldcxt = NULL;
492         HeapTuple       ctuple;
493
494         if (tuple == NULL)
495         {
496                 SPI_result = SPI_ERROR_ARGUMENT;
497                 return NULL;
498         }
499
500         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
501         {
502                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
503                         elog(ERROR, "SPI stack corrupted");
504                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
505         }
506
507         ctuple = heap_copytuple(tuple);
508
509         if (oldcxt)
510                 MemoryContextSwitchTo(oldcxt);
511
512         return ctuple;
513 }
514
515 HeapTupleHeader
516 SPI_returntuple(HeapTuple tuple, TupleDesc tupdesc)
517 {
518         MemoryContext oldcxt = NULL;
519         HeapTupleHeader dtup;
520
521         if (tuple == NULL || tupdesc == NULL)
522         {
523                 SPI_result = SPI_ERROR_ARGUMENT;
524                 return NULL;
525         }
526
527         /* For RECORD results, make sure a typmod has been assigned */
528         if (tupdesc->tdtypeid == RECORDOID &&
529                 tupdesc->tdtypmod < 0)
530                 assign_record_type_typmod(tupdesc);
531
532         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
533         {
534                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
535                         elog(ERROR, "SPI stack corrupted");
536                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
537         }
538
539         dtup = (HeapTupleHeader) palloc(tuple->t_len);
540         memcpy((char *) dtup, (char *) tuple->t_data, tuple->t_len);
541
542         HeapTupleHeaderSetDatumLength(dtup, tuple->t_len);
543         HeapTupleHeaderSetTypeId(dtup, tupdesc->tdtypeid);
544         HeapTupleHeaderSetTypMod(dtup, tupdesc->tdtypmod);
545
546         if (oldcxt)
547                 MemoryContextSwitchTo(oldcxt);
548
549         return dtup;
550 }
551
552 HeapTuple
553 SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum,
554                                 Datum *Values, const char *Nulls)
555 {
556         MemoryContext oldcxt = NULL;
557         HeapTuple       mtuple;
558         int                     numberOfAttributes;
559         Datum      *v;
560         char       *n;
561         int                     i;
562
563         if (rel == NULL || tuple == NULL || natts < 0 || attnum == NULL || Values == NULL)
564         {
565                 SPI_result = SPI_ERROR_ARGUMENT;
566                 return NULL;
567         }
568
569         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
570         {
571                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
572                         elog(ERROR, "SPI stack corrupted");
573                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
574         }
575         SPI_result = 0;
576         numberOfAttributes = rel->rd_att->natts;
577         v = (Datum *) palloc(numberOfAttributes * sizeof(Datum));
578         n = (char *) palloc(numberOfAttributes * sizeof(char));
579
580         /* fetch old values and nulls */
581         heap_deformtuple(tuple, rel->rd_att, v, n);
582
583         /* replace values and nulls */
584         for (i = 0; i < natts; i++)
585         {
586                 if (attnum[i] <= 0 || attnum[i] > numberOfAttributes)
587                         break;
588                 v[attnum[i] - 1] = Values[i];
589                 n[attnum[i] - 1] = (Nulls && Nulls[i] == 'n') ? 'n' : ' ';
590         }
591
592         if (i == natts)                         /* no errors in *attnum */
593         {
594                 mtuple = heap_formtuple(rel->rd_att, v, n);
595
596                 /*
597                  * copy the identification info of the old tuple: t_ctid, t_self, and
598                  * OID (if any)
599                  */
600                 mtuple->t_data->t_ctid = tuple->t_data->t_ctid;
601                 mtuple->t_self = tuple->t_self;
602                 mtuple->t_tableOid = tuple->t_tableOid;
603                 if (rel->rd_att->tdhasoid)
604                         HeapTupleSetOid(mtuple, HeapTupleGetOid(tuple));
605         }
606         else
607         {
608                 mtuple = NULL;
609                 SPI_result = SPI_ERROR_NOATTRIBUTE;
610         }
611
612         pfree(v);
613         pfree(n);
614
615         if (oldcxt)
616                 MemoryContextSwitchTo(oldcxt);
617
618         return mtuple;
619 }
620
621 int
622 SPI_fnumber(TupleDesc tupdesc, const char *fname)
623 {
624         int                     res;
625         Form_pg_attribute sysatt;
626
627         for (res = 0; res < tupdesc->natts; res++)
628         {
629                 if (namestrcmp(&tupdesc->attrs[res]->attname, fname) == 0)
630                         return res + 1;
631         }
632
633         sysatt = SystemAttributeByName(fname, true /* "oid" will be accepted */ );
634         if (sysatt != NULL)
635                 return sysatt->attnum;
636
637         /* SPI_ERROR_NOATTRIBUTE is different from all sys column numbers */
638         return SPI_ERROR_NOATTRIBUTE;
639 }
640
641 char *
642 SPI_fname(TupleDesc tupdesc, int fnumber)
643 {
644         Form_pg_attribute att;
645
646         SPI_result = 0;
647
648         if (fnumber > tupdesc->natts || fnumber == 0 ||
649                 fnumber <= FirstLowInvalidHeapAttributeNumber)
650         {
651                 SPI_result = SPI_ERROR_NOATTRIBUTE;
652                 return NULL;
653         }
654
655         if (fnumber > 0)
656                 att = tupdesc->attrs[fnumber - 1];
657         else
658                 att = SystemAttributeDefinition(fnumber, true);
659
660         return pstrdup(NameStr(att->attname));
661 }
662
663 char *
664 SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
665 {
666         char       *result;
667         Datum           origval,
668                                 val;
669         bool            isnull;
670         Oid                     typoid,
671                                 foutoid;
672         bool            typisvarlena;
673
674         SPI_result = 0;
675
676         if (fnumber > HeapTupleHeaderGetNatts(tuple->t_data) || fnumber == 0 ||
677                 fnumber <= FirstLowInvalidHeapAttributeNumber)
678         {
679                 SPI_result = SPI_ERROR_NOATTRIBUTE;
680                 return NULL;
681         }
682
683         origval = heap_getattr(tuple, fnumber, tupdesc, &isnull);
684         if (isnull)
685                 return NULL;
686
687         if (fnumber > 0)
688                 typoid = tupdesc->attrs[fnumber - 1]->atttypid;
689         else
690                 typoid = (SystemAttributeDefinition(fnumber, true))->atttypid;
691
692         getTypeOutputInfo(typoid, &foutoid, &typisvarlena);
693
694         /*
695          * If we have a toasted datum, forcibly detoast it here to avoid memory
696          * leakage inside the type's output routine.
697          */
698         if (typisvarlena)
699                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
700         else
701                 val = origval;
702
703         result = OidOutputFunctionCall(foutoid, val);
704
705         /* Clean up detoasted copy, if any */
706         if (val != origval)
707                 pfree(DatumGetPointer(val));
708
709         return result;
710 }
711
712 Datum
713 SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
714 {
715         SPI_result = 0;
716
717         if (fnumber > HeapTupleHeaderGetNatts(tuple->t_data) || fnumber == 0 ||
718                 fnumber <= FirstLowInvalidHeapAttributeNumber)
719         {
720                 SPI_result = SPI_ERROR_NOATTRIBUTE;
721                 *isnull = true;
722                 return (Datum) NULL;
723         }
724
725         return heap_getattr(tuple, fnumber, tupdesc, isnull);
726 }
727
728 char *
729 SPI_gettype(TupleDesc tupdesc, int fnumber)
730 {
731         Oid                     typoid;
732         HeapTuple       typeTuple;
733         char       *result;
734
735         SPI_result = 0;
736
737         if (fnumber > tupdesc->natts || fnumber == 0 ||
738                 fnumber <= FirstLowInvalidHeapAttributeNumber)
739         {
740                 SPI_result = SPI_ERROR_NOATTRIBUTE;
741                 return NULL;
742         }
743
744         if (fnumber > 0)
745                 typoid = tupdesc->attrs[fnumber - 1]->atttypid;
746         else
747                 typoid = (SystemAttributeDefinition(fnumber, true))->atttypid;
748
749         typeTuple = SearchSysCache(TYPEOID,
750                                                            ObjectIdGetDatum(typoid),
751                                                            0, 0, 0);
752
753         if (!HeapTupleIsValid(typeTuple))
754         {
755                 SPI_result = SPI_ERROR_TYPUNKNOWN;
756                 return NULL;
757         }
758
759         result = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(typeTuple))->typname));
760         ReleaseSysCache(typeTuple);
761         return result;
762 }
763
764 Oid
765 SPI_gettypeid(TupleDesc tupdesc, int fnumber)
766 {
767         SPI_result = 0;
768
769         if (fnumber > tupdesc->natts || fnumber == 0 ||
770                 fnumber <= FirstLowInvalidHeapAttributeNumber)
771         {
772                 SPI_result = SPI_ERROR_NOATTRIBUTE;
773                 return InvalidOid;
774         }
775
776         if (fnumber > 0)
777                 return tupdesc->attrs[fnumber - 1]->atttypid;
778         else
779                 return (SystemAttributeDefinition(fnumber, true))->atttypid;
780 }
781
782 char *
783 SPI_getrelname(Relation rel)
784 {
785         return pstrdup(RelationGetRelationName(rel));
786 }
787
788 char *
789 SPI_getnspname(Relation rel)
790 {
791         return get_namespace_name(RelationGetNamespace(rel));
792 }
793
794 void *
795 SPI_palloc(Size size)
796 {
797         MemoryContext oldcxt = NULL;
798         void       *pointer;
799
800         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
801         {
802                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
803                         elog(ERROR, "SPI stack corrupted");
804                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
805         }
806
807         pointer = palloc(size);
808
809         if (oldcxt)
810                 MemoryContextSwitchTo(oldcxt);
811
812         return pointer;
813 }
814
815 void *
816 SPI_repalloc(void *pointer, Size size)
817 {
818         /* No longer need to worry which context chunk was in... */
819         return repalloc(pointer, size);
820 }
821
822 void
823 SPI_pfree(void *pointer)
824 {
825         /* No longer need to worry which context chunk was in... */
826         pfree(pointer);
827 }
828
829 void
830 SPI_freetuple(HeapTuple tuple)
831 {
832         /* No longer need to worry which context tuple was in... */
833         heap_freetuple(tuple);
834 }
835
836 void
837 SPI_freetuptable(SPITupleTable *tuptable)
838 {
839         if (tuptable != NULL)
840                 MemoryContextDelete(tuptable->tuptabcxt);
841 }
842
843
844 /*
845  * SPI_cursor_open()
846  *
847  *      Open a prepared SPI plan as a portal
848  */
849 Portal
850 SPI_cursor_open(const char *name, SPIPlanPtr plan,
851                                 Datum *Values, const char *Nulls,
852                                 bool read_only)
853 {
854         CachedPlanSource *plansource;
855         CachedPlan *cplan;
856         List       *stmt_list;
857         ParamListInfo paramLI;
858         Snapshot        snapshot;
859         MemoryContext oldcontext;
860         Portal          portal;
861         int                     k;
862
863         /*
864          * Check that the plan is something the Portal code will special-case as
865          * returning one tupleset.
866          */
867         if (!SPI_is_cursor_plan(plan))
868         {
869                 /* try to give a good error message */
870                 if (list_length(plan->plancache_list) != 1)
871                         ereport(ERROR,
872                                         (errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
873                                          errmsg("cannot open multi-query plan as cursor")));
874                 plansource = (CachedPlanSource *) linitial(plan->plancache_list);
875                 ereport(ERROR,
876                                 (errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
877                 /* translator: %s is name of a SQL command, eg INSERT */
878                                  errmsg("cannot open %s query as cursor",
879                                                 plansource->commandTag)));
880         }
881
882         Assert(list_length(plan->plancache_list) == 1);
883         plansource = (CachedPlanSource *) linitial(plan->plancache_list);
884
885         /* Reset SPI result (note we deliberately don't touch lastoid) */
886         SPI_processed = 0;
887         SPI_tuptable = NULL;
888         _SPI_current->processed = 0;
889         _SPI_current->tuptable = NULL;
890
891         /* Create the portal */
892         if (name == NULL || name[0] == '\0')
893         {
894                 /* Use a random nonconflicting name */
895                 portal = CreateNewPortal();
896         }
897         else
898         {
899                 /* In this path, error if portal of same name already exists */
900                 portal = CreatePortal(name, false, false);
901         }
902
903         /* If the plan has parameters, copy them into the portal */
904         if (plan->nargs > 0)
905         {
906                 oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
907                 /* sizeof(ParamListInfoData) includes the first array element */
908                 paramLI = (ParamListInfo) palloc(sizeof(ParamListInfoData) +
909                                                           (plan->nargs - 1) *sizeof(ParamExternData));
910                 paramLI->numParams = plan->nargs;
911
912                 for (k = 0; k < plan->nargs; k++)
913                 {
914                         ParamExternData *prm = &paramLI->params[k];
915
916                         prm->ptype = plan->argtypes[k];
917                         prm->pflags = 0;
918                         prm->isnull = (Nulls && Nulls[k] == 'n');
919                         if (prm->isnull)
920                         {
921                                 /* nulls just copy */
922                                 prm->value = Values[k];
923                         }
924                         else
925                         {
926                                 /* pass-by-ref values must be copied into portal context */
927                                 int16           paramTypLen;
928                                 bool            paramTypByVal;
929
930                                 get_typlenbyval(prm->ptype, &paramTypLen, &paramTypByVal);
931                                 prm->value = datumCopy(Values[k],
932                                                                            paramTypByVal, paramTypLen);
933                         }
934                 }
935                 MemoryContextSwitchTo(oldcontext);
936         }
937         else
938                 paramLI = NULL;
939
940         if (plan->saved)
941         {
942                 /* Replan if needed, and increment plan refcount for portal */
943                 cplan = RevalidateCachedPlan(plansource, false);
944                 stmt_list = cplan->stmt_list;
945         }
946         else
947         {
948                 /* No replan, but copy the plan into the portal's context */
949                 oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
950                 stmt_list = copyObject(plansource->plan->stmt_list);
951                 MemoryContextSwitchTo(oldcontext);
952                 cplan = NULL;                   /* portal shouldn't depend on cplan */
953         }
954
955         /*
956          * Set up the portal.
957          */
958         PortalDefineQuery(portal,
959                                           NULL,         /* no statement name */
960                                           plansource->query_string,
961                                           plansource->commandTag,
962                                           stmt_list,
963                                           cplan);
964
965         /*
966          * Set up options for portal.
967          */
968         portal->cursorOptions &= ~(CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL);
969         if (list_length(stmt_list) == 1 &&
970                 IsA((Node *) linitial(stmt_list), PlannedStmt) &&
971                 ExecSupportsBackwardScan(((PlannedStmt *) linitial(stmt_list))->planTree))
972                 portal->cursorOptions |= CURSOR_OPT_SCROLL;
973         else
974                 portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
975
976         /*
977          * If told to be read-only, we'd better check for read-only queries.
978          * This can't be done earlier because we need to look at the finished,
979          * planned queries.  (In particular, we don't want to do it between
980          * RevalidateCachedPlan and PortalDefineQuery, because throwing an error
981          * between those steps would result in leaking our plancache refcount.)
982          */
983         if (read_only)
984         {
985                 ListCell   *lc;
986
987                 foreach(lc, stmt_list)
988                 {
989                         Node   *pstmt = (Node *) lfirst(lc);
990
991                         if (!CommandIsReadOnly(pstmt))
992                                 ereport(ERROR,
993                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
994                                                  /* translator: %s is a SQL statement name */
995                                                  errmsg("%s is not allowed in a non-volatile function",
996                                                                 CreateCommandTag(pstmt))));
997                 }
998         }
999
1000         /*
1001          * Set up the snapshot to use.  (PortalStart will do CopySnapshot, so we
1002          * skip that here.)
1003          */
1004         if (read_only)
1005                 snapshot = ActiveSnapshot;
1006         else
1007         {
1008                 CommandCounterIncrement();
1009                 snapshot = GetTransactionSnapshot();
1010         }
1011
1012         /*
1013          * Start portal execution.
1014          */
1015         PortalStart(portal, paramLI, snapshot);
1016
1017         Assert(portal->strategy != PORTAL_MULTI_QUERY);
1018
1019         /* Return the created portal */
1020         return portal;
1021 }
1022
1023
1024 /*
1025  * SPI_cursor_find()
1026  *
1027  *      Find the portal of an existing open cursor
1028  */
1029 Portal
1030 SPI_cursor_find(const char *name)
1031 {
1032         return GetPortalByName(name);
1033 }
1034
1035
1036 /*
1037  * SPI_cursor_fetch()
1038  *
1039  *      Fetch rows in a cursor
1040  */
1041 void
1042 SPI_cursor_fetch(Portal portal, bool forward, long count)
1043 {
1044         _SPI_cursor_operation(portal,
1045                                                   forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
1046                                                   CreateDestReceiver(DestSPI, NULL));
1047         /* we know that the DestSPI receiver doesn't need a destroy call */
1048 }
1049
1050
1051 /*
1052  * SPI_cursor_move()
1053  *
1054  *      Move in a cursor
1055  */
1056 void
1057 SPI_cursor_move(Portal portal, bool forward, long count)
1058 {
1059         _SPI_cursor_operation(portal,
1060                                                   forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
1061                                                   None_Receiver);
1062 }
1063
1064
1065 /*
1066  * SPI_scroll_cursor_fetch()
1067  *
1068  *      Fetch rows in a scrollable cursor
1069  */
1070 void
1071 SPI_scroll_cursor_fetch(Portal portal, FetchDirection direction, long count)
1072 {
1073         _SPI_cursor_operation(portal,
1074                                                   direction, count,
1075                                                   CreateDestReceiver(DestSPI, NULL));
1076         /* we know that the DestSPI receiver doesn't need a destroy call */
1077 }
1078
1079
1080 /*
1081  * SPI_scroll_cursor_move()
1082  *
1083  *      Move in a scrollable cursor
1084  */
1085 void
1086 SPI_scroll_cursor_move(Portal portal, FetchDirection direction, long count)
1087 {
1088         _SPI_cursor_operation(portal, direction, count, None_Receiver);
1089 }
1090
1091
1092 /*
1093  * SPI_cursor_close()
1094  *
1095  *      Close a cursor
1096  */
1097 void
1098 SPI_cursor_close(Portal portal)
1099 {
1100         if (!PortalIsValid(portal))
1101                 elog(ERROR, "invalid portal in SPI cursor operation");
1102
1103         PortalDrop(portal, false);
1104 }
1105
1106 /*
1107  * Returns the Oid representing the type id for argument at argIndex. First
1108  * parameter is at index zero.
1109  */
1110 Oid
1111 SPI_getargtypeid(SPIPlanPtr plan, int argIndex)
1112 {
1113         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC ||
1114                 argIndex < 0 || argIndex >= plan->nargs)
1115         {
1116                 SPI_result = SPI_ERROR_ARGUMENT;
1117                 return InvalidOid;
1118         }
1119         return plan->argtypes[argIndex];
1120 }
1121
1122 /*
1123  * Returns the number of arguments for the prepared plan.
1124  */
1125 int
1126 SPI_getargcount(SPIPlanPtr plan)
1127 {
1128         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
1129         {
1130                 SPI_result = SPI_ERROR_ARGUMENT;
1131                 return -1;
1132         }
1133         return plan->nargs;
1134 }
1135
1136 /*
1137  * Returns true if the plan contains exactly one command
1138  * and that command returns tuples to the caller (eg, SELECT or
1139  * INSERT ... RETURNING, but not SELECT ... INTO). In essence,
1140  * the result indicates if the command can be used with SPI_cursor_open
1141  *
1142  * Parameters
1143  *        plan: A plan previously prepared using SPI_prepare
1144  */
1145 bool
1146 SPI_is_cursor_plan(SPIPlanPtr plan)
1147 {
1148         CachedPlanSource *plansource;
1149         CachedPlan *cplan;
1150
1151         if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC)
1152         {
1153                 SPI_result = SPI_ERROR_ARGUMENT;
1154                 return false;
1155         }
1156
1157         if (list_length(plan->plancache_list) != 1)
1158                 return false;                   /* not exactly 1 pre-rewrite command */
1159         plansource = (CachedPlanSource *) linitial(plan->plancache_list);
1160
1161         if (plan->saved)
1162         {
1163                 /* Make sure the plan is up to date */
1164                 cplan = RevalidateCachedPlan(plansource, true);
1165                 ReleaseCachedPlan(cplan, true);
1166         }
1167
1168         /* Does it return tuples? */
1169         if (plansource->resultDesc)
1170                 return true;
1171
1172         return false;
1173 }
1174
1175 /*
1176  * SPI_result_code_string --- convert any SPI return code to a string
1177  *
1178  * This is often useful in error messages.      Most callers will probably
1179  * only pass negative (error-case) codes, but for generality we recognize
1180  * the success codes too.
1181  */
1182 const char *
1183 SPI_result_code_string(int code)
1184 {
1185         static char buf[64];
1186
1187         switch (code)
1188         {
1189                 case SPI_ERROR_CONNECT:
1190                         return "SPI_ERROR_CONNECT";
1191                 case SPI_ERROR_COPY:
1192                         return "SPI_ERROR_COPY";
1193                 case SPI_ERROR_OPUNKNOWN:
1194                         return "SPI_ERROR_OPUNKNOWN";
1195                 case SPI_ERROR_UNCONNECTED:
1196                         return "SPI_ERROR_UNCONNECTED";
1197                 case SPI_ERROR_ARGUMENT:
1198                         return "SPI_ERROR_ARGUMENT";
1199                 case SPI_ERROR_PARAM:
1200                         return "SPI_ERROR_PARAM";
1201                 case SPI_ERROR_TRANSACTION:
1202                         return "SPI_ERROR_TRANSACTION";
1203                 case SPI_ERROR_NOATTRIBUTE:
1204                         return "SPI_ERROR_NOATTRIBUTE";
1205                 case SPI_ERROR_NOOUTFUNC:
1206                         return "SPI_ERROR_NOOUTFUNC";
1207                 case SPI_ERROR_TYPUNKNOWN:
1208                         return "SPI_ERROR_TYPUNKNOWN";
1209                 case SPI_OK_CONNECT:
1210                         return "SPI_OK_CONNECT";
1211                 case SPI_OK_FINISH:
1212                         return "SPI_OK_FINISH";
1213                 case SPI_OK_FETCH:
1214                         return "SPI_OK_FETCH";
1215                 case SPI_OK_UTILITY:
1216                         return "SPI_OK_UTILITY";
1217                 case SPI_OK_SELECT:
1218                         return "SPI_OK_SELECT";
1219                 case SPI_OK_SELINTO:
1220                         return "SPI_OK_SELINTO";
1221                 case SPI_OK_INSERT:
1222                         return "SPI_OK_INSERT";
1223                 case SPI_OK_DELETE:
1224                         return "SPI_OK_DELETE";
1225                 case SPI_OK_UPDATE:
1226                         return "SPI_OK_UPDATE";
1227                 case SPI_OK_CURSOR:
1228                         return "SPI_OK_CURSOR";
1229                 case SPI_OK_INSERT_RETURNING:
1230                         return "SPI_OK_INSERT_RETURNING";
1231                 case SPI_OK_DELETE_RETURNING:
1232                         return "SPI_OK_DELETE_RETURNING";
1233                 case SPI_OK_UPDATE_RETURNING:
1234                         return "SPI_OK_UPDATE_RETURNING";
1235         }
1236         /* Unrecognized code ... return something useful ... */
1237         sprintf(buf, "Unrecognized SPI code %d", code);
1238         return buf;
1239 }
1240
1241 /* =================== private functions =================== */
1242
1243 /*
1244  * spi_dest_startup
1245  *              Initialize to receive tuples from Executor into SPITupleTable
1246  *              of current SPI procedure
1247  */
1248 void
1249 spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1250 {
1251         SPITupleTable *tuptable;
1252         MemoryContext oldcxt;
1253         MemoryContext tuptabcxt;
1254
1255         /*
1256          * When called by Executor _SPI_curid expected to be equal to
1257          * _SPI_connected
1258          */
1259         if (_SPI_curid != _SPI_connected || _SPI_connected < 0)
1260                 elog(ERROR, "improper call to spi_dest_startup");
1261         if (_SPI_current != &(_SPI_stack[_SPI_curid]))
1262                 elog(ERROR, "SPI stack corrupted");
1263
1264         if (_SPI_current->tuptable != NULL)
1265                 elog(ERROR, "improper call to spi_dest_startup");
1266
1267         oldcxt = _SPI_procmem();        /* switch to procedure memory context */
1268
1269         tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
1270                                                                           "SPI TupTable",
1271                                                                           ALLOCSET_DEFAULT_MINSIZE,
1272                                                                           ALLOCSET_DEFAULT_INITSIZE,
1273                                                                           ALLOCSET_DEFAULT_MAXSIZE);
1274         MemoryContextSwitchTo(tuptabcxt);
1275
1276         _SPI_current->tuptable = tuptable = (SPITupleTable *)
1277                 palloc(sizeof(SPITupleTable));
1278         tuptable->tuptabcxt = tuptabcxt;
1279         tuptable->alloced = tuptable->free = 128;
1280         tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
1281         tuptable->tupdesc = CreateTupleDescCopy(typeinfo);
1282
1283         MemoryContextSwitchTo(oldcxt);
1284 }
1285
1286 /*
1287  * spi_printtup
1288  *              store tuple retrieved by Executor into SPITupleTable
1289  *              of current SPI procedure
1290  */
1291 void
1292 spi_printtup(TupleTableSlot *slot, DestReceiver *self)
1293 {
1294         SPITupleTable *tuptable;
1295         MemoryContext oldcxt;
1296
1297         /*
1298          * When called by Executor _SPI_curid expected to be equal to
1299          * _SPI_connected
1300          */
1301         if (_SPI_curid != _SPI_connected || _SPI_connected < 0)
1302                 elog(ERROR, "improper call to spi_printtup");
1303         if (_SPI_current != &(_SPI_stack[_SPI_curid]))
1304                 elog(ERROR, "SPI stack corrupted");
1305
1306         tuptable = _SPI_current->tuptable;
1307         if (tuptable == NULL)
1308                 elog(ERROR, "improper call to spi_printtup");
1309
1310         oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);
1311
1312         if (tuptable->free == 0)
1313         {
1314                 tuptable->free = 256;
1315                 tuptable->alloced += tuptable->free;
1316                 tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
1317                                                                           tuptable->alloced * sizeof(HeapTuple));
1318         }
1319
1320         tuptable->vals[tuptable->alloced - tuptable->free] =
1321                 ExecCopySlotTuple(slot);
1322         (tuptable->free)--;
1323
1324         MemoryContextSwitchTo(oldcxt);
1325 }
1326
1327 /*
1328  * Static functions
1329  */
1330
1331 /*
1332  * Parse and plan a querystring.
1333  *
1334  * At entry, plan->argtypes and plan->nargs must be valid.
1335  *
1336  * Results are stored into *plan (specifically, plan->plancache_list).
1337  * Note however that the result trees are all in CurrentMemoryContext
1338  * and need to be copied somewhere to survive.
1339  */
1340 static void
1341 _SPI_prepare_plan(const char *src, SPIPlanPtr plan, int cursorOptions)
1342 {
1343         List       *raw_parsetree_list;
1344         List       *plancache_list;
1345         ListCell   *list_item;
1346         ErrorContextCallback spierrcontext;
1347         Oid                *argtypes = plan->argtypes;
1348         int                     nargs = plan->nargs;
1349
1350         /*
1351          * Increment CommandCounter to see changes made by now.  We must do this
1352          * to be sure of seeing any schema changes made by a just-preceding SPI
1353          * command.  (But we don't bother advancing the snapshot, since the
1354          * planner generally operates under SnapshotNow rules anyway.)
1355          */
1356         CommandCounterIncrement();
1357
1358         /*
1359          * Setup error traceback support for ereport()
1360          */
1361         spierrcontext.callback = _SPI_error_callback;
1362         spierrcontext.arg = (void *) src;
1363         spierrcontext.previous = error_context_stack;
1364         error_context_stack = &spierrcontext;
1365
1366         /*
1367          * Parse the request string into a list of raw parse trees.
1368          */
1369         raw_parsetree_list = pg_parse_query(src);
1370
1371         /*
1372          * Do parse analysis and rule rewrite for each raw parsetree, then
1373          * cons up a phony plancache entry for each one.
1374          */
1375         plancache_list = NIL;
1376
1377         foreach(list_item, raw_parsetree_list)
1378         {
1379                 Node       *parsetree = (Node *) lfirst(list_item);
1380                 List       *stmt_list;
1381                 CachedPlanSource *plansource;
1382                 CachedPlan *cplan;
1383
1384                 /* Need a copyObject here to keep parser from modifying raw tree */
1385                 stmt_list = pg_analyze_and_rewrite(copyObject(parsetree),
1386                                                                                    src, argtypes, nargs);
1387                 stmt_list = pg_plan_queries(stmt_list, cursorOptions, NULL, false);
1388
1389                 plansource = (CachedPlanSource *) palloc0(sizeof(CachedPlanSource));
1390                 cplan = (CachedPlan *) palloc0(sizeof(CachedPlan));
1391
1392             plansource->raw_parse_tree = parsetree;
1393                 /* cast-away-const here is a bit ugly, but there's no reason to copy */
1394             plansource->query_string = (char *) src;
1395                 plansource->commandTag = CreateCommandTag(parsetree);
1396                 plansource->param_types = argtypes;
1397                 plansource->num_params = nargs;
1398                 plansource->fully_planned = true;
1399                 plansource->fixed_result = false;
1400                 plansource->resultDesc = PlanCacheComputeResultDesc(stmt_list);
1401                 plansource->plan = cplan;
1402
1403                 cplan->stmt_list = stmt_list;
1404                 cplan->fully_planned = true;
1405
1406                 plancache_list = lappend(plancache_list, plansource);
1407         }
1408
1409         plan->plancache_list = plancache_list;
1410
1411         /*
1412          * Pop the error context stack
1413          */
1414         error_context_stack = spierrcontext.previous;
1415 }
1416
1417 /*
1418  * Execute the given plan with the given parameter values
1419  *
1420  * snapshot: query snapshot to use, or InvalidSnapshot for the normal
1421  *              behavior of taking a new snapshot for each query.
1422  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
1423  * read_only: TRUE for read-only execution (no CommandCounterIncrement)
1424  * tcount: execution tuple-count limit, or 0 for none
1425  */
1426 static int
1427 _SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
1428                                   Snapshot snapshot, Snapshot crosscheck_snapshot,
1429                                   bool read_only, long tcount)
1430 {
1431         volatile int my_res = 0;
1432         volatile uint32 my_processed = 0;
1433         volatile Oid my_lastoid = InvalidOid;
1434         SPITupleTable *volatile my_tuptable = NULL;
1435         volatile int res = 0;
1436         Snapshot        saveActiveSnapshot;
1437
1438         /* Be sure to restore ActiveSnapshot on error exit */
1439         saveActiveSnapshot = ActiveSnapshot;
1440         PG_TRY();
1441         {
1442                 ListCell   *lc1;
1443                 ErrorContextCallback spierrcontext;
1444                 int                     nargs = plan->nargs;
1445                 ParamListInfo paramLI;
1446                 CachedPlan *cplan = NULL;
1447
1448                 /* Convert parameters to form wanted by executor */
1449                 if (nargs > 0)
1450                 {
1451                         int                     k;
1452
1453                         /* sizeof(ParamListInfoData) includes the first array element */
1454                         paramLI = (ParamListInfo) palloc(sizeof(ParamListInfoData) +
1455                                                                            (nargs - 1) *sizeof(ParamExternData));
1456                         paramLI->numParams = nargs;
1457
1458                         for (k = 0; k < nargs; k++)
1459                         {
1460                                 ParamExternData *prm = &paramLI->params[k];
1461
1462                                 prm->value = Values[k];
1463                                 prm->isnull = (Nulls && Nulls[k] == 'n');
1464                                 prm->pflags = 0;
1465                                 prm->ptype = plan->argtypes[k];
1466                         }
1467                 }
1468                 else
1469                         paramLI = NULL;
1470
1471                 /*
1472                  * Setup error traceback support for ereport()
1473                  */
1474                 spierrcontext.callback = _SPI_error_callback;
1475                 spierrcontext.arg = NULL;
1476                 spierrcontext.previous = error_context_stack;
1477                 error_context_stack = &spierrcontext;
1478
1479                 foreach(lc1, plan->plancache_list)
1480                 {
1481                         CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
1482                         List       *stmt_list;
1483                         ListCell   *lc2;
1484
1485                         spierrcontext.arg = (void *) plansource->query_string;
1486
1487                         if (plan->saved)
1488                         {
1489                                 /* Replan if needed, and increment plan refcount locally */
1490                                 cplan = RevalidateCachedPlan(plansource, true);
1491                                 stmt_list = cplan->stmt_list;
1492                         }
1493                         else
1494                         {
1495                                 /* No replan here */
1496                                 cplan = NULL;
1497                                 stmt_list = plansource->plan->stmt_list;
1498                         }
1499
1500                         foreach(lc2, stmt_list)
1501                         {
1502                                 Node       *stmt = (Node *) lfirst(lc2);
1503                                 bool            canSetTag;
1504                                 QueryDesc  *qdesc;
1505                                 DestReceiver *dest;
1506
1507                                 _SPI_current->processed = 0;
1508                                 _SPI_current->lastoid = InvalidOid;
1509                                 _SPI_current->tuptable = NULL;
1510
1511                                 if (IsA(stmt, PlannedStmt))
1512                                 {
1513                                         canSetTag = ((PlannedStmt *) stmt)->canSetTag;
1514                                 }
1515                                 else
1516                                 {
1517                                         /* utilities are canSetTag if only thing in list */
1518                                         canSetTag = (list_length(stmt_list) == 1);
1519
1520                                         if (IsA(stmt, CopyStmt))
1521                                         {
1522                                                 CopyStmt   *cstmt = (CopyStmt *) stmt;
1523
1524                                                 if (cstmt->filename == NULL)
1525                                                 {
1526                                                         my_res = SPI_ERROR_COPY;
1527                                                         goto fail;
1528                                                 }
1529                                         }
1530                                         else if (IsA(stmt, TransactionStmt))
1531                                         {
1532                                                 my_res = SPI_ERROR_TRANSACTION;
1533                                                 goto fail;
1534                                         }
1535                                 }
1536
1537                                 if (read_only && !CommandIsReadOnly(stmt))
1538                                         ereport(ERROR,
1539                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1540                                         /* translator: %s is a SQL statement name */
1541                                            errmsg("%s is not allowed in a non-volatile function",
1542                                                           CreateCommandTag(stmt))));
1543
1544                                 /*
1545                                  * If not read-only mode, advance the command counter before
1546                                  * each command.
1547                                  */
1548                                 if (!read_only)
1549                                         CommandCounterIncrement();
1550
1551                                 dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone,
1552                                                                                   NULL);
1553
1554                                 if (snapshot == InvalidSnapshot)
1555                                 {
1556                                         /*
1557                                          * Default read_only behavior is to use the entry-time
1558                                          * ActiveSnapshot; if read-write, grab a full new snap.
1559                                          */
1560                                         if (read_only)
1561                                                 ActiveSnapshot = CopySnapshot(saveActiveSnapshot);
1562                                         else
1563                                                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1564                                 }
1565                                 else
1566                                 {
1567                                         /*
1568                                          * We interpret read_only with a specified snapshot to be
1569                                          * exactly that snapshot, but read-write means use the
1570                                          * snap with advancing of command ID.
1571                                          */
1572                                         ActiveSnapshot = CopySnapshot(snapshot);
1573                                         if (!read_only)
1574                                                 ActiveSnapshot->curcid = GetCurrentCommandId();
1575                                 }
1576
1577                                 if (IsA(stmt, PlannedStmt))
1578                                 {
1579                                         qdesc = CreateQueryDesc((PlannedStmt *) stmt,
1580                                                                                         ActiveSnapshot,
1581                                                                                         crosscheck_snapshot,
1582                                                                                         dest,
1583                                                                                         paramLI, false);
1584                                         res = _SPI_pquery(qdesc, canSetTag ? tcount : 0);
1585                                         FreeQueryDesc(qdesc);
1586                                 }
1587                                 else
1588                                 {
1589                                         ProcessUtility(stmt,
1590                                                                    plansource->query_string,
1591                                                                    paramLI,
1592                                                                    false,                               /* not top level */
1593                                                                    dest,
1594                                                                    NULL);
1595                                         /* Update "processed" if stmt returned tuples */
1596                                         if (_SPI_current->tuptable)
1597                                                 _SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free;
1598                                         res = SPI_OK_UTILITY;
1599                                 }
1600                                 FreeSnapshot(ActiveSnapshot);
1601                                 ActiveSnapshot = NULL;
1602
1603                                 /*
1604                                  * The last canSetTag query sets the status values returned to
1605                                  * the caller.  Be careful to free any tuptables not returned,
1606                                  * to avoid intratransaction memory leak.
1607                                  */
1608                                 if (canSetTag)
1609                                 {
1610                                         my_processed = _SPI_current->processed;
1611                                         my_lastoid = _SPI_current->lastoid;
1612                                         SPI_freetuptable(my_tuptable);
1613                                         my_tuptable = _SPI_current->tuptable;
1614                                         my_res = res;
1615                                 }
1616                                 else
1617                                 {
1618                                         SPI_freetuptable(_SPI_current->tuptable);
1619                                         _SPI_current->tuptable = NULL;
1620                                 }
1621                                 /* we know that the receiver doesn't need a destroy call */
1622                                 if (res < 0)
1623                                 {
1624                                         my_res = res;
1625                                         goto fail;
1626                                 }
1627                         }
1628
1629                         /* Done with this plan, so release refcount */
1630                         if (cplan)
1631                                 ReleaseCachedPlan(cplan, true);
1632                         cplan = NULL;
1633                 }
1634
1635 fail:
1636
1637                 /* We no longer need the cached plan refcount, if any */
1638                 if (cplan)
1639                         ReleaseCachedPlan(cplan, true);
1640
1641                 /*
1642                  * Pop the error context stack
1643                  */
1644                 error_context_stack = spierrcontext.previous;
1645         }
1646         PG_CATCH();
1647         {
1648                 /* Restore global vars and propagate error */
1649                 ActiveSnapshot = saveActiveSnapshot;
1650                 PG_RE_THROW();
1651         }
1652         PG_END_TRY();
1653
1654         ActiveSnapshot = saveActiveSnapshot;
1655
1656         /* Save results for caller */
1657         SPI_processed = my_processed;
1658         SPI_lastoid = my_lastoid;
1659         SPI_tuptable = my_tuptable;
1660
1661         /* tuptable now is caller's responsibility, not SPI's */
1662         _SPI_current->tuptable = NULL;
1663
1664         /*
1665          * If none of the queries had canSetTag, we return the last query's result
1666          * code, but not its auxiliary results (for backwards compatibility).
1667          */
1668         if (my_res == 0)
1669                 my_res = res;
1670
1671         return my_res;
1672 }
1673
1674 static int
1675 _SPI_pquery(QueryDesc *queryDesc, long tcount)
1676 {
1677         int                     operation = queryDesc->operation;
1678         int                     res;
1679
1680         switch (operation)
1681         {
1682                 case CMD_SELECT:
1683                         if (queryDesc->plannedstmt->into)               /* select into table? */
1684                                 res = SPI_OK_SELINTO;
1685                         else if (queryDesc->dest->mydest != DestSPI)
1686                         {
1687                                 /* Don't return SPI_OK_SELECT if we're discarding result */
1688                                 res = SPI_OK_UTILITY;
1689                         }
1690                         else
1691                                 res = SPI_OK_SELECT;
1692                         break;
1693                 case CMD_INSERT:
1694                         if (queryDesc->plannedstmt->returningLists)
1695                                 res = SPI_OK_INSERT_RETURNING;
1696                         else
1697                                 res = SPI_OK_INSERT;
1698                         break;
1699                 case CMD_DELETE:
1700                         if (queryDesc->plannedstmt->returningLists)
1701                                 res = SPI_OK_DELETE_RETURNING;
1702                         else
1703                                 res = SPI_OK_DELETE;
1704                         break;
1705                 case CMD_UPDATE:
1706                         if (queryDesc->plannedstmt->returningLists)
1707                                 res = SPI_OK_UPDATE_RETURNING;
1708                         else
1709                                 res = SPI_OK_UPDATE;
1710                         break;
1711                 default:
1712                         return SPI_ERROR_OPUNKNOWN;
1713         }
1714
1715 #ifdef SPI_EXECUTOR_STATS
1716         if (ShowExecutorStats)
1717                 ResetUsage();
1718 #endif
1719
1720         AfterTriggerBeginQuery();
1721
1722         ExecutorStart(queryDesc, 0);
1723
1724         ExecutorRun(queryDesc, ForwardScanDirection, tcount);
1725
1726         _SPI_current->processed = queryDesc->estate->es_processed;
1727         _SPI_current->lastoid = queryDesc->estate->es_lastoid;
1728
1729         if ((res == SPI_OK_SELECT || queryDesc->plannedstmt->returningLists) &&
1730                 queryDesc->dest->mydest == DestSPI)
1731         {
1732                 if (_SPI_checktuples())
1733                         elog(ERROR, "consistency check on SPI tuple count failed");
1734         }
1735
1736         /* Take care of any queued AFTER triggers */
1737         AfterTriggerEndQuery(queryDesc->estate);
1738
1739         ExecutorEnd(queryDesc);
1740
1741 #ifdef SPI_EXECUTOR_STATS
1742         if (ShowExecutorStats)
1743                 ShowUsage("SPI EXECUTOR STATS");
1744 #endif
1745
1746         return res;
1747 }
1748
1749 /*
1750  * _SPI_error_callback
1751  *
1752  * Add context information when a query invoked via SPI fails
1753  */
1754 static void
1755 _SPI_error_callback(void *arg)
1756 {
1757         const char *query = (const char *) arg;
1758         int                     syntaxerrposition;
1759
1760         /*
1761          * If there is a syntax error position, convert to internal syntax error;
1762          * otherwise treat the query as an item of context stack
1763          */
1764         syntaxerrposition = geterrposition();
1765         if (syntaxerrposition > 0)
1766         {
1767                 errposition(0);
1768                 internalerrposition(syntaxerrposition);
1769                 internalerrquery(query);
1770         }
1771         else
1772                 errcontext("SQL statement \"%s\"", query);
1773 }
1774
1775 /*
1776  * _SPI_cursor_operation()
1777  *
1778  *      Do a FETCH or MOVE in a cursor
1779  */
1780 static void
1781 _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
1782                                           DestReceiver *dest)
1783 {
1784         long            nfetched;
1785
1786         /* Check that the portal is valid */
1787         if (!PortalIsValid(portal))
1788                 elog(ERROR, "invalid portal in SPI cursor operation");
1789
1790         /* Push the SPI stack */
1791         if (_SPI_begin_call(true) < 0)
1792                 elog(ERROR, "SPI cursor operation called while not connected");
1793
1794         /* Reset the SPI result (note we deliberately don't touch lastoid) */
1795         SPI_processed = 0;
1796         SPI_tuptable = NULL;
1797         _SPI_current->processed = 0;
1798         _SPI_current->tuptable = NULL;
1799
1800         /* Run the cursor */
1801         nfetched = PortalRunFetch(portal,
1802                                                           direction,
1803                                                           count,
1804                                                           dest);
1805
1806         /*
1807          * Think not to combine this store with the preceding function call. If
1808          * the portal contains calls to functions that use SPI, then SPI_stack is
1809          * likely to move around while the portal runs.  When control returns,
1810          * _SPI_current will point to the correct stack entry... but the pointer
1811          * may be different than it was beforehand. So we must be sure to re-fetch
1812          * the pointer after the function call completes.
1813          */
1814         _SPI_current->processed = nfetched;
1815
1816         if (dest->mydest == DestSPI && _SPI_checktuples())
1817                 elog(ERROR, "consistency check on SPI tuple count failed");
1818
1819         /* Put the result into place for access by caller */
1820         SPI_processed = _SPI_current->processed;
1821         SPI_tuptable = _SPI_current->tuptable;
1822
1823         /* tuptable now is caller's responsibility, not SPI's */
1824         _SPI_current->tuptable = NULL;
1825
1826         /* Pop the SPI stack */
1827         _SPI_end_call(true);
1828 }
1829
1830
1831 static MemoryContext
1832 _SPI_execmem(void)
1833 {
1834         return MemoryContextSwitchTo(_SPI_current->execCxt);
1835 }
1836
1837 static MemoryContext
1838 _SPI_procmem(void)
1839 {
1840         return MemoryContextSwitchTo(_SPI_current->procCxt);
1841 }
1842
1843 /*
1844  * _SPI_begin_call: begin a SPI operation within a connected procedure
1845  */
1846 static int
1847 _SPI_begin_call(bool execmem)
1848 {
1849         if (_SPI_curid + 1 != _SPI_connected)
1850                 return SPI_ERROR_UNCONNECTED;
1851         _SPI_curid++;
1852         if (_SPI_current != &(_SPI_stack[_SPI_curid]))
1853                 elog(ERROR, "SPI stack corrupted");
1854
1855         if (execmem)                            /* switch to the Executor memory context */
1856                 _SPI_execmem();
1857
1858         return 0;
1859 }
1860
1861 /*
1862  * _SPI_end_call: end a SPI operation within a connected procedure
1863  *
1864  * Note: this currently has no failure return cases, so callers don't check
1865  */
1866 static int
1867 _SPI_end_call(bool procmem)
1868 {
1869         /*
1870          * We're returning to procedure where _SPI_curid == _SPI_connected - 1
1871          */
1872         _SPI_curid--;
1873
1874         if (procmem)                            /* switch to the procedure memory context */
1875         {
1876                 _SPI_procmem();
1877                 /* and free Executor memory */
1878                 MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
1879         }
1880
1881         return 0;
1882 }
1883
1884 static bool
1885 _SPI_checktuples(void)
1886 {
1887         uint32          processed = _SPI_current->processed;
1888         SPITupleTable *tuptable = _SPI_current->tuptable;
1889         bool            failed = false;
1890
1891         if (tuptable == NULL)           /* spi_dest_startup was not called */
1892                 failed = true;
1893         else if (processed != (tuptable->alloced - tuptable->free))
1894                 failed = true;
1895
1896         return failed;
1897 }
1898
1899 /*
1900  * Make an "unsaved" copy of the given plan, in a child context of parentcxt.
1901  */
1902 static SPIPlanPtr
1903 _SPI_copy_plan(SPIPlanPtr plan, MemoryContext parentcxt)
1904 {
1905         SPIPlanPtr      newplan;
1906         MemoryContext plancxt;
1907         MemoryContext oldcxt;
1908         ListCell   *lc;
1909
1910         Assert(!plan->saved);           /* not currently supported */
1911
1912         /*
1913          * Create a memory context for the plan.  We don't expect the plan to be
1914          * very large, so use smaller-than-default alloc parameters.
1915          */
1916         plancxt = AllocSetContextCreate(parentcxt,
1917                                                                         "SPI Plan",
1918                                                                         ALLOCSET_SMALL_MINSIZE,
1919                                                                         ALLOCSET_SMALL_INITSIZE,
1920                                                                         ALLOCSET_SMALL_MAXSIZE);
1921         oldcxt = MemoryContextSwitchTo(plancxt);
1922
1923         /* Copy the SPI plan into its own context */
1924         newplan = (SPIPlanPtr) palloc(sizeof(_SPI_plan));
1925         newplan->magic = _SPI_PLAN_MAGIC;
1926         newplan->saved = false;
1927         newplan->plancache_list = NIL;
1928         newplan->plancxt = plancxt;
1929         newplan->nargs = plan->nargs;
1930         if (plan->nargs > 0)
1931         {
1932                 newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
1933                 memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid));
1934         }
1935         else
1936                 newplan->argtypes = NULL;
1937
1938         foreach(lc, plan->plancache_list)
1939         {
1940                 CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
1941                 CachedPlanSource *newsource;
1942                 CachedPlan *cplan;
1943                 CachedPlan *newcplan;
1944
1945                 /* Note: we assume we don't need to revalidate the plan */
1946                 cplan = plansource->plan;
1947
1948                 newsource = (CachedPlanSource *) palloc0(sizeof(CachedPlanSource));
1949                 newcplan = (CachedPlan *) palloc0(sizeof(CachedPlan));
1950
1951             newsource->raw_parse_tree = copyObject(plansource->raw_parse_tree);
1952             newsource->query_string = pstrdup(plansource->query_string);
1953                 newsource->commandTag = plansource->commandTag;
1954                 newsource->param_types = newplan->argtypes;
1955                 newsource->num_params = newplan->nargs;
1956                 newsource->fully_planned = plansource->fully_planned;
1957                 newsource->fixed_result = plansource->fixed_result;
1958                 if (plansource->resultDesc)
1959                         newsource->resultDesc = CreateTupleDescCopy(plansource->resultDesc);
1960                 newsource->plan = newcplan;
1961
1962                 newcplan->stmt_list = copyObject(cplan->stmt_list);
1963                 newcplan->fully_planned = cplan->fully_planned;
1964
1965                 newplan->plancache_list = lappend(newplan->plancache_list, newsource);
1966         }
1967
1968         MemoryContextSwitchTo(oldcxt);
1969
1970         return newplan;
1971 }
1972
1973 /*
1974  * Make a "saved" copy of the given plan, entrusting everything to plancache.c
1975  */
1976 static SPIPlanPtr
1977 _SPI_save_plan(SPIPlanPtr plan)
1978 {
1979         SPIPlanPtr      newplan;
1980         MemoryContext plancxt;
1981         MemoryContext oldcxt;
1982         ListCell   *lc;
1983
1984         Assert(!plan->saved);           /* not currently supported */
1985
1986         /*
1987          * Create a memory context for the plan.  We don't expect the plan to be
1988          * very large, so use smaller-than-default alloc parameters.
1989          */
1990         plancxt = AllocSetContextCreate(CacheMemoryContext,
1991                                                                         "SPI Plan",
1992                                                                         ALLOCSET_SMALL_MINSIZE,
1993                                                                         ALLOCSET_SMALL_INITSIZE,
1994                                                                         ALLOCSET_SMALL_MAXSIZE);
1995         oldcxt = MemoryContextSwitchTo(plancxt);
1996
1997         /* Copy the SPI plan into its own context */
1998         newplan = (SPIPlanPtr) palloc(sizeof(_SPI_plan));
1999         newplan->magic = _SPI_PLAN_MAGIC;
2000         newplan->saved = true;
2001         newplan->plancache_list = NIL;
2002         newplan->plancxt = plancxt;
2003         newplan->nargs = plan->nargs;
2004         if (plan->nargs > 0)
2005         {
2006                 newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
2007                 memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid));
2008         }
2009         else
2010                 newplan->argtypes = NULL;
2011
2012         foreach(lc, plan->plancache_list)
2013         {
2014                 CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
2015                 CachedPlanSource *newsource;
2016                 CachedPlan *cplan;
2017
2018                 /* Note: we assume we don't need to revalidate the plan */
2019                 cplan = plansource->plan;
2020
2021                 newsource = CreateCachedPlan(plansource->raw_parse_tree,
2022                                                                          plansource->query_string,
2023                                                                          plansource->commandTag,
2024                                                                          newplan->argtypes,
2025                                                                          newplan->nargs,
2026                                                                          cplan->stmt_list,
2027                                                                          true,
2028                                                                          false);
2029
2030                 newplan->plancache_list = lappend(newplan->plancache_list, newsource);
2031         }
2032
2033         MemoryContextSwitchTo(oldcxt);
2034
2035         return newplan;
2036 }