X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Fportalcmds.c;h=1ba72437ad72669e35829d0ff6367d45335f0bc1;hb=6261c75014c9948837d9d025493ef18b8f833f70;hp=754ea46246c733d411738503abebd61382103e8d;hpb=959e61e917a802074e257d4bec13ee04ab4822ff;p=postgresql diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 754ea46246..1ba72437ad 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -8,54 +8,106 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.2 2002/05/21 22:05:54 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.10 2003/03/11 19:40:22 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include + #include "commands/portalcmds.h" #include "executor/executor.h" +#include "optimizer/planner.h" +#include "rewrite/rewriteHandler.h" + + +static long DoRelativeFetch(Portal portal, + bool forward, + long count, + CommandDest dest); +static void DoPortalRewind(Portal portal); +static Portal PreparePortal(char *portalName); /* - * PortalCleanup + * PerformCursorOpen + * Execute SQL DECLARE CURSOR command. */ void -PortalCleanup(Portal portal) +PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) { - MemoryContext oldcontext; + List *rewritten; + Query *query; + Plan *plan; + Portal portal; + MemoryContext oldContext; + char *cursorName; + QueryDesc *queryDesc; + + /* Check for invalid context (must be in transaction block) */ + RequireTransactionChain((void *) stmt, "DECLARE CURSOR"); /* - * sanity checks + * The query has been through parse analysis, but not rewriting or + * planning as yet. Note that the grammar ensured we have a SELECT + * query, so we are not expecting rule rewriting to do anything strange. */ - AssertArg(PortalIsValid(portal)); - AssertArg(portal->cleanup == PortalCleanup); + rewritten = QueryRewrite((Query *) stmt->query); + if (length(rewritten) != 1 || !IsA(lfirst(rewritten), Query)) + elog(ERROR, "PerformCursorOpen: unexpected rewrite result"); + query = (Query *) lfirst(rewritten); + if (query->commandType != CMD_SELECT) + elog(ERROR, "PerformCursorOpen: unexpected rewrite result"); + + if (query->into) + elog(ERROR, "DECLARE CURSOR may not specify INTO"); + if (query->rowMarks != NIL) + elog(ERROR, "DECLARE/UPDATE is not supported" + "\n\tCursors must be READ ONLY"); + + plan = planner(query, true, stmt->options); + + /* If binary cursor, switch to alternate output format */ + if ((stmt->options & CURSOR_OPT_BINARY) && dest == Remote) + dest = RemoteInternal; /* - * set proper portal-executor context before calling ExecMain. + * Create a portal and copy the query and plan into its memory context. */ - oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + portal = PreparePortal(stmt->portalname); + + oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + query = copyObject(query); + plan = copyObject(plan); /* - * tell the executor to shutdown the query + * Create the QueryDesc object in the portal context, too. */ - ExecutorEnd(PortalGetQueryDesc(portal), PortalGetState(portal)); + cursorName = pstrdup(stmt->portalname); + queryDesc = CreateQueryDesc(query, plan, dest, cursorName, NULL, false); /* - * switch back to previous context + * call ExecStart to prepare the plan for execution */ - MemoryContextSwitchTo(oldcontext); -} + ExecutorStart(queryDesc); + /* Arrange to shut down the executor if portal is dropped */ + PortalSetQuery(portal, queryDesc, PortalCleanup); + + /* + * We're done; the query won't actually be run until PerformPortalFetch + * is called. + */ + MemoryContextSwitchTo(oldContext); +} /* * PerformPortalFetch + * Execute SQL FETCH or MOVE command. * - * name: name of portal - * forward: forward or backward fetch? - * count: # of tuples to fetch (0 implies all) + * stmt: parsetree node for command * dest: where to send results * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE * in which to store a command completion status string. @@ -63,50 +115,236 @@ PortalCleanup(Portal portal) * completionTag may be NULL if caller doesn't want a status string. */ void -PerformPortalFetch(char *name, - bool forward, - int count, +PerformPortalFetch(FetchStmt *stmt, CommandDest dest, char *completionTag) { Portal portal; - QueryDesc *queryDesc; - EState *estate; - MemoryContext oldcontext; - ScanDirection direction; - bool temp_desc = false; + long nprocessed; /* initialize completion status in case of early exit */ if (completionTag) - strcpy(completionTag, (dest == None) ? "MOVE 0" : "FETCH 0"); + strcpy(completionTag, stmt->ismove ? "MOVE 0" : "FETCH 0"); - /* - * sanity checks - */ - if (name == NULL) + /* get the portal from the portal name */ + portal = GetPortalByName(stmt->portalname); + if (!PortalIsValid(portal)) { - elog(WARNING, "PerformPortalFetch: missing portal name"); + elog(WARNING, "PerformPortalFetch: portal \"%s\" not found", + stmt->portalname); return; } + /* Do it */ + nprocessed = DoPortalFetch(portal, + stmt->direction, + stmt->howMany, + stmt->ismove ? None : dest); + + /* Return command status if wanted */ + if (completionTag) + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %ld", + stmt->ismove ? "MOVE" : "FETCH", + nprocessed); +} + +/* + * DoPortalFetch + * Guts of PerformPortalFetch --- shared with SPI cursor operations. + * Caller must already have validated the Portal. + * + * Returns number of rows processed (suitable for use in result tag) + */ +long +DoPortalFetch(Portal portal, + FetchDirection fdirection, + long count, + CommandDest dest) +{ + bool forward; + + switch (fdirection) + { + case FETCH_FORWARD: + if (count < 0) + { + fdirection = FETCH_BACKWARD; + count = -count; + } + /* fall out of switch to share code with FETCH_BACKWARD */ + break; + case FETCH_BACKWARD: + if (count < 0) + { + fdirection = FETCH_FORWARD; + count = -count; + } + /* fall out of switch to share code with FETCH_FORWARD */ + break; + case FETCH_ABSOLUTE: + if (count > 0) + { + /* + * Definition: Rewind to start, advance count-1 rows, return + * next row (if any). In practice, if the goal is less than + * halfway back to the start, it's better to scan from where + * we are. In any case, we arrange to fetch the target row + * going forwards. + */ + if (portal->posOverflow || portal->portalPos == LONG_MAX || + count-1 <= portal->portalPos / 2) + { + DoPortalRewind(portal); + if (count > 1) + DoRelativeFetch(portal, true, count-1, None); + } + else + { + long pos = portal->portalPos; + + if (portal->atEnd) + pos++; /* need one extra fetch if off end */ + if (count <= pos) + DoRelativeFetch(portal, false, pos-count+1, None); + else if (count > pos+1) + DoRelativeFetch(portal, true, count-pos-1, None); + } + return DoRelativeFetch(portal, true, 1L, dest); + } + else if (count < 0) + { + /* + * Definition: Advance to end, back up abs(count)-1 rows, + * return prior row (if any). We could optimize this if we + * knew in advance where the end was, but typically we won't. + * (Is it worth considering case where count > half of size + * of query? We could rewind once we know the size ...) + */ + DoRelativeFetch(portal, true, FETCH_ALL, None); + if (count < -1) + DoRelativeFetch(portal, false, -count-1, None); + return DoRelativeFetch(portal, false, 1L, dest); + } + else /* count == 0 */ + { + /* Rewind to start, return zero rows */ + DoPortalRewind(portal); + return DoRelativeFetch(portal, true, 0L, dest); + } + break; + case FETCH_RELATIVE: + if (count > 0) + { + /* + * Definition: advance count-1 rows, return next row (if any). + */ + if (count > 1) + DoRelativeFetch(portal, true, count-1, None); + return DoRelativeFetch(portal, true, 1L, dest); + } + else if (count < 0) + { + /* + * Definition: back up abs(count)-1 rows, return prior row + * (if any). + */ + if (count < -1) + DoRelativeFetch(portal, false, -count-1, None); + return DoRelativeFetch(portal, false, 1L, dest); + } + else /* count == 0 */ + { + /* Same as FETCH FORWARD 0, so fall out of switch */ + fdirection = FETCH_FORWARD; + } + break; + default: + elog(ERROR, "DoPortalFetch: bogus direction"); + break; + } + /* - * get the portal from the portal name + * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, + * and count >= 0. */ - portal = GetPortalByName(name); - if (!PortalIsValid(portal)) + forward = (fdirection == FETCH_FORWARD); + + /* + * Zero count means to re-fetch the current row, if any (per SQL92) + */ + if (count == 0) { - elog(WARNING, "PerformPortalFetch: portal \"%s\" not found", - name); - return; + bool on_row; + + /* Are we sitting on a row? */ + on_row = (!portal->atStart && !portal->atEnd); + + if (dest == None) + { + /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */ + return on_row ? 1L : 0L; + } + else + { + /* + * If we are sitting on a row, back up one so we can re-fetch it. + * If we are not sitting on a row, we still have to start up and + * shut down the executor so that the destination is initialized + * and shut down correctly; so keep going. To DoRelativeFetch, + * count == 0 means we will retrieve no row. + */ + if (on_row) + { + DoRelativeFetch(portal, false, 1L, None); + /* Set up to fetch one row forward */ + count = 1; + forward = true; + } + } } /* - * switch into the portal context + * Optimize MOVE BACKWARD ALL into a Rewind. */ - oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + if (!forward && count == FETCH_ALL && dest == None) + { + long result = portal->portalPos; + + if (result > 0 && !portal->atEnd) + result--; + DoPortalRewind(portal); + /* result is bogus if pos had overflowed, but it's best we can do */ + return result; + } + + return DoRelativeFetch(portal, forward, count, dest); +} + +/* + * DoRelativeFetch + * Do fetch for a simple N-rows-forward-or-backward case. + * + * count <= 0 is interpreted as a no-op: the destination gets started up + * and shut down, but nothing else happens. Also, count == FETCH_ALL is + * interpreted as "all rows". + * + * Caller must already have validated the Portal. + * + * Returns number of rows processed (suitable for use in result tag) + */ +static long +DoRelativeFetch(Portal portal, + bool forward, + long count, + CommandDest dest) +{ + QueryDesc *queryDesc; + EState *estate; + ScanDirection direction; + QueryDesc temp_queryDesc; queryDesc = PortalGetQueryDesc(portal); - estate = PortalGetState(portal); + estate = queryDesc->estate; /* * If the requested destination is not the same as the query's @@ -122,86 +360,125 @@ PerformPortalFetch(char *name, if (dest != queryDesc->dest && !(queryDesc->dest == RemoteInternal && dest == Remote)) { - QueryDesc *qdesc = (QueryDesc *) palloc(sizeof(QueryDesc)); - - memcpy(qdesc, queryDesc, sizeof(QueryDesc)); - qdesc->dest = dest; - queryDesc = qdesc; - temp_desc = true; + memcpy(&temp_queryDesc, queryDesc, sizeof(QueryDesc)); + temp_queryDesc.dest = dest; + queryDesc = &temp_queryDesc; } /* * Determine which direction to go in, and check to see if we're * already at the end of the available tuples in that direction. If * so, set the direction to NoMovement to avoid trying to fetch any - * tuples. (This check exists because not all plan node types - * are robust about being called again if they've already returned - * NULL once.) Then call the executor (we must not skip this, because - * the destination needs to see a setup and shutdown even if no tuples - * are available). Finally, update the atStart/atEnd state depending - * on the number of tuples that were retrieved. + * tuples. (This check exists because not all plan node types are + * robust about being called again if they've already returned NULL + * once.) Then call the executor (we must not skip this, because the + * destination needs to see a setup and shutdown even if no tuples are + * available). Finally, update the portal position state depending on + * the number of tuples that were retrieved. */ if (forward) { - if (portal->atEnd) + if (portal->atEnd || count <= 0) direction = NoMovementScanDirection; else direction = ForwardScanDirection; - ExecutorRun(queryDesc, estate, direction, (long) count); + /* In the executor, zero count processes all rows */ + if (count == FETCH_ALL) + count = 0; + + ExecutorRun(queryDesc, direction, count); - if (estate->es_processed > 0) - portal->atStart = false; /* OK to back up now */ - if (count <= 0 || (int) estate->es_processed < count) - portal->atEnd = true; /* we retrieved 'em all */ + if (direction != NoMovementScanDirection) + { + long oldPos; + + if (estate->es_processed > 0) + portal->atStart = false; /* OK to go backward now */ + if (count == 0 || + (unsigned long) estate->es_processed < (unsigned long) count) + portal->atEnd = true; /* we retrieved 'em all */ + oldPos = portal->portalPos; + portal->portalPos += estate->es_processed; + /* portalPos doesn't advance when we fall off the end */ + if (portal->portalPos < oldPos) + portal->posOverflow = true; + } } else { - if (portal->atStart) + if (!portal->backwardOK) + elog(ERROR, "Cursor can only scan forward" + "\n\tDeclare it with SCROLL option to enable backward scan"); + + if (portal->atStart || count <= 0) direction = NoMovementScanDirection; else direction = BackwardScanDirection; - ExecutorRun(queryDesc, estate, direction, (long) count); + /* In the executor, zero count processes all rows */ + if (count == FETCH_ALL) + count = 0; + + ExecutorRun(queryDesc, direction, count); + + if (direction != NoMovementScanDirection) + { + if (estate->es_processed > 0 && portal->atEnd) + { + portal->atEnd = false; /* OK to go forward now */ + portal->portalPos++; /* adjust for endpoint case */ + } + if (count == 0 || + (unsigned long) estate->es_processed < (unsigned long) count) + { + portal->atStart = true; /* we retrieved 'em all */ + portal->portalPos = 0; + portal->posOverflow = false; + } + else + { + long oldPos; - if (estate->es_processed > 0) - portal->atEnd = false; /* OK to go forward now */ - if (count <= 0 || (int) estate->es_processed < count) - portal->atStart = true; /* we retrieved 'em all */ + oldPos = portal->portalPos; + portal->portalPos -= estate->es_processed; + if (portal->portalPos > oldPos || + portal->portalPos <= 0) + portal->posOverflow = true; + } + } } - /* Return command status if wanted */ - if (completionTag) - snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %u", - (dest == None) ? "MOVE" : "FETCH", - estate->es_processed); + return estate->es_processed; +} - /* - * Clean up and switch back to old context. - */ - if (temp_desc) - pfree(queryDesc); +/* + * DoPortalRewind - rewind a Portal to starting point + */ +static void +DoPortalRewind(Portal portal) +{ + QueryDesc *queryDesc; + + queryDesc = PortalGetQueryDesc(portal); + + ExecutorRewind(queryDesc); - MemoryContextSwitchTo(oldcontext); + portal->atStart = true; + portal->atEnd = false; + portal->portalPos = 0; + portal->posOverflow = false; } /* * PerformPortalClose + * Close a cursor. */ void -PerformPortalClose(char *name, CommandDest dest) +PerformPortalClose(char *name) { Portal portal; - /* - * sanity checks - */ - if (name == NULL) - { - elog(WARNING, "PerformPortalClose: missing portal name"); - return; - } - /* * get the portal from the portal name */ @@ -218,3 +495,64 @@ PerformPortalClose(char *name, CommandDest dest) */ PortalDrop(portal); } + + +/* + * PreparePortal + */ +static Portal +PreparePortal(char *portalName) +{ + Portal portal; + + /* + * Check for already-in-use portal name. + */ + portal = GetPortalByName(portalName); + if (PortalIsValid(portal)) + { + /* + * XXX Should we raise an error rather than closing the old + * portal? + */ + elog(WARNING, "Closing pre-existing portal \"%s\"", + portalName); + PortalDrop(portal); + } + + /* + * Create the new portal. + */ + portal = CreatePortal(portalName); + + return portal; +} + + +/* + * PortalCleanup + * + * Clean up a portal when it's dropped. Since this mainly exists to run + * ExecutorEnd(), it should not be set as the cleanup hook until we have + * called ExecutorStart() on the portal's query. + */ +void +PortalCleanup(Portal portal) +{ + /* + * sanity checks + */ + AssertArg(PortalIsValid(portal)); + AssertArg(portal->cleanup == PortalCleanup); + + /* + * tell the executor to shutdown the query + */ + ExecutorEnd(PortalGetQueryDesc(portal)); + + /* + * This should be unnecessary since the querydesc should be in the + * portal's memory context, but do it anyway for symmetry. + */ + FreeQueryDesc(PortalGetQueryDesc(portal)); +}