]> granicus.if.org Git - postgresql/blob - src/backend/tcop/pquery.c
Remove the Query structure from the executor's API. This allows us to stop
[postgresql] / src / backend / tcop / pquery.c
1 /*-------------------------------------------------------------------------
2  *
3  * pquery.c
4  *        POSTGRES process query command code
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.114 2007/02/20 17:32:16 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/xact.h"
19 #include "commands/prepare.h"
20 #include "commands/trigger.h"
21 #include "miscadmin.h"
22 #include "tcop/pquery.h"
23 #include "tcop/tcopprot.h"
24 #include "tcop/utility.h"
25 #include "utils/memutils.h"
26
27
28 /*
29  * ActivePortal is the currently executing Portal (the most closely nested,
30  * if there are several).
31  */
32 Portal          ActivePortal = NULL;
33
34
35 static void ProcessQuery(PlannedStmt *plan,
36                          ParamListInfo params,
37                          DestReceiver *dest,
38                          char *completionTag);
39 static void FillPortalStore(Portal portal);
40 static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
41                          DestReceiver *dest);
42 static long PortalRunSelect(Portal portal, bool forward, long count,
43                                 DestReceiver *dest);
44 static void PortalRunUtility(Portal portal, Node *utilityStmt,
45                                  DestReceiver *dest, char *completionTag);
46 static void PortalRunMulti(Portal portal,
47                            DestReceiver *dest, DestReceiver *altdest,
48                            char *completionTag);
49 static long DoPortalRunFetch(Portal portal,
50                                  FetchDirection fdirection,
51                                  long count,
52                                  DestReceiver *dest);
53 static void DoPortalRewind(Portal portal);
54
55
56 /*
57  * CreateQueryDesc
58  */
59 QueryDesc *
60 CreateQueryDesc(PlannedStmt *plannedstmt,
61                                 Snapshot snapshot,
62                                 Snapshot crosscheck_snapshot,
63                                 DestReceiver *dest,
64                                 ParamListInfo params,
65                                 bool doInstrument)
66 {
67         QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
68
69         qd->operation = plannedstmt->commandType;       /* operation */
70         qd->plannedstmt = plannedstmt;                          /* plan */
71         qd->utilitystmt = NULL;
72         qd->snapshot = snapshot;                                        /* snapshot */
73         qd->crosscheck_snapshot = crosscheck_snapshot;          /* RI check snapshot */
74         qd->dest = dest;                        /* output dest */
75         qd->params = params;            /* parameter values passed into query */
76         qd->doInstrument = doInstrument;        /* instrumentation wanted? */
77
78         /* null these fields until set by ExecutorStart */
79         qd->tupDesc = NULL;
80         qd->estate = NULL;
81         qd->planstate = NULL;
82
83         return qd;
84 }
85
86 /*
87  * CreateUtilityQueryDesc
88  */
89 QueryDesc *
90 CreateUtilityQueryDesc(Node *utilitystmt,
91                                            Snapshot snapshot,
92                                            DestReceiver *dest,
93                                            ParamListInfo params)
94 {
95         QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
96
97         qd->operation = CMD_UTILITY;                            /* operation */
98         qd->plannedstmt = NULL;
99         qd->utilitystmt = utilitystmt;                          /* utility command */
100         qd->snapshot = snapshot;                                        /* snapshot */
101         qd->crosscheck_snapshot = InvalidSnapshot;      /* RI check snapshot */
102         qd->dest = dest;                        /* output dest */
103         qd->params = params;            /* parameter values passed into query */
104         qd->doInstrument = false;       /* uninteresting for utilities */
105
106         /* null these fields until set by ExecutorStart */
107         qd->tupDesc = NULL;
108         qd->estate = NULL;
109         qd->planstate = NULL;
110
111         return qd;
112 }
113
114 /*
115  * FreeQueryDesc
116  */
117 void
118 FreeQueryDesc(QueryDesc *qdesc)
119 {
120         /* Can't be a live query */
121         Assert(qdesc->estate == NULL);
122         /* Only the QueryDesc itself need be freed */
123         pfree(qdesc);
124 }
125
126
127 /*
128  * ProcessQuery
129  *              Execute a single plannable query within a PORTAL_MULTI_QUERY
130  *              or PORTAL_ONE_RETURNING portal
131  *
132  *      plan: the plan tree for the query
133  *      params: any parameters needed
134  *      dest: where to send results
135  *      completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
136  *              in which to store a command completion status string.
137  *
138  * completionTag may be NULL if caller doesn't want a status string.
139  *
140  * Must be called in a memory context that will be reset or deleted on
141  * error; otherwise the executor's memory usage will be leaked.
142  */
143 static void
144 ProcessQuery(PlannedStmt *plan,
145                          ParamListInfo params,
146                          DestReceiver *dest,
147                          char *completionTag)
148 {
149         QueryDesc  *queryDesc;
150
151         ereport(DEBUG3,
152                         (errmsg_internal("ProcessQuery")));
153
154         /*
155          * Must always set snapshot for plannable queries.      Note we assume that
156          * caller will take care of restoring ActiveSnapshot on exit/error.
157          */
158         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
159
160         /*
161          * Create the QueryDesc object
162          */
163         queryDesc = CreateQueryDesc(plan,
164                                                                 ActiveSnapshot, InvalidSnapshot,
165                                                                 dest, params, false);
166
167         /*
168          * Set up to collect AFTER triggers
169          */
170         AfterTriggerBeginQuery();
171
172         /*
173          * Call ExecutorStart to prepare the plan for execution
174          */
175         ExecutorStart(queryDesc, 0);
176
177         /*
178          * Run the plan to completion.
179          */
180         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
181
182         /*
183          * Build command completion status string, if caller wants one.
184          */
185         if (completionTag)
186         {
187                 Oid                     lastOid;
188
189                 switch (queryDesc->operation)
190                 {
191                         case CMD_SELECT:
192                                 strcpy(completionTag, "SELECT");
193                                 break;
194                         case CMD_INSERT:
195                                 if (queryDesc->estate->es_processed == 1)
196                                         lastOid = queryDesc->estate->es_lastoid;
197                                 else
198                                         lastOid = InvalidOid;
199                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
200                                    "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
201                                 break;
202                         case CMD_UPDATE:
203                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
204                                                  "UPDATE %u", queryDesc->estate->es_processed);
205                                 break;
206                         case CMD_DELETE:
207                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
208                                                  "DELETE %u", queryDesc->estate->es_processed);
209                                 break;
210                         default:
211                                 strcpy(completionTag, "???");
212                                 break;
213                 }
214         }
215
216         /* Now take care of any queued AFTER triggers */
217         AfterTriggerEndQuery(queryDesc->estate);
218
219         /*
220          * Now, we close down all the scans and free allocated resources.
221          */
222         ExecutorEnd(queryDesc);
223
224         FreeQueryDesc(queryDesc);
225
226         FreeSnapshot(ActiveSnapshot);
227         ActiveSnapshot = NULL;
228 }
229
230 /*
231  * ChoosePortalStrategy
232  *              Select portal execution strategy given the intended statement list.
233  *
234  * The list elements can be Querys, PlannedStmts, or utility statements.
235  * That's more general than portals need, but we use this for prepared
236  * statements as well.
237  *
238  * See the comments in portal.h.
239  */
240 PortalStrategy
241 ChoosePortalStrategy(List *stmts)
242 {
243         int                     nSetTag;
244         ListCell   *lc;
245
246         /*
247          * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
248          * single-statement case, since there are no rewrite rules that can add
249          * auxiliary queries to a SELECT or a utility command.
250          */
251         if (list_length(stmts) == 1)
252         {
253                 Node       *stmt = (Node *) linitial(stmts);
254
255                 if (IsA(stmt, Query))
256                 {
257                         Query      *query = (Query *) stmt;
258
259                         if (query->canSetTag)
260                         {
261                                 if (query->commandType == CMD_SELECT &&
262                                         query->into == NULL)
263                                         return PORTAL_ONE_SELECT;
264                                 if (query->commandType == CMD_UTILITY &&
265                                         query->utilityStmt != NULL)
266                                 {
267                                         if (UtilityReturnsTuples(query->utilityStmt))
268                                                 return PORTAL_UTIL_SELECT;
269                                         /* it can't be ONE_RETURNING, so give up */
270                                         return PORTAL_MULTI_QUERY;
271                                 }
272                         }
273                 }
274                 else if (IsA(stmt, PlannedStmt))
275                 {
276                         PlannedStmt *pstmt = (PlannedStmt *) stmt;
277
278                         if (pstmt->canSetTag)
279                         {
280                                 if (pstmt->commandType == CMD_SELECT &&
281                                         pstmt->into == NULL)
282                                         return PORTAL_ONE_SELECT;
283                         }
284                 }
285                 else
286                 {
287                         /* must be a utility command; assume it's canSetTag */
288                         if (UtilityReturnsTuples(stmt))
289                                 return PORTAL_UTIL_SELECT;
290                         /* it can't be ONE_RETURNING, so give up */
291                         return PORTAL_MULTI_QUERY;
292                 }
293         }
294
295         /*
296          * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
297          * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
298          * it has a RETURNING list.
299          */
300         nSetTag = 0;
301         foreach(lc, stmts)
302         {
303                 Node       *stmt = (Node *) lfirst(lc);
304
305                 if (IsA(stmt, Query))
306                 {
307                         Query      *query = (Query *) stmt;
308
309                         if (query->canSetTag)
310                         {
311                                 if (++nSetTag > 1)
312                                         return PORTAL_MULTI_QUERY;      /* no need to look further */
313                                 if (query->returningList == NIL)
314                                         return PORTAL_MULTI_QUERY;      /* no need to look further */
315                         }
316                 }
317                 else if (IsA(stmt, PlannedStmt))
318                 {
319                         PlannedStmt *pstmt = (PlannedStmt *) stmt;
320
321                         if (pstmt->canSetTag)
322                         {
323                                 if (++nSetTag > 1)
324                                         return PORTAL_MULTI_QUERY;      /* no need to look further */
325                                 if (pstmt->returningLists == NIL)
326                                         return PORTAL_MULTI_QUERY;      /* no need to look further */
327                         }
328                 }
329                 /* otherwise, utility command, assumed not canSetTag */
330         }
331         if (nSetTag == 1)
332                 return PORTAL_ONE_RETURNING;
333
334         /* Else, it's the general case... */
335         return PORTAL_MULTI_QUERY;
336 }
337
338 /*
339  * FetchPortalTargetList
340  *              Given a portal that returns tuples, extract the query targetlist.
341  *              Returns NIL if the portal doesn't have a determinable targetlist.
342  *
343  * Note: do not modify the result.
344  */
345 List *
346 FetchPortalTargetList(Portal portal)
347 {
348         /* no point in looking if we determined it doesn't return tuples */
349         if (portal->strategy == PORTAL_MULTI_QUERY)
350                 return NIL;
351         /* get the primary statement and find out what it returns */
352         return FetchStatementTargetList(PortalGetPrimaryStmt(portal));
353 }
354
355 /*
356  * FetchStatementTargetList
357  *              Given a statement that returns tuples, extract the query targetlist.
358  *              Returns NIL if the statement doesn't have a determinable targetlist.
359  *
360  * This can be applied to a Query, a PlannedStmt, or a utility statement.
361  * That's more general than portals need, but we use this for prepared
362  * statements as well.
363  *
364  * Note: do not modify the result.
365  *
366  * XXX be careful to keep this in sync with UtilityReturnsTuples.
367  */
368 List *
369 FetchStatementTargetList(Node *stmt)
370 {
371         if (stmt == NULL)
372                 return NIL;
373         if (IsA(stmt, Query))
374         {
375                 Query      *query = (Query *) stmt;
376
377                 if (query->commandType == CMD_UTILITY &&
378                         query->utilityStmt != NULL)
379                 {
380                         /* transfer attention to utility statement */
381                         stmt = query->utilityStmt;
382                 }
383                 else
384                 {
385                         if (query->commandType == CMD_SELECT &&
386                                 query->into == NULL)
387                                 return query->targetList;
388                         if (query->returningList)
389                                 return query->returningList;
390                         return NIL;
391                 }
392         }
393         if (IsA(stmt, PlannedStmt))
394         {
395                 PlannedStmt *pstmt = (PlannedStmt *) stmt;
396
397                 if (pstmt->commandType == CMD_SELECT &&
398                         pstmt->into == NULL)
399                         return pstmt->planTree->targetlist;
400                 if (pstmt->returningLists)
401                         return (List *) linitial(pstmt->returningLists);
402                 return NIL;
403         }
404         if (IsA(stmt, FetchStmt))
405         {
406                 FetchStmt  *fstmt = (FetchStmt *) stmt;
407                 Portal          subportal;
408
409                 Assert(!fstmt->ismove);
410                 subportal = GetPortalByName(fstmt->portalname);
411                 Assert(PortalIsValid(subportal));
412                 return FetchPortalTargetList(subportal);
413         }
414         if (IsA(stmt, ExecuteStmt))
415         {
416                 ExecuteStmt *estmt = (ExecuteStmt *) stmt;
417                 PreparedStatement *entry;
418
419                 Assert(!estmt->into);
420                 entry = FetchPreparedStatement(estmt->name, true);
421                 return FetchPreparedStatementTargetList(entry);
422         }
423         return NIL;
424 }
425
426 /*
427  * PortalStart
428  *              Prepare a portal for execution.
429  *
430  * Caller must already have created the portal, done PortalDefineQuery(),
431  * and adjusted portal options if needed.  If parameters are needed by
432  * the query, they must be passed in here (caller is responsible for
433  * giving them appropriate lifetime).
434  *
435  * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
436  * for the normal behavior of setting a new snapshot.  This parameter is
437  * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
438  * to be used for cursors).
439  *
440  * On return, portal is ready to accept PortalRun() calls, and the result
441  * tupdesc (if any) is known.
442  */
443 void
444 PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot)
445 {
446         Portal          saveActivePortal;
447         Snapshot        saveActiveSnapshot;
448         ResourceOwner saveResourceOwner;
449         MemoryContext savePortalContext;
450         MemoryContext oldContext;
451         QueryDesc  *queryDesc;
452         int                     eflags;
453
454         AssertArg(PortalIsValid(portal));
455         AssertState(portal->queryContext != NULL);      /* query defined? */
456         AssertState(portal->status == PORTAL_NEW);      /* else extra PortalStart */
457
458         /*
459          * Set up global portal context pointers.  (Should we set QueryContext?)
460          */
461         saveActivePortal = ActivePortal;
462         saveActiveSnapshot = ActiveSnapshot;
463         saveResourceOwner = CurrentResourceOwner;
464         savePortalContext = PortalContext;
465         PG_TRY();
466         {
467                 ActivePortal = portal;
468                 ActiveSnapshot = NULL;  /* will be set later */
469                 CurrentResourceOwner = portal->resowner;
470                 PortalContext = PortalGetHeapMemory(portal);
471
472                 oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
473
474                 /* Must remember portal param list, if any */
475                 portal->portalParams = params;
476
477                 /*
478                  * Determine the portal execution strategy
479                  */
480                 portal->strategy = ChoosePortalStrategy(portal->stmts);
481
482                 /*
483                  * Fire her up according to the strategy
484                  */
485                 switch (portal->strategy)
486                 {
487                         case PORTAL_ONE_SELECT:
488
489                                 /*
490                                  * Must set snapshot before starting executor.  Be sure to
491                                  * copy it into the portal's context.
492                                  */
493                                 if (snapshot)
494                                         ActiveSnapshot = CopySnapshot(snapshot);
495                                 else
496                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
497
498                                 /*
499                                  * Create QueryDesc in portal's context; for the moment, set
500                                  * the destination to DestNone.
501                                  */
502                                 queryDesc = CreateQueryDesc((PlannedStmt *) linitial(portal->stmts),
503                                                                                         ActiveSnapshot,
504                                                                                         InvalidSnapshot,
505                                                                                         None_Receiver,
506                                                                                         params,
507                                                                                         false);
508
509                                 /*
510                                  * We do *not* call AfterTriggerBeginQuery() here.      We assume
511                                  * that a SELECT cannot queue any triggers.  It would be messy
512                                  * to support triggers since the execution of the portal may
513                                  * be interleaved with other queries.
514                                  */
515
516                                 /*
517                                  * If it's a scrollable cursor, executor needs to support
518                                  * REWIND and backwards scan.
519                                  */
520                                 if (portal->cursorOptions & CURSOR_OPT_SCROLL)
521                                         eflags = EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
522                                 else
523                                         eflags = 0; /* default run-to-completion flags */
524
525                                 /*
526                                  * Call ExecutorStart to prepare the plan for execution
527                                  */
528                                 ExecutorStart(queryDesc, eflags);
529
530                                 /*
531                                  * This tells PortalCleanup to shut down the executor
532                                  */
533                                 portal->queryDesc = queryDesc;
534
535                                 /*
536                                  * Remember tuple descriptor (computed by ExecutorStart)
537                                  */
538                                 portal->tupDesc = queryDesc->tupDesc;
539
540                                 /*
541                                  * Reset cursor position data to "start of query"
542                                  */
543                                 portal->atStart = true;
544                                 portal->atEnd = false;  /* allow fetches */
545                                 portal->portalPos = 0;
546                                 portal->posOverflow = false;
547                                 break;
548
549                         case PORTAL_ONE_RETURNING:
550
551                                 /*
552                                  * We don't start the executor until we are told to run the
553                                  * portal.      We do need to set up the result tupdesc.
554                                  */
555                                 {
556                                         PlannedStmt *pstmt;
557
558                                         pstmt = (PlannedStmt *) PortalGetPrimaryStmt(portal);
559                                         Assert(IsA(pstmt, PlannedStmt));
560                                         Assert(pstmt->returningLists);
561                                         portal->tupDesc =
562                                                 ExecCleanTypeFromTL((List *) linitial(pstmt->returningLists),
563                                                                                         false);
564                                 }
565
566                                 /*
567                                  * Reset cursor position data to "start of query"
568                                  */
569                                 portal->atStart = true;
570                                 portal->atEnd = false;  /* allow fetches */
571                                 portal->portalPos = 0;
572                                 portal->posOverflow = false;
573                                 break;
574
575                         case PORTAL_UTIL_SELECT:
576
577                                 /*
578                                  * We don't set snapshot here, because PortalRunUtility will
579                                  * take care of it if needed.
580                                  */
581                                 {
582                                         Node *ustmt = PortalGetPrimaryStmt(portal);
583
584                                         Assert(!IsA(ustmt, PlannedStmt));
585                                         portal->tupDesc = UtilityTupleDescriptor(ustmt);
586                                 }
587
588                                 /*
589                                  * Reset cursor position data to "start of query"
590                                  */
591                                 portal->atStart = true;
592                                 portal->atEnd = false;  /* allow fetches */
593                                 portal->portalPos = 0;
594                                 portal->posOverflow = false;
595                                 break;
596
597                         case PORTAL_MULTI_QUERY:
598                                 /* Need do nothing now */
599                                 portal->tupDesc = NULL;
600                                 break;
601                 }
602         }
603         PG_CATCH();
604         {
605                 /* Uncaught error while executing portal: mark it dead */
606                 portal->status = PORTAL_FAILED;
607
608                 /* Restore global vars and propagate error */
609                 ActivePortal = saveActivePortal;
610                 ActiveSnapshot = saveActiveSnapshot;
611                 CurrentResourceOwner = saveResourceOwner;
612                 PortalContext = savePortalContext;
613
614                 PG_RE_THROW();
615         }
616         PG_END_TRY();
617
618         MemoryContextSwitchTo(oldContext);
619
620         ActivePortal = saveActivePortal;
621         ActiveSnapshot = saveActiveSnapshot;
622         CurrentResourceOwner = saveResourceOwner;
623         PortalContext = savePortalContext;
624
625         portal->status = PORTAL_READY;
626 }
627
628 /*
629  * PortalSetResultFormat
630  *              Select the format codes for a portal's output.
631  *
632  * This must be run after PortalStart for a portal that will be read by
633  * a DestRemote or DestRemoteExecute destination.  It is not presently needed
634  * for other destination types.
635  *
636  * formats[] is the client format request, as per Bind message conventions.
637  */
638 void
639 PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
640 {
641         int                     natts;
642         int                     i;
643
644         /* Do nothing if portal won't return tuples */
645         if (portal->tupDesc == NULL)
646                 return;
647         natts = portal->tupDesc->natts;
648         portal->formats = (int16 *)
649                 MemoryContextAlloc(PortalGetHeapMemory(portal),
650                                                    natts * sizeof(int16));
651         if (nFormats > 1)
652         {
653                 /* format specified for each column */
654                 if (nFormats != natts)
655                         ereport(ERROR,
656                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
657                                          errmsg("bind message has %d result formats but query has %d columns",
658                                                         nFormats, natts)));
659                 memcpy(portal->formats, formats, natts * sizeof(int16));
660         }
661         else if (nFormats > 0)
662         {
663                 /* single format specified, use for all columns */
664                 int16           format1 = formats[0];
665
666                 for (i = 0; i < natts; i++)
667                         portal->formats[i] = format1;
668         }
669         else
670         {
671                 /* use default format for all columns */
672                 for (i = 0; i < natts; i++)
673                         portal->formats[i] = 0;
674         }
675 }
676
677 /*
678  * PortalRun
679  *              Run a portal's query or queries.
680  *
681  * count <= 0 is interpreted as a no-op: the destination gets started up
682  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
683  * interpreted as "all rows".  Note that count is ignored in multi-query
684  * situations, where we always run the portal to completion.
685  *
686  * dest: where to send output of primary (canSetTag) query
687  *
688  * altdest: where to send output of non-primary queries
689  *
690  * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
691  *              in which to store a command completion status string.
692  *              May be NULL if caller doesn't want a status string.
693  *
694  * Returns TRUE if the portal's execution is complete, FALSE if it was
695  * suspended due to exhaustion of the count parameter.
696  */
697 bool
698 PortalRun(Portal portal, long count,
699                   DestReceiver *dest, DestReceiver *altdest,
700                   char *completionTag)
701 {
702         bool            result;
703         ResourceOwner saveTopTransactionResourceOwner;
704         MemoryContext saveTopTransactionContext;
705         Portal          saveActivePortal;
706         Snapshot        saveActiveSnapshot;
707         ResourceOwner saveResourceOwner;
708         MemoryContext savePortalContext;
709         MemoryContext saveQueryContext;
710         MemoryContext saveMemoryContext;
711
712         AssertArg(PortalIsValid(portal));
713
714         /* Initialize completion tag to empty string */
715         if (completionTag)
716                 completionTag[0] = '\0';
717
718         if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
719         {
720                 ereport(DEBUG3,
721                                 (errmsg_internal("PortalRun")));
722                 /* PORTAL_MULTI_QUERY logs its own stats per query */
723                 ResetUsage();
724         }
725
726         /*
727          * Check for improper portal use, and mark portal active.
728          */
729         if (portal->status != PORTAL_READY)
730                 ereport(ERROR,
731                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
732                                  errmsg("portal \"%s\" cannot be run", portal->name)));
733         portal->status = PORTAL_ACTIVE;
734
735         /*
736          * Set up global portal context pointers.
737          *
738          * We have to play a special game here to support utility commands like
739          * VACUUM and CLUSTER, which internally start and commit transactions.
740          * When we are called to execute such a command, CurrentResourceOwner will
741          * be pointing to the TopTransactionResourceOwner --- which will be
742          * destroyed and replaced in the course of the internal commit and
743          * restart.  So we need to be prepared to restore it as pointing to the
744          * exit-time TopTransactionResourceOwner.  (Ain't that ugly?  This idea of
745          * internally starting whole new transactions is not good.)
746          * CurrentMemoryContext has a similar problem, but the other pointers we
747          * save here will be NULL or pointing to longer-lived objects.
748          */
749         saveTopTransactionResourceOwner = TopTransactionResourceOwner;
750         saveTopTransactionContext = TopTransactionContext;
751         saveActivePortal = ActivePortal;
752         saveActiveSnapshot = ActiveSnapshot;
753         saveResourceOwner = CurrentResourceOwner;
754         savePortalContext = PortalContext;
755         saveQueryContext = QueryContext;
756         saveMemoryContext = CurrentMemoryContext;
757         PG_TRY();
758         {
759                 ActivePortal = portal;
760                 ActiveSnapshot = NULL;  /* will be set later */
761                 CurrentResourceOwner = portal->resowner;
762                 PortalContext = PortalGetHeapMemory(portal);
763                 QueryContext = portal->queryContext;
764
765                 MemoryContextSwitchTo(PortalContext);
766
767                 switch (portal->strategy)
768                 {
769                         case PORTAL_ONE_SELECT:
770                                 (void) PortalRunSelect(portal, true, count, dest);
771
772                                 /* we know the query is supposed to set the tag */
773                                 if (completionTag && portal->commandTag)
774                                         strcpy(completionTag, portal->commandTag);
775
776                                 /* Mark portal not active */
777                                 portal->status = PORTAL_READY;
778
779                                 /*
780                                  * Since it's a forward fetch, say DONE iff atEnd is now true.
781                                  */
782                                 result = portal->atEnd;
783                                 break;
784
785                         case PORTAL_ONE_RETURNING:
786                         case PORTAL_UTIL_SELECT:
787
788                                 /*
789                                  * If we have not yet run the command, do so, storing its
790                                  * results in the portal's tuplestore.
791                                  */
792                                 if (!portal->holdStore)
793                                         FillPortalStore(portal);
794
795                                 /*
796                                  * Now fetch desired portion of results.
797                                  */
798                                 (void) PortalRunSelect(portal, true, count, dest);
799
800                                 /* we know the query is supposed to set the tag */
801                                 if (completionTag && portal->commandTag)
802                                         strcpy(completionTag, portal->commandTag);
803
804                                 /* Mark portal not active */
805                                 portal->status = PORTAL_READY;
806
807                                 /*
808                                  * Since it's a forward fetch, say DONE iff atEnd is now true.
809                                  */
810                                 result = portal->atEnd;
811                                 break;
812
813                         case PORTAL_MULTI_QUERY:
814                                 PortalRunMulti(portal, dest, altdest, completionTag);
815
816                                 /* Prevent portal's commands from being re-executed */
817                                 portal->status = PORTAL_DONE;
818
819                                 /* Always complete at end of RunMulti */
820                                 result = true;
821                                 break;
822
823                         default:
824                                 elog(ERROR, "unrecognized portal strategy: %d",
825                                          (int) portal->strategy);
826                                 result = false; /* keep compiler quiet */
827                                 break;
828                 }
829         }
830         PG_CATCH();
831         {
832                 /* Uncaught error while executing portal: mark it dead */
833                 portal->status = PORTAL_FAILED;
834
835                 /* Restore global vars and propagate error */
836                 if (saveMemoryContext == saveTopTransactionContext)
837                         MemoryContextSwitchTo(TopTransactionContext);
838                 else
839                         MemoryContextSwitchTo(saveMemoryContext);
840                 ActivePortal = saveActivePortal;
841                 ActiveSnapshot = saveActiveSnapshot;
842                 if (saveResourceOwner == saveTopTransactionResourceOwner)
843                         CurrentResourceOwner = TopTransactionResourceOwner;
844                 else
845                         CurrentResourceOwner = saveResourceOwner;
846                 PortalContext = savePortalContext;
847                 QueryContext = saveQueryContext;
848
849                 PG_RE_THROW();
850         }
851         PG_END_TRY();
852
853         if (saveMemoryContext == saveTopTransactionContext)
854                 MemoryContextSwitchTo(TopTransactionContext);
855         else
856                 MemoryContextSwitchTo(saveMemoryContext);
857         ActivePortal = saveActivePortal;
858         ActiveSnapshot = saveActiveSnapshot;
859         if (saveResourceOwner == saveTopTransactionResourceOwner)
860                 CurrentResourceOwner = TopTransactionResourceOwner;
861         else
862                 CurrentResourceOwner = saveResourceOwner;
863         PortalContext = savePortalContext;
864         QueryContext = saveQueryContext;
865
866         if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
867                 ShowUsage("EXECUTOR STATISTICS");
868
869         return result;
870 }
871
872 /*
873  * PortalRunSelect
874  *              Execute a portal's query in PORTAL_ONE_SELECT mode, and also
875  *              when fetching from a completed holdStore in PORTAL_ONE_RETURNING
876  *              and PORTAL_UTIL_SELECT cases.
877  *
878  * This handles simple N-rows-forward-or-backward cases.  For more complex
879  * nonsequential access to a portal, see PortalRunFetch.
880  *
881  * count <= 0 is interpreted as a no-op: the destination gets started up
882  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
883  * interpreted as "all rows".
884  *
885  * Caller must already have validated the Portal and done appropriate
886  * setup (cf. PortalRun).
887  *
888  * Returns number of rows processed (suitable for use in result tag)
889  */
890 static long
891 PortalRunSelect(Portal portal,
892                                 bool forward,
893                                 long count,
894                                 DestReceiver *dest)
895 {
896         QueryDesc  *queryDesc;
897         ScanDirection direction;
898         uint32          nprocessed;
899
900         /*
901          * NB: queryDesc will be NULL if we are fetching from a held cursor or a
902          * completed utility query; can't use it in that path.
903          */
904         queryDesc = PortalGetQueryDesc(portal);
905
906         /* Caller messed up if we have neither a ready query nor held data. */
907         Assert(queryDesc || portal->holdStore);
908
909         /*
910          * Force the queryDesc destination to the right thing.  This supports
911          * MOVE, for example, which will pass in dest = DestNone.  This is okay to
912          * change as long as we do it on every fetch.  (The Executor must not
913          * assume that dest never changes.)
914          */
915         if (queryDesc)
916                 queryDesc->dest = dest;
917
918         /*
919          * Determine which direction to go in, and check to see if we're already
920          * at the end of the available tuples in that direction.  If so, set the
921          * direction to NoMovement to avoid trying to fetch any tuples.  (This
922          * check exists because not all plan node types are robust about being
923          * called again if they've already returned NULL once.)  Then call the
924          * executor (we must not skip this, because the destination needs to see a
925          * setup and shutdown even if no tuples are available).  Finally, update
926          * the portal position state depending on the number of tuples that were
927          * retrieved.
928          */
929         if (forward)
930         {
931                 if (portal->atEnd || count <= 0)
932                         direction = NoMovementScanDirection;
933                 else
934                         direction = ForwardScanDirection;
935
936                 /* In the executor, zero count processes all rows */
937                 if (count == FETCH_ALL)
938                         count = 0;
939
940                 if (portal->holdStore)
941                         nprocessed = RunFromStore(portal, direction, count, dest);
942                 else
943                 {
944                         ActiveSnapshot = queryDesc->snapshot;
945                         ExecutorRun(queryDesc, direction, count);
946                         nprocessed = queryDesc->estate->es_processed;
947                 }
948
949                 if (!ScanDirectionIsNoMovement(direction))
950                 {
951                         long            oldPos;
952
953                         if (nprocessed > 0)
954                                 portal->atStart = false;                /* OK to go backward now */
955                         if (count == 0 ||
956                                 (unsigned long) nprocessed < (unsigned long) count)
957                                 portal->atEnd = true;   /* we retrieved 'em all */
958                         oldPos = portal->portalPos;
959                         portal->portalPos += nprocessed;
960                         /* portalPos doesn't advance when we fall off the end */
961                         if (portal->portalPos < oldPos)
962                                 portal->posOverflow = true;
963                 }
964         }
965         else
966         {
967                 if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
968                         ereport(ERROR,
969                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970                                          errmsg("cursor can only scan forward"),
971                                          errhint("Declare it with SCROLL option to enable backward scan.")));
972
973                 if (portal->atStart || count <= 0)
974                         direction = NoMovementScanDirection;
975                 else
976                         direction = BackwardScanDirection;
977
978                 /* In the executor, zero count processes all rows */
979                 if (count == FETCH_ALL)
980                         count = 0;
981
982                 if (portal->holdStore)
983                         nprocessed = RunFromStore(portal, direction, count, dest);
984                 else
985                 {
986                         ActiveSnapshot = queryDesc->snapshot;
987                         ExecutorRun(queryDesc, direction, count);
988                         nprocessed = queryDesc->estate->es_processed;
989                 }
990
991                 if (!ScanDirectionIsNoMovement(direction))
992                 {
993                         if (nprocessed > 0 && portal->atEnd)
994                         {
995                                 portal->atEnd = false;  /* OK to go forward now */
996                                 portal->portalPos++;    /* adjust for endpoint case */
997                         }
998                         if (count == 0 ||
999                                 (unsigned long) nprocessed < (unsigned long) count)
1000                         {
1001                                 portal->atStart = true; /* we retrieved 'em all */
1002                                 portal->portalPos = 0;
1003                                 portal->posOverflow = false;
1004                         }
1005                         else
1006                         {
1007                                 long            oldPos;
1008
1009                                 oldPos = portal->portalPos;
1010                                 portal->portalPos -= nprocessed;
1011                                 if (portal->portalPos > oldPos ||
1012                                         portal->portalPos <= 0)
1013                                         portal->posOverflow = true;
1014                         }
1015                 }
1016         }
1017
1018         return nprocessed;
1019 }
1020
1021 /*
1022  * FillPortalStore
1023  *              Run the query and load result tuples into the portal's tuple store.
1024  *
1025  * This is used for PORTAL_ONE_RETURNING and PORTAL_UTIL_SELECT cases only.
1026  */
1027 static void
1028 FillPortalStore(Portal portal)
1029 {
1030         DestReceiver *treceiver;
1031         char            completionTag[COMPLETION_TAG_BUFSIZE];
1032
1033         PortalCreateHoldStore(portal);
1034         treceiver = CreateDestReceiver(DestTuplestore, portal);
1035
1036         completionTag[0] = '\0';
1037
1038         switch (portal->strategy)
1039         {
1040                 case PORTAL_ONE_RETURNING:
1041
1042                         /*
1043                          * Run the portal to completion just as for the default
1044                          * MULTI_QUERY case, but send the primary query's output to the
1045                          * tuplestore. Auxiliary query outputs are discarded.
1046                          */
1047                         PortalRunMulti(portal, treceiver, None_Receiver, completionTag);
1048                         break;
1049
1050                 case PORTAL_UTIL_SELECT:
1051                         PortalRunUtility(portal, (Node *) linitial(portal->stmts),
1052                                                          treceiver, completionTag);
1053                         break;
1054
1055                 default:
1056                         elog(ERROR, "unsupported portal strategy: %d",
1057                                  (int) portal->strategy);
1058                         break;
1059         }
1060
1061         /* Override default completion tag with actual command result */
1062         if (completionTag[0] != '\0')
1063                 portal->commandTag = pstrdup(completionTag);
1064
1065         (*treceiver->rDestroy) (treceiver);
1066 }
1067
1068 /*
1069  * RunFromStore
1070  *              Fetch tuples from the portal's tuple store.
1071  *
1072  * Calling conventions are similar to ExecutorRun, except that we
1073  * do not depend on having a queryDesc or estate.  Therefore we return the
1074  * number of tuples processed as the result, not in estate->es_processed.
1075  *
1076  * One difference from ExecutorRun is that the destination receiver functions
1077  * are run in the caller's memory context (since we have no estate).  Watch
1078  * out for memory leaks.
1079  */
1080 static uint32
1081 RunFromStore(Portal portal, ScanDirection direction, long count,
1082                          DestReceiver *dest)
1083 {
1084         long            current_tuple_count = 0;
1085         TupleTableSlot *slot;
1086
1087         slot = MakeSingleTupleTableSlot(portal->tupDesc);
1088
1089         (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
1090
1091         if (ScanDirectionIsNoMovement(direction))
1092         {
1093                 /* do nothing except start/stop the destination */
1094         }
1095         else
1096         {
1097                 bool            forward = ScanDirectionIsForward(direction);
1098
1099                 for (;;)
1100                 {
1101                         MemoryContext oldcontext;
1102                         bool            ok;
1103
1104                         oldcontext = MemoryContextSwitchTo(portal->holdContext);
1105
1106                         ok = tuplestore_gettupleslot(portal->holdStore, forward, slot);
1107
1108                         MemoryContextSwitchTo(oldcontext);
1109
1110                         if (!ok)
1111                                 break;
1112
1113                         (*dest->receiveSlot) (slot, dest);
1114
1115                         ExecClearTuple(slot);
1116
1117                         /*
1118                          * check our tuple count.. if we've processed the proper number
1119                          * then quit, else loop again and process more tuples. Zero count
1120                          * means no limit.
1121                          */
1122                         current_tuple_count++;
1123                         if (count && count == current_tuple_count)
1124                                 break;
1125                 }
1126         }
1127
1128         (*dest->rShutdown) (dest);
1129
1130         ExecDropSingleTupleTableSlot(slot);
1131
1132         return (uint32) current_tuple_count;
1133 }
1134
1135 /*
1136  * PortalRunUtility
1137  *              Execute a utility statement inside a portal.
1138  */
1139 static void
1140 PortalRunUtility(Portal portal, Node *utilityStmt,
1141                                  DestReceiver *dest, char *completionTag)
1142 {
1143         ereport(DEBUG3,
1144                         (errmsg_internal("ProcessUtility")));
1145
1146         /*
1147          * Set snapshot if utility stmt needs one.      Most reliable way to do this
1148          * seems to be to enumerate those that do not need one; this is a short
1149          * list.  Transaction control, LOCK, and SET must *not* set a snapshot
1150          * since they need to be executable at the start of a serializable
1151          * transaction without freezing a snapshot.  By extension we allow SHOW
1152          * not to set a snapshot.  The other stmts listed are just efficiency
1153          * hacks.  Beware of listing anything that can modify the database --- if,
1154          * say, it has to update an index with expressions that invoke
1155          * user-defined functions, then it had better have a snapshot.
1156          *
1157          * Note we assume that caller will take care of restoring ActiveSnapshot
1158          * on exit/error.
1159          */
1160         if (!(IsA(utilityStmt, TransactionStmt) ||
1161                   IsA(utilityStmt, LockStmt) ||
1162                   IsA(utilityStmt, VariableSetStmt) ||
1163                   IsA(utilityStmt, VariableShowStmt) ||
1164                   IsA(utilityStmt, VariableResetStmt) ||
1165                   IsA(utilityStmt, ConstraintsSetStmt) ||
1166         /* efficiency hacks from here down */
1167                   IsA(utilityStmt, FetchStmt) ||
1168                   IsA(utilityStmt, ListenStmt) ||
1169                   IsA(utilityStmt, NotifyStmt) ||
1170                   IsA(utilityStmt, UnlistenStmt) ||
1171                   IsA(utilityStmt, CheckPointStmt)))
1172                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1173         else
1174                 ActiveSnapshot = NULL;
1175
1176         ProcessUtility(utilityStmt, portal->portalParams, dest, completionTag);
1177
1178         /* Some utility statements may change context on us */
1179         MemoryContextSwitchTo(PortalGetHeapMemory(portal));
1180
1181         if (ActiveSnapshot)
1182                 FreeSnapshot(ActiveSnapshot);
1183         ActiveSnapshot = NULL;
1184 }
1185
1186 /*
1187  * PortalRunMulti
1188  *              Execute a portal's queries in the general case (multi queries
1189  *              or non-SELECT-like queries)
1190  */
1191 static void
1192 PortalRunMulti(Portal portal,
1193                            DestReceiver *dest, DestReceiver *altdest,
1194                            char *completionTag)
1195 {
1196         ListCell   *stmtlist_item;
1197
1198         /*
1199          * If the destination is DestRemoteExecute, change to DestNone.  The
1200          * reason is that the client won't be expecting any tuples, and indeed has
1201          * no way to know what they are, since there is no provision for Describe
1202          * to send a RowDescription message when this portal execution strategy is
1203          * in effect.  This presently will only affect SELECT commands added to
1204          * non-SELECT queries by rewrite rules: such commands will be executed,
1205          * but the results will be discarded unless you use "simple Query"
1206          * protocol.
1207          */
1208         if (dest->mydest == DestRemoteExecute)
1209                 dest = None_Receiver;
1210         if (altdest->mydest == DestRemoteExecute)
1211                 altdest = None_Receiver;
1212
1213         /*
1214          * Loop to handle the individual queries generated from a single parsetree
1215          * by analysis and rewrite.
1216          */
1217         foreach(stmtlist_item, portal->stmts)
1218         {
1219                 Node   *stmt = (Node *) lfirst(stmtlist_item);
1220
1221                 /*
1222                  * If we got a cancel signal in prior command, quit
1223                  */
1224                 CHECK_FOR_INTERRUPTS();
1225
1226                 if (IsA(stmt, PlannedStmt))
1227                 {
1228                         /*
1229                          * process a plannable query.
1230                          */
1231                         PlannedStmt *pstmt = (PlannedStmt *) stmt;
1232
1233                         if (log_executor_stats)
1234                                 ResetUsage();
1235
1236                         if (pstmt->canSetTag)
1237                         {
1238                                 /* statement can set tag string */
1239                                 ProcessQuery(pstmt,
1240                                                          portal->portalParams,
1241                                                          dest, completionTag);
1242                         }
1243                         else
1244                         {
1245                                 /* stmt added by rewrite cannot set tag */
1246                                 ProcessQuery(pstmt,
1247                                                          portal->portalParams,
1248                                                          altdest, NULL);
1249                         }
1250
1251                         if (log_executor_stats)
1252                                 ShowUsage("EXECUTOR STATISTICS");
1253                 }
1254                 else
1255                 {
1256                         /*
1257                          * process utility functions (create, destroy, etc..)
1258                          *
1259                          * These are assumed canSetTag if they're the only stmt in the
1260                          * portal.
1261                          */
1262                         if (list_length(portal->stmts) == 1)
1263                                 PortalRunUtility(portal, stmt, dest, completionTag);
1264                         else
1265                                 PortalRunUtility(portal, stmt, altdest, NULL);
1266                 }
1267
1268                 /*
1269                  * Increment command counter between queries, but not after the last
1270                  * one.
1271                  */
1272                 if (lnext(stmtlist_item) != NULL)
1273                         CommandCounterIncrement();
1274
1275                 /*
1276                  * Clear subsidiary contexts to recover temporary memory.
1277                  */
1278                 Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
1279
1280                 MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
1281         }
1282
1283         /*
1284          * If a command completion tag was supplied, use it.  Otherwise use the
1285          * portal's commandTag as the default completion tag.
1286          *
1287          * Exception: clients will expect INSERT/UPDATE/DELETE tags to have
1288          * counts, so fake something up if necessary.  (This could happen if the
1289          * original query was replaced by a DO INSTEAD rule.)
1290          */
1291         if (completionTag && completionTag[0] == '\0')
1292         {
1293                 if (portal->commandTag)
1294                         strcpy(completionTag, portal->commandTag);
1295                 if (strcmp(completionTag, "INSERT") == 0)
1296                         strcpy(completionTag, "INSERT 0 0");
1297                 else if (strcmp(completionTag, "UPDATE") == 0)
1298                         strcpy(completionTag, "UPDATE 0");
1299                 else if (strcmp(completionTag, "DELETE") == 0)
1300                         strcpy(completionTag, "DELETE 0");
1301         }
1302 }
1303
1304 /*
1305  * PortalRunFetch
1306  *              Variant form of PortalRun that supports SQL FETCH directions.
1307  *
1308  * Returns number of rows processed (suitable for use in result tag)
1309  */
1310 long
1311 PortalRunFetch(Portal portal,
1312                            FetchDirection fdirection,
1313                            long count,
1314                            DestReceiver *dest)
1315 {
1316         long            result;
1317         Portal          saveActivePortal;
1318         Snapshot        saveActiveSnapshot;
1319         ResourceOwner saveResourceOwner;
1320         MemoryContext savePortalContext;
1321         MemoryContext saveQueryContext;
1322         MemoryContext oldContext;
1323
1324         AssertArg(PortalIsValid(portal));
1325
1326         /*
1327          * Check for improper portal use, and mark portal active.
1328          */
1329         if (portal->status != PORTAL_READY)
1330                 ereport(ERROR,
1331                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1332                                  errmsg("portal \"%s\" cannot be run", portal->name)));
1333         portal->status = PORTAL_ACTIVE;
1334
1335         /*
1336          * Set up global portal context pointers.
1337          */
1338         saveActivePortal = ActivePortal;
1339         saveActiveSnapshot = ActiveSnapshot;
1340         saveResourceOwner = CurrentResourceOwner;
1341         savePortalContext = PortalContext;
1342         saveQueryContext = QueryContext;
1343         PG_TRY();
1344         {
1345                 ActivePortal = portal;
1346                 ActiveSnapshot = NULL;  /* will be set later */
1347                 CurrentResourceOwner = portal->resowner;
1348                 PortalContext = PortalGetHeapMemory(portal);
1349                 QueryContext = portal->queryContext;
1350
1351                 oldContext = MemoryContextSwitchTo(PortalContext);
1352
1353                 switch (portal->strategy)
1354                 {
1355                         case PORTAL_ONE_SELECT:
1356                                 result = DoPortalRunFetch(portal, fdirection, count, dest);
1357                                 break;
1358
1359                         case PORTAL_ONE_RETURNING:
1360                         case PORTAL_UTIL_SELECT:
1361
1362                                 /*
1363                                  * If we have not yet run the command, do so, storing its
1364                                  * results in the portal's tuplestore.
1365                                  */
1366                                 if (!portal->holdStore)
1367                                         FillPortalStore(portal);
1368
1369                                 /*
1370                                  * Now fetch desired portion of results.
1371                                  */
1372                                 result = DoPortalRunFetch(portal, fdirection, count, dest);
1373                                 break;
1374
1375                         default:
1376                                 elog(ERROR, "unsupported portal strategy");
1377                                 result = 0;             /* keep compiler quiet */
1378                                 break;
1379                 }
1380         }
1381         PG_CATCH();
1382         {
1383                 /* Uncaught error while executing portal: mark it dead */
1384                 portal->status = PORTAL_FAILED;
1385
1386                 /* Restore global vars and propagate error */
1387                 ActivePortal = saveActivePortal;
1388                 ActiveSnapshot = saveActiveSnapshot;
1389                 CurrentResourceOwner = saveResourceOwner;
1390                 PortalContext = savePortalContext;
1391                 QueryContext = saveQueryContext;
1392
1393                 PG_RE_THROW();
1394         }
1395         PG_END_TRY();
1396
1397         MemoryContextSwitchTo(oldContext);
1398
1399         /* Mark portal not active */
1400         portal->status = PORTAL_READY;
1401
1402         ActivePortal = saveActivePortal;
1403         ActiveSnapshot = saveActiveSnapshot;
1404         CurrentResourceOwner = saveResourceOwner;
1405         PortalContext = savePortalContext;
1406         QueryContext = saveQueryContext;
1407
1408         return result;
1409 }
1410
1411 /*
1412  * DoPortalRunFetch
1413  *              Guts of PortalRunFetch --- the portal context is already set up
1414  *
1415  * Returns number of rows processed (suitable for use in result tag)
1416  */
1417 static long
1418 DoPortalRunFetch(Portal portal,
1419                                  FetchDirection fdirection,
1420                                  long count,
1421                                  DestReceiver *dest)
1422 {
1423         bool            forward;
1424
1425         Assert(portal->strategy == PORTAL_ONE_SELECT ||
1426                    portal->strategy == PORTAL_ONE_RETURNING ||
1427                    portal->strategy == PORTAL_UTIL_SELECT);
1428
1429         switch (fdirection)
1430         {
1431                 case FETCH_FORWARD:
1432                         if (count < 0)
1433                         {
1434                                 fdirection = FETCH_BACKWARD;
1435                                 count = -count;
1436                         }
1437                         /* fall out of switch to share code with FETCH_BACKWARD */
1438                         break;
1439                 case FETCH_BACKWARD:
1440                         if (count < 0)
1441                         {
1442                                 fdirection = FETCH_FORWARD;
1443                                 count = -count;
1444                         }
1445                         /* fall out of switch to share code with FETCH_FORWARD */
1446                         break;
1447                 case FETCH_ABSOLUTE:
1448                         if (count > 0)
1449                         {
1450                                 /*
1451                                  * Definition: Rewind to start, advance count-1 rows, return
1452                                  * next row (if any).  In practice, if the goal is less than
1453                                  * halfway back to the start, it's better to scan from where
1454                                  * we are.      In any case, we arrange to fetch the target row
1455                                  * going forwards.
1456                                  */
1457                                 if (portal->posOverflow || portal->portalPos == LONG_MAX ||
1458                                         count - 1 <= portal->portalPos / 2)
1459                                 {
1460                                         DoPortalRewind(portal);
1461                                         if (count > 1)
1462                                                 PortalRunSelect(portal, true, count - 1,
1463                                                                                 None_Receiver);
1464                                 }
1465                                 else
1466                                 {
1467                                         long            pos = portal->portalPos;
1468
1469                                         if (portal->atEnd)
1470                                                 pos++;  /* need one extra fetch if off end */
1471                                         if (count <= pos)
1472                                                 PortalRunSelect(portal, false, pos - count + 1,
1473                                                                                 None_Receiver);
1474                                         else if (count > pos + 1)
1475                                                 PortalRunSelect(portal, true, count - pos - 1,
1476                                                                                 None_Receiver);
1477                                 }
1478                                 return PortalRunSelect(portal, true, 1L, dest);
1479                         }
1480                         else if (count < 0)
1481                         {
1482                                 /*
1483                                  * Definition: Advance to end, back up abs(count)-1 rows,
1484                                  * return prior row (if any).  We could optimize this if we
1485                                  * knew in advance where the end was, but typically we won't.
1486                                  * (Is it worth considering case where count > half of size of
1487                                  * query?  We could rewind once we know the size ...)
1488                                  */
1489                                 PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1490                                 if (count < -1)
1491                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1492                                 return PortalRunSelect(portal, false, 1L, dest);
1493                         }
1494                         else
1495                         {
1496                                 /* count == 0 */
1497                                 /* Rewind to start, return zero rows */
1498                                 DoPortalRewind(portal);
1499                                 return PortalRunSelect(portal, true, 0L, dest);
1500                         }
1501                         break;
1502                 case FETCH_RELATIVE:
1503                         if (count > 0)
1504                         {
1505                                 /*
1506                                  * Definition: advance count-1 rows, return next row (if any).
1507                                  */
1508                                 if (count > 1)
1509                                         PortalRunSelect(portal, true, count - 1, None_Receiver);
1510                                 return PortalRunSelect(portal, true, 1L, dest);
1511                         }
1512                         else if (count < 0)
1513                         {
1514                                 /*
1515                                  * Definition: back up abs(count)-1 rows, return prior row (if
1516                                  * any).
1517                                  */
1518                                 if (count < -1)
1519                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1520                                 return PortalRunSelect(portal, false, 1L, dest);
1521                         }
1522                         else
1523                         {
1524                                 /* count == 0 */
1525                                 /* Same as FETCH FORWARD 0, so fall out of switch */
1526                                 fdirection = FETCH_FORWARD;
1527                         }
1528                         break;
1529                 default:
1530                         elog(ERROR, "bogus direction");
1531                         break;
1532         }
1533
1534         /*
1535          * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1536          * >= 0.
1537          */
1538         forward = (fdirection == FETCH_FORWARD);
1539
1540         /*
1541          * Zero count means to re-fetch the current row, if any (per SQL92)
1542          */
1543         if (count == 0)
1544         {
1545                 bool            on_row;
1546
1547                 /* Are we sitting on a row? */
1548                 on_row = (!portal->atStart && !portal->atEnd);
1549
1550                 if (dest->mydest == DestNone)
1551                 {
1552                         /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1553                         return on_row ? 1L : 0L;
1554                 }
1555                 else
1556                 {
1557                         /*
1558                          * If we are sitting on a row, back up one so we can re-fetch it.
1559                          * If we are not sitting on a row, we still have to start up and
1560                          * shut down the executor so that the destination is initialized
1561                          * and shut down correctly; so keep going.      To PortalRunSelect,
1562                          * count == 0 means we will retrieve no row.
1563                          */
1564                         if (on_row)
1565                         {
1566                                 PortalRunSelect(portal, false, 1L, None_Receiver);
1567                                 /* Set up to fetch one row forward */
1568                                 count = 1;
1569                                 forward = true;
1570                         }
1571                 }
1572         }
1573
1574         /*
1575          * Optimize MOVE BACKWARD ALL into a Rewind.
1576          */
1577         if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1578         {
1579                 long            result = portal->portalPos;
1580
1581                 if (result > 0 && !portal->atEnd)
1582                         result--;
1583                 DoPortalRewind(portal);
1584                 /* result is bogus if pos had overflowed, but it's best we can do */
1585                 return result;
1586         }
1587
1588         return PortalRunSelect(portal, forward, count, dest);
1589 }
1590
1591 /*
1592  * DoPortalRewind - rewind a Portal to starting point
1593  */
1594 static void
1595 DoPortalRewind(Portal portal)
1596 {
1597         if (portal->holdStore)
1598         {
1599                 MemoryContext oldcontext;
1600
1601                 oldcontext = MemoryContextSwitchTo(portal->holdContext);
1602                 tuplestore_rescan(portal->holdStore);
1603                 MemoryContextSwitchTo(oldcontext);
1604         }
1605         if (PortalGetQueryDesc(portal))
1606                 ExecutorRewind(PortalGetQueryDesc(portal));
1607
1608         portal->atStart = true;
1609         portal->atEnd = false;
1610         portal->portalPos = 0;
1611         portal->posOverflow = false;
1612 }