From 6f922ef88e43b3084cdddf4b5ffe525a00896a90 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 4 Apr 2012 18:39:08 -0400 Subject: [PATCH] Improve efficiency of dblink by using libpq's new row processor API. This patch provides a test case for libpq's row processor API. contrib/dblink can deal with very large result sets by dumping them into a tuplestore (which can spill to disk) --- but until now, the intermediate storage of the query result in a PGresult meant memory bloat for any large result. Now we use a row processor to convert the data to tuple form and dump it directly into the tuplestore. A limitation is that this only works for plain dblink() queries, not dblink_send_query() followed by dblink_get_result(). In the latter case we don't know the desired tuple rowtype soon enough. While hack solutions to that are possible, a different user-level API would probably be a better answer. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane --- contrib/dblink/dblink.c | 421 ++++++++++++++++++++++++++++++++------- doc/src/sgml/dblink.sgml | 20 +- 2 files changed, 366 insertions(+), 75 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 46c7cc5923..8154cae7bd 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,12 +63,28 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; +typedef struct storeInfo +{ + FunctionCallInfo fcinfo; + Tuplestorestate *tuplestore; + AttInMetadata *attinmeta; + MemoryContext tmpcontext; + char **cstrs; +} storeInfo; + /* * Internal declarations */ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static void prepTuplestoreResult(FunctionCallInfo fcinfo); static void materializeResult(FunctionCallInfo fcinfo, PGresult *res); +static void materializeQueryResult(FunctionCallInfo fcinfo, + PGconn *conn, + const char *conname, + const char *sql, + bool fail); +static int storeHandler(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); @@ -629,100 +645,118 @@ dblink_get_result(PG_FUNCTION_ARGS) static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { - char *msg; - PGresult *res = NULL; - PGconn *conn = NULL; - char *connstr = NULL; - char *sql = NULL; - char *conname = NULL; - remoteConn *rconn = NULL; - bool fail = true; /* default to backward compatible */ - bool freeconn = false; + PGconn *volatile conn = NULL; + volatile bool freeconn = false; prepTuplestoreResult(fcinfo); DBLINK_INIT; - if (!is_async) + PG_TRY(); { - if (PG_NARGS() == 3) - { - /* text,text,bool */ - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) + char *msg; + char *connstr = NULL; + char *sql = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + bool fail = true; /* default to backward compatible */ + + if (!is_async) { - /* text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + if (PG_NARGS() == 3) { + /* text,text,bool */ + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + fail = PG_GETARG_BOOL(2); + } + else if (PG_NARGS() == 2) + { + /* text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + { + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + fail = PG_GETARG_BOOL(1); + } + else + { + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + } + } + else if (PG_NARGS() == 1) + { + /* text */ conn = pconn->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - fail = PG_GETARG_BOOL(1); } else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + } + else /* is_async */ + { + /* get async result */ + if (PG_NARGS() == 2) { - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + /* text,bool */ + DBLINK_GET_NAMED_CONN; + fail = PG_GETARG_BOOL(1); } + else if (PG_NARGS() == 1) + { + /* text */ + DBLINK_GET_NAMED_CONN; + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); } - else if (PG_NARGS() == 1) + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + if (!is_async) { - /* text */ - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + /* synchronous query, use efficient tuple collection method */ + materializeQueryResult(fcinfo, conn, conname, sql, fail); } else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); - } - else /* is_async */ - { - /* get async result */ - if (PG_NARGS() == 2) - { - /* text,bool */ - DBLINK_GET_NAMED_CONN; - fail = PG_GETARG_BOOL(1); - } - else if (PG_NARGS() == 1) { - /* text */ - DBLINK_GET_NAMED_CONN; + /* async result retrieval, do it the old way */ + PGresult *res = PQgetResult(conn); + + /* NULL means we're all done with the async results */ + if (res) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) + { + dblink_res_error(conname, res, "could not execute query", + fail); + /* if fail isn't set, we'll return an empty query result */ + } + else + { + materializeResult(fcinfo, res); + } + } } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); } - - if (!conn) - DBLINK_CONN_NOT_AVAIL; - - /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); - else + PG_CATCH(); { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; + /* if needed, close the connection to the database */ + if (freeconn) + PQfinish(conn); + PG_RE_THROW(); } + PG_END_TRY(); - /* if needed, close the connection to the database and cleanup */ + /* if needed, close the connection to the database */ if (freeconn) PQfinish(conn); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) - { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; - } - - materializeResult(fcinfo, res); return (Datum) 0; } @@ -890,6 +924,259 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res) PG_END_TRY(); } +/* + * Execute the given SQL command and store its results into a tuplestore + * to be returned as the result of the current function. + * This is equivalent to PQexec followed by materializeResult, but we make + * use of libpq's "row processor" API to reduce per-row overhead. + */ +static void +materializeQueryResult(FunctionCallInfo fcinfo, + PGconn *conn, + const char *conname, + const char *sql, + bool fail) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + PGresult *volatile res = NULL; + storeInfo sinfo; + + /* prepTuplestoreResult must have been called previously */ + Assert(rsinfo->returnMode == SFRM_Materialize); + + PG_TRY(); + { + /* initialize storeInfo to empty */ + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.fcinfo = fcinfo; + + /* We'll collect tuples using storeHandler */ + PQsetRowProcessor(conn, storeHandler, &sinfo); + + res = PQexec(conn, sql); + + /* We don't keep the custom row processor installed permanently */ + PQsetRowProcessor(conn, NULL, NULL); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* + * dblink_res_error will clear the passed PGresult, so we need + * this ugly dance to avoid doing so twice during error exit + */ + PGresult *res1 = res; + + res = NULL; + dblink_res_error(conname, res1, "could not execute query", fail); + /* if fail isn't set, we'll return an empty query result */ + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * storeHandler didn't get called, so we need to convert the + * command status string to a tuple manually + */ + TupleDesc tupdesc; + AttInMetadata *attinmeta; + Tuplestorestate *tupstore; + HeapTuple tuple; + char *values[1]; + MemoryContext oldcontext; + + /* + * need a tuple descriptor representing one TEXT column to return + * the command status string as our result tuple + */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + attinmeta = TupleDescGetAttInMetadata(tupdesc); + + oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + values[0] = PQcmdStatus(res); + + /* build the tuple and put it into the tuplestore. */ + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, tuple); + + PQclear(res); + } + else + { + Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* storeHandler should have created a tuplestore */ + Assert(rsinfo->setResult != NULL); + + PQclear(res); + } + } + PG_CATCH(); + { + /* be sure to unset the custom row processor */ + PQsetRowProcessor(conn, NULL, NULL); + /* be sure to release any libpq result we collected */ + if (res) + PQclear(res); + /* and clear out any pending data in libpq */ + while ((res = PQskipResult(conn)) != NULL) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * Custom row processor for materializeQueryResult. + * Prototype of this function must match PQrowProcessor. + */ +static int +storeHandler(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param) +{ + storeInfo *sinfo = (storeInfo *) param; + int nfields = PQnfields(res); + char **cstrs = sinfo->cstrs; + HeapTuple tuple; + char *pbuf; + int pbuflen; + int i; + MemoryContext oldcontext; + + if (columns == NULL) + { + /* Prepare for new result set */ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo; + TupleDesc tupdesc; + + /* + * It's possible to get more than one result set if the query string + * contained multiple SQL commands. In that case, we follow PQexec's + * traditional behavior of throwing away all but the last result. + */ + if (sinfo->tuplestore) + tuplestore_end(sinfo->tuplestore); + sinfo->tuplestore = NULL; + + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + /* check result and tuple descriptor have the same number of columns */ + if (nfields != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* Prepare attinmeta for later data conversions */ + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* Create a new, empty tuplestore */ + oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Set up sufficiently-wide string pointers array; this won't change + * in size so it's easy to preallocate. + */ + if (sinfo->cstrs) + pfree(sinfo->cstrs); + sinfo->cstrs = (char **) palloc(nfields * sizeof(char *)); + + /* Create short-lived memory context for data conversions */ + if (!sinfo->tmpcontext) + sinfo->tmpcontext = + AllocSetContextCreate(CurrentMemoryContext, + "dblink temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + return 1; + } + + CHECK_FOR_INTERRUPTS(); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext); + + /* + * The strings passed to us are not null-terminated, but the datatype + * input functions we're about to call require null termination. Copy the + * strings and add null termination. As a micro-optimization, allocate + * all the strings with one palloc. + */ + pbuflen = nfields; /* count the null terminators themselves */ + for (i = 0; i < nfields; i++) + { + int len = columns[i].len; + + if (len > 0) + pbuflen += len; + } + pbuf = (char *) palloc(pbuflen); + + for (i = 0; i < nfields; i++) + { + int len = columns[i].len; + + if (len < 0) + cstrs[i] = NULL; + else + { + cstrs[i] = pbuf; + memcpy(pbuf, columns[i].value, len); + pbuf += len; + *pbuf++ = '\0'; + } + } + + /* Convert row to a tuple, and add it to the tuplestore */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + + tuplestore_puttuple(sinfo->tuplestore, tuple); + + /* Clean up */ + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(sinfo->tmpcontext); + + return 1; +} + /* * List all open dblink connections by name. * Returns an array of all connection names. diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml index 855495c54d..72ca765be7 100644 --- a/doc/src/sgml/dblink.sgml +++ b/doc/src/sgml/dblink.sgml @@ -425,14 +425,6 @@ SELECT * Notes - - dblink fetches the entire remote query result before - returning any of it to the local system. If the query is expected - to return a large number of rows, it's better to open it as a cursor - with dblink_open and then fetch a manageable number - of rows at a time. - - A convenient way to use dblink with predetermined queries is to create a view. @@ -1432,6 +1424,18 @@ dblink_get_result(text connname [, bool fail_on_error]) returns setof record sent, and one additional time to obtain an empty set result, before the connection can be used again. + + + When using dblink_send_query and + dblink_get_result, dblink fetches the entire + remote query result before returning any of it to the local query + processor. If the query returns a large number of rows, this can result + in transient memory bloat in the local session. It may be better to open + such a query as a cursor with dblink_open and then fetch a + manageable number of rows at a time. Alternatively, use plain + dblink(), which avoids memory bloat by spooling large result + sets to disk. + -- 2.40.0