From: Tom Lane Date: Wed, 4 Apr 2012 22:27:56 +0000 (-0400) Subject: Add a "row processor" API to libpq for better handling of large results. X-Git-Tag: REL9_2_BETA1~190 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=92785dac2ee7026948962cd61c4cd84a2d052772;p=postgresql Add a "row processor" API to libpq for better handling of large results. Traditionally libpq has collected an entire query result before passing it back to the application. That provides a simple and transactional API, but it's pretty inefficient for large result sets. This patch allows the application to process each row on-the-fly instead of accumulating the rows into the PGresult. Error recovery becomes a bit more complex, but often that tradeoff is well worth making. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane --- diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 96064bbb0d..0ec501e5bd 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -5581,6 +5581,274 @@ defaultNoticeProcessor(void *arg, const char *message) + + Custom Row Processing + + + PQrowProcessor + + + + row processor + in libpq + + + + Ordinarily, when receiving a query result from the server, + libpq adds each row value to the current + PGresult until the entire result set is received; then + the PGresult is returned to the application as a unit. + This approach is simple to work with, but becomes inefficient for large + result sets. To improve performance, an application can register a + custom row processor function that processes each row + as the data is received from the network. The custom row processor could + process the data fully, or store it into some application-specific data + structure for later processing. + + + + + The row processor function sees the rows before it is known whether the + query will succeed overall, since the server might return some rows before + encountering an error. For proper transactional behavior, it must be + possible to discard or undo whatever the row processor has done, if the + query ultimately fails. + + + + + When using a custom row processor, row data is not accumulated into the + PGresult, so the PGresult ultimately delivered to + the application will contain no rows (PQntuples = + 0). However, it still has PQresultStatus = + PGRES_TUPLES_OK, and it contains correct information about the + set of columns in the query result. On the other hand, if the query fails + partway through, the returned PGresult has + PQresultStatus = PGRES_FATAL_ERROR. The + application must be prepared to undo any actions of the row processor + whenever it gets a PGRES_FATAL_ERROR result. + + + + A custom row processor is registered for a particular connection by + calling PQsetRowProcessor, described below. + This row processor will be used for all subsequent query results on that + connection until changed again. A row processor function must have a + signature matching + + +typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); + + where PGdataValue is described by + +typedef struct pgDataValue +{ + int len; /* data length in bytes, or <0 if NULL */ + const char *value; /* data value, without zero-termination */ +} PGdataValue; + + + + + The res parameter is the PGRES_TUPLES_OK + PGresult that will eventually be delivered to the calling + application (if no error intervenes). It contains information about + the set of columns in the query result, but no row data. In particular the + row processor must fetch PQnfields(res) to know the number of + data columns. + + + + Immediately after libpq has determined the result set's + column information, it will make a call to the row processor with + columns set to NULL, but the other parameters as + usual. The row processor can use this call to initialize for a new result + set; if it has nothing to do, it can just return 1. In + subsequent calls, one per received row, columns + is non-NULL and points to an array of PGdataValue structs, one per + data column. + + + + errmsgp is an output parameter used only for error + reporting. If the row processor needs to report an error, it can set + *errmsgp to point to a suitable message + string (and then return -1). As a special case, returning + -1 without changing *errmsgp + from its initial value of NULL is taken to mean out of memory. + + + + The last parameter, param, is just a void pointer + passed through from PQsetRowProcessor. This can be + used for communication between the row processor function and the + surrounding application. + + + + In the PGdataValue array passed to a row processor, data values + cannot be assumed to be zero-terminated, whether the data format is text + or binary. A SQL NULL value is indicated by a negative length field. + + + + The row processor must process the row data values + immediately, or else copy them into application-controlled storage. + The value pointers passed to the row processor point into + libpq's internal data input buffer, which will be + overwritten by the next packet fetch. + + + + The row processor function must return either 1 or + -1. + 1 is the normal, successful result value; libpq + will continue with receiving row values from the server and passing them to + the row processor. -1 indicates that the row processor has + encountered an error. In that case, + libpq will discard all remaining rows in the result set + and then return a PGRES_FATAL_ERROR PGresult to + the application (containing the specified error message, or out of + memory for query result if *errmsgp + was left as NULL). + + + + Another option for exiting a row processor is to throw an exception using + C's longjmp() or C++'s throw. If this is done, + processing of the incoming data can be resumed later by calling + PQgetResult; the row processor will be invoked as normal for + any remaining rows in the current result. + As with any usage of PQgetResult, the application + should continue calling PQgetResult until it gets a NULL + result before issuing any new query. + + + + In some cases, an exception may mean that the remainder of the + query result is not interesting. In such cases the application can discard + the remaining rows with PQskipResult, described below. + Another possible recovery option is to close the connection altogether with + PQfinish. + + + + + + + PQsetRowProcessor + + PQsetRowProcessor + + + + + + Sets a callback function to process each row. + + +void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); + + + + + The specified row processor function func is installed as + the active row processor for the given connection conn. + Also, param is installed as the passthrough pointer to + pass to it. Alternatively, if func is NULL, the standard + row processor is reinstalled on the given connection (and + param is ignored). + + + + Although the row processor can be changed at any time in the life of a + connection, it's generally unwise to do so while a query is active. + In particular, when using asynchronous mode, be aware that both + PQisBusy and PQgetResult can call the current + row processor. + + + + + + + PQgetRowProcessor + + PQgetRowProcessor + + + + + + Fetches the current row processor for the specified connection. + + +PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param); + + + + + In addition to returning the row processor function pointer, the + current passthrough pointer will be returned at + *param, if param is not NULL. + + + + + + + PQskipResult + + PQskipResult + + + + + + Discard all the remaining rows in the incoming result set. + + +PGresult *PQskipResult(PGconn *conn); + + + + + This is a simple convenience function to discard incoming data after a + row processor has failed or it's determined that the rest of the result + set is not interesting. PQskipResult is exactly + equivalent to PQgetResult except that it transiently + installs a dummy row processor function that just discards data. + The returned PGresult can be discarded without further ado + if it has status PGRES_TUPLES_OK; but other status values + should be handled normally. (In particular, + PGRES_FATAL_ERROR indicates a server-reported error that + will still need to be dealt with.) + As when using PQgetResult, one should usually repeat the + call until NULL is returned to ensure the connection has reached an + idle state. Another possible usage is to call + PQskipResult just once, and then resume using + PQgetResult to process subsequent result sets normally. + + + + Because PQskipResult will wait for server input, it is not + very useful in asynchronous applications. In particular you should not + code a loop of PQisBusy and PQskipResult, + because that will result in the installed row processor being called + within PQisBusy. To get the proper behavior in an + asynchronous application, you'll need to install a dummy row processor + (or set a flag to make your normal row processor do nothing) and leave + it that way until you have discarded all incoming data via your normal + PQisBusy and PQgetResult loop. + + + + + + + + Event System diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df699e..1251455f1f 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157 PQping 158 PQpingParams 159 PQlibVersion 160 +PQsetRowProcessor 161 +PQgetRowProcessor 162 +PQskipResult 163 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 6a20a1485d..03fd6e45bb 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2425,7 +2425,7 @@ keep_going: /* We will come back to here until there is conn->status = CONNECTION_AUTH_OK; /* - * Set asyncStatus so that PQsetResult will think that + * Set asyncStatus so that PQgetResult will think that * what comes back next is the result of a query. See * below. */ @@ -2686,8 +2686,11 @@ makeEmptyPGconn(void) /* Zero all pointers and booleans */ MemSet(conn, 0, sizeof(PGconn)); + /* install default row processor and notice hooks */ + PQsetRowProcessor(conn, NULL, NULL); conn->noticeHooks.noticeRec = defaultNoticeReceiver; conn->noticeHooks.noticeProc = defaultNoticeProcessor; + conn->status = CONNECTION_BAD; conn->asyncStatus = PGASYNC_IDLE; conn->xactStatus = PQTRANS_IDLE; @@ -2721,11 +2724,14 @@ makeEmptyPGconn(void) conn->inBuffer = (char *) malloc(conn->inBufSize); conn->outBufSize = 16 * 1024; conn->outBuffer = (char *) malloc(conn->outBufSize); + conn->rowBufLen = 32; + conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue)); initPQExpBuffer(&conn->errorMessage); initPQExpBuffer(&conn->workBuffer); if (conn->inBuffer == NULL || conn->outBuffer == NULL || + conn->rowBuf == NULL || PQExpBufferBroken(&conn->errorMessage) || PQExpBufferBroken(&conn->workBuffer)) { @@ -2829,6 +2835,8 @@ freePGconn(PGconn *conn) free(conn->inBuffer); if (conn->outBuffer) free(conn->outBuffer); + if (conn->rowBuf) + free(conn->rowBuf); termPQExpBuffer(&conn->errorMessage); termPQExpBuffer(&conn->workBuffer); @@ -2888,7 +2896,7 @@ closePGconn(PGconn *conn) conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just * absent */ conn->asyncStatus = PGASYNC_IDLE; - pqClearAsyncResult(conn); /* deallocate result and curTuple */ + pqClearAsyncResult(conn); /* deallocate result */ pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); conn->addrlist = NULL; conn->addr_cur = NULL; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566a5d..86f157c338 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -50,6 +50,9 @@ static bool static_std_strings = false; static PGEvent *dupEvents(PGEvent *events, int count); +static bool pqAddTuple(PGresult *res, PGresAttValue *tup); +static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); static bool PQsendQueryStart(PGconn *conn); static int PQsendQueryGuts(PGconn *conn, const char *command, @@ -61,6 +64,8 @@ static int PQsendQueryGuts(PGconn *conn, const int *paramFormats, int resultFormat); static void parseInput(PGconn *conn); +static int dummyRowProcessor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); static bool PQexecStart(PGconn *conn); static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, @@ -694,14 +699,12 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. */ - void pqClearAsyncResult(PGconn *conn) { if (conn->result) PQclear(conn->result); conn->result = NULL; - conn->curTuple = NULL; } /* @@ -756,7 +759,6 @@ pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handing over ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -832,7 +834,7 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...) * add a row pointer to the PGresult structure, growing it if necessary * Returns TRUE if OK, FALSE if not enough memory to add the row */ -int +static bool pqAddTuple(PGresult *res, PGresAttValue *tup) { if (res->ntups >= res->tupArrSize) @@ -978,6 +980,124 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) } +/* + * PQsetRowProcessor + * Set function that copies row data out from the network buffer, + * along with a passthrough parameter for it. + */ +void +PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) +{ + if (!conn) + return; + + if (func) + { + /* set custom row processor */ + conn->rowProcessor = func; + conn->rowProcessorParam = param; + } + else + { + /* set default row processor */ + conn->rowProcessor = pqStdRowProcessor; + conn->rowProcessorParam = conn; + } +} + +/* + * PQgetRowProcessor + * Get current row processor of PGconn. + * If param is not NULL, also store the passthrough parameter at *param. + */ +PQrowProcessor +PQgetRowProcessor(const PGconn *conn, void **param) +{ + if (!conn) + { + if (param) + *param = NULL; + return NULL; + } + + if (param) + *param = conn->rowProcessorParam; + return conn->rowProcessor; +} + +/* + * pqStdRowProcessor + * Add the received row to the PGresult structure + * Returns 1 if OK, -1 if error occurred. + * + * Note: "param" should point to the PGconn, but we don't actually need that + * as of the current coding. + */ +static int +pqStdRowProcessor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param) +{ + int nfields = res->numAttributes; + PGresAttValue *tup; + int i; + + if (columns == NULL) + { + /* New result set ... we have nothing to do in this function. */ + return 1; + } + + /* + * Basically we just allocate space in the PGresult for each field and + * copy the data over. + * + * Note: on malloc failure, we return -1 leaving *errmsgp still NULL, + * which caller will take to mean "out of memory". This is preferable to + * trying to set up such a message here, because evidently there's not + * enough memory for gettext() to do anything. + */ + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) + return -1; + + for (i = 0; i < nfields; i++) + { + int clen = columns[i].len; + + if (clen < 0) + { + /* null field */ + tup[i].len = NULL_LEN; + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + char *val; + + val = (char *) pqResultAlloc(res, clen + 1, isbinary); + if (val == NULL) + return -1; + + /* copy and zero-terminate the data (even if it's binary) */ + memcpy(val, columns[i].value, clen); + val[clen] = '\0'; + + tup[i].len = clen; + tup[i].value = val; + } + } + + /* And add the tuple to the PGresult's tuple array */ + if (!pqAddTuple(res, tup)) + return -1; + + /* Success */ + return 1; +} + + /* * PQsendQuery * Submit a query, but don't wait for it to finish @@ -1223,7 +1343,6 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result = NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; @@ -1468,6 +1587,9 @@ PQconsumeInput(PGconn *conn) * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. + * + * Note: callers of parseInput must be prepared for a longjmp exit when we are + * in PGASYNC_BUSY state, since an external row processor might do that. */ static void parseInput(PGconn *conn) @@ -1615,6 +1737,49 @@ PQgetResult(PGconn *conn) return res; } +/* + * PQskipResult + * Get the next PGresult produced by a query, but discard any data rows. + * + * This is mainly useful for cleaning up after a longjmp out of a row + * processor, when resuming processing of the current query result isn't + * wanted. Note that this is of little value in an async-style application, + * since any preceding calls to PQisBusy would have already called the regular + * row processor. + */ +PGresult * +PQskipResult(PGconn *conn) +{ + PGresult *res; + PQrowProcessor savedRowProcessor; + + if (!conn) + return NULL; + + /* temporarily install dummy row processor */ + savedRowProcessor = conn->rowProcessor; + conn->rowProcessor = dummyRowProcessor; + /* no need to save/change rowProcessorParam */ + + /* fetch the next result */ + res = PQgetResult(conn); + + /* restore previous row processor */ + conn->rowProcessor = savedRowProcessor; + + return res; +} + +/* + * Do-nothing row processor for PQskipResult + */ +static int +dummyRowProcessor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param) +{ + return 1; +} + /* * PQexec @@ -1721,7 +1886,7 @@ PQexecStart(PGconn *conn) * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. */ - while ((result = PQgetResult(conn)) != NULL) + while ((result = PQskipResult(conn)) != NULL) { ExecStatusType resultStatus = result->resultStatus; diff --git a/src/interfaces/libpq/fe-lobj.c b/src/interfaces/libpq/fe-lobj.c index 29752270a1..13fd98c2f9 100644 --- a/src/interfaces/libpq/fe-lobj.c +++ b/src/interfaces/libpq/fe-lobj.c @@ -40,9 +40,7 @@ #define LO_BUFSIZE 8192 static int lo_initialize(PGconn *conn); - -static Oid - lo_import_internal(PGconn *conn, const char *filename, const Oid oid); +static Oid lo_import_internal(PGconn *conn, const char *filename, Oid oid); /* * lo_open @@ -59,7 +57,7 @@ lo_open(PGconn *conn, Oid lobjId, int mode) PQArgBlock argv[2]; PGresult *res; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -101,7 +99,7 @@ lo_close(PGconn *conn, int fd) int retval; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -139,7 +137,7 @@ lo_truncate(PGconn *conn, int fd, size_t len) int retval; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -192,7 +190,7 @@ lo_read(PGconn *conn, int fd, char *buf, size_t len) PGresult *res; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -234,7 +232,7 @@ lo_write(PGconn *conn, int fd, const char *buf, size_t len) int result_len; int retval; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -280,7 +278,7 @@ lo_lseek(PGconn *conn, int fd, int offset, int whence) int retval; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -328,7 +326,7 @@ lo_creat(PGconn *conn, int mode) int retval; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return InvalidOid; @@ -367,7 +365,7 @@ lo_create(PGconn *conn, Oid lobjId) int retval; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return InvalidOid; @@ -413,7 +411,7 @@ lo_tell(PGconn *conn, int fd) PGresult *res; int result_len; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -451,7 +449,7 @@ lo_unlink(PGconn *conn, Oid lobjId) int result_len; int retval; - if (conn->lobjfuncs == NULL) + if (conn == NULL || conn->lobjfuncs == NULL) { if (lo_initialize(conn) < 0) return -1; @@ -505,7 +503,7 @@ lo_import_with_oid(PGconn *conn, const char *filename, Oid lobjId) } static Oid -lo_import_internal(PGconn *conn, const char *filename, const Oid oid) +lo_import_internal(PGconn *conn, const char *filename, Oid oid) { int fd; int nbytes, @@ -684,8 +682,13 @@ lo_initialize(PGconn *conn) int n; const char *query; const char *fname; + PQrowProcessor savedRowProcessor; + void *savedRowProcessorParam; Oid foid; + if (!conn) + return -1; + /* * Allocate the structure to hold the functions OID's */ @@ -729,7 +732,16 @@ lo_initialize(PGconn *conn) "or proname = 'loread' " "or proname = 'lowrite'"; + /* Ensure the standard row processor is used to collect the result */ + savedRowProcessor = conn->rowProcessor; + savedRowProcessorParam = conn->rowProcessorParam; + PQsetRowProcessor(conn, NULL, NULL); + res = PQexec(conn, query); + + conn->rowProcessor = savedRowProcessor; + conn->rowProcessorParam = savedRowProcessorParam; + if (res == NULL) { free(lobjfuncs); diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index ce0eac3f71..b5e5519c41 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -218,6 +218,32 @@ pqGetnchar(char *s, size_t len, PGconn *conn) return 0; } +/* + * pqSkipnchar: + * skip over len bytes in input buffer. + * + * Note: this is primarily useful for its debug output, which should + * be exactly the same as for pqGetnchar. We assume the data in question + * will actually be used, but just isn't getting copied anywhere as yet. + */ +int +pqSkipnchar(size_t len, PGconn *conn) +{ + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + if (conn->Pfdebug) + { + fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len); + fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len); + fprintf(conn->Pfdebug, "\n"); + } + + conn->inCursor += len; + + return 0; +} + /* * pqPutnchar: * write exactly len bytes to the current message diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c38993b8..43f9954dd1 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -49,11 +49,19 @@ static int getNotify(PGconn *conn); PostgresPollingStatusType pqSetenvPoll(PGconn *conn) { + PostgresPollingStatusType result; PGresult *res; + PQrowProcessor savedRowProcessor; + void *savedRowProcessorParam; if (conn == NULL || conn->status == CONNECTION_BAD) return PGRES_POLLING_FAILED; + /* Ensure the standard row processor is used to collect any results */ + savedRowProcessor = conn->rowProcessor; + savedRowProcessorParam = conn->rowProcessorParam; + PQsetRowProcessor(conn, NULL, NULL); + /* Check whether there are any data for us */ switch (conn->setenv_state) { @@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn) if (n < 0) goto error_return; if (n == 0) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } break; } @@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn) /* Should we raise an error if called when not active? */ case SETENV_STATE_IDLE: - return PGRES_POLLING_OK; + result = PGRES_POLLING_OK; + goto normal_return; default: printfPQExpBuffer(&conn->errorMessage, @@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_CLIENT_ENCODING_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_OPTION_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY1_WAIT; - return PGRES_POLLING_READING; + result = PGRES_POLLING_READING; + goto normal_return; } case SETENV_STATE_QUERY1_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY2_WAIT; - return PGRES_POLLING_READING; + result = PGRES_POLLING_READING; + goto normal_return; } case SETENV_STATE_QUERY2_WAIT: { if (PQisBusy(conn)) - return PGRES_POLLING_READING; + { + result = PGRES_POLLING_READING; + goto normal_return; + } res = PQgetResult(conn); @@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn) { /* Query finished, so we're done */ conn->setenv_state = SETENV_STATE_IDLE; - return PGRES_POLLING_OK; + result = PGRES_POLLING_OK; + goto normal_return; } break; } @@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn) error_return: conn->setenv_state = SETENV_STATE_IDLE; - return PGRES_POLLING_FAILED; + result = PGRES_POLLING_FAILED; + +normal_return: + conn->rowProcessor = savedRowProcessor; + conn->rowProcessorParam = savedRowProcessorParam; + return result; } @@ -406,6 +438,9 @@ error_return: * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. + * + * Note: callers of parseInput must be prepared for a longjmp exit when we are + * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput2(PGconn *conn) @@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn) /* First 'T' in a query sequence */ if (getRowDescriptions(conn)) return; + /* getRowDescriptions() moves inStart itself */ + continue; } else { @@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, FALSE)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else { @@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, TRUE)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else { @@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn) /* * parseInput subroutine to read a 'T' (row descriptions) message. * We build a PGresult structure containing the attribute data. - * Returns: 0 if completed message, EOF if not enough data yet. + * Returns: 0 if completed message, EOF if error or not enough data + * received yet. * - * Note that if we run out of data, we have to release the partially - * constructed PGresult, and rebuild it again next time. Fortunately, - * that shouldn't happen often, since 'T' messages usually fit in a packet. + * Note that if we run out of data, we have to suspend and reprocess + * the message after more data is received. Otherwise, conn->inStart + * must get advanced past the processed data. */ static int getRowDescriptions(PGconn *conn) { - PGresult *result = NULL; + PGresult *result; int nfields; + const char *errmsg; int i; result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK); if (!result) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } /* parseInput already read the 'T' label. */ /* the next two bytes are the number of fields */ if (pqGetInt(&(result->numAttributes), 2, conn)) - goto failure; + goto EOFexit; nfields = result->numAttributes; /* allocate space for the attribute descriptors */ @@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn) result->attDescs = (PGresAttDesc *) pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); if (!result->attDescs) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); } @@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn) pqGetInt(&typid, 4, conn) || pqGetInt(&typlen, 2, conn) || pqGetInt(&atttypmod, 4, conn)) - goto failure; + goto EOFexit; /* * Since pqGetInt treats 2-byte integers as unsigned, we need to @@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn) result->attDescs[i].name = pqResultStrdup(result, conn->workBuffer.data); if (!result->attDescs[i].name) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } result->attDescs[i].tableid = 0; result->attDescs[i].columnid = 0; result->attDescs[i].format = 0; @@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn) /* Success! */ conn->result = result; - return 0; -failure: - if (result) + /* + * Advance inStart to show that the "T" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; + + /* Give the row processor a chance to initialize for new result set */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, NULL, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* + * Discard the failed message. Unfortunately we don't know for sure + * where the end is, so just throw away everything in the input buffer. + * This is not very desirable but it's the best we can do in protocol v2. + */ + conn->inStart = conn->inEnd; + +set_error_result: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + + /* + * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can + * do to recover... + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); + conn->asyncStatus = PGASYNC_READY; + +EOFexit: + if (result && result != conn->result) PQclear(result); return EOF; } /* * parseInput subroutine to read a 'B' or 'D' (row data) message. - * We add another tuple to the existing PGresult structure. - * Returns: 0 if completed message, EOF if error or not enough data yet. + * We fill rowbuf with column pointers and then call the row processor. + * Returns: 0 if completed message, EOF if error or not enough data + * received yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. Otherwise, conn->inStart + * must get advanced past the processed data. */ static int getAnotherTuple(PGconn *conn, bool binary) { PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; - + const char *errmsg; + PGdataValue *rowbuf; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ char *bitmap = std_bitmap; @@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary) int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value */ - result->binary = binary; - - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + /* Resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) + rowbuf = (PGdataValue *) realloc(rowbuf, + nfields * sizeof(PGdataValue)); + if (!rowbuf) { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + + /* Save format specifier */ + result->binary = binary; + + /* + * If it's binary, fix the column format indicators. We assume the + * backend will consistently send either B or D, not a mix. + */ + if (binary) + { + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } } if (pqGetnchar(bitmap, nbytes, conn)) @@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { + /* get the value length */ if (!(bmap & 0200)) - { - /* if the field value is absent, make it a null string */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - } + vlen = NULL_LEN; + else if (pqGetInt(&vlen, 4, conn)) + goto EOFexit; else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } + rowbuf[i].len = vlen; + + /* + * rowbuf[i].value always points to the next address in the data + * buffer even if the value is NULL. This allows row processors to + * estimate data sizes more easily. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + + /* Skip over the data value */ + if (vlen > 0) + { + if (pqSkipnchar(vlen, conn)) + goto EOFexit; + } + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) @@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; - + /* Release bitmap now if we allocated it */ if (bitmap != std_bitmap) free(bitmap); - return 0; + bitmap = NULL; + + /* + * Advance inStart to show that the "D" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; + + /* Pass the completed row values to rowProcessor */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; -outOfMemory: - /* Replace partially constructed result with an error result */ + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* + * Discard the failed message. Unfortunately we don't know for sure + * where the end is, so just throw away everything in the input buffer. + * This is not very desirable but it's the best we can do in protocol v2. + */ + conn->inStart = conn->inEnd; + +set_error_result: /* - * we do NOT use pqSaveErrorResult() here, because of the likelihood that - * there's not enough memory to concatenate messages... + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can @@ -838,8 +994,6 @@ outOfMemory: */ conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); conn->asyncStatus = PGASYNC_READY; - /* Discard the failed message --- good idea? */ - conn->inStart = conn->inEnd; EOFexit: if (bitmap != NULL && bitmap != std_bitmap) @@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen) { int result = 1; /* return value if buffer overflows */ - if (conn->sock < 0) + if (conn->sock < 0 || + conn->asyncStatus != PGASYNC_COPY_OUT) { *s = '\0'; return EOF; diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbcb00..a773d7a524 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -44,7 +44,7 @@ static void handleSyncLoss(PGconn *conn, char id, int msgLength); -static int getRowDescriptions(PGconn *conn); +static int getRowDescriptions(PGconn *conn, int msgLength); static int getParamDescriptions(PGconn *conn); static int getAnotherTuple(PGconn *conn, int msgLength); static int getParameterStatus(PGconn *conn); @@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet, * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. + * + * Note: callers of parseInput must be prepared for a longjmp exit when we are + * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput3(PGconn *conn) @@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn) conn->queryclass == PGQUERY_DESCRIBE) { /* First 'T' in a query sequence */ - if (getRowDescriptions(conn)) + if (getRowDescriptions(conn, msgLength)) return; - - /* - * If we're doing a Describe, we're ready to pass the - * result back to the client. - */ - if (conn->queryclass == PGQUERY_DESCRIBE) - conn->asyncStatus = PGASYNC_READY; + /* getRowDescriptions() moves inStart itself */ + continue; } else { @@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else if (conn->result != NULL && conn->result->resultStatus == PGRES_FATAL_ERROR) @@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) * parseInput subroutine to read a 'T' (row descriptions) message. * We'll build a new PGresult structure (unless called for a Describe * command for a prepared statement) containing the attribute data. - * Returns: 0 if completed message, EOF if not enough data yet. + * Returns: 0 if processed message successfully, EOF to suspend parsing + * (the latter case is not actually used currently). + * In either case, conn->inStart has been advanced past the message. * - * Note that if we run out of data, we have to release the partially - * constructed PGresult, and rebuild it again next time. Fortunately, - * that shouldn't happen often, since 'T' messages usually fit in a packet. + * Note: the row processor could also choose to longjmp out of libpq, + * in which case the library's state must allow for resumption at the + * next message. */ static int -getRowDescriptions(PGconn *conn) +getRowDescriptions(PGconn *conn, int msgLength) { PGresult *result; int nfields; + const char *errmsg; int i; /* @@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn) else result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK); if (!result) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } /* parseInput already read the 'T' label and message length. */ /* the next two bytes are the number of fields */ if (pqGetInt(&(result->numAttributes), 2, conn)) - goto failure; + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"T\" message"); + goto advance_and_error; + } nfields = result->numAttributes; /* allocate space for the attribute descriptors */ @@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn) result->attDescs = (PGresAttDesc *) pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); if (!result->attDescs) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); } @@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn) pqGetInt(&atttypmod, 4, conn) || pqGetInt(&format, 2, conn)) { - goto failure; + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"T\" message"); + goto advance_and_error; } /* @@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn) result->attDescs[i].name = pqResultStrdup(result, conn->workBuffer.data); if (!result->attDescs[i].name) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } result->attDescs[i].tableid = tableid; result->attDescs[i].columnid = columnid; result->attDescs[i].format = format; @@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn) result->binary = 0; } + /* Sanity check that we absorbed all the data */ + if (conn->inCursor != conn->inStart + 5 + msgLength) + { + errmsg = libpq_gettext("extraneous data in \"T\" message"); + goto advance_and_error; + } + /* Success! */ conn->result = result; - return 0; -failure: + /* + * Advance inStart to show that the "T" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; /* - * Discard incomplete result, unless it's from getParamDescriptions. - * - * Note that if we hit a bufferload boundary while handling the - * describe-statement case, we'll forget any PGresult space we just - * allocated, and then reallocate it on next try. This will bloat the - * PGresult a little bit but the space will be freed at PQclear, so it - * doesn't seem worth trying to be smarter. + * If we're doing a Describe, we're done, and ready to pass the result + * back to the client. */ - if (result != conn->result) + if (conn->queryclass == PGQUERY_DESCRIBE) + { + conn->asyncStatus = PGASYNC_READY; + return 0; + } + + /* Give the row processor a chance to initialize for new result set */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, NULL, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* Discard unsaved result, if any */ + if (result && result != conn->result) PQclear(result); - return EOF; + + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; + +set_error_result: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + pqSaveErrorResult(conn); + + /* + * Return zero to allow input parsing to continue. Subsequent "D" + * messages will be ignored until we get to end of data, since an error + * result is already set up. + */ + return 0; } /* @@ -613,47 +691,53 @@ failure: /* * parseInput subroutine to read a 'D' (row data) message. - * We add another tuple to the existing PGresult structure. - * Returns: 0 if completed message, EOF if error or not enough data yet. + * We fill rowbuf with column pointers and then call the row processor. + * Returns: 0 if processed message successfully, EOF to suspend parsing + * (the latter case is not actually used currently). + * In either case, conn->inStart has been advanced past the message. * - * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * Note: the row processor could also choose to longjmp out of libpq, + * in which case the library's state must allow for resumption at the + * next message. */ static int getAnotherTuple(PGconn *conn, int msgLength) { PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + const char *errmsg; + PGdataValue *rowbuf; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; - /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) - return EOF; + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; + } if (tupnfields != nfields) { - /* Replace partially constructed result with an error result */ - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("unexpected field count in \"D\" message\n")); - pqSaveErrorResult(conn); - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; - return 0; + errmsg = libpq_gettext("unexpected field count in \"D\" message"); + goto advance_and_error; + } + + /* Resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) + { + rowbuf = (PGdataValue *) realloc(rowbuf, + nfields * sizeof(PGdataValue)); + if (!rowbuf) + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; } /* Scan the fields */ @@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength) { /* get the value length */ if (pqGetInt(&vlen, 4, conn)) - return EOF; - if (vlen == -1) { - /* null field */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - continue; + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; } - if (vlen < 0) - vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); + rowbuf[i].len = vlen; - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ + /* + * rowbuf[i].value always points to the next address in the data + * buffer even if the value is NULL. This allows row processors to + * estimate data sizes more easily. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + + /* Skip over the data value */ if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; + { + if (pqSkipnchar(vlen, conn)) + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; + } + } } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* Sanity check that we absorbed all the data */ + if (conn->inCursor != conn->inStart + 5 + msgLength) + { + errmsg = libpq_gettext("extraneous data in \"D\" message"); + goto advance_and_error; + } - return 0; + /* + * Advance inStart to show that the "D" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; -outOfMemory: + /* Pass the completed row values to rowProcessor */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; + +set_error_result: /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); pqSaveErrorResult(conn); - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; + /* + * Return zero to allow input parsing to continue. Subsequent "D" + * messages will be ignored until we get to end of data, since an error + * result is already set up. + */ return 0; } diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9e0d..32b466e245 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -38,13 +38,14 @@ extern "C" /* Application-visible enum types */ +/* + * Although it is okay to add to these lists, values which become unused + * should never be removed, nor should constants be redefined - that would + * break compatibility with existing code. + */ + typedef enum { - /* - * Although it is okay to add to this list, values which become unused - * should never be removed, nor should constants be redefined - that would - * break compatibility with existing code. - */ CONNECTION_OK, CONNECTION_BAD, /* Non-blocking mode only below here */ @@ -128,6 +129,17 @@ typedef struct pg_conn PGconn; */ typedef struct pg_result PGresult; +/* PGdataValue represents a data field value being passed to a row processor. + * It could be either text or binary data; text data is not zero-terminated. + * A SQL NULL is represented by len < 0; then value is still valid but there + * are no data bytes there. + */ +typedef struct pgDataValue +{ + int len; /* data length in bytes, or <0 if NULL */ + const char *value; /* data value, without zero-termination */ +} PGdataValue; + /* PGcancel encapsulates the information needed to cancel a running * query on an existing connection. * The contents of this struct are not supposed to be known to applications. @@ -149,6 +161,10 @@ typedef struct pgNotify struct pgNotify *next; /* list link */ } PGnotify; +/* Function type for row-processor callback */ +typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); + /* Function types for notice-handling callbacks */ typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res); typedef void (*PQnoticeProcessor) (void *arg, const char *message); @@ -388,11 +404,16 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramFormats, int resultFormat); extern PGresult *PQgetResult(PGconn *conn); +extern PGresult *PQskipResult(PGconn *conn); /* Routines for managing an asynchronous query */ extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Override default per-row processing */ +extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); +extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 2103af8832..0b6e6769c0 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -324,6 +324,10 @@ struct pg_conn /* Optional file to write trace info to */ FILE *Pfdebug; + /* Callback procedure for per-row processing */ + PQrowProcessor rowProcessor; /* function pointer */ + void *rowProcessorParam; /* passthrough argument */ + /* Callback procedures for notice message processing */ PGNoticeHooks noticeHooks; @@ -396,9 +400,14 @@ struct pg_conn * msg has no length word */ int outMsgEnd; /* offset to msg end (so far) */ + /* Row processor interface workspace */ + PGdataValue *rowBuf; /* array for passing values to rowProcessor */ + int rowBufLen; /* number of entries allocated in rowBuf */ + /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */ + + /* Assorted state for SSL, GSS, etc */ #ifdef USE_SSL bool allow_ssl_try; /* Allowed to try SSL negotiation */ @@ -435,7 +444,6 @@ struct pg_conn * connection */ #endif - /* Buffer for current error message */ PQExpBufferData errorMessage; /* expansible string */ @@ -505,7 +513,6 @@ extern void pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...) /* This lets gcc check the format string for consistency. */ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup); extern void pqSaveMessageField(PGresult *res, char code, const char *value); extern void pqSaveParameterStatus(PGconn *conn, const char *name, @@ -558,6 +565,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn); extern int pqGets_append(PQExpBuffer buf, PGconn *conn); extern int pqPuts(const char *s, PGconn *conn); extern int pqGetnchar(char *s, size_t len, PGconn *conn); +extern int pqSkipnchar(size_t len, PGconn *conn); extern int pqPutnchar(const char *s, size_t len, PGconn *conn); extern int pqGetInt(int *result, size_t bytes, PGconn *conn); extern int pqPutInt(int value, size_t bytes, PGconn *conn);