]> granicus.if.org Git - postgresql/commitdiff
Add a "row processor" API to libpq for better handling of large results.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 4 Apr 2012 22:27:56 +0000 (18:27 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 4 Apr 2012 22:27:56 +0000 (18:27 -0400)
Traditionally libpq has collected an entire query result before passing
it back to the application.  That provides a simple and transactional API,
but it's pretty inefficient for large result sets.  This patch allows the
application to process each row on-the-fly instead of accumulating the
rows into the PGresult.  Error recovery becomes a bit more complex, but
often that tradeoff is well worth making.

Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane

doc/src/sgml/libpq.sgml
src/interfaces/libpq/exports.txt
src/interfaces/libpq/fe-connect.c
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-lobj.c
src/interfaces/libpq/fe-misc.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-fe.h
src/interfaces/libpq/libpq-int.h

index 96064bbb0de8cdb0ad3bd52f35c7d845038acf1f..0ec501e5bda8cea44856747bbd91e4590f50cc30 100644 (file)
@@ -5581,6 +5581,274 @@ defaultNoticeProcessor(void *arg, const char *message)
 
  </sect1>
 
+ <sect1 id="libpq-row-processor">
+  <title>Custom Row Processing</title>
+
+  <indexterm zone="libpq-row-processor">
+   <primary>PQrowProcessor</primary>
+  </indexterm>
+
+  <indexterm zone="libpq-row-processor">
+   <primary>row processor</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <para>
+   Ordinarily, when receiving a query result from the server,
+   <application>libpq</> adds each row value to the current
+   <type>PGresult</type> until the entire result set is received; then
+   the <type>PGresult</type> is returned to the application as a unit.
+   This approach is simple to work with, but becomes inefficient for large
+   result sets.  To improve performance, an application can register a
+   custom <firstterm>row processor</> function that processes each row
+   as the data is received from the network.  The custom row processor could
+   process the data fully, or store it into some application-specific data
+   structure for later processing.
+  </para>
+
+  <caution>
+   <para>
+    The row processor function sees the rows before it is known whether the
+    query will succeed overall, since the server might return some rows before
+    encountering an error.  For proper transactional behavior, it must be
+    possible to discard or undo whatever the row processor has done, if the
+    query ultimately fails.
+   </para>
+  </caution>
+
+  <para>
+   When using a custom row processor, row data is not accumulated into the
+   <type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
+   the application will contain no rows (<function>PQntuples</> =
+   <literal>0</>).  However, it still has <function>PQresultStatus</> =
+   <literal>PGRES_TUPLES_OK</>, and it contains correct information about the
+   set of columns in the query result.  On the other hand, if the query fails
+   partway through, the returned <type>PGresult</type> has
+   <function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>.  The
+   application must be prepared to undo any actions of the row processor
+   whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
+  </para>
+
+  <para>
+   A custom row processor is registered for a particular connection by
+   calling <function>PQsetRowProcessor</function>, described below.
+   This row processor will be used for all subsequent query results on that
+   connection until changed again.  A row processor function must have a
+   signature matching
+
+<synopsis>
+typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+                               const char **errmsgp, void *param);
+</synopsis>
+   where <type>PGdataValue</> is described by
+<synopsis>
+typedef struct pgDataValue
+{
+    int         len;            /* data length in bytes, or <0 if NULL */
+    const char *value;          /* data value, without zero-termination */
+} PGdataValue;
+</synopsis>
+  </para>
+
+  <para>
+   The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
+   <type>PGresult</type> that will eventually be delivered to the calling
+   application (if no error intervenes).  It contains information about
+   the set of columns in the query result, but no row data.  In particular the
+   row processor must fetch <literal>PQnfields(res)</> to know the number of
+   data columns.
+  </para>
+
+  <para>
+   Immediately after <application>libpq</> has determined the result set's
+   column information, it will make a call to the row processor with
+   <parameter>columns</parameter> set to NULL, but the other parameters as
+   usual.  The row processor can use this call to initialize for a new result
+   set; if it has nothing to do, it can just return <literal>1</>.  In
+   subsequent calls, one per received row, <parameter>columns</parameter>
+   is non-NULL and points to an array of <type>PGdataValue</> structs, one per
+   data column.
+  </para>
+
+  <para>
+   <parameter>errmsgp</parameter> is an output parameter used only for error
+   reporting.  If the row processor needs to report an error, it can set
+   <literal>*</><parameter>errmsgp</parameter> to point to a suitable message
+   string (and then return <literal>-1</>).  As a special case, returning
+   <literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
+   from its initial value of NULL is taken to mean <quote>out of memory</>.
+  </para>
+
+  <para>
+   The last parameter, <parameter>param</parameter>, is just a void pointer
+   passed through from <function>PQsetRowProcessor</function>.  This can be
+   used for communication between the row processor function and the
+   surrounding application.
+  </para>
+
+  <para>
+   In the <type>PGdataValue</> array passed to a row processor, data values
+   cannot be assumed to be zero-terminated, whether the data format is text
+   or binary.  A SQL NULL value is indicated by a negative length field.
+  </para>
+
+  <para>
+   The row processor <emphasis>must</> process the row data values
+   immediately, or else copy them into application-controlled storage.
+   The value pointers passed to the row processor point into
+   <application>libpq</>'s internal data input buffer, which will be
+   overwritten by the next packet fetch.
+  </para>
+
+  <para>
+   The row processor function must return either <literal>1</> or
+   <literal>-1</>.
+   <literal>1</> is the normal, successful result value; <application>libpq</>
+   will continue with receiving row values from the server and passing them to
+   the row processor.  <literal>-1</> indicates that the row processor has
+   encountered an error.  In that case,
+   <application>libpq</> will discard all remaining rows in the result set
+   and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
+   the application (containing the specified error message, or <quote>out of
+   memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
+   was left as NULL).
+  </para>
+
+  <para>
+   Another option for exiting a row processor is to throw an exception using
+   C's <function>longjmp()</> or C++'s <literal>throw</>.  If this is done,
+   processing of the incoming data can be resumed later by calling
+   <function>PQgetResult</>; the row processor will be invoked as normal for
+   any remaining rows in the current result.
+   As with any usage of <function>PQgetResult</>, the application
+   should continue calling <function>PQgetResult</> until it gets a NULL
+   result before issuing any new query.
+  </para>
+
+  <para>
+   In some cases, an exception may mean that the remainder of the
+   query result is not interesting.  In such cases the application can discard
+   the remaining rows with <function>PQskipResult</>, described below.
+   Another possible recovery option is to close the connection altogether with
+   <function>PQfinish</>.
+  </para>
+
+  <para>
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessor">
+     <term>
+      <function>PQsetRowProcessor</function>
+      <indexterm>
+       <primary>PQsetRowProcessor</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Sets a callback function to process each row.
+
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+      </para>
+
+      <para>
+       The specified row processor function <parameter>func</> is installed as
+       the active row processor for the given connection <parameter>conn</>.
+       Also, <parameter>param</> is installed as the passthrough pointer to
+       pass to it.  Alternatively, if <parameter>func</> is NULL, the standard
+       row processor is reinstalled on the given connection (and
+       <parameter>param</> is ignored).
+      </para>
+
+      <para>
+       Although the row processor can be changed at any time in the life of a
+       connection, it's generally unwise to do so while a query is active.
+       In particular, when using asynchronous mode, be aware that both
+       <function>PQisBusy</> and <function>PQgetResult</> can call the current
+       row processor.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqgetrowprocessor">
+     <term>
+      <function>PQgetRowProcessor</function>
+      <indexterm>
+       <primary>PQgetRowProcessor</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Fetches the current row processor for the specified connection.
+
+<synopsis>
+PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+</synopsis>
+      </para>
+
+      <para>
+       In addition to returning the row processor function pointer, the
+       current passthrough pointer will be returned at
+       <literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqskipresult">
+     <term>
+      <function>PQskipResult</function>
+      <indexterm>
+       <primary>PQskipResult</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Discard all the remaining rows in the incoming result set.
+
+<synopsis>
+PGresult *PQskipResult(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This is a simple convenience function to discard incoming data after a
+       row processor has failed or it's determined that the rest of the result
+       set is not interesting.  <function>PQskipResult</> is exactly
+       equivalent to <function>PQgetResult</> except that it transiently
+       installs a dummy row processor function that just discards data.
+       The returned <type>PGresult</> can be discarded without further ado
+       if it has status <literal>PGRES_TUPLES_OK</>; but other status values
+       should be handled normally.  (In particular,
+       <literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
+       will still need to be dealt with.)
+       As when using <function>PQgetResult</>, one should usually repeat the
+       call until NULL is returned to ensure the connection has reached an
+       idle state.  Another possible usage is to call
+       <function>PQskipResult</> just once, and then resume using
+       <function>PQgetResult</> to process subsequent result sets normally.
+      </para>
+
+      <para>
+       Because <function>PQskipResult</> will wait for server input, it is not
+       very useful in asynchronous applications.  In particular you should not
+       code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
+       because that will result in the installed row processor being called
+       within <function>PQisBusy</>.  To get the proper behavior in an
+       asynchronous application, you'll need to install a dummy row processor
+       (or set a flag to make your normal row processor do nothing) and leave
+       it that way until you have discarded all incoming data via your normal
+       <function>PQisBusy</> and <function>PQgetResult</> loop.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
+
+ </sect1>
+
  <sect1 id="libpq-events">
   <title>Event System</title>
 
index 1af8df699edf641ef4defecbdd375d7ff4b3a515..1251455f1f6d92c74e206b5a9b8dcdeb36ac9b98 100644 (file)
@@ -160,3 +160,6 @@ PQconnectStartParams      157
 PQping                    158
 PQpingParams              159
 PQlibVersion              160
+PQsetRowProcessor         161
+PQgetRowProcessor         162
+PQskipResult              163
index 6a20a1485d17b6fdea79cb922ff26c78335388a9..03fd6e45bb9a9b996f39b6c2625643ac8a288ebe 100644 (file)
@@ -2425,7 +2425,7 @@ keep_going:                                               /* We will come back to here until there is
                                        conn->status = CONNECTION_AUTH_OK;
 
                                        /*
-                                        * Set asyncStatus so that PQsetResult will think that
+                                        * Set asyncStatus so that PQgetResult will think that
                                         * what comes back next is the result of a query.  See
                                         * below.
                                         */
@@ -2686,8 +2686,11 @@ makeEmptyPGconn(void)
        /* Zero all pointers and booleans */
        MemSet(conn, 0, sizeof(PGconn));
 
+       /* install default row processor and notice hooks */
+       PQsetRowProcessor(conn, NULL, NULL);
        conn->noticeHooks.noticeRec = defaultNoticeReceiver;
        conn->noticeHooks.noticeProc = defaultNoticeProcessor;
+
        conn->status = CONNECTION_BAD;
        conn->asyncStatus = PGASYNC_IDLE;
        conn->xactStatus = PQTRANS_IDLE;
@@ -2721,11 +2724,14 @@ makeEmptyPGconn(void)
        conn->inBuffer = (char *) malloc(conn->inBufSize);
        conn->outBufSize = 16 * 1024;
        conn->outBuffer = (char *) malloc(conn->outBufSize);
+       conn->rowBufLen = 32;
+       conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
        initPQExpBuffer(&conn->errorMessage);
        initPQExpBuffer(&conn->workBuffer);
 
        if (conn->inBuffer == NULL ||
                conn->outBuffer == NULL ||
+               conn->rowBuf == NULL ||
                PQExpBufferBroken(&conn->errorMessage) ||
                PQExpBufferBroken(&conn->workBuffer))
        {
@@ -2829,6 +2835,8 @@ freePGconn(PGconn *conn)
                free(conn->inBuffer);
        if (conn->outBuffer)
                free(conn->outBuffer);
+       if (conn->rowBuf)
+               free(conn->rowBuf);
        termPQExpBuffer(&conn->errorMessage);
        termPQExpBuffer(&conn->workBuffer);
 
@@ -2888,7 +2896,7 @@ closePGconn(PGconn *conn)
        conn->status = CONNECTION_BAD;          /* Well, not really _bad_ - just
                                                                                 * absent */
        conn->asyncStatus = PGASYNC_IDLE;
-       pqClearAsyncResult(conn);       /* deallocate result and curTuple */
+       pqClearAsyncResult(conn);       /* deallocate result */
        pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
        conn->addrlist = NULL;
        conn->addr_cur = NULL;
index b743566a5ddda2e9f30a4148bfcac78ed898fd84..86f157c338af7e149dea6bff844f500c93068d27 100644 (file)
@@ -50,6 +50,9 @@ static bool static_std_strings = false;
 
 
 static PGEvent *dupEvents(PGEvent *events, int count);
+static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
+static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+                                 const char **errmsgp, void *param);
 static bool PQsendQueryStart(PGconn *conn);
 static int PQsendQueryGuts(PGconn *conn,
                                const char *command,
@@ -61,6 +64,8 @@ static int PQsendQueryGuts(PGconn *conn,
                                const int *paramFormats,
                                int resultFormat);
 static void parseInput(PGconn *conn);
+static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+                                 const char **errmsgp, void *param);
 static bool PQexecStart(PGconn *conn);
 static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
@@ -694,14 +699,12 @@ PQclear(PGresult *res)
 /*
  * Handy subroutine to deallocate any partially constructed async result.
  */
-
 void
 pqClearAsyncResult(PGconn *conn)
 {
        if (conn->result)
                PQclear(conn->result);
        conn->result = NULL;
-       conn->curTuple = NULL;
 }
 
 /*
@@ -756,7 +759,6 @@ pqPrepareAsyncResult(PGconn *conn)
         */
        res = conn->result;
        conn->result = NULL;            /* handing over ownership to caller */
-       conn->curTuple = NULL;          /* just in case */
        if (!res)
                res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
        else
@@ -832,7 +834,7 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
  *       add a row pointer to the PGresult structure, growing it if necessary
  *       Returns TRUE if OK, FALSE if not enough memory to add the row
  */
-int
+static bool
 pqAddTuple(PGresult *res, PGresAttValue *tup)
 {
        if (res->ntups >= res->tupArrSize)
@@ -978,6 +980,124 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
 }
 
 
+/*
+ * PQsetRowProcessor
+ *       Set function that copies row data out from the network buffer,
+ *       along with a passthrough parameter for it.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+       if (!conn)
+               return;
+
+       if (func)
+       {
+               /* set custom row processor */
+               conn->rowProcessor = func;
+               conn->rowProcessorParam = param;
+       }
+       else
+       {
+               /* set default row processor */
+               conn->rowProcessor = pqStdRowProcessor;
+               conn->rowProcessorParam = conn;
+       }
+}
+
+/*
+ * PQgetRowProcessor
+ *       Get current row processor of PGconn.
+ *       If param is not NULL, also store the passthrough parameter at *param.
+ */
+PQrowProcessor
+PQgetRowProcessor(const PGconn *conn, void **param)
+{
+       if (!conn)
+       {
+               if (param)
+                       *param = NULL;
+               return NULL;
+       }
+
+       if (param)
+               *param = conn->rowProcessorParam;
+       return conn->rowProcessor;
+}
+
+/*
+ * pqStdRowProcessor
+ *       Add the received row to the PGresult structure
+ *       Returns 1 if OK, -1 if error occurred.
+ *
+ * Note: "param" should point to the PGconn, but we don't actually need that
+ * as of the current coding.
+ */
+static int
+pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+                                 const char **errmsgp, void *param)
+{
+       int                     nfields = res->numAttributes;
+       PGresAttValue *tup;
+       int                     i;
+
+       if (columns == NULL)
+       {
+               /* New result set ... we have nothing to do in this function. */
+               return 1;
+       }
+
+       /*
+        * Basically we just allocate space in the PGresult for each field and
+        * copy the data over.
+        *
+        * Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
+        * which caller will take to mean "out of memory".  This is preferable to
+        * trying to set up such a message here, because evidently there's not
+        * enough memory for gettext() to do anything.
+        */
+       tup = (PGresAttValue *)
+               pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+       if (tup == NULL)
+               return -1;
+
+       for (i = 0; i < nfields; i++)
+       {
+               int             clen = columns[i].len;
+
+               if (clen < 0)
+               {
+                       /* null field */
+                       tup[i].len = NULL_LEN;
+                       tup[i].value = res->null_field;
+               }
+               else
+               {
+                       bool            isbinary = (res->attDescs[i].format != 0);
+                       char       *val;
+
+                       val = (char *) pqResultAlloc(res, clen + 1, isbinary);
+                       if (val == NULL)
+                               return -1;
+
+                       /* copy and zero-terminate the data (even if it's binary) */
+                       memcpy(val, columns[i].value, clen);
+                       val[clen] = '\0';
+
+                       tup[i].len = clen;
+                       tup[i].value = val;
+               }
+       }
+
+       /* And add the tuple to the PGresult's tuple array */
+       if (!pqAddTuple(res, tup))
+               return -1;
+
+       /* Success */
+       return 1;
+}
+
+
 /*
  * PQsendQuery
  *      Submit a query, but don't wait for it to finish
@@ -1223,7 +1343,6 @@ PQsendQueryStart(PGconn *conn)
 
        /* initialize async result-accumulation state */
        conn->result = NULL;
-       conn->curTuple = NULL;
 
        /* ready to send command message */
        return true;
@@ -1468,6 +1587,9 @@ PQconsumeInput(PGconn *conn)
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 static void
 parseInput(PGconn *conn)
@@ -1615,6 +1737,49 @@ PQgetResult(PGconn *conn)
        return res;
 }
 
+/*
+ * PQskipResult
+ *       Get the next PGresult produced by a query, but discard any data rows.
+ *
+ * This is mainly useful for cleaning up after a longjmp out of a row
+ * processor, when resuming processing of the current query result isn't
+ * wanted.  Note that this is of little value in an async-style application,
+ * since any preceding calls to PQisBusy would have already called the regular
+ * row processor.
+ */
+PGresult *
+PQskipResult(PGconn *conn)
+{
+       PGresult   *res;
+       PQrowProcessor savedRowProcessor;
+
+       if (!conn)
+               return NULL;
+
+       /* temporarily install dummy row processor */
+       savedRowProcessor = conn->rowProcessor;
+       conn->rowProcessor = dummyRowProcessor;
+       /* no need to save/change rowProcessorParam */
+
+       /* fetch the next result */
+       res = PQgetResult(conn);
+
+       /* restore previous row processor */
+       conn->rowProcessor = savedRowProcessor;
+
+       return res;
+}
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+                                 const char **errmsgp, void *param)
+{
+       return 1;
+}
+
 
 /*
  * PQexec
@@ -1721,7 +1886,7 @@ PQexecStart(PGconn *conn)
         * Silently discard any prior query result that application didn't eat.
         * This is probably poor design, but it's here for backward compatibility.
         */
-       while ((result = PQgetResult(conn)) != NULL)
+       while ((result = PQskipResult(conn)) != NULL)
        {
                ExecStatusType resultStatus = result->resultStatus;
 
index 29752270a1d8072fabc73eea0e64383d32b12402..13fd98c2f913d3818758e75bac96822306981b52 100644 (file)
@@ -40,9 +40,7 @@
 #define LO_BUFSIZE               8192
 
 static int     lo_initialize(PGconn *conn);
-
-static Oid
-                       lo_import_internal(PGconn *conn, const char *filename, const Oid oid);
+static Oid     lo_import_internal(PGconn *conn, const char *filename, Oid oid);
 
 /*
  * lo_open
@@ -59,7 +57,7 @@ lo_open(PGconn *conn, Oid lobjId, int mode)
        PQArgBlock      argv[2];
        PGresult   *res;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -101,7 +99,7 @@ lo_close(PGconn *conn, int fd)
        int                     retval;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -139,7 +137,7 @@ lo_truncate(PGconn *conn, int fd, size_t len)
        int                     retval;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -192,7 +190,7 @@ lo_read(PGconn *conn, int fd, char *buf, size_t len)
        PGresult   *res;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -234,7 +232,7 @@ lo_write(PGconn *conn, int fd, const char *buf, size_t len)
        int                     result_len;
        int                     retval;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -280,7 +278,7 @@ lo_lseek(PGconn *conn, int fd, int offset, int whence)
        int                     retval;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -328,7 +326,7 @@ lo_creat(PGconn *conn, int mode)
        int                     retval;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return InvalidOid;
@@ -367,7 +365,7 @@ lo_create(PGconn *conn, Oid lobjId)
        int                     retval;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return InvalidOid;
@@ -413,7 +411,7 @@ lo_tell(PGconn *conn, int fd)
        PGresult   *res;
        int                     result_len;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -451,7 +449,7 @@ lo_unlink(PGconn *conn, Oid lobjId)
        int                     result_len;
        int                     retval;
 
-       if (conn->lobjfuncs == NULL)
+       if (conn == NULL || conn->lobjfuncs == NULL)
        {
                if (lo_initialize(conn) < 0)
                        return -1;
@@ -505,7 +503,7 @@ lo_import_with_oid(PGconn *conn, const char *filename, Oid lobjId)
 }
 
 static Oid
-lo_import_internal(PGconn *conn, const char *filename, const Oid oid)
+lo_import_internal(PGconn *conn, const char *filename, Oid oid)
 {
        int                     fd;
        int                     nbytes,
@@ -684,8 +682,13 @@ lo_initialize(PGconn *conn)
        int                     n;
        const char *query;
        const char *fname;
+       PQrowProcessor savedRowProcessor;
+       void       *savedRowProcessorParam;
        Oid                     foid;
 
+       if (!conn)
+               return -1;
+
        /*
         * Allocate the structure to hold the functions OID's
         */
@@ -729,7 +732,16 @@ lo_initialize(PGconn *conn)
                        "or proname = 'loread' "
                        "or proname = 'lowrite'";
 
+       /* Ensure the standard row processor is used to collect the result */
+       savedRowProcessor = conn->rowProcessor;
+       savedRowProcessorParam = conn->rowProcessorParam;
+       PQsetRowProcessor(conn, NULL, NULL);
+
        res = PQexec(conn, query);
+
+       conn->rowProcessor = savedRowProcessor;
+       conn->rowProcessorParam = savedRowProcessorParam;
+
        if (res == NULL)
        {
                free(lobjfuncs);
index ce0eac3f712eab7f2974619f930494c941783874..b5e5519c416aa767921a5549f7947f80e4cf8250 100644 (file)
@@ -218,6 +218,32 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
        return 0;
 }
 
+/*
+ * pqSkipnchar:
+ *     skip over len bytes in input buffer.
+ *
+ * Note: this is primarily useful for its debug output, which should
+ * be exactly the same as for pqGetnchar.  We assume the data in question
+ * will actually be used, but just isn't getting copied anywhere as yet.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+       if (len > (size_t) (conn->inEnd - conn->inCursor))
+               return EOF;
+
+       if (conn->Pfdebug)
+       {
+               fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len);
+               fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len);
+               fprintf(conn->Pfdebug, "\n");
+       }
+
+       conn->inCursor += len;
+
+       return 0;
+}
+
 /*
  * pqPutnchar:
  *     write exactly len bytes to the current message
index a7c38993b8b69f23e7c33f1c41158b1ff9a7d0d8..43f9954dd1c01c4a8716a3d69bff5f644a8e49ea 100644 (file)
@@ -49,11 +49,19 @@ static int  getNotify(PGconn *conn);
 PostgresPollingStatusType
 pqSetenvPoll(PGconn *conn)
 {
+       PostgresPollingStatusType result;
        PGresult   *res;
+       PQrowProcessor savedRowProcessor;
+       void       *savedRowProcessorParam;
 
        if (conn == NULL || conn->status == CONNECTION_BAD)
                return PGRES_POLLING_FAILED;
 
+       /* Ensure the standard row processor is used to collect any results */
+       savedRowProcessor = conn->rowProcessor;
+       savedRowProcessorParam = conn->rowProcessorParam;
+       PQsetRowProcessor(conn, NULL, NULL);
+
        /* Check whether there are any data for us */
        switch (conn->setenv_state)
        {
@@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn)
                                if (n < 0)
                                        goto error_return;
                                if (n == 0)
-                                       return PGRES_POLLING_READING;
+                               {
+                                       result = PGRES_POLLING_READING;
+                                       goto normal_return;
+                               }
 
                                break;
                        }
@@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn)
 
                        /* Should we raise an error if called when not active? */
                case SETENV_STATE_IDLE:
-                       return PGRES_POLLING_OK;
+                       result = PGRES_POLLING_OK;
+                       goto normal_return;
 
                default:
                        printfPQExpBuffer(&conn->errorMessage,
@@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn)
                        case SETENV_STATE_CLIENT_ENCODING_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                               return PGRES_POLLING_READING;
+                                       {
+                                               result = PGRES_POLLING_READING;
+                                               goto normal_return;
+                                       }
 
                                        res = PQgetResult(conn);
 
@@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn)
                        case SETENV_STATE_OPTION_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                               return PGRES_POLLING_READING;
+                                       {
+                                               result = PGRES_POLLING_READING;
+                                               goto normal_return;
+                                       }
 
                                        res = PQgetResult(conn);
 
@@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn)
                                                goto error_return;
 
                                        conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
-                                       return PGRES_POLLING_READING;
+                                       result = PGRES_POLLING_READING;
+                                       goto normal_return;
                                }
 
                        case SETENV_STATE_QUERY1_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                               return PGRES_POLLING_READING;
+                                       {
+                                               result = PGRES_POLLING_READING;
+                                               goto normal_return;
+                                       }
 
                                        res = PQgetResult(conn);
 
@@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn)
                                                goto error_return;
 
                                        conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
-                                       return PGRES_POLLING_READING;
+                                       result = PGRES_POLLING_READING;
+                                       goto normal_return;
                                }
 
                        case SETENV_STATE_QUERY2_WAIT:
                                {
                                        if (PQisBusy(conn))
-                                               return PGRES_POLLING_READING;
+                                       {
+                                               result = PGRES_POLLING_READING;
+                                               goto normal_return;
+                                       }
 
                                        res = PQgetResult(conn);
 
@@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn)
                                        {
                                                /* Query finished, so we're done */
                                                conn->setenv_state = SETENV_STATE_IDLE;
-                                               return PGRES_POLLING_OK;
+                                               result = PGRES_POLLING_OK;
+                                               goto normal_return;
                                        }
                                        break;
                                }
@@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn)
 
 error_return:
        conn->setenv_state = SETENV_STATE_IDLE;
-       return PGRES_POLLING_FAILED;
+       result = PGRES_POLLING_FAILED;
+
+normal_return:
+       conn->rowProcessor = savedRowProcessor;
+       conn->rowProcessorParam = savedRowProcessorParam;
+       return result;
 }
 
 
@@ -406,6 +438,9 @@ error_return:
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 void
 pqParseInput2(PGconn *conn)
@@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn)
                                                /* First 'T' in a query sequence */
                                                if (getRowDescriptions(conn))
                                                        return;
+                                               /* getRowDescriptions() moves inStart itself */
+                                               continue;
                                        }
                                        else
                                        {
@@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn)
                                                /* Read another tuple of a normal query response */
                                                if (getAnotherTuple(conn, FALSE))
                                                        return;
+                                               /* getAnotherTuple() moves inStart itself */
+                                               continue;
                                        }
                                        else
                                        {
@@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn)
                                                /* Read another tuple of a normal query response */
                                                if (getAnotherTuple(conn, TRUE))
                                                        return;
+                                               /* getAnotherTuple() moves inStart itself */
+                                               continue;
                                        }
                                        else
                                        {
@@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn)
 /*
  * parseInput subroutine to read a 'T' (row descriptions) message.
  * We build a PGresult structure containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
  *
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time.  Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note that if we run out of data, we have to suspend and reprocess
+ * the message after more data is received.  Otherwise, conn->inStart
+ * must get advanced past the processed data.
  */
 static int
 getRowDescriptions(PGconn *conn)
 {
-       PGresult   *result = NULL;
+       PGresult   *result;
        int                     nfields;
+       const char *errmsg;
        int                     i;
 
        result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
        if (!result)
-               goto failure;
+       {
+               errmsg = NULL;                  /* means "out of memory", see below */
+               goto advance_and_error;
+       }
 
        /* parseInput already read the 'T' label. */
        /* the next two bytes are the number of fields  */
        if (pqGetInt(&(result->numAttributes), 2, conn))
-               goto failure;
+               goto EOFexit;
        nfields = result->numAttributes;
 
        /* allocate space for the attribute descriptors */
@@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn)
                result->attDescs = (PGresAttDesc *)
                        pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
                if (!result->attDescs)
-                       goto failure;
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
                MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
        }
 
@@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn)
                        pqGetInt(&typid, 4, conn) ||
                        pqGetInt(&typlen, 2, conn) ||
                        pqGetInt(&atttypmod, 4, conn))
-                       goto failure;
+                       goto EOFexit;
 
                /*
                 * Since pqGetInt treats 2-byte integers as unsigned, we need to
@@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn)
                result->attDescs[i].name = pqResultStrdup(result,
                                                                                                  conn->workBuffer.data);
                if (!result->attDescs[i].name)
-                       goto failure;
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
                result->attDescs[i].tableid = 0;
                result->attDescs[i].columnid = 0;
                result->attDescs[i].format = 0;
@@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn)
 
        /* Success! */
        conn->result = result;
-       return 0;
 
-failure:
-       if (result)
+       /*
+        * Advance inStart to show that the "T" message has been processed.  We
+        * must do this before calling the row processor, in case it longjmps.
+        */
+       conn->inStart = conn->inCursor;
+
+       /* Give the row processor a chance to initialize for new result set */
+       errmsg = NULL;
+       switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+                                                                  conn->rowProcessorParam))
+       {
+               case 1:
+                       /* everything is good */
+                       return 0;
+
+               case -1:
+                       /* error, report the errmsg below */
+                       break;
+
+               default:
+                       /* unrecognized return code */
+                       errmsg = libpq_gettext("unrecognized return value from row processor");
+                       break;
+       }
+       goto set_error_result;
+
+advance_and_error:
+       /*
+        * Discard the failed message.  Unfortunately we don't know for sure
+        * where the end is, so just throw away everything in the input buffer.
+        * This is not very desirable but it's the best we can do in protocol v2.
+        */
+       conn->inStart = conn->inEnd;
+
+set_error_result:
+
+       /*
+        * Replace partially constructed result with an error result. First
+        * discard the old result to try to win back some memory.
+        */
+       pqClearAsyncResult(conn);
+
+       /*
+        * If row processor didn't provide an error message, assume "out of
+        * memory" was meant.  The advantage of having this special case is that
+        * freeing the old result first greatly improves the odds that gettext()
+        * will succeed in providing a translation.
+        */
+       if (!errmsg)
+               errmsg = libpq_gettext("out of memory for query result");
+
+       printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+
+       /*
+        * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
+        * do to recover...
+        */
+       conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+       conn->asyncStatus = PGASYNC_READY;
+
+EOFexit:
+       if (result && result != conn->result)
                PQclear(result);
        return EOF;
 }
 
 /*
  * parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
  *
  * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received.  Otherwise, conn->inStart
+ * must get advanced past the processed data.
  */
 static int
 getAnotherTuple(PGconn *conn, bool binary)
 {
        PGresult   *result = conn->result;
        int                     nfields = result->numAttributes;
-       PGresAttValue *tup;
-
+       const char *errmsg;
+       PGdataValue *rowbuf;
        /* the backend sends us a bitmap of which attributes are null */
        char            std_bitmap[64]; /* used unless it doesn't fit */
        char       *bitmap = std_bitmap;
@@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary)
        int                     bitcnt;                 /* number of bits examined in current byte */
        int                     vlen;                   /* length of the current field value */
 
-       result->binary = binary;
-
-       /* Allocate tuple space if first time for this data message */
-       if (conn->curTuple == NULL)
+       /* Resize row buffer if needed */
+       rowbuf = conn->rowBuf;
+       if (nfields > conn->rowBufLen)
        {
-               conn->curTuple = (PGresAttValue *)
-                       pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-               if (conn->curTuple == NULL)
-                       goto outOfMemory;
-               MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-               /*
-                * If it's binary, fix the column format indicators.  We assume the
-                * backend will consistently send either B or D, not a mix.
-                */
-               if (binary)
+               rowbuf = (PGdataValue *) realloc(rowbuf,
+                                                                                nfields * sizeof(PGdataValue));
+               if (!rowbuf)
                {
-                       for (i = 0; i < nfields; i++)
-                               result->attDescs[i].format = 1;
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
                }
+               conn->rowBuf = rowbuf;
+               conn->rowBufLen = nfields;
+       }
+
+       /* Save format specifier */
+       result->binary = binary;
+
+       /*
+        * If it's binary, fix the column format indicators.  We assume the
+        * backend will consistently send either B or D, not a mix.
+        */
+       if (binary)
+       {
+               for (i = 0; i < nfields; i++)
+                       result->attDescs[i].format = 1;
        }
-       tup = conn->curTuple;
 
        /* Get the null-value bitmap */
        nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary)
        {
                bitmap = (char *) malloc(nbytes);
                if (!bitmap)
-                       goto outOfMemory;
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
        }
 
        if (pqGetnchar(bitmap, nbytes, conn))
@@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary)
 
        for (i = 0; i < nfields; i++)
        {
+               /* get the value length */
                if (!(bmap & 0200))
-               {
-                       /* if the field value is absent, make it a null string */
-                       tup[i].value = result->null_field;
-                       tup[i].len = NULL_LEN;
-               }
+                       vlen = NULL_LEN;
+               else if (pqGetInt(&vlen, 4, conn))
+                       goto EOFexit;
                else
                {
-                       /* get the value length (the first four bytes are for length) */
-                       if (pqGetInt(&vlen, 4, conn))
-                               goto EOFexit;
                        if (!binary)
                                vlen = vlen - 4;
                        if (vlen < 0)
                                vlen = 0;
-                       if (tup[i].value == NULL)
-                       {
-                               tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                               if (tup[i].value == NULL)
-                                       goto outOfMemory;
-                       }
-                       tup[i].len = vlen;
-                       /* read in the value */
-                       if (vlen > 0)
-                               if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                                       goto EOFexit;
-                       /* we have to terminate this ourselves */
-                       tup[i].value[vlen] = '\0';
                }
+               rowbuf[i].len = vlen;
+
+               /*
+                * rowbuf[i].value always points to the next address in the data
+                * buffer even if the value is NULL.  This allows row processors to
+                * estimate data sizes more easily.
+                */
+               rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+               /* Skip over the data value */
+               if (vlen > 0)
+               {
+                       if (pqSkipnchar(vlen, conn))
+                               goto EOFexit;
+               }
+
                /* advance the bitmap stuff */
                bitcnt++;
                if (bitcnt == BITS_PER_BYTE)
@@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary)
                        bmap <<= 1;
        }
 
-       /* Success!  Store the completed tuple in the result */
-       if (!pqAddTuple(result, tup))
-               goto outOfMemory;
-       /* and reset for a new message */
-       conn->curTuple = NULL;
-
+       /* Release bitmap now if we allocated it */
        if (bitmap != std_bitmap)
                free(bitmap);
-       return 0;
+       bitmap = NULL;
+
+       /*
+        * Advance inStart to show that the "D" message has been processed.  We
+        * must do this before calling the row processor, in case it longjmps.
+        */
+       conn->inStart = conn->inCursor;
+
+       /* Pass the completed row values to rowProcessor */
+       errmsg = NULL;
+       switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+                                                                  conn->rowProcessorParam))
+       {
+               case 1:
+                       /* everything is good */
+                       return 0;
+
+               case -1:
+                       /* error, report the errmsg below */
+                       break;
 
-outOfMemory:
-       /* Replace partially constructed result with an error result */
+               default:
+                       /* unrecognized return code */
+                       errmsg = libpq_gettext("unrecognized return value from row processor");
+                       break;
+       }
+       goto set_error_result;
+
+advance_and_error:
+       /*
+        * Discard the failed message.  Unfortunately we don't know for sure
+        * where the end is, so just throw away everything in the input buffer.
+        * This is not very desirable but it's the best we can do in protocol v2.
+        */
+       conn->inStart = conn->inEnd;
+
+set_error_result:
 
        /*
-        * we do NOT use pqSaveErrorResult() here, because of the likelihood that
-        * there's not enough memory to concatenate messages...
+        * Replace partially constructed result with an error result. First
+        * discard the old result to try to win back some memory.
         */
        pqClearAsyncResult(conn);
-       printfPQExpBuffer(&conn->errorMessage,
-                                         libpq_gettext("out of memory for query result\n"));
+
+       /*
+        * If row processor didn't provide an error message, assume "out of
+        * memory" was meant.  The advantage of having this special case is that
+        * freeing the old result first greatly improves the odds that gettext()
+        * will succeed in providing a translation.
+        */
+       if (!errmsg)
+               errmsg = libpq_gettext("out of memory for query result");
+
+       printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
 
        /*
         * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
@@ -838,8 +994,6 @@ outOfMemory:
         */
        conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
        conn->asyncStatus = PGASYNC_READY;
-       /* Discard the failed message --- good idea? */
-       conn->inStart = conn->inEnd;
 
 EOFexit:
        if (bitmap != NULL && bitmap != std_bitmap)
@@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen)
 {
        int                     result = 1;             /* return value if buffer overflows */
 
-       if (conn->sock < 0)
+       if (conn->sock < 0 ||
+               conn->asyncStatus != PGASYNC_COPY_OUT)
        {
                *s = '\0';
                return EOF;
index 892dcbcb00f15e9a675543b874603058ebf012f1..a773d7a5246f201c2bc11b5c98433222e0b335ad 100644 (file)
@@ -44,7 +44,7 @@
 
 
 static void handleSyncLoss(PGconn *conn, char id, int msgLength);
-static int     getRowDescriptions(PGconn *conn);
+static int     getRowDescriptions(PGconn *conn, int msgLength);
 static int     getParamDescriptions(PGconn *conn);
 static int     getAnotherTuple(PGconn *conn, int msgLength);
 static int     getParameterStatus(PGconn *conn);
@@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet,
  * parseInput: if appropriate, parse input data from backend
  * until input is exhausted or a stopping state is reached.
  * Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
  */
 void
 pqParseInput3(PGconn *conn)
@@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn)
                                                conn->queryclass == PGQUERY_DESCRIBE)
                                        {
                                                /* First 'T' in a query sequence */
-                                               if (getRowDescriptions(conn))
+                                               if (getRowDescriptions(conn, msgLength))
                                                        return;
-
-                                               /*
-                                                * If we're doing a Describe, we're ready to pass the
-                                                * result back to the client.
-                                                */
-                                               if (conn->queryclass == PGQUERY_DESCRIBE)
-                                                       conn->asyncStatus = PGASYNC_READY;
+                                               /* getRowDescriptions() moves inStart itself */
+                                               continue;
                                        }
                                        else
                                        {
@@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn)
                                                /* Read another tuple of a normal query response */
                                                if (getAnotherTuple(conn, msgLength))
                                                        return;
+                                               /* getAnotherTuple() moves inStart itself */
+                                               continue;
                                        }
                                        else if (conn->result != NULL &&
                                                         conn->result->resultStatus == PGRES_FATAL_ERROR)
@@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
  * parseInput subroutine to read a 'T' (row descriptions) message.
  * We'll build a new PGresult structure (unless called for a Describe
  * command for a prepared statement) containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
  *
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time.  Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
  */
 static int
-getRowDescriptions(PGconn *conn)
+getRowDescriptions(PGconn *conn, int msgLength)
 {
        PGresult   *result;
        int                     nfields;
+       const char *errmsg;
        int                     i;
 
        /*
@@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn)
        else
                result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
        if (!result)
-               goto failure;
+       {
+               errmsg = NULL;                  /* means "out of memory", see below */
+               goto advance_and_error;
+       }
 
        /* parseInput already read the 'T' label and message length. */
        /* the next two bytes are the number of fields */
        if (pqGetInt(&(result->numAttributes), 2, conn))
-               goto failure;
+       {
+               /* We should not run out of data here, so complain */
+               errmsg = libpq_gettext("insufficient data in \"T\" message");
+               goto advance_and_error;
+       }
        nfields = result->numAttributes;
 
        /* allocate space for the attribute descriptors */
@@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn)
                result->attDescs = (PGresAttDesc *)
                        pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
                if (!result->attDescs)
-                       goto failure;
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
                MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
        }
 
@@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn)
                        pqGetInt(&atttypmod, 4, conn) ||
                        pqGetInt(&format, 2, conn))
                {
-                       goto failure;
+                       /* We should not run out of data here, so complain */
+                       errmsg = libpq_gettext("insufficient data in \"T\" message");
+                       goto advance_and_error;
                }
 
                /*
@@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn)
                result->attDescs[i].name = pqResultStrdup(result,
                                                                                                  conn->workBuffer.data);
                if (!result->attDescs[i].name)
-                       goto failure;
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
                result->attDescs[i].tableid = tableid;
                result->attDescs[i].columnid = columnid;
                result->attDescs[i].format = format;
@@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn)
                        result->binary = 0;
        }
 
+       /* Sanity check that we absorbed all the data */
+       if (conn->inCursor != conn->inStart + 5 + msgLength)
+       {
+               errmsg = libpq_gettext("extraneous data in \"T\" message");
+               goto advance_and_error;
+       }
+
        /* Success! */
        conn->result = result;
-       return 0;
 
-failure:
+       /*
+        * Advance inStart to show that the "T" message has been processed.  We
+        * must do this before calling the row processor, in case it longjmps.
+        */
+       conn->inStart = conn->inCursor;
 
        /*
-        * Discard incomplete result, unless it's from getParamDescriptions.
-        *
-        * Note that if we hit a bufferload boundary while handling the
-        * describe-statement case, we'll forget any PGresult space we just
-        * allocated, and then reallocate it on next try.  This will bloat the
-        * PGresult a little bit but the space will be freed at PQclear, so it
-        * doesn't seem worth trying to be smarter.
+        * If we're doing a Describe, we're done, and ready to pass the result
+        * back to the client.
         */
-       if (result != conn->result)
+       if (conn->queryclass == PGQUERY_DESCRIBE)
+       {
+               conn->asyncStatus = PGASYNC_READY;
+               return 0;
+       }
+
+       /* Give the row processor a chance to initialize for new result set */
+       errmsg = NULL;
+       switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+                                                                  conn->rowProcessorParam))
+       {
+               case 1:
+                       /* everything is good */
+                       return 0;
+
+               case -1:
+                       /* error, report the errmsg below */
+                       break;
+
+               default:
+                       /* unrecognized return code */
+                       errmsg = libpq_gettext("unrecognized return value from row processor");
+                       break;
+       }
+       goto set_error_result;
+
+advance_and_error:
+       /* Discard unsaved result, if any */
+       if (result && result != conn->result)
                PQclear(result);
-       return EOF;
+
+       /* Discard the failed message by pretending we read it */
+       conn->inStart += 5 + msgLength;
+
+set_error_result:
+
+       /*
+        * Replace partially constructed result with an error result. First
+        * discard the old result to try to win back some memory.
+        */
+       pqClearAsyncResult(conn);
+
+       /*
+        * If row processor didn't provide an error message, assume "out of
+        * memory" was meant.
+        */
+       if (!errmsg)
+               errmsg = libpq_gettext("out of memory for query result");
+
+       printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+       pqSaveErrorResult(conn);
+
+       /*
+        * Return zero to allow input parsing to continue.  Subsequent "D"
+        * messages will be ignored until we get to end of data, since an error
+        * result is already set up.
+        */
+       return 0;
 }
 
 /*
@@ -613,47 +691,53 @@ failure:
 
 /*
  * parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
  *
- * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
  */
 static int
 getAnotherTuple(PGconn *conn, int msgLength)
 {
        PGresult   *result = conn->result;
        int                     nfields = result->numAttributes;
-       PGresAttValue *tup;
+       const char *errmsg;
+       PGdataValue *rowbuf;
        int                     tupnfields;             /* # fields from tuple */
        int                     vlen;                   /* length of the current field value */
        int                     i;
 
-       /* Allocate tuple space if first time for this data message */
-       if (conn->curTuple == NULL)
-       {
-               conn->curTuple = (PGresAttValue *)
-                       pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-               if (conn->curTuple == NULL)
-                       goto outOfMemory;
-               MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-       }
-       tup = conn->curTuple;
-
        /* Get the field count and make sure it's what we expect */
        if (pqGetInt(&tupnfields, 2, conn))
-               return EOF;
+       {
+               /* We should not run out of data here, so complain */
+               errmsg = libpq_gettext("insufficient data in \"D\" message");
+               goto advance_and_error;
+       }
 
        if (tupnfields != nfields)
        {
-               /* Replace partially constructed result with an error result */
-               printfPQExpBuffer(&conn->errorMessage,
-                                libpq_gettext("unexpected field count in \"D\" message\n"));
-               pqSaveErrorResult(conn);
-               /* Discard the failed message by pretending we read it */
-               conn->inCursor = conn->inStart + 5 + msgLength;
-               return 0;
+               errmsg = libpq_gettext("unexpected field count in \"D\" message");
+               goto advance_and_error;
+       }
+
+       /* Resize row buffer if needed */
+       rowbuf = conn->rowBuf;
+       if (nfields > conn->rowBufLen)
+       {
+               rowbuf = (PGdataValue *) realloc(rowbuf,
+                                                                                nfields * sizeof(PGdataValue));
+               if (!rowbuf)
+               {
+                       errmsg = NULL;          /* means "out of memory", see below */
+                       goto advance_and_error;
+               }
+               conn->rowBuf = rowbuf;
+               conn->rowBufLen = nfields;
        }
 
        /* Scan the fields */
@@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength)
        {
                /* get the value length */
                if (pqGetInt(&vlen, 4, conn))
-                       return EOF;
-               if (vlen == -1)
                {
-                       /* null field */
-                       tup[i].value = result->null_field;
-                       tup[i].len = NULL_LEN;
-                       continue;
+                       /* We should not run out of data here, so complain */
+                       errmsg = libpq_gettext("insufficient data in \"D\" message");
+                       goto advance_and_error;
                }
-               if (vlen < 0)
-                       vlen = 0;
-               if (tup[i].value == NULL)
-               {
-                       bool            isbinary = (result->attDescs[i].format != 0);
+               rowbuf[i].len = vlen;
 
-                       tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-                       if (tup[i].value == NULL)
-                               goto outOfMemory;
-               }
-               tup[i].len = vlen;
-               /* read in the value */
+               /*
+                * rowbuf[i].value always points to the next address in the data
+                * buffer even if the value is NULL.  This allows row processors to
+                * estimate data sizes more easily.
+                */
+               rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+               /* Skip over the data value */
                if (vlen > 0)
-                       if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                               return EOF;
-               /* we have to terminate this ourselves */
-               tup[i].value[vlen] = '\0';
+               {
+                       if (pqSkipnchar(vlen, conn))
+                       {
+                               /* We should not run out of data here, so complain */
+                               errmsg = libpq_gettext("insufficient data in \"D\" message");
+                               goto advance_and_error;
+                       }
+               }
        }
 
-       /* Success!  Store the completed tuple in the result */
-       if (!pqAddTuple(result, tup))
-               goto outOfMemory;
-       /* and reset for a new message */
-       conn->curTuple = NULL;
+       /* Sanity check that we absorbed all the data */
+       if (conn->inCursor != conn->inStart + 5 + msgLength)
+       {
+               errmsg = libpq_gettext("extraneous data in \"D\" message");
+               goto advance_and_error;
+       }
 
-       return 0;
+       /*
+        * Advance inStart to show that the "D" message has been processed.  We
+        * must do this before calling the row processor, in case it longjmps.
+        */
+       conn->inStart = conn->inCursor;
 
-outOfMemory:
+       /* Pass the completed row values to rowProcessor */
+       errmsg = NULL;
+       switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+                                                                  conn->rowProcessorParam))
+       {
+               case 1:
+                       /* everything is good */
+                       return 0;
+
+               case -1:
+                       /* error, report the errmsg below */
+                       break;
+
+               default:
+                       /* unrecognized return code */
+                       errmsg = libpq_gettext("unrecognized return value from row processor");
+                       break;
+       }
+       goto set_error_result;
+
+advance_and_error:
+       /* Discard the failed message by pretending we read it */
+       conn->inStart += 5 + msgLength;
+
+set_error_result:
 
        /*
         * Replace partially constructed result with an error result. First
         * discard the old result to try to win back some memory.
         */
        pqClearAsyncResult(conn);
-       printfPQExpBuffer(&conn->errorMessage,
-                                         libpq_gettext("out of memory for query result\n"));
+
+       /*
+        * If row processor didn't provide an error message, assume "out of
+        * memory" was meant.  The advantage of having this special case is that
+        * freeing the old result first greatly improves the odds that gettext()
+        * will succeed in providing a translation.
+        */
+       if (!errmsg)
+               errmsg = libpq_gettext("out of memory for query result");
+
+       printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
        pqSaveErrorResult(conn);
 
-       /* Discard the failed message by pretending we read it */
-       conn->inCursor = conn->inStart + 5 + msgLength;
+       /*
+        * Return zero to allow input parsing to continue.  Subsequent "D"
+        * messages will be ignored until we get to end of data, since an error
+        * result is already set up.
+        */
        return 0;
 }
 
index ef26ab9e0d8b3c510c75f16ec79ace35ad7f3cd0..32b466e245cc1587fbb8e7164314d473aeac1225 100644 (file)
@@ -38,13 +38,14 @@ extern              "C"
 
 /* Application-visible enum types */
 
+/*
+ * Although it is okay to add to these lists, values which become unused
+ * should never be removed, nor should constants be redefined - that would
+ * break compatibility with existing code.
+ */
+
 typedef enum
 {
-       /*
-        * Although it is okay to add to this list, values which become unused
-        * should never be removed, nor should constants be redefined - that would
-        * break compatibility with existing code.
-        */
        CONNECTION_OK,
        CONNECTION_BAD,
        /* Non-blocking mode only below here */
@@ -128,6 +129,17 @@ typedef struct pg_conn PGconn;
  */
 typedef struct pg_result PGresult;
 
+/* PGdataValue represents a data field value being passed to a row processor.
+ * It could be either text or binary data; text data is not zero-terminated.
+ * A SQL NULL is represented by len < 0; then value is still valid but there
+ * are no data bytes there.
+ */
+typedef struct pgDataValue
+{
+       int                     len;                    /* data length in bytes, or <0 if NULL */
+       const char *value;                      /* data value, without zero-termination */
+} PGdataValue;
+
 /* PGcancel encapsulates the information needed to cancel a running
  * query on an existing connection.
  * The contents of this struct are not supposed to be known to applications.
@@ -149,6 +161,10 @@ typedef struct pgNotify
        struct pgNotify *next;          /* list link */
 } PGnotify;
 
+/* Function type for row-processor callback */
+typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+                                                          const char **errmsgp, void *param);
+
 /* Function types for notice-handling callbacks */
 typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
 typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -388,11 +404,16 @@ extern int PQsendQueryPrepared(PGconn *conn,
                                        const int *paramFormats,
                                        int resultFormat);
 extern PGresult *PQgetResult(PGconn *conn);
+extern PGresult *PQskipResult(PGconn *conn);
 
 /* Routines for managing an asynchronous query */
 extern int     PQisBusy(PGconn *conn);
 extern int     PQconsumeInput(PGconn *conn);
 
+/* Override default per-row processing */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
index 2103af88329894d5cecee7f4de0214ab8613bbe4..0b6e6769c018ccc48dfd97d2412a54fc3121290e 100644 (file)
@@ -324,6 +324,10 @@ struct pg_conn
        /* Optional file to write trace info to */
        FILE       *Pfdebug;
 
+       /* Callback procedure for per-row processing */
+       PQrowProcessor rowProcessor;    /* function pointer */
+       void       *rowProcessorParam;  /* passthrough argument */
+
        /* Callback procedures for notice message processing */
        PGNoticeHooks noticeHooks;
 
@@ -396,9 +400,14 @@ struct pg_conn
                                                                 * msg has no length word */
        int                     outMsgEnd;              /* offset to msg end (so far) */
 
+       /* Row processor interface workspace */
+       PGdataValue *rowBuf;            /* array for passing values to rowProcessor */
+       int                     rowBufLen;              /* number of entries allocated in rowBuf */
+
        /* Status for asynchronous result construction */
        PGresult   *result;                     /* result being constructed */
-       PGresAttValue *curTuple;        /* tuple currently being read */
+
+       /* Assorted state for SSL, GSS, etc */
 
 #ifdef USE_SSL
        bool            allow_ssl_try;  /* Allowed to try SSL negotiation */
@@ -435,7 +444,6 @@ struct pg_conn
                                                                 * connection */
 #endif
 
-
        /* Buffer for current error message */
        PQExpBufferData errorMessage;           /* expansible string */
 
@@ -505,7 +513,6 @@ extern void
 pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 /* This lets gcc check the format string for consistency. */
 __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int     pqAddTuple(PGresult *res, PGresAttValue *tup);
 extern void pqSaveMessageField(PGresult *res, char code,
                                   const char *value);
 extern void pqSaveParameterStatus(PGconn *conn, const char *name,
@@ -558,6 +565,7 @@ extern int  pqGets(PQExpBuffer buf, PGconn *conn);
 extern int     pqGets_append(PQExpBuffer buf, PGconn *conn);
 extern int     pqPuts(const char *s, PGconn *conn);
 extern int     pqGetnchar(char *s, size_t len, PGconn *conn);
+extern int     pqSkipnchar(size_t len, PGconn *conn);
 extern int     pqPutnchar(const char *s, size_t len, PGconn *conn);
 extern int     pqGetInt(int *result, size_t bytes, PGconn *conn);
 extern int     pqPutInt(int value, size_t bytes, PGconn *conn);