]> granicus.if.org Git - postgresql/blob - src/backend/tcop/pquery.c
PortalRun must guard against the possibility that the portal it's
[postgresql] / src / backend / tcop / pquery.c
1 /*-------------------------------------------------------------------------
2  *
3  * pquery.c
4  *        POSTGRES process query command code
5  *
6  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.88 2004/10/04 21:52:15 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "commands/trigger.h"
19 #include "executor/executor.h"
20 #include "miscadmin.h"
21 #include "tcop/tcopprot.h"
22 #include "tcop/pquery.h"
23 #include "tcop/utility.h"
24 #include "utils/guc.h"
25 #include "utils/memutils.h"
26
27
28 /*
29  * ActivePortal is the currently executing Portal (the most closely nested,
30  * if there are several).
31  */
32 Portal          ActivePortal = NULL;
33
34
35 static void ProcessQuery(Query *parsetree,
36                          Plan *plan,
37                          ParamListInfo params,
38                          DestReceiver *dest,
39                          char *completionTag);
40 static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
41                          DestReceiver *dest);
42 static long PortalRunSelect(Portal portal, bool forward, long count,
43                                 DestReceiver *dest);
44 static void PortalRunUtility(Portal portal, Query *query,
45                                  DestReceiver *dest, char *completionTag);
46 static void PortalRunMulti(Portal portal,
47                            DestReceiver *dest, DestReceiver *altdest,
48                            char *completionTag);
49 static long DoPortalRunFetch(Portal portal,
50                                  FetchDirection fdirection,
51                                  long count,
52                                  DestReceiver *dest);
53 static void DoPortalRewind(Portal portal);
54
55
56 /*
57  * CreateQueryDesc
58  */
59 QueryDesc *
60 CreateQueryDesc(Query *parsetree,
61                                 Plan *plantree,
62                                 Snapshot snapshot,
63                                 Snapshot crosscheck_snapshot,
64                                 DestReceiver *dest,
65                                 ParamListInfo params,
66                                 bool doInstrument)
67 {
68         QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
69
70         qd->operation = parsetree->commandType;         /* operation */
71         qd->parsetree = parsetree;      /* parse tree */
72         qd->plantree = plantree;        /* plan */
73         qd->snapshot = snapshot;        /* snapshot */
74         qd->crosscheck_snapshot = crosscheck_snapshot; /* RI check snapshot */
75         qd->dest = dest;                        /* output dest */
76         qd->params = params;            /* parameter values passed into query */
77         qd->doInstrument = doInstrument;        /* instrumentation wanted? */
78
79         /* null these fields until set by ExecutorStart */
80         qd->tupDesc = NULL;
81         qd->estate = NULL;
82         qd->planstate = NULL;
83
84         return qd;
85 }
86
87 /*
88  * FreeQueryDesc
89  */
90 void
91 FreeQueryDesc(QueryDesc *qdesc)
92 {
93         /* Can't be a live query */
94         Assert(qdesc->estate == NULL);
95         /* Only the QueryDesc itself need be freed */
96         pfree(qdesc);
97 }
98
99
100 /*
101  * ProcessQuery
102  *              Execute a single plannable query within a PORTAL_MULTI_QUERY portal
103  *
104  *      parsetree: the query tree
105  *      plan: the plan tree for the query
106  *      params: any parameters needed
107  *      dest: where to send results
108  *      completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
109  *              in which to store a command completion status string.
110  *
111  * completionTag may be NULL if caller doesn't want a status string.
112  *
113  * Must be called in a memory context that will be reset or deleted on
114  * error; otherwise the executor's memory usage will be leaked.
115  */
116 static void
117 ProcessQuery(Query *parsetree,
118                          Plan *plan,
119                          ParamListInfo params,
120                          DestReceiver *dest,
121                          char *completionTag)
122 {
123         int                     operation = parsetree->commandType;
124         QueryDesc  *queryDesc;
125
126         ereport(DEBUG3,
127                         (errmsg_internal("ProcessQuery")));
128
129         /*
130          * Check for special-case destinations
131          */
132         if (operation == CMD_SELECT)
133         {
134                 if (parsetree->into != NULL)
135                 {
136                         /*
137                          * SELECT INTO table (a/k/a CREATE AS ... SELECT).
138                          *
139                          * Override the normal communication destination; execMain.c
140                          * special-cases this case.  (Perhaps would be cleaner to have
141                          * an additional destination type?)
142                          */
143                         dest = None_Receiver;
144                 }
145         }
146
147         /*
148          * Must always set snapshot for plannable queries.  Note we assume
149          * that caller will take care of restoring ActiveSnapshot on exit/error.
150          */
151         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
152
153         /*
154          * Create the QueryDesc object
155          */
156         queryDesc = CreateQueryDesc(parsetree, plan,
157                                                                 ActiveSnapshot, InvalidSnapshot,
158                                                                 dest, params, false);
159
160         /*
161          * Set up to collect AFTER triggers
162          */
163         AfterTriggerBeginQuery();
164
165         /*
166          * Call ExecStart to prepare the plan for execution
167          */
168         ExecutorStart(queryDesc, false);
169
170         /*
171          * Run the plan to completion.
172          */
173         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
174
175         /*
176          * Build command completion status string, if caller wants one.
177          */
178         if (completionTag)
179         {
180                 Oid                     lastOid;
181
182                 switch (operation)
183                 {
184                         case CMD_SELECT:
185                                 strcpy(completionTag, "SELECT");
186                                 break;
187                         case CMD_INSERT:
188                                 if (queryDesc->estate->es_processed == 1)
189                                         lastOid = queryDesc->estate->es_lastoid;
190                                 else
191                                         lastOid = InvalidOid;
192                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
193                                 "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
194                                 break;
195                         case CMD_UPDATE:
196                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
197                                                  "UPDATE %u", queryDesc->estate->es_processed);
198                                 break;
199                         case CMD_DELETE:
200                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
201                                                  "DELETE %u", queryDesc->estate->es_processed);
202                                 break;
203                         default:
204                                 strcpy(completionTag, "???");
205                                 break;
206                 }
207         }
208
209         /*
210          * Now, we close down all the scans and free allocated resources.
211          */
212         ExecutorEnd(queryDesc);
213
214         /* And take care of any queued AFTER triggers */
215         AfterTriggerEndQuery();
216
217         FreeQueryDesc(queryDesc);
218
219         FreeSnapshot(ActiveSnapshot);
220         ActiveSnapshot = NULL;
221 }
222
223 /*
224  * ChoosePortalStrategy
225  *              Select portal execution strategy given the intended query list.
226  *
227  * See the comments in portal.h.
228  */
229 PortalStrategy
230 ChoosePortalStrategy(List *parseTrees)
231 {
232         PortalStrategy strategy;
233
234         strategy = PORTAL_MULTI_QUERY;          /* default assumption */
235
236         if (list_length(parseTrees) == 1)
237         {
238                 Query      *query = (Query *) linitial(parseTrees);
239
240                 if (query->commandType == CMD_SELECT &&
241                         query->canSetTag &&
242                         query->into == NULL)
243                         strategy = PORTAL_ONE_SELECT;
244                 else if (query->commandType == CMD_UTILITY &&
245                                  query->canSetTag &&
246                                  query->utilityStmt != NULL)
247                 {
248                         if (UtilityReturnsTuples(query->utilityStmt))
249                                 strategy = PORTAL_UTIL_SELECT;
250                 }
251         }
252         return strategy;
253 }
254
255 /*
256  * PortalStart
257  *              Prepare a portal for execution.
258  *
259  * Caller must already have created the portal, done PortalDefineQuery(),
260  * and adjusted portal options if needed.  If parameters are needed by
261  * the query, they must be passed in here (caller is responsible for
262  * giving them appropriate lifetime).
263  *
264  * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
265  * for the normal behavior of setting a new snapshot.  This parameter is
266  * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
267  * to be used for cursors).
268  *
269  * On return, portal is ready to accept PortalRun() calls, and the result
270  * tupdesc (if any) is known.
271  */
272 void
273 PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot)
274 {
275         Portal          saveActivePortal;
276         Snapshot        saveActiveSnapshot;
277         ResourceOwner saveResourceOwner;
278         MemoryContext savePortalContext;
279         MemoryContext oldContext;
280         QueryDesc  *queryDesc;
281
282         AssertArg(PortalIsValid(portal));
283         AssertState(portal->queryContext != NULL);      /* query defined? */
284         AssertState(portal->status == PORTAL_NEW);      /* else extra PortalStart */
285
286         /*
287          * Set up global portal context pointers.  (Should we set
288          * QueryContext?)
289          */
290         saveActivePortal = ActivePortal;
291         saveActiveSnapshot = ActiveSnapshot;
292         saveResourceOwner = CurrentResourceOwner;
293         savePortalContext = PortalContext;
294         PG_TRY();
295         {
296                 ActivePortal = portal;
297                 ActiveSnapshot = NULL;                          /* will be set later */
298                 CurrentResourceOwner = portal->resowner;
299                 PortalContext = PortalGetHeapMemory(portal);
300
301                 oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
302
303                 /* Must remember portal param list, if any */
304                 portal->portalParams = params;
305
306                 /*
307                  * Determine the portal execution strategy
308                  */
309                 portal->strategy = ChoosePortalStrategy(portal->parseTrees);
310
311                 /*
312                  * Fire her up according to the strategy
313                  */
314                 switch (portal->strategy)
315                 {
316                         case PORTAL_ONE_SELECT:
317
318                                 /*
319                                  * Must set snapshot before starting executor.  Be sure to
320                                  * copy it into the portal's context.
321                                  */
322                                 if (snapshot)
323                                         ActiveSnapshot = CopySnapshot(snapshot);
324                                 else
325                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
326
327                                 /*
328                                  * Create QueryDesc in portal's context; for the moment,
329                                  * set the destination to None.
330                                  */
331                                 queryDesc = CreateQueryDesc((Query *) linitial(portal->parseTrees),
332                                                                         (Plan *) linitial(portal->planTrees),
333                                                                                         ActiveSnapshot,
334                                                                                         InvalidSnapshot,
335                                                                                         None_Receiver,
336                                                                                         params,
337                                                                                         false);
338
339                                 /*
340                                  * We do *not* call AfterTriggerBeginQuery() here.  We
341                                  * assume that a SELECT cannot queue any triggers.  It
342                                  * would be messy to support triggers since the execution
343                                  * of the portal may be interleaved with other queries.
344                                  */
345
346                                 /*
347                                  * Call ExecStart to prepare the plan for execution
348                                  */
349                                 ExecutorStart(queryDesc, false);
350
351                                 /*
352                                  * This tells PortalCleanup to shut down the executor
353                                  */
354                                 portal->queryDesc = queryDesc;
355
356                                 /*
357                                  * Remember tuple descriptor (computed by ExecutorStart)
358                                  */
359                                 portal->tupDesc = queryDesc->tupDesc;
360
361                                 /*
362                                  * Reset cursor position data to "start of query"
363                                  */
364                                 portal->atStart = true;
365                                 portal->atEnd = false;  /* allow fetches */
366                                 portal->portalPos = 0;
367                                 portal->posOverflow = false;
368                                 break;
369
370                         case PORTAL_UTIL_SELECT:
371
372                                 /*
373                                  * We don't set snapshot here, because
374                                  * PortalRunUtility will take care of it if needed.
375                                  */
376                                 portal->tupDesc =
377                                         UtilityTupleDescriptor(((Query *) linitial(portal->parseTrees))->utilityStmt);
378
379                                 /*
380                                  * Reset cursor position data to "start of query"
381                                  */
382                                 portal->atStart = true;
383                                 portal->atEnd = false;  /* allow fetches */
384                                 portal->portalPos = 0;
385                                 portal->posOverflow = false;
386                                 break;
387
388                         case PORTAL_MULTI_QUERY:
389                                 /* Need do nothing now */
390                                 portal->tupDesc = NULL;
391                                 break;
392                 }
393         }
394         PG_CATCH();
395         {
396                 /* Uncaught error while executing portal: mark it dead */
397                 portal->status = PORTAL_FAILED;
398
399                 /* Restore global vars and propagate error */
400                 ActivePortal = saveActivePortal;
401                 ActiveSnapshot = saveActiveSnapshot;
402                 CurrentResourceOwner = saveResourceOwner;
403                 PortalContext = savePortalContext;
404
405                 PG_RE_THROW();
406         }
407         PG_END_TRY();
408
409         MemoryContextSwitchTo(oldContext);
410
411         ActivePortal = saveActivePortal;
412         ActiveSnapshot = saveActiveSnapshot;
413         CurrentResourceOwner = saveResourceOwner;
414         PortalContext = savePortalContext;
415
416         portal->status = PORTAL_READY;
417 }
418
419 /*
420  * PortalSetResultFormat
421  *              Select the format codes for a portal's output.
422  *
423  * This must be run after PortalStart for a portal that will be read by
424  * a Remote or RemoteExecute destination.  It is not presently needed for
425  * other destination types.
426  *
427  * formats[] is the client format request, as per Bind message conventions.
428  */
429 void
430 PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
431 {
432         int                     natts;
433         int                     i;
434
435         /* Do nothing if portal won't return tuples */
436         if (portal->tupDesc == NULL)
437                 return;
438         natts = portal->tupDesc->natts;
439         portal->formats = (int16 *)
440                 MemoryContextAlloc(PortalGetHeapMemory(portal),
441                                                    natts * sizeof(int16));
442         if (nFormats > 1)
443         {
444                 /* format specified for each column */
445                 if (nFormats != natts)
446                         ereport(ERROR,
447                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
448                                          errmsg("bind message has %d result formats but query has %d columns",
449                                                         nFormats, natts)));
450                 memcpy(portal->formats, formats, natts * sizeof(int16));
451         }
452         else if (nFormats > 0)
453         {
454                 /* single format specified, use for all columns */
455                 int16           format1 = formats[0];
456
457                 for (i = 0; i < natts; i++)
458                         portal->formats[i] = format1;
459         }
460         else
461         {
462                 /* use default format for all columns */
463                 for (i = 0; i < natts; i++)
464                         portal->formats[i] = 0;
465         }
466 }
467
468 /*
469  * PortalRun
470  *              Run a portal's query or queries.
471  *
472  * count <= 0 is interpreted as a no-op: the destination gets started up
473  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
474  * interpreted as "all rows".  Note that count is ignored in multi-query
475  * situations, where we always run the portal to completion.
476  *
477  * dest: where to send output of primary (canSetTag) query
478  *
479  * altdest: where to send output of non-primary queries
480  *
481  * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
482  *              in which to store a command completion status string.
483  *              May be NULL if caller doesn't want a status string.
484  *
485  * Returns TRUE if the portal's execution is complete, FALSE if it was
486  * suspended due to exhaustion of the count parameter.
487  */
488 bool
489 PortalRun(Portal portal, long count,
490                   DestReceiver *dest, DestReceiver *altdest,
491                   char *completionTag)
492 {
493         bool            result;
494         ResourceOwner saveTopTransactionResourceOwner;
495         MemoryContext saveTopTransactionContext;
496         Portal          saveActivePortal;
497         Snapshot        saveActiveSnapshot;
498         ResourceOwner saveResourceOwner;
499         MemoryContext savePortalContext;
500         MemoryContext saveQueryContext;
501         MemoryContext saveMemoryContext;
502
503         AssertArg(PortalIsValid(portal));
504
505         /* Initialize completion tag to empty string */
506         if (completionTag)
507                 completionTag[0] = '\0';
508
509         if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
510         {
511                 ereport(DEBUG3,
512                                 (errmsg_internal("PortalRun")));
513                 /* PORTAL_MULTI_QUERY logs its own stats per query */
514                 ResetUsage();
515         }
516
517         /*
518          * Check for improper portal use, and mark portal active.
519          */
520         if (portal->status != PORTAL_READY)
521                 ereport(ERROR,
522                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
523                                  errmsg("portal \"%s\" cannot be run", portal->name)));
524         portal->status = PORTAL_ACTIVE;
525
526         /*
527          * Set up global portal context pointers.
528          *
529          * We have to play a special game here to support utility commands like
530          * VACUUM and CLUSTER, which internally start and commit transactions.
531          * When we are called to execute such a command, CurrentResourceOwner
532          * will be pointing to the TopTransactionResourceOwner --- which will
533          * be destroyed and replaced in the course of the internal commit and
534          * restart.  So we need to be prepared to restore it as pointing to
535          * the exit-time TopTransactionResourceOwner.  (Ain't that ugly?  This
536          * idea of internally starting whole new transactions is not good.)
537          * CurrentMemoryContext has a similar problem, but the other pointers
538          * we save here will be NULL or pointing to longer-lived objects.
539          */
540         saveTopTransactionResourceOwner = TopTransactionResourceOwner;
541         saveTopTransactionContext = TopTransactionContext;
542         saveActivePortal = ActivePortal;
543         saveActiveSnapshot = ActiveSnapshot;
544         saveResourceOwner = CurrentResourceOwner;
545         savePortalContext = PortalContext;
546         saveQueryContext = QueryContext;
547         saveMemoryContext = CurrentMemoryContext;
548         PG_TRY();
549         {
550                 ActivePortal = portal;
551                 ActiveSnapshot = NULL;                          /* will be set later */
552                 CurrentResourceOwner = portal->resowner;
553                 PortalContext = PortalGetHeapMemory(portal);
554                 QueryContext = portal->queryContext;
555
556                 MemoryContextSwitchTo(PortalContext);
557
558                 switch (portal->strategy)
559                 {
560                         case PORTAL_ONE_SELECT:
561                                 (void) PortalRunSelect(portal, true, count, dest);
562                                 /* we know the query is supposed to set the tag */
563                                 if (completionTag && portal->commandTag)
564                                         strcpy(completionTag, portal->commandTag);
565
566                                 /* Mark portal not active */
567                                 portal->status = PORTAL_READY;
568
569                                 /*
570                                  * Since it's a forward fetch, say DONE iff atEnd is now
571                                  * true.
572                                  */
573                                 result = portal->atEnd;
574                                 break;
575
576                         case PORTAL_UTIL_SELECT:
577
578                                 /*
579                                  * If we have not yet run the utility statement, do so,
580                                  * storing its results in the portal's tuplestore.
581                                  */
582                                 if (!portal->portalUtilReady)
583                                 {
584                                         DestReceiver *treceiver;
585
586                                         PortalCreateHoldStore(portal);
587                                         treceiver = CreateDestReceiver(Tuplestore, portal);
588                                         PortalRunUtility(portal, linitial(portal->parseTrees),
589                                                                          treceiver, NULL);
590                                         (*treceiver->rDestroy) (treceiver);
591                                         portal->portalUtilReady = true;
592                                 }
593
594                                 /*
595                                  * Now fetch desired portion of results.
596                                  */
597                                 (void) PortalRunSelect(portal, true, count, dest);
598
599                                 /*
600                                  * We know the query is supposed to set the tag; we assume
601                                  * only the default tag is needed.
602                                  */
603                                 if (completionTag && portal->commandTag)
604                                         strcpy(completionTag, portal->commandTag);
605
606                                 /* Mark portal not active */
607                                 portal->status = PORTAL_READY;
608
609                                 /*
610                                  * Since it's a forward fetch, say DONE iff atEnd is now
611                                  * true.
612                                  */
613                                 result = portal->atEnd;
614                                 break;
615
616                         case PORTAL_MULTI_QUERY:
617                                 PortalRunMulti(portal, dest, altdest, completionTag);
618
619                                 /* Prevent portal's commands from being re-executed */
620                                 portal->status = PORTAL_DONE;
621
622                                 /* Always complete at end of RunMulti */
623                                 result = true;
624                                 break;
625
626                         default:
627                                 elog(ERROR, "unrecognized portal strategy: %d",
628                                          (int) portal->strategy);
629                                 result = false; /* keep compiler quiet */
630                                 break;
631                 }
632         }
633         PG_CATCH();
634         {
635                 /* Uncaught error while executing portal: mark it dead */
636                 portal->status = PORTAL_FAILED;
637
638                 /* Restore global vars and propagate error */
639                 if (saveMemoryContext == saveTopTransactionContext)
640                         MemoryContextSwitchTo(TopTransactionContext);
641                 else
642                         MemoryContextSwitchTo(saveMemoryContext);
643                 ActivePortal = saveActivePortal;
644                 ActiveSnapshot = saveActiveSnapshot;
645                 if (saveResourceOwner == saveTopTransactionResourceOwner)
646                         CurrentResourceOwner = TopTransactionResourceOwner;
647                 else
648                         CurrentResourceOwner = saveResourceOwner;
649                 PortalContext = savePortalContext;
650                 QueryContext = saveQueryContext;
651
652                 PG_RE_THROW();
653         }
654         PG_END_TRY();
655
656         if (saveMemoryContext == saveTopTransactionContext)
657                 MemoryContextSwitchTo(TopTransactionContext);
658         else
659                 MemoryContextSwitchTo(saveMemoryContext);
660         ActivePortal = saveActivePortal;
661         ActiveSnapshot = saveActiveSnapshot;
662         if (saveResourceOwner == saveTopTransactionResourceOwner)
663                 CurrentResourceOwner = TopTransactionResourceOwner;
664         else
665                 CurrentResourceOwner = saveResourceOwner;
666         PortalContext = savePortalContext;
667         QueryContext = saveQueryContext;
668
669         if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
670                 ShowUsage("EXECUTOR STATISTICS");
671
672         return result;
673 }
674
675 /*
676  * PortalRunSelect
677  *              Execute a portal's query in SELECT cases (also UTIL_SELECT).
678  *
679  * This handles simple N-rows-forward-or-backward cases.  For more complex
680  * nonsequential access to a portal, see PortalRunFetch.
681  *
682  * count <= 0 is interpreted as a no-op: the destination gets started up
683  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
684  * interpreted as "all rows".
685  *
686  * Caller must already have validated the Portal and done appropriate
687  * setup (cf. PortalRun).
688  *
689  * Returns number of rows processed (suitable for use in result tag)
690  */
691 static long
692 PortalRunSelect(Portal portal,
693                                 bool forward,
694                                 long count,
695                                 DestReceiver *dest)
696 {
697         QueryDesc  *queryDesc;
698         ScanDirection direction;
699         uint32          nprocessed;
700
701         /*
702          * NB: queryDesc will be NULL if we are fetching from a held cursor or
703          * a completed utility query; can't use it in that path.
704          */
705         queryDesc = PortalGetQueryDesc(portal);
706
707         /* Caller messed up if we have neither a ready query nor held data. */
708         Assert(queryDesc || portal->holdStore);
709
710         /*
711          * Force the queryDesc destination to the right thing.  This supports
712          * MOVE, for example, which will pass in dest = None.  This is okay to
713          * change as long as we do it on every fetch.  (The Executor must not
714          * assume that dest never changes.)
715          */
716         if (queryDesc)
717                 queryDesc->dest = dest;
718
719         /*
720          * Determine which direction to go in, and check to see if we're
721          * already at the end of the available tuples in that direction.  If
722          * so, set the direction to NoMovement to avoid trying to fetch any
723          * tuples.      (This check exists because not all plan node types are
724          * robust about being called again if they've already returned NULL
725          * once.)  Then call the executor (we must not skip this, because the
726          * destination needs to see a setup and shutdown even if no tuples are
727          * available).  Finally, update the portal position state depending on
728          * the number of tuples that were retrieved.
729          */
730         if (forward)
731         {
732                 if (portal->atEnd || count <= 0)
733                         direction = NoMovementScanDirection;
734                 else
735                         direction = ForwardScanDirection;
736
737                 /* In the executor, zero count processes all rows */
738                 if (count == FETCH_ALL)
739                         count = 0;
740
741                 if (portal->holdStore)
742                         nprocessed = RunFromStore(portal, direction, count, dest);
743                 else
744                 {
745                         ActiveSnapshot = queryDesc->snapshot;
746                         ExecutorRun(queryDesc, direction, count);
747                         nprocessed = queryDesc->estate->es_processed;
748                 }
749
750                 if (direction != NoMovementScanDirection)
751                 {
752                         long            oldPos;
753
754                         if (nprocessed > 0)
755                                 portal->atStart = false;                /* OK to go backward now */
756                         if (count == 0 ||
757                                 (unsigned long) nprocessed < (unsigned long) count)
758                                 portal->atEnd = true;   /* we retrieved 'em all */
759                         oldPos = portal->portalPos;
760                         portal->portalPos += nprocessed;
761                         /* portalPos doesn't advance when we fall off the end */
762                         if (portal->portalPos < oldPos)
763                                 portal->posOverflow = true;
764                 }
765         }
766         else
767         {
768                 if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
769                         ereport(ERROR,
770                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
771                                          errmsg("cursor can only scan forward"),
772                                          errhint("Declare it with SCROLL option to enable backward scan.")));
773
774                 if (portal->atStart || count <= 0)
775                         direction = NoMovementScanDirection;
776                 else
777                         direction = BackwardScanDirection;
778
779                 /* In the executor, zero count processes all rows */
780                 if (count == FETCH_ALL)
781                         count = 0;
782
783                 if (portal->holdStore)
784                         nprocessed = RunFromStore(portal, direction, count, dest);
785                 else
786                 {
787                         ActiveSnapshot = queryDesc->snapshot;
788                         ExecutorRun(queryDesc, direction, count);
789                         nprocessed = queryDesc->estate->es_processed;
790                 }
791
792                 if (direction != NoMovementScanDirection)
793                 {
794                         if (nprocessed > 0 && portal->atEnd)
795                         {
796                                 portal->atEnd = false;  /* OK to go forward now */
797                                 portal->portalPos++;    /* adjust for endpoint case */
798                         }
799                         if (count == 0 ||
800                                 (unsigned long) nprocessed < (unsigned long) count)
801                         {
802                                 portal->atStart = true; /* we retrieved 'em all */
803                                 portal->portalPos = 0;
804                                 portal->posOverflow = false;
805                         }
806                         else
807                         {
808                                 long            oldPos;
809
810                                 oldPos = portal->portalPos;
811                                 portal->portalPos -= nprocessed;
812                                 if (portal->portalPos > oldPos ||
813                                         portal->portalPos <= 0)
814                                         portal->posOverflow = true;
815                         }
816                 }
817         }
818
819         return nprocessed;
820 }
821
822 /*
823  * RunFromStore
824  *              Fetch tuples from the portal's tuple store.
825  *
826  * Calling conventions are similar to ExecutorRun, except that we
827  * do not depend on having a queryDesc or estate.  Therefore we return the
828  * number of tuples processed as the result, not in estate->es_processed.
829  *
830  * One difference from ExecutorRun is that the destination receiver functions
831  * are run in the caller's memory context (since we have no estate).  Watch
832  * out for memory leaks.
833  */
834 static uint32
835 RunFromStore(Portal portal, ScanDirection direction, long count,
836                          DestReceiver *dest)
837 {
838         long            current_tuple_count = 0;
839
840         (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
841
842         if (direction == NoMovementScanDirection)
843         {
844                 /* do nothing except start/stop the destination */
845         }
846         else
847         {
848                 bool            forward = (direction == ForwardScanDirection);
849
850                 for (;;)
851                 {
852                         MemoryContext oldcontext;
853                         HeapTuple       tup;
854                         bool            should_free;
855
856                         oldcontext = MemoryContextSwitchTo(portal->holdContext);
857
858                         tup = tuplestore_getheaptuple(portal->holdStore, forward,
859                                                                                   &should_free);
860
861                         MemoryContextSwitchTo(oldcontext);
862
863                         if (tup == NULL)
864                                 break;
865
866                         (*dest->receiveTuple) (tup, portal->tupDesc, dest);
867
868                         if (should_free)
869                                 pfree(tup);
870
871                         /*
872                          * check our tuple count.. if we've processed the proper
873                          * number then quit, else loop again and process more tuples.
874                          * Zero count means no limit.
875                          */
876                         current_tuple_count++;
877                         if (count && count == current_tuple_count)
878                                 break;
879                 }
880         }
881
882         (*dest->rShutdown) (dest);
883
884         return (uint32) current_tuple_count;
885 }
886
887 /*
888  * PortalRunUtility
889  *              Execute a utility statement inside a portal.
890  */
891 static void
892 PortalRunUtility(Portal portal, Query *query,
893                                  DestReceiver *dest, char *completionTag)
894 {
895         Node       *utilityStmt = query->utilityStmt;
896
897         ereport(DEBUG3,
898                         (errmsg_internal("ProcessUtility")));
899
900         /*
901          * Set snapshot if utility stmt needs one.      Most reliable way to do
902          * this seems to be to enumerate those that do not need one; this is a
903          * short list.  Transaction control, LOCK, and SET must *not* set a
904          * snapshot since they need to be executable at the start of a
905          * serializable transaction without freezing a snapshot.  By extension
906          * we allow SHOW not to set a snapshot.  The other stmts listed are
907          * just efficiency hacks.  Beware of listing anything that can modify
908          * the database --- if, say, it has to update an index with
909          * expressions that invoke user-defined functions, then it had better
910          * have a snapshot.
911          *
912          * Note we assume that caller will take care of restoring ActiveSnapshot
913          * on exit/error.
914          */
915         if (!(IsA(utilityStmt, TransactionStmt) ||
916                   IsA(utilityStmt, LockStmt) ||
917                   IsA(utilityStmt, VariableSetStmt) ||
918                   IsA(utilityStmt, VariableShowStmt) ||
919                   IsA(utilityStmt, VariableResetStmt) ||
920                   IsA(utilityStmt, ConstraintsSetStmt) ||
921         /* efficiency hacks from here down */
922                   IsA(utilityStmt, FetchStmt) ||
923                   IsA(utilityStmt, ListenStmt) ||
924                   IsA(utilityStmt, NotifyStmt) ||
925                   IsA(utilityStmt, UnlistenStmt) ||
926                   IsA(utilityStmt, CheckPointStmt)))
927                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
928         else
929                 ActiveSnapshot = NULL;
930
931         if (query->canSetTag)
932         {
933                 /* utility statement can override default tag string */
934                 ProcessUtility(utilityStmt, portal->portalParams, dest, completionTag);
935                 if (completionTag && completionTag[0] == '\0' && portal->commandTag)
936                         strcpy(completionTag, portal->commandTag);      /* use the default */
937         }
938         else
939         {
940                 /* utility added by rewrite cannot set tag */
941                 ProcessUtility(utilityStmt, portal->portalParams, dest, NULL);
942         }
943
944         /* Some utility statements may change context on us */
945         MemoryContextSwitchTo(PortalGetHeapMemory(portal));
946
947         if (ActiveSnapshot)
948                 FreeSnapshot(ActiveSnapshot);
949         ActiveSnapshot = NULL;
950 }
951
952 /*
953  * PortalRunMulti
954  *              Execute a portal's queries in the general case (multi queries).
955  */
956 static void
957 PortalRunMulti(Portal portal,
958                            DestReceiver *dest, DestReceiver *altdest,
959                            char *completionTag)
960 {
961         ListCell   *querylist_item;
962         ListCell   *planlist_item;
963
964         /*
965          * If the destination is RemoteExecute, change to None.  The reason is
966          * that the client won't be expecting any tuples, and indeed has no
967          * way to know what they are, since there is no provision for Describe
968          * to send a RowDescription message when this portal execution
969          * strategy is in effect.  This presently will only affect SELECT
970          * commands added to non-SELECT queries by rewrite rules: such
971          * commands will be executed, but the results will be discarded unless
972          * you use "simple Query" protocol.
973          */
974         if (dest->mydest == RemoteExecute)
975                 dest = None_Receiver;
976         if (altdest->mydest == RemoteExecute)
977                 altdest = None_Receiver;
978
979         /*
980          * Loop to handle the individual queries generated from a single
981          * parsetree by analysis and rewrite.
982          */
983         forboth(querylist_item, portal->parseTrees,
984                         planlist_item, portal->planTrees)
985         {
986                 Query      *query = (Query *) lfirst(querylist_item);
987                 Plan       *plan = (Plan *) lfirst(planlist_item);
988
989                 /*
990                  * If we got a cancel signal in prior command, quit
991                  */
992                 CHECK_FOR_INTERRUPTS();
993
994                 if (query->commandType == CMD_UTILITY)
995                 {
996                         /*
997                          * process utility functions (create, destroy, etc..)
998                          */
999                         Assert(plan == NULL);
1000
1001                         PortalRunUtility(portal, query,
1002                                                          query->canSetTag ? dest : altdest,
1003                                                          completionTag);
1004                 }
1005                 else
1006                 {
1007                         /*
1008                          * process a plannable query.
1009                          */
1010                         if (log_executor_stats)
1011                                 ResetUsage();
1012
1013                         if (query->canSetTag)
1014                         {
1015                                 /* statement can set tag string */
1016                                 ProcessQuery(query, plan,
1017                                                          portal->portalParams,
1018                                                          dest, completionTag);
1019                         }
1020                         else
1021                         {
1022                                 /* stmt added by rewrite cannot set tag */
1023                                 ProcessQuery(query, plan,
1024                                                          portal->portalParams,
1025                                                          altdest, NULL);
1026                         }
1027
1028                         if (log_executor_stats)
1029                                 ShowUsage("EXECUTOR STATISTICS");
1030                 }
1031
1032                 /*
1033                  * Increment command counter between queries, but not after the
1034                  * last one.
1035                  */
1036                 if (planlist_item != NULL)
1037                         CommandCounterIncrement();
1038
1039                 /*
1040                  * Clear subsidiary contexts to recover temporary memory.
1041                  */
1042                 Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
1043
1044                 MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
1045         }
1046
1047         /*
1048          * If a command completion tag was supplied, use it.  Otherwise use
1049          * the portal's commandTag as the default completion tag.
1050          *
1051          * Exception: clients will expect INSERT/UPDATE/DELETE tags to have
1052          * counts, so fake something up if necessary.  (This could happen if
1053          * the original query was replaced by a DO INSTEAD rule.)
1054          */
1055         if (completionTag && completionTag[0] == '\0')
1056         {
1057                 if (portal->commandTag)
1058                         strcpy(completionTag, portal->commandTag);
1059                 if (strcmp(completionTag, "INSERT") == 0)
1060                         strcpy(completionTag, "INSERT 0 0");
1061                 else if (strcmp(completionTag, "UPDATE") == 0)
1062                         strcpy(completionTag, "UPDATE 0");
1063                 else if (strcmp(completionTag, "DELETE") == 0)
1064                         strcpy(completionTag, "DELETE 0");
1065         }
1066 }
1067
1068 /*
1069  * PortalRunFetch
1070  *              Variant form of PortalRun that supports SQL FETCH directions.
1071  *
1072  * Returns number of rows processed (suitable for use in result tag)
1073  */
1074 long
1075 PortalRunFetch(Portal portal,
1076                            FetchDirection fdirection,
1077                            long count,
1078                            DestReceiver *dest)
1079 {
1080         long            result;
1081         Portal          saveActivePortal;
1082         Snapshot        saveActiveSnapshot;
1083         ResourceOwner saveResourceOwner;
1084         MemoryContext savePortalContext;
1085         MemoryContext saveQueryContext;
1086         MemoryContext oldContext;
1087
1088         AssertArg(PortalIsValid(portal));
1089
1090         /*
1091          * Check for improper portal use, and mark portal active.
1092          */
1093         if (portal->status != PORTAL_READY)
1094                 ereport(ERROR,
1095                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1096                                  errmsg("portal \"%s\" cannot be run", portal->name)));
1097         portal->status = PORTAL_ACTIVE;
1098
1099         /*
1100          * Set up global portal context pointers.
1101          */
1102         saveActivePortal = ActivePortal;
1103         saveActiveSnapshot = ActiveSnapshot;
1104         saveResourceOwner = CurrentResourceOwner;
1105         savePortalContext = PortalContext;
1106         saveQueryContext = QueryContext;
1107         PG_TRY();
1108         {
1109                 ActivePortal = portal;
1110                 ActiveSnapshot = NULL;                          /* will be set later */
1111                 CurrentResourceOwner = portal->resowner;
1112                 PortalContext = PortalGetHeapMemory(portal);
1113                 QueryContext = portal->queryContext;
1114
1115                 oldContext = MemoryContextSwitchTo(PortalContext);
1116
1117                 switch (portal->strategy)
1118                 {
1119                         case PORTAL_ONE_SELECT:
1120                                 result = DoPortalRunFetch(portal, fdirection, count, dest);
1121                                 break;
1122
1123                         default:
1124                                 elog(ERROR, "unsupported portal strategy");
1125                                 result = 0;             /* keep compiler quiet */
1126                                 break;
1127                 }
1128         }
1129         PG_CATCH();
1130         {
1131                 /* Uncaught error while executing portal: mark it dead */
1132                 portal->status = PORTAL_FAILED;
1133
1134                 /* Restore global vars and propagate error */
1135                 ActivePortal = saveActivePortal;
1136                 ActiveSnapshot = saveActiveSnapshot;
1137                 CurrentResourceOwner = saveResourceOwner;
1138                 PortalContext = savePortalContext;
1139                 QueryContext = saveQueryContext;
1140
1141                 PG_RE_THROW();
1142         }
1143         PG_END_TRY();
1144
1145         MemoryContextSwitchTo(oldContext);
1146
1147         /* Mark portal not active */
1148         portal->status = PORTAL_READY;
1149
1150         ActivePortal = saveActivePortal;
1151         ActiveSnapshot = saveActiveSnapshot;
1152         CurrentResourceOwner = saveResourceOwner;
1153         PortalContext = savePortalContext;
1154         QueryContext = saveQueryContext;
1155
1156         return result;
1157 }
1158
1159 /*
1160  * DoPortalRunFetch
1161  *              Guts of PortalRunFetch --- the portal context is already set up
1162  *
1163  * Returns number of rows processed (suitable for use in result tag)
1164  */
1165 static long
1166 DoPortalRunFetch(Portal portal,
1167                                  FetchDirection fdirection,
1168                                  long count,
1169                                  DestReceiver *dest)
1170 {
1171         bool            forward;
1172
1173         Assert(portal->strategy == PORTAL_ONE_SELECT);
1174
1175         switch (fdirection)
1176         {
1177                 case FETCH_FORWARD:
1178                         if (count < 0)
1179                         {
1180                                 fdirection = FETCH_BACKWARD;
1181                                 count = -count;
1182                         }
1183                         /* fall out of switch to share code with FETCH_BACKWARD */
1184                         break;
1185                 case FETCH_BACKWARD:
1186                         if (count < 0)
1187                         {
1188                                 fdirection = FETCH_FORWARD;
1189                                 count = -count;
1190                         }
1191                         /* fall out of switch to share code with FETCH_FORWARD */
1192                         break;
1193                 case FETCH_ABSOLUTE:
1194                         if (count > 0)
1195                         {
1196                                 /*
1197                                  * Definition: Rewind to start, advance count-1 rows,
1198                                  * return next row (if any).  In practice, if the goal is
1199                                  * less than halfway back to the start, it's better to
1200                                  * scan from where we are.      In any case, we arrange to
1201                                  * fetch the target row going forwards.
1202                                  */
1203                                 if (portal->posOverflow || portal->portalPos == LONG_MAX ||
1204                                         count - 1 <= portal->portalPos / 2)
1205                                 {
1206                                         DoPortalRewind(portal);
1207                                         if (count > 1)
1208                                                 PortalRunSelect(portal, true, count - 1,
1209                                                                                 None_Receiver);
1210                                 }
1211                                 else
1212                                 {
1213                                         long            pos = portal->portalPos;
1214
1215                                         if (portal->atEnd)
1216                                                 pos++;  /* need one extra fetch if off end */
1217                                         if (count <= pos)
1218                                                 PortalRunSelect(portal, false, pos - count + 1,
1219                                                                                 None_Receiver);
1220                                         else if (count > pos + 1)
1221                                                 PortalRunSelect(portal, true, count - pos - 1,
1222                                                                                 None_Receiver);
1223                                 }
1224                                 return PortalRunSelect(portal, true, 1L, dest);
1225                         }
1226                         else if (count < 0)
1227                         {
1228                                 /*
1229                                  * Definition: Advance to end, back up abs(count)-1 rows,
1230                                  * return prior row (if any).  We could optimize this if
1231                                  * we knew in advance where the end was, but typically we
1232                                  * won't. (Is it worth considering case where count > half
1233                                  * of size of query?  We could rewind once we know the
1234                                  * size ...)
1235                                  */
1236                                 PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1237                                 if (count < -1)
1238                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1239                                 return PortalRunSelect(portal, false, 1L, dest);
1240                         }
1241                         else
1242                         {
1243                                 /* count == 0 */
1244                                 /* Rewind to start, return zero rows */
1245                                 DoPortalRewind(portal);
1246                                 return PortalRunSelect(portal, true, 0L, dest);
1247                         }
1248                         break;
1249                 case FETCH_RELATIVE:
1250                         if (count > 0)
1251                         {
1252                                 /*
1253                                  * Definition: advance count-1 rows, return next row (if
1254                                  * any).
1255                                  */
1256                                 if (count > 1)
1257                                         PortalRunSelect(portal, true, count - 1, None_Receiver);
1258                                 return PortalRunSelect(portal, true, 1L, dest);
1259                         }
1260                         else if (count < 0)
1261                         {
1262                                 /*
1263                                  * Definition: back up abs(count)-1 rows, return prior row
1264                                  * (if any).
1265                                  */
1266                                 if (count < -1)
1267                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1268                                 return PortalRunSelect(portal, false, 1L, dest);
1269                         }
1270                         else
1271                         {
1272                                 /* count == 0 */
1273                                 /* Same as FETCH FORWARD 0, so fall out of switch */
1274                                 fdirection = FETCH_FORWARD;
1275                         }
1276                         break;
1277                 default:
1278                         elog(ERROR, "bogus direction");
1279                         break;
1280         }
1281
1282         /*
1283          * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and
1284          * count >= 0.
1285          */
1286         forward = (fdirection == FETCH_FORWARD);
1287
1288         /*
1289          * Zero count means to re-fetch the current row, if any (per SQL92)
1290          */
1291         if (count == 0)
1292         {
1293                 bool            on_row;
1294
1295                 /* Are we sitting on a row? */
1296                 on_row = (!portal->atStart && !portal->atEnd);
1297
1298                 if (dest->mydest == None)
1299                 {
1300                         /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1301                         return on_row ? 1L : 0L;
1302                 }
1303                 else
1304                 {
1305                         /*
1306                          * If we are sitting on a row, back up one so we can re-fetch
1307                          * it. If we are not sitting on a row, we still have to start
1308                          * up and shut down the executor so that the destination is
1309                          * initialized and shut down correctly; so keep going.  To
1310                          * PortalRunSelect, count == 0 means we will retrieve no row.
1311                          */
1312                         if (on_row)
1313                         {
1314                                 PortalRunSelect(portal, false, 1L, None_Receiver);
1315                                 /* Set up to fetch one row forward */
1316                                 count = 1;
1317                                 forward = true;
1318                         }
1319                 }
1320         }
1321
1322         /*
1323          * Optimize MOVE BACKWARD ALL into a Rewind.
1324          */
1325         if (!forward && count == FETCH_ALL && dest->mydest == None)
1326         {
1327                 long            result = portal->portalPos;
1328
1329                 if (result > 0 && !portal->atEnd)
1330                         result--;
1331                 DoPortalRewind(portal);
1332                 /* result is bogus if pos had overflowed, but it's best we can do */
1333                 return result;
1334         }
1335
1336         return PortalRunSelect(portal, forward, count, dest);
1337 }
1338
1339 /*
1340  * DoPortalRewind - rewind a Portal to starting point
1341  */
1342 static void
1343 DoPortalRewind(Portal portal)
1344 {
1345         if (portal->holdStore)
1346         {
1347                 MemoryContext oldcontext;
1348
1349                 oldcontext = MemoryContextSwitchTo(portal->holdContext);
1350                 tuplestore_rescan(portal->holdStore);
1351                 MemoryContextSwitchTo(oldcontext);
1352         }
1353         if (PortalGetQueryDesc(portal))
1354                 ExecutorRewind(PortalGetQueryDesc(portal));
1355
1356         portal->atStart = true;
1357         portal->atEnd = false;
1358         portal->portalPos = 0;
1359         portal->posOverflow = false;
1360 }