</sect1>
+ <sect1 id="libpq-row-processor">
+ <title>Custom Row Processing</title>
+
+ <indexterm zone="libpq-row-processor">
+ <primary>PQrowProcessor</primary>
+ </indexterm>
+
+ <indexterm zone="libpq-row-processor">
+ <primary>row processor</primary>
+ <secondary>in libpq</secondary>
+ </indexterm>
+
+ <para>
+ Ordinarily, when receiving a query result from the server,
+ <application>libpq</> adds each row value to the current
+ <type>PGresult</type> until the entire result set is received; then
+ the <type>PGresult</type> is returned to the application as a unit.
+ This approach is simple to work with, but becomes inefficient for large
+ result sets. To improve performance, an application can register a
+ custom <firstterm>row processor</> function that processes each row
+ as the data is received from the network. The custom row processor could
+ process the data fully, or store it into some application-specific data
+ structure for later processing.
+ </para>
+
+ <caution>
+ <para>
+ The row processor function sees the rows before it is known whether the
+ query will succeed overall, since the server might return some rows before
+ encountering an error. For proper transactional behavior, it must be
+ possible to discard or undo whatever the row processor has done, if the
+ query ultimately fails.
+ </para>
+ </caution>
+
+ <para>
+ When using a custom row processor, row data is not accumulated into the
+ <type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
+ the application will contain no rows (<function>PQntuples</> =
+ <literal>0</>). However, it still has <function>PQresultStatus</> =
+ <literal>PGRES_TUPLES_OK</>, and it contains correct information about the
+ set of columns in the query result. On the other hand, if the query fails
+ partway through, the returned <type>PGresult</type> has
+ <function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>. The
+ application must be prepared to undo any actions of the row processor
+ whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
+ </para>
+
+ <para>
+ A custom row processor is registered for a particular connection by
+ calling <function>PQsetRowProcessor</function>, described below.
+ This row processor will be used for all subsequent query results on that
+ connection until changed again. A row processor function must have a
+ signature matching
+
+<synopsis>
+typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param);
+</synopsis>
+ where <type>PGdataValue</> is described by
+<synopsis>
+typedef struct pgDataValue
+{
+ int len; /* data length in bytes, or <0 if NULL */
+ const char *value; /* data value, without zero-termination */
+} PGdataValue;
+</synopsis>
+ </para>
+
+ <para>
+ The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
+ <type>PGresult</type> that will eventually be delivered to the calling
+ application (if no error intervenes). It contains information about
+ the set of columns in the query result, but no row data. In particular the
+ row processor must fetch <literal>PQnfields(res)</> to know the number of
+ data columns.
+ </para>
+
+ <para>
+ Immediately after <application>libpq</> has determined the result set's
+ column information, it will make a call to the row processor with
+ <parameter>columns</parameter> set to NULL, but the other parameters as
+ usual. The row processor can use this call to initialize for a new result
+ set; if it has nothing to do, it can just return <literal>1</>. In
+ subsequent calls, one per received row, <parameter>columns</parameter>
+ is non-NULL and points to an array of <type>PGdataValue</> structs, one per
+ data column.
+ </para>
+
+ <para>
+ <parameter>errmsgp</parameter> is an output parameter used only for error
+ reporting. If the row processor needs to report an error, it can set
+ <literal>*</><parameter>errmsgp</parameter> to point to a suitable message
+ string (and then return <literal>-1</>). As a special case, returning
+ <literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
+ from its initial value of NULL is taken to mean <quote>out of memory</>.
+ </para>
+
+ <para>
+ The last parameter, <parameter>param</parameter>, is just a void pointer
+ passed through from <function>PQsetRowProcessor</function>. This can be
+ used for communication between the row processor function and the
+ surrounding application.
+ </para>
+
+ <para>
+ In the <type>PGdataValue</> array passed to a row processor, data values
+ cannot be assumed to be zero-terminated, whether the data format is text
+ or binary. A SQL NULL value is indicated by a negative length field.
+ </para>
+
+ <para>
+ The row processor <emphasis>must</> process the row data values
+ immediately, or else copy them into application-controlled storage.
+ The value pointers passed to the row processor point into
+ <application>libpq</>'s internal data input buffer, which will be
+ overwritten by the next packet fetch.
+ </para>
+
+ <para>
+ The row processor function must return either <literal>1</> or
+ <literal>-1</>.
+ <literal>1</> is the normal, successful result value; <application>libpq</>
+ will continue with receiving row values from the server and passing them to
+ the row processor. <literal>-1</> indicates that the row processor has
+ encountered an error. In that case,
+ <application>libpq</> will discard all remaining rows in the result set
+ and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
+ the application (containing the specified error message, or <quote>out of
+ memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
+ was left as NULL).
+ </para>
+
+ <para>
+ Another option for exiting a row processor is to throw an exception using
+ C's <function>longjmp()</> or C++'s <literal>throw</>. If this is done,
+ processing of the incoming data can be resumed later by calling
+ <function>PQgetResult</>; the row processor will be invoked as normal for
+ any remaining rows in the current result.
+ As with any usage of <function>PQgetResult</>, the application
+ should continue calling <function>PQgetResult</> until it gets a NULL
+ result before issuing any new query.
+ </para>
+
+ <para>
+ In some cases, an exception may mean that the remainder of the
+ query result is not interesting. In such cases the application can discard
+ the remaining rows with <function>PQskipResult</>, described below.
+ Another possible recovery option is to close the connection altogether with
+ <function>PQfinish</>.
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry id="libpq-pqsetrowprocessor">
+ <term>
+ <function>PQsetRowProcessor</function>
+ <indexterm>
+ <primary>PQsetRowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Sets a callback function to process each row.
+
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+ </para>
+
+ <para>
+ The specified row processor function <parameter>func</> is installed as
+ the active row processor for the given connection <parameter>conn</>.
+ Also, <parameter>param</> is installed as the passthrough pointer to
+ pass to it. Alternatively, if <parameter>func</> is NULL, the standard
+ row processor is reinstalled on the given connection (and
+ <parameter>param</> is ignored).
+ </para>
+
+ <para>
+ Although the row processor can be changed at any time in the life of a
+ connection, it's generally unwise to do so while a query is active.
+ In particular, when using asynchronous mode, be aware that both
+ <function>PQisBusy</> and <function>PQgetResult</> can call the current
+ row processor.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqgetrowprocessor">
+ <term>
+ <function>PQgetRowProcessor</function>
+ <indexterm>
+ <primary>PQgetRowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Fetches the current row processor for the specified connection.
+
+<synopsis>
+PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+</synopsis>
+ </para>
+
+ <para>
+ In addition to returning the row processor function pointer, the
+ current passthrough pointer will be returned at
+ <literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqskipresult">
+ <term>
+ <function>PQskipResult</function>
+ <indexterm>
+ <primary>PQskipResult</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Discard all the remaining rows in the incoming result set.
+
+<synopsis>
+PGresult *PQskipResult(PGconn *conn);
+</synopsis>
+ </para>
+
+ <para>
+ This is a simple convenience function to discard incoming data after a
+ row processor has failed or it's determined that the rest of the result
+ set is not interesting. <function>PQskipResult</> is exactly
+ equivalent to <function>PQgetResult</> except that it transiently
+ installs a dummy row processor function that just discards data.
+ The returned <type>PGresult</> can be discarded without further ado
+ if it has status <literal>PGRES_TUPLES_OK</>; but other status values
+ should be handled normally. (In particular,
+ <literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
+ will still need to be dealt with.)
+ As when using <function>PQgetResult</>, one should usually repeat the
+ call until NULL is returned to ensure the connection has reached an
+ idle state. Another possible usage is to call
+ <function>PQskipResult</> just once, and then resume using
+ <function>PQgetResult</> to process subsequent result sets normally.
+ </para>
+
+ <para>
+ Because <function>PQskipResult</> will wait for server input, it is not
+ very useful in asynchronous applications. In particular you should not
+ code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
+ because that will result in the installed row processor being called
+ within <function>PQisBusy</>. To get the proper behavior in an
+ asynchronous application, you'll need to install a dummy row processor
+ (or set a flag to make your normal row processor do nothing) and leave
+ it that way until you have discarded all incoming data via your normal
+ <function>PQisBusy</> and <function>PQgetResult</> loop.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
+ </sect1>
+
<sect1 id="libpq-events">
<title>Event System</title>
PQping 158
PQpingParams 159
PQlibVersion 160
+PQsetRowProcessor 161
+PQgetRowProcessor 162
+PQskipResult 163
conn->status = CONNECTION_AUTH_OK;
/*
- * Set asyncStatus so that PQsetResult will think that
+ * Set asyncStatus so that PQgetResult will think that
* what comes back next is the result of a query. See
* below.
*/
/* Zero all pointers and booleans */
MemSet(conn, 0, sizeof(PGconn));
+ /* install default row processor and notice hooks */
+ PQsetRowProcessor(conn, NULL, NULL);
conn->noticeHooks.noticeRec = defaultNoticeReceiver;
conn->noticeHooks.noticeProc = defaultNoticeProcessor;
+
conn->status = CONNECTION_BAD;
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
conn->inBuffer = (char *) malloc(conn->inBufSize);
conn->outBufSize = 16 * 1024;
conn->outBuffer = (char *) malloc(conn->outBufSize);
+ conn->rowBufLen = 32;
+ conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
if (conn->inBuffer == NULL ||
conn->outBuffer == NULL ||
+ conn->rowBuf == NULL ||
PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))
{
free(conn->inBuffer);
if (conn->outBuffer)
free(conn->outBuffer);
+ if (conn->rowBuf)
+ free(conn->rowBuf);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
* absent */
conn->asyncStatus = PGASYNC_IDLE;
- pqClearAsyncResult(conn); /* deallocate result and curTuple */
+ pqClearAsyncResult(conn); /* deallocate result */
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
conn->addrlist = NULL;
conn->addr_cur = NULL;
static PGEvent *dupEvents(PGEvent *events, int count);
+static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
+static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param);
static bool PQsendQueryStart(PGconn *conn);
static int PQsendQueryGuts(PGconn *conn,
const char *command,
const int *paramFormats,
int resultFormat);
static void parseInput(PGconn *conn);
+static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param);
static bool PQexecStart(PGconn *conn);
static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
/*
* Handy subroutine to deallocate any partially constructed async result.
*/
-
void
pqClearAsyncResult(PGconn *conn)
{
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
- conn->curTuple = NULL;
}
/*
*/
res = conn->result;
conn->result = NULL; /* handing over ownership to caller */
- conn->curTuple = NULL; /* just in case */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
* add a row pointer to the PGresult structure, growing it if necessary
* Returns TRUE if OK, FALSE if not enough memory to add the row
*/
-int
+static bool
pqAddTuple(PGresult *res, PGresAttValue *tup)
{
if (res->ntups >= res->tupArrSize)
}
+/*
+ * PQsetRowProcessor
+ * Set function that copies row data out from the network buffer,
+ * along with a passthrough parameter for it.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+ if (!conn)
+ return;
+
+ if (func)
+ {
+ /* set custom row processor */
+ conn->rowProcessor = func;
+ conn->rowProcessorParam = param;
+ }
+ else
+ {
+ /* set default row processor */
+ conn->rowProcessor = pqStdRowProcessor;
+ conn->rowProcessorParam = conn;
+ }
+}
+
+/*
+ * PQgetRowProcessor
+ * Get current row processor of PGconn.
+ * If param is not NULL, also store the passthrough parameter at *param.
+ */
+PQrowProcessor
+PQgetRowProcessor(const PGconn *conn, void **param)
+{
+ if (!conn)
+ {
+ if (param)
+ *param = NULL;
+ return NULL;
+ }
+
+ if (param)
+ *param = conn->rowProcessorParam;
+ return conn->rowProcessor;
+}
+
+/*
+ * pqStdRowProcessor
+ * Add the received row to the PGresult structure
+ * Returns 1 if OK, -1 if error occurred.
+ *
+ * Note: "param" should point to the PGconn, but we don't actually need that
+ * as of the current coding.
+ */
+static int
+pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param)
+{
+ int nfields = res->numAttributes;
+ PGresAttValue *tup;
+ int i;
+
+ if (columns == NULL)
+ {
+ /* New result set ... we have nothing to do in this function. */
+ return 1;
+ }
+
+ /*
+ * Basically we just allocate space in the PGresult for each field and
+ * copy the data over.
+ *
+ * Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
+ * which caller will take to mean "out of memory". This is preferable to
+ * trying to set up such a message here, because evidently there's not
+ * enough memory for gettext() to do anything.
+ */
+ tup = (PGresAttValue *)
+ pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ if (tup == NULL)
+ return -1;
+
+ for (i = 0; i < nfields; i++)
+ {
+ int clen = columns[i].len;
+
+ if (clen < 0)
+ {
+ /* null field */
+ tup[i].len = NULL_LEN;
+ tup[i].value = res->null_field;
+ }
+ else
+ {
+ bool isbinary = (res->attDescs[i].format != 0);
+ char *val;
+
+ val = (char *) pqResultAlloc(res, clen + 1, isbinary);
+ if (val == NULL)
+ return -1;
+
+ /* copy and zero-terminate the data (even if it's binary) */
+ memcpy(val, columns[i].value, clen);
+ val[clen] = '\0';
+
+ tup[i].len = clen;
+ tup[i].value = val;
+ }
+ }
+
+ /* And add the tuple to the PGresult's tuple array */
+ if (!pqAddTuple(res, tup))
+ return -1;
+
+ /* Success */
+ return 1;
+}
+
+
/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
/* initialize async result-accumulation state */
conn->result = NULL;
- conn->curTuple = NULL;
/* ready to send command message */
return true;
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
*/
static void
parseInput(PGconn *conn)
return res;
}
+/*
+ * PQskipResult
+ * Get the next PGresult produced by a query, but discard any data rows.
+ *
+ * This is mainly useful for cleaning up after a longjmp out of a row
+ * processor, when resuming processing of the current query result isn't
+ * wanted. Note that this is of little value in an async-style application,
+ * since any preceding calls to PQisBusy would have already called the regular
+ * row processor.
+ */
+PGresult *
+PQskipResult(PGconn *conn)
+{
+ PGresult *res;
+ PQrowProcessor savedRowProcessor;
+
+ if (!conn)
+ return NULL;
+
+ /* temporarily install dummy row processor */
+ savedRowProcessor = conn->rowProcessor;
+ conn->rowProcessor = dummyRowProcessor;
+ /* no need to save/change rowProcessorParam */
+
+ /* fetch the next result */
+ res = PQgetResult(conn);
+
+ /* restore previous row processor */
+ conn->rowProcessor = savedRowProcessor;
+
+ return res;
+}
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param)
+{
+ return 1;
+}
+
/*
* PQexec
* Silently discard any prior query result that application didn't eat.
* This is probably poor design, but it's here for backward compatibility.
*/
- while ((result = PQgetResult(conn)) != NULL)
+ while ((result = PQskipResult(conn)) != NULL)
{
ExecStatusType resultStatus = result->resultStatus;
#define LO_BUFSIZE 8192
static int lo_initialize(PGconn *conn);
-
-static Oid
- lo_import_internal(PGconn *conn, const char *filename, const Oid oid);
+static Oid lo_import_internal(PGconn *conn, const char *filename, Oid oid);
/*
* lo_open
PQArgBlock argv[2];
PGresult *res;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int retval;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int retval;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
PGresult *res;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int result_len;
int retval;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int retval;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int retval;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return InvalidOid;
int retval;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return InvalidOid;
PGresult *res;
int result_len;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
int result_len;
int retval;
- if (conn->lobjfuncs == NULL)
+ if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
}
static Oid
-lo_import_internal(PGconn *conn, const char *filename, const Oid oid)
+lo_import_internal(PGconn *conn, const char *filename, Oid oid)
{
int fd;
int nbytes,
int n;
const char *query;
const char *fname;
+ PQrowProcessor savedRowProcessor;
+ void *savedRowProcessorParam;
Oid foid;
+ if (!conn)
+ return -1;
+
/*
* Allocate the structure to hold the functions OID's
*/
"or proname = 'loread' "
"or proname = 'lowrite'";
+ /* Ensure the standard row processor is used to collect the result */
+ savedRowProcessor = conn->rowProcessor;
+ savedRowProcessorParam = conn->rowProcessorParam;
+ PQsetRowProcessor(conn, NULL, NULL);
+
res = PQexec(conn, query);
+
+ conn->rowProcessor = savedRowProcessor;
+ conn->rowProcessorParam = savedRowProcessorParam;
+
if (res == NULL)
{
free(lobjfuncs);
return 0;
}
+/*
+ * pqSkipnchar:
+ * skip over len bytes in input buffer.
+ *
+ * Note: this is primarily useful for its debug output, which should
+ * be exactly the same as for pqGetnchar. We assume the data in question
+ * will actually be used, but just isn't getting copied anywhere as yet.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+ if (len > (size_t) (conn->inEnd - conn->inCursor))
+ return EOF;
+
+ if (conn->Pfdebug)
+ {
+ fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len);
+ fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len);
+ fprintf(conn->Pfdebug, "\n");
+ }
+
+ conn->inCursor += len;
+
+ return 0;
+}
+
/*
* pqPutnchar:
* write exactly len bytes to the current message
PostgresPollingStatusType
pqSetenvPoll(PGconn *conn)
{
+ PostgresPollingStatusType result;
PGresult *res;
+ PQrowProcessor savedRowProcessor;
+ void *savedRowProcessorParam;
if (conn == NULL || conn->status == CONNECTION_BAD)
return PGRES_POLLING_FAILED;
+ /* Ensure the standard row processor is used to collect any results */
+ savedRowProcessor = conn->rowProcessor;
+ savedRowProcessorParam = conn->rowProcessorParam;
+ PQsetRowProcessor(conn, NULL, NULL);
+
/* Check whether there are any data for us */
switch (conn->setenv_state)
{
if (n < 0)
goto error_return;
if (n == 0)
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
break;
}
/* Should we raise an error if called when not active? */
case SETENV_STATE_IDLE:
- return PGRES_POLLING_OK;
+ result = PGRES_POLLING_OK;
+ goto normal_return;
default:
printfPQExpBuffer(&conn->errorMessage,
case SETENV_STATE_CLIENT_ENCODING_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
case SETENV_STATE_OPTION_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
- return PGRES_POLLING_READING;
+ result = PGRES_POLLING_READING;
+ goto normal_return;
}
case SETENV_STATE_QUERY1_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
- return PGRES_POLLING_READING;
+ result = PGRES_POLLING_READING;
+ goto normal_return;
}
case SETENV_STATE_QUERY2_WAIT:
{
if (PQisBusy(conn))
- return PGRES_POLLING_READING;
+ {
+ result = PGRES_POLLING_READING;
+ goto normal_return;
+ }
res = PQgetResult(conn);
{
/* Query finished, so we're done */
conn->setenv_state = SETENV_STATE_IDLE;
- return PGRES_POLLING_OK;
+ result = PGRES_POLLING_OK;
+ goto normal_return;
}
break;
}
error_return:
conn->setenv_state = SETENV_STATE_IDLE;
- return PGRES_POLLING_FAILED;
+ result = PGRES_POLLING_FAILED;
+
+normal_return:
+ conn->rowProcessor = savedRowProcessor;
+ conn->rowProcessorParam = savedRowProcessorParam;
+ return result;
}
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput2(PGconn *conn)
/* First 'T' in a query sequence */
if (getRowDescriptions(conn))
return;
+ /* getRowDescriptions() moves inStart itself */
+ continue;
}
else
{
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, FALSE))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else
{
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, TRUE))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else
{
/*
* parseInput subroutine to read a 'T' (row descriptions) message.
* We build a PGresult structure containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
*
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time. Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note that if we run out of data, we have to suspend and reprocess
+ * the message after more data is received. Otherwise, conn->inStart
+ * must get advanced past the processed data.
*/
static int
getRowDescriptions(PGconn *conn)
{
- PGresult *result = NULL;
+ PGresult *result;
int nfields;
+ const char *errmsg;
int i;
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
/* parseInput already read the 'T' label. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
- goto failure;
+ goto EOFexit;
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
pqGetInt(&typid, 4, conn) ||
pqGetInt(&typlen, 2, conn) ||
pqGetInt(&atttypmod, 4, conn))
- goto failure;
+ goto EOFexit;
/*
* Since pqGetInt treats 2-byte integers as unsigned, we need to
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
result->attDescs[i].tableid = 0;
result->attDescs[i].columnid = 0;
result->attDescs[i].format = 0;
/* Success! */
conn->result = result;
- return 0;
-failure:
- if (result)
+ /*
+ * Advance inStart to show that the "T" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
+
+ /* Give the row processor a chance to initialize for new result set */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /*
+ * Discard the failed message. Unfortunately we don't know for sure
+ * where the end is, so just throw away everything in the input buffer.
+ * This is not very desirable but it's the best we can do in protocol v2.
+ */
+ conn->inStart = conn->inEnd;
+
+set_error_result:
+
+ /*
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
+ */
+ pqClearAsyncResult(conn);
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+
+ /*
+ * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
+ * do to recover...
+ */
+ conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+ conn->asyncStatus = PGASYNC_READY;
+
+EOFexit:
+ if (result && result != conn->result)
PQclear(result);
return EOF;
}
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if completed message, EOF if error or not enough data
+ * received yet.
*
* Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. Otherwise, conn->inStart
+ * must get advanced past the processed data.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
-
+ const char *errmsg;
+ PGdataValue *rowbuf;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
char *bitmap = std_bitmap;
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
- result->binary = binary;
-
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
+ /* Resize row buffer if needed */
+ rowbuf = conn->rowBuf;
+ if (nfields > conn->rowBufLen)
{
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
- /*
- * If it's binary, fix the column format indicators. We assume the
- * backend will consistently send either B or D, not a mix.
- */
- if (binary)
+ rowbuf = (PGdataValue *) realloc(rowbuf,
+ nfields * sizeof(PGdataValue));
+ if (!rowbuf)
{
- for (i = 0; i < nfields; i++)
- result->attDescs[i].format = 1;
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
}
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
+ }
+
+ /* Save format specifier */
+ result->binary = binary;
+
+ /*
+ * If it's binary, fix the column format indicators. We assume the
+ * backend will consistently send either B or D, not a mix.
+ */
+ if (binary)
+ {
+ for (i = 0; i < nfields; i++)
+ result->attDescs[i].format = 1;
}
- tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
- goto outOfMemory;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
}
if (pqGetnchar(bitmap, nbytes, conn))
for (i = 0; i < nfields; i++)
{
+ /* get the value length */
if (!(bmap & 0200))
- {
- /* if the field value is absent, make it a null string */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- }
+ vlen = NULL_LEN;
+ else if (pqGetInt(&vlen, 4, conn))
+ goto EOFexit;
else
{
- /* get the value length (the first four bytes are for length) */
- if (pqGetInt(&vlen, 4, conn))
- goto EOFexit;
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
- if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- goto EOFexit;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
+ rowbuf[i].len = vlen;
+
+ /*
+ * rowbuf[i].value always points to the next address in the data
+ * buffer even if the value is NULL. This allows row processors to
+ * estimate data sizes more easily.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+ /* Skip over the data value */
+ if (vlen > 0)
+ {
+ if (pqSkipnchar(vlen, conn))
+ goto EOFexit;
+ }
+
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
bmap <<= 1;
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
-
+ /* Release bitmap now if we allocated it */
if (bitmap != std_bitmap)
free(bitmap);
- return 0;
+ bitmap = NULL;
+
+ /*
+ * Advance inStart to show that the "D" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
+
+ /* Pass the completed row values to rowProcessor */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
-outOfMemory:
- /* Replace partially constructed result with an error result */
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /*
+ * Discard the failed message. Unfortunately we don't know for sure
+ * where the end is, so just throw away everything in the input buffer.
+ * This is not very desirable but it's the best we can do in protocol v2.
+ */
+ conn->inStart = conn->inEnd;
+
+set_error_result:
/*
- * we do NOT use pqSaveErrorResult() here, because of the likelihood that
- * there's not enough memory to concatenate messages...
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
conn->asyncStatus = PGASYNC_READY;
- /* Discard the failed message --- good idea? */
- conn->inStart = conn->inEnd;
EOFexit:
if (bitmap != NULL && bitmap != std_bitmap)
{
int result = 1; /* return value if buffer overflows */
- if (conn->sock < 0)
+ if (conn->sock < 0 ||
+ conn->asyncStatus != PGASYNC_COPY_OUT)
{
*s = '\0';
return EOF;
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
-static int getRowDescriptions(PGconn *conn);
+static int getRowDescriptions(PGconn *conn, int msgLength);
static int getParamDescriptions(PGconn *conn);
static int getAnotherTuple(PGconn *conn, int msgLength);
static int getParameterStatus(PGconn *conn);
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput3(PGconn *conn)
conn->queryclass == PGQUERY_DESCRIBE)
{
/* First 'T' in a query sequence */
- if (getRowDescriptions(conn))
+ if (getRowDescriptions(conn, msgLength))
return;
-
- /*
- * If we're doing a Describe, we're ready to pass the
- * result back to the client.
- */
- if (conn->queryclass == PGQUERY_DESCRIBE)
- conn->asyncStatus = PGASYNC_READY;
+ /* getRowDescriptions() moves inStart itself */
+ continue;
}
else
{
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else if (conn->result != NULL &&
conn->result->resultStatus == PGRES_FATAL_ERROR)
* parseInput subroutine to read a 'T' (row descriptions) message.
* We'll build a new PGresult structure (unless called for a Describe
* command for a prepared statement) containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
*
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time. Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
*/
static int
-getRowDescriptions(PGconn *conn)
+getRowDescriptions(PGconn *conn, int msgLength)
{
PGresult *result;
int nfields;
+ const char *errmsg;
int i;
/*
else
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
/* parseInput already read the 'T' label and message length. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
- goto failure;
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"T\" message");
+ goto advance_and_error;
+ }
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
pqGetInt(&atttypmod, 4, conn) ||
pqGetInt(&format, 2, conn))
{
- goto failure;
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"T\" message");
+ goto advance_and_error;
}
/*
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
result->attDescs[i].tableid = tableid;
result->attDescs[i].columnid = columnid;
result->attDescs[i].format = format;
result->binary = 0;
}
+ /* Sanity check that we absorbed all the data */
+ if (conn->inCursor != conn->inStart + 5 + msgLength)
+ {
+ errmsg = libpq_gettext("extraneous data in \"T\" message");
+ goto advance_and_error;
+ }
+
/* Success! */
conn->result = result;
- return 0;
-failure:
+ /*
+ * Advance inStart to show that the "T" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
/*
- * Discard incomplete result, unless it's from getParamDescriptions.
- *
- * Note that if we hit a bufferload boundary while handling the
- * describe-statement case, we'll forget any PGresult space we just
- * allocated, and then reallocate it on next try. This will bloat the
- * PGresult a little bit but the space will be freed at PQclear, so it
- * doesn't seem worth trying to be smarter.
+ * If we're doing a Describe, we're done, and ready to pass the result
+ * back to the client.
*/
- if (result != conn->result)
+ if (conn->queryclass == PGQUERY_DESCRIBE)
+ {
+ conn->asyncStatus = PGASYNC_READY;
+ return 0;
+ }
+
+ /* Give the row processor a chance to initialize for new result set */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /* Discard unsaved result, if any */
+ if (result && result != conn->result)
PQclear(result);
- return EOF;
+
+ /* Discard the failed message by pretending we read it */
+ conn->inStart += 5 + msgLength;
+
+set_error_result:
+
+ /*
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
+ */
+ pqClearAsyncResult(conn);
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+ pqSaveErrorResult(conn);
+
+ /*
+ * Return zero to allow input parsing to continue. Subsequent "D"
+ * messages will be ignored until we get to end of data, since an error
+ * result is already set up.
+ */
+ return 0;
}
/*
/*
* parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
*
- * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ const char *errmsg;
+ PGdataValue *rowbuf;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
- {
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- }
- tup = conn->curTuple;
-
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
- return EOF;
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
+ }
if (tupnfields != nfields)
{
- /* Replace partially constructed result with an error result */
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("unexpected field count in \"D\" message\n"));
- pqSaveErrorResult(conn);
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
- return 0;
+ errmsg = libpq_gettext("unexpected field count in \"D\" message");
+ goto advance_and_error;
+ }
+
+ /* Resize row buffer if needed */
+ rowbuf = conn->rowBuf;
+ if (nfields > conn->rowBufLen)
+ {
+ rowbuf = (PGdataValue *) realloc(rowbuf,
+ nfields * sizeof(PGdataValue));
+ if (!rowbuf)
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
}
/* Scan the fields */
{
/* get the value length */
if (pqGetInt(&vlen, 4, conn))
- return EOF;
- if (vlen == -1)
{
- /* null field */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- continue;
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
}
- if (vlen < 0)
- vlen = 0;
- if (tup[i].value == NULL)
- {
- bool isbinary = (result->attDescs[i].format != 0);
+ rowbuf[i].len = vlen;
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
+ /*
+ * rowbuf[i].value always points to the next address in the data
+ * buffer even if the value is NULL. This allows row processors to
+ * estimate data sizes more easily.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+ /* Skip over the data value */
if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- return EOF;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
+ {
+ if (pqSkipnchar(vlen, conn))
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
+ }
+ }
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
+ /* Sanity check that we absorbed all the data */
+ if (conn->inCursor != conn->inStart + 5 + msgLength)
+ {
+ errmsg = libpq_gettext("extraneous data in \"D\" message");
+ goto advance_and_error;
+ }
- return 0;
+ /*
+ * Advance inStart to show that the "D" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
-outOfMemory:
+ /* Pass the completed row values to rowProcessor */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /* Discard the failed message by pretending we read it */
+ conn->inStart += 5 + msgLength;
+
+set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
pqSaveErrorResult(conn);
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
+ /*
+ * Return zero to allow input parsing to continue. Subsequent "D"
+ * messages will be ignored until we get to end of data, since an error
+ * result is already set up.
+ */
return 0;
}
/* Application-visible enum types */
+/*
+ * Although it is okay to add to these lists, values which become unused
+ * should never be removed, nor should constants be redefined - that would
+ * break compatibility with existing code.
+ */
+
typedef enum
{
- /*
- * Although it is okay to add to this list, values which become unused
- * should never be removed, nor should constants be redefined - that would
- * break compatibility with existing code.
- */
CONNECTION_OK,
CONNECTION_BAD,
/* Non-blocking mode only below here */
*/
typedef struct pg_result PGresult;
+/* PGdataValue represents a data field value being passed to a row processor.
+ * It could be either text or binary data; text data is not zero-terminated.
+ * A SQL NULL is represented by len < 0; then value is still valid but there
+ * are no data bytes there.
+ */
+typedef struct pgDataValue
+{
+ int len; /* data length in bytes, or <0 if NULL */
+ const char *value; /* data value, without zero-termination */
+} PGdataValue;
+
/* PGcancel encapsulates the information needed to cancel a running
* query on an existing connection.
* The contents of this struct are not supposed to be known to applications.
struct pgNotify *next; /* list link */
} PGnotify;
+/* Function type for row-processor callback */
+typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param);
+
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
const int *paramFormats,
int resultFormat);
extern PGresult *PQgetResult(PGconn *conn);
+extern PGresult *PQskipResult(PGconn *conn);
/* Routines for managing an asynchronous query */
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
+/* Override default per-row processing */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
+
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
/* Optional file to write trace info to */
FILE *Pfdebug;
+ /* Callback procedure for per-row processing */
+ PQrowProcessor rowProcessor; /* function pointer */
+ void *rowProcessorParam; /* passthrough argument */
+
/* Callback procedures for notice message processing */
PGNoticeHooks noticeHooks;
* msg has no length word */
int outMsgEnd; /* offset to msg end (so far) */
+ /* Row processor interface workspace */
+ PGdataValue *rowBuf; /* array for passing values to rowProcessor */
+ int rowBufLen; /* number of entries allocated in rowBuf */
+
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
- PGresAttValue *curTuple; /* tuple currently being read */
+
+ /* Assorted state for SSL, GSS, etc */
#ifdef USE_SSL
bool allow_ssl_try; /* Allowed to try SSL negotiation */
* connection */
#endif
-
/* Buffer for current error message */
PQExpBufferData errorMessage; /* expansible string */
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/* This lets gcc check the format string for consistency. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int pqAddTuple(PGresult *res, PGresAttValue *tup);
extern void pqSaveMessageField(PGresult *res, char code,
const char *value);
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
extern int pqPuts(const char *s, PGconn *conn);
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
+extern int pqSkipnchar(size_t len, PGconn *conn);
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
extern int pqPutInt(int value, size_t bytes, PGconn *conn);