]> granicus.if.org Git - postgresql/commitdiff
Replace libpq's "row processor" API with a "single row" mode.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 2 Aug 2012 17:10:36 +0000 (13:10 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 2 Aug 2012 17:10:36 +0000 (13:10 -0400)
After taking awhile to digest the row-processor feature that was added to
libpq in commit 92785dac2ee7026948962cd61c4cd84a2d052772, we've concluded
it is over-complicated and too hard to use.  Leave the core infrastructure
changes in place (that is, there's still a row processor function inside
libpq), but remove the exposed API pieces, and instead provide a "single
row" mode switch that causes PQgetResult to return one row at a time in
separate PGresult objects.

This approach incurs more overhead than proper use of a row processor
callback would, since construction of a PGresult per row adds extra cycles.
However, it is far easier to use and harder to break.  The single-row mode
still affords applications the primary benefit that the row processor API
was meant to provide, namely not having to accumulate large result sets in
memory before processing them.  Preliminary testing suggests that we can
probably buy back most of the extra cycles by micro-optimizing construction
of the extra results, but that task will be left for another day.

Marko Kreen

contrib/dblink/dblink.c
doc/src/sgml/libpq.sgml
src/interfaces/libpq/exports.txt
src/interfaces/libpq/fe-connect.c
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-lobj.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-fe.h
src/interfaces/libpq/libpq-int.h

index 1e62d8091a9d2bdf60af6745d5a01ee14ee5cf5a..a67a836eba1fc5e3ea14701284b831269fbc3357 100644 (file)
@@ -70,6 +70,9 @@ typedef struct storeInfo
        AttInMetadata *attinmeta;
        MemoryContext tmpcontext;
        char      **cstrs;
+       /* temp storage for results to avoid leaks on exception */
+       PGresult   *last_res;
+       PGresult   *cur_res;
 } storeInfo;
 
 /*
@@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
                                           const char *conname,
                                           const char *sql,
                                           bool fail);
-static int storeHandler(PGresult *res, const PGdataValue *columns,
-                        const char **errmsgp, void *param);
+static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
+static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
        /* async query send */
        retval = PQsendQuery(conn, sql);
        if (retval != 1)
-               elog(NOTICE, "%s", PQerrorMessage(conn));
+               elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
 
        PG_RETURN_INT32(retval);
 }
@@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
 /*
  * Execute the given SQL command and store its results into a tuplestore
  * to be returned as the result of the current function.
+ *
  * This is equivalent to PQexec followed by materializeResult, but we make
- * use of libpq's "row processor" API to reduce per-row overhead.
+ * use of libpq's single-row mode to avoid accumulating the whole result
+ * inside libpq before it gets transferred to the tuplestore.
  */
 static void
 materializeQueryResult(FunctionCallInfo fcinfo,
@@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo,
        /* prepTuplestoreResult must have been called previously */
        Assert(rsinfo->returnMode == SFRM_Materialize);
 
+       /* initialize storeInfo to empty */
+       memset(&sinfo, 0, sizeof(sinfo));
+       sinfo.fcinfo = fcinfo;
+
        PG_TRY();
        {
-               /* initialize storeInfo to empty */
-               memset(&sinfo, 0, sizeof(sinfo));
-               sinfo.fcinfo = fcinfo;
-
-               /* We'll collect tuples using storeHandler */
-               PQsetRowProcessor(conn, storeHandler, &sinfo);
-
-               res = PQexec(conn, sql);
-
-               /* We don't keep the custom row processor installed permanently */
-               PQsetRowProcessor(conn, NULL, NULL);
+               /* execute query, collecting any tuples into the tuplestore */
+               res = storeQueryResult(&sinfo, conn, sql);
 
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
@@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
                else if (PQresultStatus(res) == PGRES_COMMAND_OK)
                {
                        /*
-                        * storeHandler didn't get called, so we need to convert the
-                        * command status string to a tuple manually
+                        * storeRow didn't get called, so we need to convert the command
+                        * status string to a tuple manually
                         */
                        TupleDesc       tupdesc;
                        AttInMetadata *attinmeta;
@@ -1008,25 +1008,30 @@ materializeQueryResult(FunctionCallInfo fcinfo,
                        tuplestore_puttuple(tupstore, tuple);
 
                        PQclear(res);
+                       res = NULL;
                }
                else
                {
                        Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
-                       /* storeHandler should have created a tuplestore */
+                       /* storeRow should have created a tuplestore */
                        Assert(rsinfo->setResult != NULL);
 
                        PQclear(res);
+                       res = NULL;
                }
+               PQclear(sinfo.last_res);
+               sinfo.last_res = NULL;
+               PQclear(sinfo.cur_res);
+               sinfo.cur_res = NULL;
        }
        PG_CATCH();
        {
-               /* be sure to unset the custom row processor */
-               PQsetRowProcessor(conn, NULL, NULL);
                /* be sure to release any libpq result we collected */
-               if (res)
-                       PQclear(res);
+               PQclear(res);
+               PQclear(sinfo.last_res);
+               PQclear(sinfo.cur_res);
                /* and clear out any pending data in libpq */
-               while ((res = PQskipResult(conn)) != NULL)
+               while ((res = PQgetResult(conn)) != NULL)
                        PQclear(res);
                PG_RE_THROW();
        }
@@ -1034,23 +1039,72 @@ materializeQueryResult(FunctionCallInfo fcinfo,
 }
 
 /*
- * Custom row processor for materializeQueryResult.
- * Prototype of this function must match PQrowProcessor.
+ * Execute query, and send any result rows to sinfo->tuplestore.
  */
-static int
-storeHandler(PGresult *res, const PGdataValue *columns,
-                        const char **errmsgp, void *param)
+static PGresult *
+storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
+{
+       bool            first = true;
+       PGresult   *res;
+
+       if (!PQsendQuery(conn, sql))
+               elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
+
+       if (!PQsetSingleRowMode(conn))          /* shouldn't fail */
+               elog(ERROR, "failed to set single-row mode for dblink query");
+
+       for (;;)
+       {
+               CHECK_FOR_INTERRUPTS();
+
+               sinfo->cur_res = PQgetResult(conn);
+               if (!sinfo->cur_res)
+                       break;
+
+               if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
+               {
+                       /* got one row from possibly-bigger resultset */
+                       storeRow(sinfo, sinfo->cur_res, first);
+
+                       PQclear(sinfo->cur_res);
+                       sinfo->cur_res = NULL;
+                       first = false;
+               }
+               else
+               {
+                       /* if empty resultset, fill tuplestore header */
+                       if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
+                               storeRow(sinfo, sinfo->cur_res, first);
+
+                       /* store completed result at last_res */
+                       PQclear(sinfo->last_res);
+                       sinfo->last_res = sinfo->cur_res;
+                       sinfo->cur_res = NULL;
+                       first = true;
+               }
+       }
+
+       /* return last_res */
+       res = sinfo->last_res;
+       sinfo->last_res = NULL;
+       return res;
+}
+
+/*
+ * Send single row to sinfo->tuplestore.
+ *
+ * If "first" is true, create the tuplestore using PGresult's metadata
+ * (in this case the PGresult might contain either zero or one row).
+ */
+static void
+storeRow(storeInfo *sinfo, PGresult *res, bool first)
 {
-       storeInfo  *sinfo = (storeInfo *) param;
        int                     nfields = PQnfields(res);
-       char      **cstrs = sinfo->cstrs;
        HeapTuple       tuple;
-       char       *pbuf;
-       int                     pbuflen;
        int                     i;
        MemoryContext oldcontext;
 
-       if (columns == NULL)
+       if (first)
        {
                /* Prepare for new result set */
                ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
@@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns,
                sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
                /* Create a new, empty tuplestore */
-               oldcontext = MemoryContextSwitchTo(
-                                                                       rsinfo->econtext->ecxt_per_query_memory);
+               oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
                sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
                rsinfo->setResult = sinfo->tuplestore;
                rsinfo->setDesc = tupdesc;
                MemoryContextSwitchTo(oldcontext);
 
+               /* Done if empty resultset */
+               if (PQntuples(res) == 0)
+                       return;
+
                /*
                 * Set up sufficiently-wide string pointers array; this won't change
                 * in size so it's easy to preallocate.
@@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns,
                                                                          ALLOCSET_DEFAULT_MINSIZE,
                                                                          ALLOCSET_DEFAULT_INITSIZE,
                                                                          ALLOCSET_DEFAULT_MAXSIZE);
-
-               return 1;
        }
 
-       CHECK_FOR_INTERRUPTS();
+       /* Should have a single-row result if we get here */
+       Assert(PQntuples(res) == 1);
 
        /*
         * Do the following work in a temp context that we reset after each tuple.
@@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns,
        oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
 
        /*
-        * The strings passed to us are not null-terminated, but the datatype
-        * input functions we're about to call require null termination.  Copy the
-        * strings and add null termination.  As a micro-optimization, allocate
-        * all the strings with one palloc.
+        * Fill cstrs with null-terminated strings of column values.
         */
-       pbuflen = nfields;                      /* count the null terminators themselves */
        for (i = 0; i < nfields; i++)
        {
-               int                     len = columns[i].len;
-
-               if (len > 0)
-                       pbuflen += len;
-       }
-       pbuf = (char *) palloc(pbuflen);
-
-       for (i = 0; i < nfields; i++)
-       {
-               int                     len = columns[i].len;
-
-               if (len < 0)
-                       cstrs[i] = NULL;
+               if (PQgetisnull(res, 0, i))
+                       sinfo->cstrs[i] = NULL;
                else
-               {
-                       cstrs[i] = pbuf;
-                       memcpy(pbuf, columns[i].value, len);
-                       pbuf += len;
-                       *pbuf++ = '\0';
-               }
+                       sinfo->cstrs[i] = PQgetvalue(res, 0, i);
        }
 
        /* Convert row to a tuple, and add it to the tuplestore */
-       tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+       tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
 
        tuplestore_puttuple(sinfo->tuplestore, tuple);
 
        /* Clean up */
        MemoryContextSwitchTo(oldcontext);
        MemoryContextReset(sinfo->tmpcontext);
-
-       return 1;
 }
 
 /*
index 5c5dd68db30a717c59deff93a9a12d895d734677..255c5c1abb84127abfa8f7df9bc9a76a9c11fb19 100644 (file)
@@ -2418,14 +2418,28 @@ ExecStatusType PQresultStatus(const PGresult *res);
           <term><literal>PGRES_COPY_BOTH</literal></term>
           <listitem>
            <para>
-            Copy In/Out (to and from server) data transfer started.  This is
-            currently used only for streaming replication.
+            Copy In/Out (to and from server) data transfer started.  This
+            feature is currently used only for streaming replication,
+            so this status should not occur in ordinary applications.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-single-tuple">
+          <term><literal>PGRES_SINGLE_TUPLE</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</> contains a single result tuple
+            from the current command.  This status occurs only when
+            single-row mode has been selected for the query
+            (see <xref linkend="libpq-single-row-mode">).
            </para>
           </listitem>
          </varlistentry>
         </variablelist>
 
-        If the result status is <literal>PGRES_TUPLES_OK</literal>, then
+        If the result status is <literal>PGRES_TUPLES_OK</literal> or
+        <literal>PGRES_SINGLE_TUPLE</literal>, then
         the functions described below can be used to retrieve the rows
         returned by the query.  Note that a <command>SELECT</command>
         command that happens to retrieve zero rows still shows
@@ -2726,7 +2740,8 @@ void PQclear(PGresult *res);
     These functions are used to extract information from a
     <structname>PGresult</structname> object that represents a successful
     query result (that is, one that has status
-    <literal>PGRES_TUPLES_OK</literal>).  They can also be used to extract
+    <literal>PGRES_TUPLES_OK</literal> or <literal>PGRES_SINGLE_TUPLE</>).
+    They can also be used to extract
     information from a successful Describe operation: a Describe's result
     has all the same column information that actual execution of the query
     would provide, but it has zero rows.  For objects with other status values,
@@ -3738,7 +3753,7 @@ unsigned char *PQunescapeBytea(const unsigned char *from, size_t *to_length);
 
   <para>
    The <function>PQexec</function> function is adequate for submitting
-   commands in normal, synchronous applications.  It has a couple of
+   commands in normal, synchronous applications.  It has a few
    deficiencies, however, that can be of importance to some users:
 
    <itemizedlist>
@@ -3769,6 +3784,15 @@ unsigned char *PQunescapeBytea(const unsigned char *from, size_t *to_length);
       <function>PQexec</function>.
      </para>
     </listitem>
+
+    <listitem>
+     <para>
+      <function>PQexec</function> always collects the command's entire result,
+      buffering it in a single <structname>PGresult</structname>.  While
+      this simplifies error-handling logic for the application, it can be
+      impractical for results containing many rows.
+     </para>
+    </listitem>
    </itemizedlist>
   </para>
 
@@ -3984,8 +4008,11 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
        Waits for the next result from a prior
        <function>PQsendQuery</function>,
        <function>PQsendQueryParams</function>,
-       <function>PQsendPrepare</function>, or
-       <function>PQsendQueryPrepared</function> call, and returns it.
+       <function>PQsendPrepare</function>,
+       <function>PQsendQueryPrepared</function>,
+       <function>PQsendDescribePrepared</function>, or
+       <function>PQsendDescribePortal</function>
+       call, and returns it.
        A null pointer is returned when the command is complete and there
        will be no more results.
 <synopsis>
@@ -4012,7 +4039,7 @@ PGresult *PQgetResult(PGconn *conn);
        <para>
         Even when <function>PQresultStatus</function> indicates a fatal
         error, <function>PQgetResult</function> should be called until it
-        returns a null pointer to allow <application>libpq</> to
+        returns a null pointer, to allow <application>libpq</> to
         process the error information completely.
        </para>
       </note>
@@ -4029,7 +4056,18 @@ PGresult *PQgetResult(PGconn *conn);
    can be obtained individually.  (This allows a simple form of overlapped
    processing, by the way: the client can be handling the results of one
    command while the server is still working on later queries in the same
-   command string.)  However, calling <function>PQgetResult</function>
+   command string.)
+  </para>
+
+  <para>
+   Another frequently-desired feature that can be obtained with
+   <function>PQsendQuery</function> and <function>PQgetResult</function>
+   is retrieving large query results a row at a time.  This is discussed
+   in <xref linkend="libpq-single-row-mode">.
+  </para>
+
+  <para>
+   By itself, calling <function>PQgetResult</function>
    will still cause the client to block until the server completes the
    next <acronym>SQL</acronym> command.  This can be avoided by proper
    use of two more functions:
@@ -4238,6 +4276,98 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-single-row-mode">
+  <title>Retrieving Query Results Row-By-Row</title>
+
+  <indexterm zone="libpq-single-row-mode">
+   <primary>libpq</primary>
+   <secondary>single-row mode</secondary>
+  </indexterm>
+
+  <para>
+   Ordinarily, <application>libpq</> collects a SQL command's
+   entire result and returns it to the application as a single
+   <structname>PGresult</structname>.  This can be unworkable for commands
+   that return a large number of rows.  For such cases, applications can use
+   <function>PQsendQuery</function> and <function>PQgetResult</function> in
+   <firstterm>single-row mode</>.  In this mode, the result row(s) are
+   returned to the application one at a time, as they are received from the
+   server.
+  </para>
+
+  <para>
+   To enter single-row mode, call <function>PQsetSingleRowMode</function>
+   immediately after a successful call of <function>PQsendQuery</function>
+   (or a sibling function).  This mode selection is effective only for the
+   currently executing query.  Then call <function>PQgetResult</function>
+   repeatedly, until it returns null, as documented in <xref
+   linkend="libpq-async">.  If the query returns any rows, they are returned
+   as individual <structname>PGresult</structname> objects, which look like
+   normal query results except for having status code
+   <literal>PGRES_SINGLE_TUPLE</literal> instead of
+   <literal>PGRES_TUPLES_OK</literal>.  After the last row, or immediately if
+   the query returns zero rows, a zero-row object with status
+   <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
+   more rows will arrive.  (But note that it is still necessary to continue
+   calling <function>PQgetResult</function> until it returns null.)  All of
+   these <structname>PGresult</structname> objects will contain the same row
+   description data (column names, types, etc) that an ordinary
+   <structname>PGresult</structname> object for the query would have.
+   Each object should be freed with <function>PQclear</function> as usual.
+  </para>
+
+  <para>
+   <variablelist>
+    <varlistentry id="libpq-pqsetsinglerowmode">
+     <term>
+      <function>PQsetSingleRowMode</function>
+      <indexterm>
+       <primary>PQsetSingleRowMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Select single-row mode for the currently-executing query.
+
+<synopsis>
+int PQsetSingleRowMode(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This function can only be called immediately after
+       <function>PQsendQuery</function> or one of its sibling functions,
+       before any other operation on the connection such as
+       <function>PQconsumeInput</function> or
+       <function>PQgetResult</function>.  If called at the correct time,
+       the function activates single-row mode for the current query and
+       returns 1.  Otherwise the mode stays unchanged and the function
+       returns 0.  In any case, the mode reverts to normal after
+       completion of the current query.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
+
+  <caution>
+   <para>
+    While processing a query, the server may return some rows and then
+    encounter an error, causing the query to be aborted.  Ordinarily,
+    <application>libpq</> discards any such rows and reports only the
+    error.  But in single-row mode, those rows will have already been
+    returned to the application.  Hence, the application will see some
+    <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname>
+    objects followed by a <literal>PGRES_FATAL_ERROR</literal> object.  For
+    proper transactional behavior, the application must be designed to
+    discard or undo whatever has been done with the previously-processed
+    rows, if the query ultimately fails.
+   </para>
+  </caution>
+
+ </sect1>
+
  <sect1 id="libpq-cancel">
   <title>Canceling Queries in Progress</title>
 
@@ -5700,274 +5830,6 @@ defaultNoticeProcessor(void *arg, const char *message)
 
  </sect1>
 
- <sect1 id="libpq-row-processor">
-  <title>Custom Row Processing</title>
-
-  <indexterm zone="libpq-row-processor">
-   <primary>PQrowProcessor</primary>
-  </indexterm>
-
-  <indexterm zone="libpq-row-processor">
-   <primary>row processor</primary>
-   <secondary>in libpq</secondary>
-  </indexterm>
-
-  <para>
-   Ordinarily, when receiving a query result from the server,
-   <application>libpq</> adds each row value to the current
-   <type>PGresult</type> until the entire result set is received; then
-   the <type>PGresult</type> is returned to the application as a unit.
-   This approach is simple to work with, but becomes inefficient for large
-   result sets.  To improve performance, an application can register a
-   custom <firstterm>row processor</> function that processes each row
-   as the data is received from the network.  The custom row processor could
-   process the data fully, or store it into some application-specific data
-   structure for later processing.
-  </para>
-
-  <caution>
-   <para>
-    The row processor function sees the rows before it is known whether the
-    query will succeed overall, since the server might return some rows before
-    encountering an error.  For proper transactional behavior, it must be
-    possible to discard or undo whatever the row processor has done, if the
-    query ultimately fails.
-   </para>
-  </caution>
-
-  <para>
-   When using a custom row processor, row data is not accumulated into the
-   <type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
-   the application will contain no rows (<function>PQntuples</> =
-   <literal>0</>).  However, it still has <function>PQresultStatus</> =
-   <literal>PGRES_TUPLES_OK</>, and it contains correct information about the
-   set of columns in the query result.  On the other hand, if the query fails
-   partway through, the returned <type>PGresult</type> has
-   <function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>.  The
-   application must be prepared to undo any actions of the row processor
-   whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
-  </para>
-
-  <para>
-   A custom row processor is registered for a particular connection by
-   calling <function>PQsetRowProcessor</function>, described below.
-   This row processor will be used for all subsequent query results on that
-   connection until changed again.  A row processor function must have a
-   signature matching
-
-<synopsis>
-typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
-                               const char **errmsgp, void *param);
-</synopsis>
-   where <type>PGdataValue</> is described by
-<synopsis>
-typedef struct pgDataValue
-{
-    int         len;            /* data length in bytes, or <0 if NULL */
-    const char *value;          /* data value, without zero-termination */
-} PGdataValue;
-</synopsis>
-  </para>
-
-  <para>
-   The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
-   <type>PGresult</type> that will eventually be delivered to the calling
-   application (if no error intervenes).  It contains information about
-   the set of columns in the query result, but no row data.  In particular the
-   row processor must fetch <literal>PQnfields(res)</> to know the number of
-   data columns.
-  </para>
-
-  <para>
-   Immediately after <application>libpq</> has determined the result set's
-   column information, it will make a call to the row processor with
-   <parameter>columns</parameter> set to NULL, but the other parameters as
-   usual.  The row processor can use this call to initialize for a new result
-   set; if it has nothing to do, it can just return <literal>1</>.  In
-   subsequent calls, one per received row, <parameter>columns</parameter>
-   is non-NULL and points to an array of <type>PGdataValue</> structs, one per
-   data column.
-  </para>
-
-  <para>
-   <parameter>errmsgp</parameter> is an output parameter used only for error
-   reporting.  If the row processor needs to report an error, it can set
-   <literal>*</><parameter>errmsgp</parameter> to point to a suitable message
-   string (and then return <literal>-1</>).  As a special case, returning
-   <literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
-   from its initial value of NULL is taken to mean <quote>out of memory</>.
-  </para>
-
-  <para>
-   The last parameter, <parameter>param</parameter>, is just a void pointer
-   passed through from <function>PQsetRowProcessor</function>.  This can be
-   used for communication between the row processor function and the
-   surrounding application.
-  </para>
-
-  <para>
-   In the <type>PGdataValue</> array passed to a row processor, data values
-   cannot be assumed to be zero-terminated, whether the data format is text
-   or binary.  A SQL NULL value is indicated by a negative length field.
-  </para>
-
-  <para>
-   The row processor <emphasis>must</> process the row data values
-   immediately, or else copy them into application-controlled storage.
-   The value pointers passed to the row processor point into
-   <application>libpq</>'s internal data input buffer, which will be
-   overwritten by the next packet fetch.
-  </para>
-
-  <para>
-   The row processor function must return either <literal>1</> or
-   <literal>-1</>.
-   <literal>1</> is the normal, successful result value; <application>libpq</>
-   will continue with receiving row values from the server and passing them to
-   the row processor.  <literal>-1</> indicates that the row processor has
-   encountered an error.  In that case,
-   <application>libpq</> will discard all remaining rows in the result set
-   and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
-   the application (containing the specified error message, or <quote>out of
-   memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
-   was left as NULL).
-  </para>
-
-  <para>
-   Another option for exiting a row processor is to throw an exception using
-   C's <function>longjmp()</> or C++'s <literal>throw</>.  If this is done,
-   processing of the incoming data can be resumed later by calling
-   <function>PQgetResult</>; the row processor will be invoked as normal for
-   any remaining rows in the current result.
-   As with any usage of <function>PQgetResult</>, the application
-   should continue calling <function>PQgetResult</> until it gets a NULL
-   result before issuing any new query.
-  </para>
-
-  <para>
-   In some cases, an exception may mean that the remainder of the
-   query result is not interesting.  In such cases the application can discard
-   the remaining rows with <function>PQskipResult</>, described below.
-   Another possible recovery option is to close the connection altogether with
-   <function>PQfinish</>.
-  </para>
-
-  <para>
-   <variablelist>
-    <varlistentry id="libpq-pqsetrowprocessor">
-     <term>
-      <function>PQsetRowProcessor</function>
-      <indexterm>
-       <primary>PQsetRowProcessor</primary>
-      </indexterm>
-     </term>
-
-     <listitem>
-      <para>
-       Sets a callback function to process each row.
-
-<synopsis>
-void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
-</synopsis>
-      </para>
-
-      <para>
-       The specified row processor function <parameter>func</> is installed as
-       the active row processor for the given connection <parameter>conn</>.
-       Also, <parameter>param</> is installed as the passthrough pointer to
-       pass to it.  Alternatively, if <parameter>func</> is NULL, the standard
-       row processor is reinstalled on the given connection (and
-       <parameter>param</> is ignored).
-      </para>
-
-      <para>
-       Although the row processor can be changed at any time in the life of a
-       connection, it's generally unwise to do so while a query is active.
-       In particular, when using asynchronous mode, be aware that both
-       <function>PQisBusy</> and <function>PQgetResult</> can call the current
-       row processor.
-      </para>
-     </listitem>
-    </varlistentry>
-
-    <varlistentry id="libpq-pqgetrowprocessor">
-     <term>
-      <function>PQgetRowProcessor</function>
-      <indexterm>
-       <primary>PQgetRowProcessor</primary>
-      </indexterm>
-     </term>
-
-     <listitem>
-      <para>
-       Fetches the current row processor for the specified connection.
-
-<synopsis>
-PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
-</synopsis>
-      </para>
-
-      <para>
-       In addition to returning the row processor function pointer, the
-       current passthrough pointer will be returned at
-       <literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
-      </para>
-     </listitem>
-    </varlistentry>
-
-    <varlistentry id="libpq-pqskipresult">
-     <term>
-      <function>PQskipResult</function>
-      <indexterm>
-       <primary>PQskipResult</primary>
-      </indexterm>
-     </term>
-
-     <listitem>
-      <para>
-       Discard all the remaining rows in the incoming result set.
-
-<synopsis>
-PGresult *PQskipResult(PGconn *conn);
-</synopsis>
-      </para>
-
-      <para>
-       This is a simple convenience function to discard incoming data after a
-       row processor has failed or it's determined that the rest of the result
-       set is not interesting.  <function>PQskipResult</> is exactly
-       equivalent to <function>PQgetResult</> except that it transiently
-       installs a dummy row processor function that just discards data.
-       The returned <type>PGresult</> can be discarded without further ado
-       if it has status <literal>PGRES_TUPLES_OK</>; but other status values
-       should be handled normally.  (In particular,
-       <literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
-       will still need to be dealt with.)
-       As when using <function>PQgetResult</>, one should usually repeat the
-       call until NULL is returned to ensure the connection has reached an
-       idle state.  Another possible usage is to call
-       <function>PQskipResult</> just once, and then resume using
-       <function>PQgetResult</> to process subsequent result sets normally.
-      </para>
-
-      <para>
-       Because <function>PQskipResult</> will wait for server input, it is not
-       very useful in asynchronous applications.  In particular you should not
-       code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
-       because that will result in the installed row processor being called
-       within <function>PQisBusy</>.  To get the proper behavior in an
-       asynchronous application, you'll need to install a dummy row processor
-       (or set a flag to make your normal row processor do nothing) and leave
-       it that way until you have discarded all incoming data via your normal
-       <function>PQisBusy</> and <function>PQgetResult</> loop.
-      </para>
-     </listitem>
-    </varlistentry>
-   </variablelist>
-  </para>
-
- </sect1>
-
  <sect1 id="libpq-events">
   <title>Event System</title>
 
index 1251455f1f6d92c74e206b5a9b8dcdeb36ac9b98..9d95e262be3fbf26731c0926013db56dbc4e00ab 100644 (file)
@@ -160,6 +160,4 @@ PQconnectStartParams      157
 PQping                    158
 PQpingParams              159
 PQlibVersion              160
-PQsetRowProcessor         161
-PQgetRowProcessor         162
-PQskipResult              163
+PQsetSingleRowMode        161
index a32258a8cbab5e655484d0471c031864fa5084de..adaab7aaade60a980b7eaf5b6e9734444c72b930 100644 (file)
@@ -2709,8 +2709,7 @@ makeEmptyPGconn(void)
        /* Zero all pointers and booleans */
        MemSet(conn, 0, sizeof(PGconn));
 
-       /* install default row processor and notice hooks */
-       PQsetRowProcessor(conn, NULL, NULL);
+       /* install default notice hooks */
        conn->noticeHooks.noticeRec = defaultNoticeReceiver;
        conn->noticeHooks.noticeProc = defaultNoticeProcessor;
 
@@ -4658,7 +4657,7 @@ conninfo_uri_parse_options(PQconninfoOption *options, const char *uri,
                if (p == host)
                {
                        printfPQExpBuffer(errorMessage,
-                       libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"),
+                                                         libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"),
                                                          uri);
                        goto cleanup;
                }
@@ -4878,7 +4877,7 @@ conninfo_uri_parse_params(char *params,
 
                        printfPQExpBuffer(errorMessage,
                                                          libpq_gettext(
-                                                                        "invalid URI query parameter: \"%s\"\n"),
+                                                                       "invalid URI query parameter: \"%s\"\n"),
                                                          keyword);
                        return false;
                }
@@ -4943,7 +4942,7 @@ conninfo_uri_decode(const char *str, PQExpBuffer errorMessage)
                        if (!(get_hexdigit(*q++, &hi) && get_hexdigit(*q++, &lo)))
                        {
                                printfPQExpBuffer(errorMessage,
-                                               libpq_gettext("invalid percent-encoded token: \"%s\"\n"),
+                                       libpq_gettext("invalid percent-encoded token: \"%s\"\n"),
                                                                  str);
                                free(buf);
                                return NULL;
@@ -5594,8 +5593,8 @@ static void
 dot_pg_pass_warning(PGconn *conn)
 {
        /* If it was 'invalid authorization', add .pgpass mention */
-       if (conn->dot_pgpass_used && conn->password_needed && conn->result &&
        /* only works with >= 9.0 servers */
+       if (conn->dot_pgpass_used && conn->password_needed && conn->result &&
                strcmp(PQresultErrorField(conn->result, PG_DIAG_SQLSTATE),
                           ERRCODE_INVALID_PASSWORD) == 0)
        {
index badc0b32a8e8f3e33e42729a8f8899475a0de31b..53516db723492f11fc9f4bdc6d4d351693c4c22f 100644 (file)
@@ -38,7 +38,8 @@ char     *const pgresStatus[] = {
        "PGRES_BAD_RESPONSE",
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR",
-       "PGRES_COPY_BOTH"
+       "PGRES_COPY_BOTH",
+       "PGRES_SINGLE_TUPLE"
 };
 
 /*
@@ -51,8 +52,6 @@ static bool static_std_strings = false;
 
 static PGEvent *dupEvents(PGEvent *events, int count);
 static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
-static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
-                                 const char **errmsgp, void *param);
 static bool PQsendQueryStart(PGconn *conn);
 static int PQsendQueryGuts(PGconn *conn,
                                const char *command,
@@ -64,8 +63,6 @@ static int PQsendQueryGuts(PGconn *conn,
                                const int *paramFormats,
                                int resultFormat);
 static void parseInput(PGconn *conn);
-static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
-                                 const char **errmsgp, void *param);
 static bool PQexecStart(PGconn *conn);
 static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
@@ -181,6 +178,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
                        case PGRES_COPY_OUT:
                        case PGRES_COPY_IN:
                        case PGRES_COPY_BOTH:
+                       case PGRES_SINGLE_TUPLE:
                                /* non-error cases */
                                break;
                        default:
@@ -698,6 +696,8 @@ PQclear(PGresult *res)
 
 /*
  * Handy subroutine to deallocate any partially constructed async result.
+ *
+ * Any "next" result gets cleared too.
  */
 void
 pqClearAsyncResult(PGconn *conn)
@@ -705,6 +705,9 @@ pqClearAsyncResult(PGconn *conn)
        if (conn->result)
                PQclear(conn->result);
        conn->result = NULL;
+       if (conn->next_result)
+               PQclear(conn->next_result);
+       conn->next_result = NULL;
 }
 
 /*
@@ -758,7 +761,6 @@ pqPrepareAsyncResult(PGconn *conn)
         * conn->errorMessage.
         */
        res = conn->result;
-       conn->result = NULL;            /* handing over ownership to caller */
        if (!res)
                res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
        else
@@ -771,6 +773,16 @@ pqPrepareAsyncResult(PGconn *conn)
                appendPQExpBufferStr(&conn->errorMessage,
                                                         PQresultErrorMessage(res));
        }
+
+       /*
+        * Replace conn->result with next_result, if any.  In the normal case
+        * there isn't a next result and we're just dropping ownership of the
+        * current result.      In single-row mode this restores the situation to what
+        * it was before we created the current single-row result.
+        */
+       conn->result = conn->next_result;
+       conn->next_result = NULL;
+
        return res;
 }
 
@@ -981,85 +993,55 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
 
 
 /*
- * PQsetRowProcessor
- *       Set function that copies row data out from the network buffer,
- *       along with a passthrough parameter for it.
- */
-void
-PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
-{
-       if (!conn)
-               return;
-
-       if (func)
-       {
-               /* set custom row processor */
-               conn->rowProcessor = func;
-               conn->rowProcessorParam = param;
-       }
-       else
-       {
-               /* set default row processor */
-               conn->rowProcessor = pqStdRowProcessor;
-               conn->rowProcessorParam = conn;
-       }
-}
-
-/*
- * PQgetRowProcessor
- *       Get current row processor of PGconn.
- *       If param is not NULL, also store the passthrough parameter at *param.
- */
-PQrowProcessor
-PQgetRowProcessor(const PGconn *conn, void **param)
-{
-       if (!conn)
-       {
-               if (param)
-                       *param = NULL;
-               return NULL;
-       }
-
-       if (param)
-               *param = conn->rowProcessorParam;
-       return conn->rowProcessor;
-}
-
-/*
- * pqStdRowProcessor
- *       Add the received row to the PGresult structure
- *       Returns 1 if OK, -1 if error occurred.
+ * pqRowProcessor
+ *       Add the received row to the current async result (conn->result).
+ *       Returns 1 if OK, 0 if error occurred.
+ *
+ * On error, *errmsgp can be set to an error string to be returned.
+ * If it is left NULL, the error is presumed to be "out of memory".
  *
- * Note: "param" should point to the PGconn, but we don't actually need that
- * as of the current coding.
+ * In single-row mode, we create a new result holding just the current row,
+ * stashing the previous result in conn->next_result so that it becomes
+ * active again after pqPrepareAsyncResult().  This allows the result metadata
+ * (column descriptions) to be carried forward to each result row.
  */
-static int
-pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
-                                 const char **errmsgp, void *param)
+int
+pqRowProcessor(PGconn *conn, const char **errmsgp)
 {
+       PGresult   *res = conn->result;
        int                     nfields = res->numAttributes;
+       const PGdataValue *columns = conn->rowBuf;
        PGresAttValue *tup;
        int                     i;
 
-       if (columns == NULL)
+       /*
+        * In single-row mode, make a new PGresult that will hold just this one
+        * row; the original conn->result is left unchanged so that it can be used
+        * again as the template for future rows.
+        */
+       if (conn->singleRowMode)
        {
-               /* New result set ... we have nothing to do in this function. */
-               return 1;
+               /* Copy everything that should be in the result at this point */
+               res = PQcopyResult(res,
+                                                  PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+                                                  PG_COPYRES_NOTICEHOOKS);
+               if (!res)
+                       return 0;
        }
 
        /*
         * Basically we just allocate space in the PGresult for each field and
         * copy the data over.
         *
-        * Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
-        * which caller will take to mean "out of memory".      This is preferable to
-        * trying to set up such a message here, because evidently there's not
-        * enough memory for gettext() to do anything.
+        * Note: on malloc failure, we return 0 leaving *errmsgp still NULL, which
+        * caller will take to mean "out of memory".  This is preferable to trying
+        * to set up such a message here, because evidently there's not enough
+        * memory for gettext() to do anything.
         */
        tup = (PGresAttValue *)
                pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
        if (tup == NULL)
-               return -1;
+               goto fail;
 
        for (i = 0; i < nfields; i++)
        {
@@ -1078,7 +1060,7 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
 
                        val = (char *) pqResultAlloc(res, clen + 1, isbinary);
                        if (val == NULL)
-                               return -1;
+                               goto fail;
 
                        /* copy and zero-terminate the data (even if it's binary) */
                        memcpy(val, columns[i].value, clen);
@@ -1091,10 +1073,30 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
 
        /* And add the tuple to the PGresult's tuple array */
        if (!pqAddTuple(res, tup))
-               return -1;
+               goto fail;
+
+       /*
+        * Success.  In single-row mode, make the result available to the client
+        * immediately.
+        */
+       if (conn->singleRowMode)
+       {
+               /* Change result status to special single-row value */
+               res->resultStatus = PGRES_SINGLE_TUPLE;
+               /* Stash old result for re-use later */
+               conn->next_result = conn->result;
+               conn->result = res;
+               /* And mark the result ready to return */
+               conn->asyncStatus = PGASYNC_READY;
+       }
 
-       /* Success */
        return 1;
+
+fail:
+       /* release locally allocated PGresult, if we made one */
+       if (res != conn->result)
+               PQclear(res);
+       return 0;
 }
 
 
@@ -1343,6 +1345,10 @@ PQsendQueryStart(PGconn *conn)
 
        /* initialize async result-accumulation state */
        conn->result = NULL;
+       conn->next_result = NULL;
+
+       /* reset single-row processing mode */
+       conn->singleRowMode = false;
 
        /* ready to send command message */
        return true;
@@ -1547,6 +1553,31 @@ pqHandleSendFailure(PGconn *conn)
        parseInput(conn);
 }
 
+/*
+ * Select row-by-row processing mode
+ */
+int
+PQsetSingleRowMode(PGconn *conn)
+{
+       /*
+        * Only allow setting the flag when we have launched a query and not yet
+        * received any results.
+        */
+       if (!conn)
+               return 0;
+       if (conn->asyncStatus != PGASYNC_BUSY)
+               return 0;
+       if (conn->queryclass != PGQUERY_SIMPLE &&
+               conn->queryclass != PGQUERY_EXTENDED)
+               return 0;
+       if (conn->result)
+               return 0;
+
+       /* OK, set flag */
+       conn->singleRowMode = true;
+       return 1;
+}
+
 /*
  * Consume any available input from the backend
  * 0 return: some kind of trouble
@@ -1587,9 +1618,6 @@ PQconsumeInput(PGconn *conn)
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
- *
- * Note: callers of parseInput must be prepared for a longjmp exit when we are
- * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 static void
 parseInput(PGconn *conn)
@@ -1737,49 +1765,6 @@ PQgetResult(PGconn *conn)
        return res;
 }
 
-/*
- * PQskipResult
- *       Get the next PGresult produced by a query, but discard any data rows.
- *
- * This is mainly useful for cleaning up after a longjmp out of a row
- * processor, when resuming processing of the current query result isn't
- * wanted.     Note that this is of little value in an async-style application,
- * since any preceding calls to PQisBusy would have already called the regular
- * row processor.
- */
-PGresult *
-PQskipResult(PGconn *conn)
-{
-       PGresult   *res;
-       PQrowProcessor savedRowProcessor;
-
-       if (!conn)
-               return NULL;
-
-       /* temporarily install dummy row processor */
-       savedRowProcessor = conn->rowProcessor;
-       conn->rowProcessor = dummyRowProcessor;
-       /* no need to save/change rowProcessorParam */
-
-       /* fetch the next result */
-       res = PQgetResult(conn);
-
-       /* restore previous row processor */
-       conn->rowProcessor = savedRowProcessor;
-
-       return res;
-}
-
-/*
- * Do-nothing row processor for PQskipResult
- */
-static int
-dummyRowProcessor(PGresult *res, const PGdataValue *columns,
-                                 const char **errmsgp, void *param)
-{
-       return 1;
-}
-
 
 /*
  * PQexec
@@ -1886,7 +1871,7 @@ PQexecStart(PGconn *conn)
         * Silently discard any prior query result that application didn't eat.
         * This is probably poor design, but it's here for backward compatibility.
         */
-       while ((result = PQskipResult(conn)) != NULL)
+       while ((result = PQgetResult(conn)) != NULL)
        {
                ExecStatusType resultStatus = result->resultStatus;
 
index 13fd98c2f913d3818758e75bac96822306981b52..f3a6d0341c13ce644a7f1b2f15919385b85d6a6e 100644 (file)
@@ -682,8 +682,6 @@ lo_initialize(PGconn *conn)
        int                     n;
        const char *query;
        const char *fname;
-       PQrowProcessor savedRowProcessor;
-       void       *savedRowProcessorParam;
        Oid                     foid;
 
        if (!conn)
@@ -732,16 +730,7 @@ lo_initialize(PGconn *conn)
                        "or proname = 'loread' "
                        "or proname = 'lowrite'";
 
-       /* Ensure the standard row processor is used to collect the result */
-       savedRowProcessor = conn->rowProcessor;
-       savedRowProcessorParam = conn->rowProcessorParam;
-       PQsetRowProcessor(conn, NULL, NULL);
-
        res = PQexec(conn, query);
-
-       conn->rowProcessor = savedRowProcessor;
-       conn->rowProcessorParam = savedRowProcessorParam;
-
        if (res == NULL)
        {
                free(lobjfuncs);
index 8dbd6b6982395850fc167eb20cb8d5590d33122f..1ba5885cd3b419a97f4fc5ea897eac209f364897 100644 (file)
@@ -49,19 +49,11 @@ static int  getNotify(PGconn *conn);
 PostgresPollingStatusType
 pqSetenvPoll(PGconn *conn)
 {
-       PostgresPollingStatusType result;
        PGresult   *res;
-       PQrowProcessor savedRowProcessor;
-       void       *savedRowProcessorParam;
 
        if (conn == NULL || conn->status == CONNECTION_BAD)
                return PGRES_POLLING_FAILED;
 
-       /* Ensure the standard row processor is used to collect any results */
-       savedRowProcessor = conn->rowProcessor;
-       savedRowProcessorParam = conn->rowProcessorParam;
-       PQsetRowProcessor(conn, NULL, NULL);
-
        /* Check whether there are any data for us */
        switch (conn->setenv_state)
        {
@@ -77,10 +69,7 @@ pqSetenvPoll(PGconn *conn)
                                if (n < 0)
                                        goto error_return;
                                if (n == 0)
-                               {
-                                       result = PGRES_POLLING_READING;
-                                       goto normal_return;
-                               }
+                                       return PGRES_POLLING_READING;
 
                                break;
                        }
@@ -94,8 +83,7 @@ pqSetenvPoll(PGconn *conn)
 
                        /* Should we raise an error if called when not active? */
                case SETENV_STATE_IDLE:
-                       result = PGRES_POLLING_OK;
-                       goto normal_return;
+                       return PGRES_POLLING_OK;
 
                default:
                        printfPQExpBuffer(&conn->errorMessage,
@@ -192,10 +180,7 @@ pqSetenvPoll(PGconn *conn)
                        case SETENV_STATE_CLIENT_ENCODING_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                       {
-                                               result = PGRES_POLLING_READING;
-                                               goto normal_return;
-                                       }
+                                               return PGRES_POLLING_READING;
 
                                        res = PQgetResult(conn);
 
@@ -220,10 +205,7 @@ pqSetenvPoll(PGconn *conn)
                        case SETENV_STATE_OPTION_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                       {
-                                               result = PGRES_POLLING_READING;
-                                               goto normal_return;
-                                       }
+                                               return PGRES_POLLING_READING;
 
                                        res = PQgetResult(conn);
 
@@ -262,17 +244,13 @@ pqSetenvPoll(PGconn *conn)
                                                goto error_return;
 
                                        conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
-                                       result = PGRES_POLLING_READING;
-                                       goto normal_return;
+                                       return PGRES_POLLING_READING;
                                }
 
                        case SETENV_STATE_QUERY1_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                       {
-                                               result = PGRES_POLLING_READING;
-                                               goto normal_return;
-                                       }
+                                               return PGRES_POLLING_READING;
 
                                        res = PQgetResult(conn);
 
@@ -349,17 +327,13 @@ pqSetenvPoll(PGconn *conn)
                                                goto error_return;
 
                                        conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
-                                       result = PGRES_POLLING_READING;
-                                       goto normal_return;
+                                       return PGRES_POLLING_READING;
                                }
 
                        case SETENV_STATE_QUERY2_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                       {
-                                               result = PGRES_POLLING_READING;
-                                               goto normal_return;
-                                       }
+                                               return PGRES_POLLING_READING;
 
                                        res = PQgetResult(conn);
 
@@ -406,8 +380,7 @@ pqSetenvPoll(PGconn *conn)
                                        {
                                                /* Query finished, so we're done */
                                                conn->setenv_state = SETENV_STATE_IDLE;
-                                               result = PGRES_POLLING_OK;
-                                               goto normal_return;
+                                               return PGRES_POLLING_OK;
                                        }
                                        break;
                                }
@@ -425,12 +398,7 @@ pqSetenvPoll(PGconn *conn)
 
 error_return:
        conn->setenv_state = SETENV_STATE_IDLE;
-       result = PGRES_POLLING_FAILED;
-
-normal_return:
-       conn->rowProcessor = savedRowProcessor;
-       conn->rowProcessorParam = savedRowProcessorParam;
-       return result;
+       return PGRES_POLLING_FAILED;
 }
 
 
@@ -438,9 +406,6 @@ normal_return:
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
- *
- * Note: callers of parseInput must be prepared for a longjmp exit when we are
- * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 void
 pqParseInput2(PGconn *conn)
@@ -746,31 +711,16 @@ getRowDescriptions(PGconn *conn)
        /* Success! */
        conn->result = result;
 
-       /*
-        * Advance inStart to show that the "T" message has been processed.  We
-        * must do this before calling the row processor, in case it longjmps.
-        */
+       /* Advance inStart to show that the "T" message has been processed. */
        conn->inStart = conn->inCursor;
 
-       /* Give the row processor a chance to initialize for new result set */
-       errmsg = NULL;
-       switch ((*conn->rowProcessor) (result, NULL, &errmsg,
-                                                                  conn->rowProcessorParam))
-       {
-               case 1:
-                       /* everything is good */
-                       return 0;
-
-               case -1:
-                       /* error, report the errmsg below */
-                       break;
+       /*
+        * We could perform additional setup for the new result set here, but for
+        * now there's nothing else to do.
+        */
 
-               default:
-                       /* unrecognized return code */
-                       errmsg = libpq_gettext("unrecognized return value from row processor");
-                       break;
-       }
-       goto set_error_result;
+       /* And we're done. */
+       return 0;
 
 advance_and_error:
 
@@ -781,8 +731,6 @@ advance_and_error:
         */
        conn->inStart = conn->inEnd;
 
-set_error_result:
-
        /*
         * Replace partially constructed result with an error result. First
         * discard the old result to try to win back some memory.
@@ -790,7 +738,7 @@ set_error_result:
        pqClearAsyncResult(conn);
 
        /*
-        * If row processor didn't provide an error message, assume "out of
+        * If preceding code didn't provide an error message, assume "out of
         * memory" was meant.  The advantage of having this special case is that
         * freeing the old result first greatly improves the odds that gettext()
         * will succeed in providing a translation.
@@ -937,31 +885,15 @@ getAnotherTuple(PGconn *conn, bool binary)
                free(bitmap);
        bitmap = NULL;
 
-       /*
-        * Advance inStart to show that the "D" message has been processed.  We
-        * must do this before calling the row processor, in case it longjmps.
-        */
+       /* Advance inStart to show that the "D" message has been processed. */
        conn->inStart = conn->inCursor;
 
-       /* Pass the completed row values to rowProcessor */
+       /* Process the collected row */
        errmsg = NULL;
-       switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
-                                                                  conn->rowProcessorParam))
-       {
-               case 1:
-                       /* everything is good */
-                       return 0;
+       if (pqRowProcessor(conn, &errmsg))
+               return 0;                               /* normal, successful exit */
 
-               case -1:
-                       /* error, report the errmsg below */
-                       break;
-
-               default:
-                       /* unrecognized return code */
-                       errmsg = libpq_gettext("unrecognized return value from row processor");
-                       break;
-       }
-       goto set_error_result;
+       goto set_error_result;          /* pqRowProcessor failed, report it */
 
 advance_and_error:
 
@@ -981,7 +913,7 @@ set_error_result:
        pqClearAsyncResult(conn);
 
        /*
-        * If row processor didn't provide an error message, assume "out of
+        * If preceding code didn't provide an error message, assume "out of
         * memory" was meant.  The advantage of having this special case is that
         * freeing the old result first greatly improves the odds that gettext()
         * will succeed in providing a translation.
index 173af2e0a79ef3b443de515cda851e277deaed2d..d289f82285fea00d5de20542e43ea103493f9e58 100644 (file)
@@ -61,9 +61,6 @@ static int build_startup_packet(const PGconn *conn, char *packet,
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
- *
- * Note: callers of parseInput must be prepared for a longjmp exit when we are
- * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 void
 pqParseInput3(PGconn *conn)
@@ -446,10 +443,6 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
  * Returns: 0 if processed message successfully, EOF to suspend parsing
  * (the latter case is not actually used currently).
  * In either case, conn->inStart has been advanced past the message.
- *
- * Note: the row processor could also choose to longjmp out of libpq,
- * in which case the library's state must allow for resumption at the
- * next message.
  */
 static int
 getRowDescriptions(PGconn *conn, int msgLength)
@@ -564,10 +557,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
        /* Success! */
        conn->result = result;
 
-       /*
-        * Advance inStart to show that the "T" message has been processed.  We
-        * must do this before calling the row processor, in case it longjmps.
-        */
+       /* Advance inStart to show that the "T" message has been processed. */
        conn->inStart = conn->inCursor;
 
        /*
@@ -580,25 +570,13 @@ getRowDescriptions(PGconn *conn, int msgLength)
                return 0;
        }
 
-       /* Give the row processor a chance to initialize for new result set */
-       errmsg = NULL;
-       switch ((*conn->rowProcessor) (result, NULL, &errmsg,
-                                                                  conn->rowProcessorParam))
-       {
-               case 1:
-                       /* everything is good */
-                       return 0;
-
-               case -1:
-                       /* error, report the errmsg below */
-                       break;
+       /*
+        * We could perform additional setup for the new result set here, but for
+        * now there's nothing else to do.
+        */
 
-               default:
-                       /* unrecognized return code */
-                       errmsg = libpq_gettext("unrecognized return value from row processor");
-                       break;
-       }
-       goto set_error_result;
+       /* And we're done. */
+       return 0;
 
 advance_and_error:
        /* Discard unsaved result, if any */
@@ -608,8 +586,6 @@ advance_and_error:
        /* Discard the failed message by pretending we read it */
        conn->inStart += 5 + msgLength;
 
-set_error_result:
-
        /*
         * Replace partially constructed result with an error result. First
         * discard the old result to try to win back some memory.
@@ -617,8 +593,10 @@ set_error_result:
        pqClearAsyncResult(conn);
 
        /*
-        * If row processor didn't provide an error message, assume "out of
-        * memory" was meant.
+        * If preceding code didn't provide an error message, assume "out of
+        * memory" was meant.  The advantage of having this special case is that
+        * freeing the old result first greatly improves the odds that gettext()
+        * will succeed in providing a translation.
         */
        if (!errmsg)
                errmsg = libpq_gettext("out of memory for query result");
@@ -695,10 +673,6 @@ failure:
  * Returns: 0 if processed message successfully, EOF to suspend parsing
  * (the latter case is not actually used currently).
  * In either case, conn->inStart has been advanced past the message.
- *
- * Note: the row processor could also choose to longjmp out of libpq,
- * in which case the library's state must allow for resumption at the
- * next message.
  */
 static int
 getAnotherTuple(PGconn *conn, int msgLength)
@@ -778,31 +752,15 @@ getAnotherTuple(PGconn *conn, int msgLength)
                goto advance_and_error;
        }
 
-       /*
-        * Advance inStart to show that the "D" message has been processed.  We
-        * must do this before calling the row processor, in case it longjmps.
-        */
+       /* Advance inStart to show that the "D" message has been processed. */
        conn->inStart = conn->inCursor;
 
-       /* Pass the completed row values to rowProcessor */
+       /* Process the collected row */
        errmsg = NULL;
-       switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
-                                                                  conn->rowProcessorParam))
-       {
-               case 1:
-                       /* everything is good */
-                       return 0;
-
-               case -1:
-                       /* error, report the errmsg below */
-                       break;
+       if (pqRowProcessor(conn, &errmsg))
+               return 0;                               /* normal, successful exit */
 
-               default:
-                       /* unrecognized return code */
-                       errmsg = libpq_gettext("unrecognized return value from row processor");
-                       break;
-       }
-       goto set_error_result;
+       goto set_error_result;          /* pqRowProcessor failed, report it */
 
 advance_and_error:
        /* Discard the failed message by pretending we read it */
@@ -817,7 +775,7 @@ set_error_result:
        pqClearAsyncResult(conn);
 
        /*
-        * If row processor didn't provide an error message, assume "out of
+        * If preceding code didn't provide an error message, assume "out of
         * memory" was meant.  The advantage of having this special case is that
         * freeing the old result first greatly improves the odds that gettext()
         * will succeed in providing a translation.
index 67db6119bbaa35ae31fb58bb902a455b093ea23f..9d05dd20605a84ab4c4ec9d61ef8697fb2f3b77e 100644 (file)
@@ -90,7 +90,8 @@ typedef enum
                                                                 * backend */
        PGRES_NONFATAL_ERROR,           /* notice or warning message */
        PGRES_FATAL_ERROR,                      /* query failed */
-       PGRES_COPY_BOTH                         /* Copy In/Out data transfer in progress */
+       PGRES_COPY_BOTH,                        /* Copy In/Out data transfer in progress */
+       PGRES_SINGLE_TUPLE                      /* single tuple from larger resultset */
 } ExecStatusType;
 
 typedef enum
@@ -129,17 +130,6 @@ typedef struct pg_conn PGconn;
  */
 typedef struct pg_result PGresult;
 
-/* PGdataValue represents a data field value being passed to a row processor.
- * It could be either text or binary data; text data is not zero-terminated.
- * A SQL NULL is represented by len < 0; then value is still valid but there
- * are no data bytes there.
- */
-typedef struct pgDataValue
-{
-       int                     len;                    /* data length in bytes, or <0 if NULL */
-       const char *value;                      /* data value, without zero-termination */
-} PGdataValue;
-
 /* PGcancel encapsulates the information needed to cancel a running
  * query on an existing connection.
  * The contents of this struct are not supposed to be known to applications.
@@ -161,10 +151,6 @@ typedef struct pgNotify
        struct pgNotify *next;          /* list link */
 } PGnotify;
 
-/* Function type for row-processor callback */
-typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
-                                                                                  const char **errmsgp, void *param);
-
 /* Function types for notice-handling callbacks */
 typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
 typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -403,17 +389,13 @@ extern int PQsendQueryPrepared(PGconn *conn,
                                        const int *paramLengths,
                                        const int *paramFormats,
                                        int resultFormat);
+extern int     PQsetSingleRowMode(PGconn *conn);
 extern PGresult *PQgetResult(PGconn *conn);
-extern PGresult *PQskipResult(PGconn *conn);
 
 /* Routines for managing an asynchronous query */
 extern int     PQisBusy(PGconn *conn);
 extern int     PQconsumeInput(PGconn *conn);
 
-/* Override default per-row processing */
-extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
-extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
-
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
index 4bc89269fababe5e4d8ecbf6e80ca1a8625d4bd5..2bac59c3d879ecabce42ceab3b5133df03a0886a 100644 (file)
@@ -277,6 +277,17 @@ typedef struct pgLobjfuncs
        Oid                     fn_lo_write;    /* OID of backend function LOwrite              */
 } PGlobjfuncs;
 
+/* PGdataValue represents a data field value being passed to a row processor.
+ * It could be either text or binary data; text data is not zero-terminated.
+ * A SQL NULL is represented by len < 0; then value is still valid but there
+ * are no data bytes there.
+ */
+typedef struct pgDataValue
+{
+       int                     len;                    /* data length in bytes, or <0 if NULL */
+       const char *value;                      /* data value, without zero-termination */
+} PGdataValue;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -324,10 +335,6 @@ struct pg_conn
        /* Optional file to write trace info to */
        FILE       *Pfdebug;
 
-       /* Callback procedure for per-row processing */
-       PQrowProcessor rowProcessor;    /* function pointer */
-       void       *rowProcessorParam;          /* passthrough argument */
-
        /* Callback procedures for notice message processing */
        PGNoticeHooks noticeHooks;
 
@@ -346,6 +353,7 @@ struct pg_conn
        bool            options_valid;  /* true if OK to attempt connection */
        bool            nonblocking;    /* whether this connection is using nonblock
                                                                 * sending semantics */
+       bool            singleRowMode;  /* return current query result row-by-row? */
        char            copy_is_binary; /* 1 = copy binary, 0 = copy text */
        int                     copy_already_done;              /* # bytes already returned in COPY
                                                                                 * OUT */
@@ -406,6 +414,7 @@ struct pg_conn
 
        /* Status for asynchronous result construction */
        PGresult   *result;                     /* result being constructed */
+       PGresult   *next_result;        /* next result (used in single-row mode) */
 
        /* Assorted state for SSL, GSS, etc */
 
@@ -517,6 +526,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
                                   const char *value);
 extern void pqSaveParameterStatus(PGconn *conn, const char *name,
                                          const char *value);
+extern int     pqRowProcessor(PGconn *conn, const char **errmsgp);
 extern void pqHandleSendFailure(PGconn *conn);
 
 /* === in fe-protocol2.c === */