]> granicus.if.org Git - postgresql/commitdiff
Improve efficiency of dblink by using libpq's new row processor API.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 4 Apr 2012 22:39:08 +0000 (18:39 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 4 Apr 2012 22:39:08 +0000 (18:39 -0400)
This patch provides a test case for libpq's row processor API.
contrib/dblink can deal with very large result sets by dumping them into
a tuplestore (which can spill to disk) --- but until now, the intermediate
storage of the query result in a PGresult meant memory bloat for any large
result.  Now we use a row processor to convert the data to tuple form and
dump it directly into the tuplestore.

A limitation is that this only works for plain dblink() queries, not
dblink_send_query() followed by dblink_get_result().  In the latter
case we don't know the desired tuple rowtype soon enough.  While hack
solutions to that are possible, a different user-level API would
probably be a better answer.

Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane

contrib/dblink/dblink.c
doc/src/sgml/dblink.sgml

index 46c7cc5923f18d3f0d614dbd327c37bef46c4af8..8154cae7bdb90870dabf6cc2473645dbc0e968dc 100644 (file)
@@ -63,12 +63,28 @@ typedef struct remoteConn
        bool            newXactForCursor;               /* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+       FunctionCallInfo fcinfo;
+       Tuplestorestate *tuplestore;
+       AttInMetadata *attinmeta;
+       MemoryContext tmpcontext;
+       char      **cstrs;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
+static void materializeQueryResult(FunctionCallInfo fcinfo,
+                                          PGconn *conn,
+                                          const char *conname,
+                                          const char *sql,
+                                          bool fail);
+static int storeHandler(PGresult *res, const PGdataValue *columns,
+                        const char **errmsgp, void *param);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -629,100 +645,118 @@ dblink_get_result(PG_FUNCTION_ARGS)
 static Datum
 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 {
-       char       *msg;
-       PGresult   *res = NULL;
-       PGconn     *conn = NULL;
-       char       *connstr = NULL;
-       char       *sql = NULL;
-       char       *conname = NULL;
-       remoteConn *rconn = NULL;
-       bool            fail = true;    /* default to backward compatible */
-       bool            freeconn = false;
+       PGconn     *volatile conn = NULL;
+       volatile bool freeconn = false;
 
        prepTuplestoreResult(fcinfo);
 
        DBLINK_INIT;
 
-       if (!is_async)
+       PG_TRY();
        {
-               if (PG_NARGS() == 3)
-               {
-                       /* text,text,bool */
-                       DBLINK_GET_CONN;
-                       sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-                       fail = PG_GETARG_BOOL(2);
-               }
-               else if (PG_NARGS() == 2)
+               char       *msg;
+               char       *connstr = NULL;
+               char       *sql = NULL;
+               char       *conname = NULL;
+               remoteConn *rconn = NULL;
+               bool            fail = true;    /* default to backward compatible */
+
+               if (!is_async)
                {
-                       /* text,text or text,bool */
-                       if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+                       if (PG_NARGS() == 3)
                        {
+                               /* text,text,bool */
+                               DBLINK_GET_CONN;
+                               sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+                               fail = PG_GETARG_BOOL(2);
+                       }
+                       else if (PG_NARGS() == 2)
+                       {
+                               /* text,text or text,bool */
+                               if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+                               {
+                                       conn = pconn->conn;
+                                       sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+                                       fail = PG_GETARG_BOOL(1);
+                               }
+                               else
+                               {
+                                       DBLINK_GET_CONN;
+                                       sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+                               }
+                       }
+                       else if (PG_NARGS() == 1)
+                       {
+                               /* text */
                                conn = pconn->conn;
                                sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-                               fail = PG_GETARG_BOOL(1);
                        }
                        else
+                               /* shouldn't happen */
+                               elog(ERROR, "wrong number of arguments");
+               }
+               else    /* is_async */
+               {
+                       /* get async result */
+                       if (PG_NARGS() == 2)
                        {
-                               DBLINK_GET_CONN;
-                               sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+                               /* text,bool */
+                               DBLINK_GET_NAMED_CONN;
+                               fail = PG_GETARG_BOOL(1);
                        }
+                       else if (PG_NARGS() == 1)
+                       {
+                               /* text */
+                               DBLINK_GET_NAMED_CONN;
+                       }
+                       else
+                               /* shouldn't happen */
+                               elog(ERROR, "wrong number of arguments");
                }
-               else if (PG_NARGS() == 1)
+
+               if (!conn)
+                       DBLINK_CONN_NOT_AVAIL;
+
+               if (!is_async)
                {
-                       /* text */
-                       conn = pconn->conn;
-                       sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+                       /* synchronous query, use efficient tuple collection method */
+                       materializeQueryResult(fcinfo, conn, conname, sql, fail);
                }
                else
-                       /* shouldn't happen */
-                       elog(ERROR, "wrong number of arguments");
-       }
-       else    /* is_async */
-       {
-               /* get async result */
-               if (PG_NARGS() == 2)
-               {
-                       /* text,bool */
-                       DBLINK_GET_NAMED_CONN;
-                       fail = PG_GETARG_BOOL(1);
-               }
-               else if (PG_NARGS() == 1)
                {
-                       /* text */
-                       DBLINK_GET_NAMED_CONN;
+                       /* async result retrieval, do it the old way */
+                       PGresult   *res = PQgetResult(conn);
+
+                       /* NULL means we're all done with the async results */
+                       if (res)
+                       {
+                               if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                                       PQresultStatus(res) != PGRES_TUPLES_OK)
+                               {
+                                       dblink_res_error(conname, res, "could not execute query",
+                                                                        fail);
+                                       /* if fail isn't set, we'll return an empty query result */
+                               }
+                               else
+                               {
+                                       materializeResult(fcinfo, res);
+                               }
+                       }
                }
-               else
-                       /* shouldn't happen */
-                       elog(ERROR, "wrong number of arguments");
        }
-
-       if (!conn)
-               DBLINK_CONN_NOT_AVAIL;
-
-       /* synchronous query, or async result retrieval */
-       if (!is_async)
-               res = PQexec(conn, sql);
-       else
+       PG_CATCH();
        {
-               res = PQgetResult(conn);
-               /* NULL means we're all done with the async results */
-               if (!res)
-                       return (Datum) 0;
+               /* if needed, close the connection to the database */
+               if (freeconn)
+                       PQfinish(conn);
+               PG_RE_THROW();
        }
+       PG_END_TRY();
 
-       /* if needed, close the connection to the database and cleanup */
+       /* if needed, close the connection to the database */
        if (freeconn)
                PQfinish(conn);
 
-       if (!res ||
-               (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                PQresultStatus(res) != PGRES_TUPLES_OK))
-       {
-               dblink_res_error(conname, res, "could not execute query", fail);
-               return (Datum) 0;
-       }
-
-       materializeResult(fcinfo, res);
        return (Datum) 0;
 }
 
@@ -890,6 +924,259 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
        PG_END_TRY();
 }
 
+/*
+ * Execute the given SQL command and store its results into a tuplestore
+ * to be returned as the result of the current function.
+ * This is equivalent to PQexec followed by materializeResult, but we make
+ * use of libpq's "row processor" API to reduce per-row overhead.
+ */
+static void
+materializeQueryResult(FunctionCallInfo fcinfo,
+                                          PGconn *conn,
+                                          const char *conname,
+                                          const char *sql,
+                                          bool fail)
+{
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       PGresult   *volatile res = NULL;
+       storeInfo       sinfo;
+
+       /* prepTuplestoreResult must have been called previously */
+       Assert(rsinfo->returnMode == SFRM_Materialize);
+
+       PG_TRY();
+       {
+               /* initialize storeInfo to empty */
+               memset(&sinfo, 0, sizeof(sinfo));
+               sinfo.fcinfo = fcinfo;
+
+               /* We'll collect tuples using storeHandler */
+               PQsetRowProcessor(conn, storeHandler, &sinfo);
+
+               res = PQexec(conn, sql);
+
+               /* We don't keep the custom row processor installed permanently */
+               PQsetRowProcessor(conn, NULL, NULL);
+
+               if (!res ||
+                       (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                        PQresultStatus(res) != PGRES_TUPLES_OK))
+               {
+                       /*
+                        * dblink_res_error will clear the passed PGresult, so we need
+                        * this ugly dance to avoid doing so twice during error exit
+                        */
+                       PGresult   *res1 = res;
+
+                       res = NULL;
+                       dblink_res_error(conname, res1, "could not execute query", fail);
+                       /* if fail isn't set, we'll return an empty query result */
+               }
+               else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       /*
+                        * storeHandler didn't get called, so we need to convert the
+                        * command status string to a tuple manually
+                        */
+                       TupleDesc       tupdesc;
+                       AttInMetadata *attinmeta;
+                       Tuplestorestate *tupstore;
+                       HeapTuple       tuple;
+                       char       *values[1];
+                       MemoryContext oldcontext;
+
+                       /*
+                        * need a tuple descriptor representing one TEXT column to return
+                        * the command status string as our result tuple
+                        */
+                       tupdesc = CreateTemplateTupleDesc(1, false);
+                       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                                          TEXTOID, -1, 0);
+                       attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+                       oldcontext = MemoryContextSwitchTo(
+                                                                       rsinfo->econtext->ecxt_per_query_memory);
+                       tupstore = tuplestore_begin_heap(true, false, work_mem);
+                       rsinfo->setResult = tupstore;
+                       rsinfo->setDesc = tupdesc;
+                       MemoryContextSwitchTo(oldcontext);
+
+                       values[0] = PQcmdStatus(res);
+
+                       /* build the tuple and put it into the tuplestore. */
+                       tuple = BuildTupleFromCStrings(attinmeta, values);
+                       tuplestore_puttuple(tupstore, tuple);
+
+                       PQclear(res);
+               }
+               else
+               {
+                       Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+                       /* storeHandler should have created a tuplestore */
+                       Assert(rsinfo->setResult != NULL);
+
+                       PQclear(res);
+               }
+       }
+       PG_CATCH();
+       {
+               /* be sure to unset the custom row processor */
+               PQsetRowProcessor(conn, NULL, NULL);
+               /* be sure to release any libpq result we collected */
+               if (res)
+                       PQclear(res);
+               /* and clear out any pending data in libpq */
+               while ((res = PQskipResult(conn)) != NULL)
+                       PQclear(res);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+}
+
+/*
+ * Custom row processor for materializeQueryResult.
+ * Prototype of this function must match PQrowProcessor.
+ */
+static int
+storeHandler(PGresult *res, const PGdataValue *columns,
+                        const char **errmsgp, void *param)
+{
+       storeInfo  *sinfo = (storeInfo *) param;
+       int                     nfields = PQnfields(res);
+       char      **cstrs = sinfo->cstrs;
+       HeapTuple       tuple;
+       char       *pbuf;
+       int                     pbuflen;
+       int                     i;
+       MemoryContext oldcontext;
+
+       if (columns == NULL)
+       {
+               /* Prepare for new result set */
+               ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
+               TupleDesc       tupdesc;
+
+               /*
+                * It's possible to get more than one result set if the query string
+                * contained multiple SQL commands.  In that case, we follow PQexec's
+                * traditional behavior of throwing away all but the last result.
+                */
+               if (sinfo->tuplestore)
+                       tuplestore_end(sinfo->tuplestore);
+               sinfo->tuplestore = NULL;
+
+               /* get a tuple descriptor for our result type */
+               switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
+               {
+                       case TYPEFUNC_COMPOSITE:
+                               /* success */
+                               break;
+                       case TYPEFUNC_RECORD:
+                               /* failed to determine actual type of RECORD */
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("function returning record called in context "
+                                                               "that cannot accept type record")));
+                               break;
+                       default:
+                               /* result type isn't composite */
+                               elog(ERROR, "return type must be a row type");
+                               break;
+               }
+
+               /* make sure we have a persistent copy of the tupdesc */
+               tupdesc = CreateTupleDescCopy(tupdesc);
+
+               /* check result and tuple descriptor have the same number of columns */
+               if (nfields != tupdesc->natts)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("remote query result rowtype does not match "
+                                                       "the specified FROM clause rowtype")));
+
+               /* Prepare attinmeta for later data conversions */
+               sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+               /* Create a new, empty tuplestore */
+               oldcontext = MemoryContextSwitchTo(
+                                                                       rsinfo->econtext->ecxt_per_query_memory);
+               sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+               rsinfo->setResult = sinfo->tuplestore;
+               rsinfo->setDesc = tupdesc;
+               MemoryContextSwitchTo(oldcontext);
+
+               /*
+                * Set up sufficiently-wide string pointers array; this won't change
+                * in size so it's easy to preallocate.
+                */
+               if (sinfo->cstrs)
+                       pfree(sinfo->cstrs);
+               sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
+
+               /* Create short-lived memory context for data conversions */
+               if (!sinfo->tmpcontext)
+                       sinfo->tmpcontext =
+                               AllocSetContextCreate(CurrentMemoryContext,
+                                                                         "dblink temporary context",
+                                                                         ALLOCSET_DEFAULT_MINSIZE,
+                                                                         ALLOCSET_DEFAULT_INITSIZE,
+                                                                         ALLOCSET_DEFAULT_MAXSIZE);
+
+               return 1;
+       }
+
+       CHECK_FOR_INTERRUPTS();
+
+       /*
+        * Do the following work in a temp context that we reset after each tuple.
+        * This cleans up not only the data we have direct access to, but any
+        * cruft the I/O functions might leak.
+        */
+       oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
+
+       /*
+        * The strings passed to us are not null-terminated, but the datatype
+        * input functions we're about to call require null termination.  Copy the
+        * strings and add null termination.  As a micro-optimization, allocate
+        * all the strings with one palloc.
+        */
+       pbuflen = nfields;              /* count the null terminators themselves */
+       for (i = 0; i < nfields; i++)
+       {
+               int                     len = columns[i].len;
+
+               if (len > 0)
+                       pbuflen += len;
+       }
+       pbuf = (char *) palloc(pbuflen);
+
+       for (i = 0; i < nfields; i++)
+       {
+               int                     len = columns[i].len;
+
+               if (len < 0)
+                       cstrs[i] = NULL;
+               else
+               {
+                       cstrs[i] = pbuf;
+                       memcpy(pbuf, columns[i].value, len);
+                       pbuf += len;
+                       *pbuf++ = '\0';
+               }
+       }
+
+       /* Convert row to a tuple, and add it to the tuplestore */
+       tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+
+       tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+       /* Clean up */
+       MemoryContextSwitchTo(oldcontext);
+       MemoryContextReset(sinfo->tmpcontext);
+
+       return 1;
+}
+
 /*
  * List all open dblink connections by name.
  * Returns an array of all connection names.
index 855495c54d5e9ab8034bf4a2cfeffd22fda07da9..72ca765be73cff4f8f915c95d75b623c234f3bd4 100644 (file)
@@ -425,14 +425,6 @@ SELECT *
   <refsect1>
    <title>Notes</title>
 
-   <para>
-    <function>dblink</> fetches the entire remote query result before
-    returning any of it to the local system.  If the query is expected
-    to return a large number of rows, it's better to open it as a cursor
-    with <function>dblink_open</> and then fetch a manageable number
-    of rows at a time.
-   </para>
-
    <para>
     A convenient way to use <function>dblink</> with predetermined
     queries is to create a view.
@@ -1432,6 +1424,18 @@ dblink_get_result(text connname [, bool fail_on_error]) returns setof record
     sent, and one additional time to obtain an empty set result,
     before the connection can be used again.
    </para>
+
+   <para>
+    When using <function>dblink_send_query</> and
+    <function>dblink_get_result</>, <application>dblink</> fetches the entire
+    remote query result before returning any of it to the local query
+    processor.  If the query returns a large number of rows, this can result
+    in transient memory bloat in the local session.  It may be better to open
+    such a query as a cursor with <function>dblink_open</> and then fetch a
+    manageable number of rows at a time.  Alternatively, use plain
+    <function>dblink()</>, which avoids memory bloat by spooling large result
+    sets to disk.
+   </para>
   </refsect1>
 
   <refsect1>