bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+typedef struct storeInfo
+{
+ FunctionCallInfo fcinfo;
+ Tuplestorestate *tuplestore;
+ AttInMetadata *attinmeta;
+ MemoryContext tmpcontext;
+ char **cstrs;
+} storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
static void prepTuplestoreResult(FunctionCallInfo fcinfo);
static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
+static void materializeQueryResult(FunctionCallInfo fcinfo,
+ PGconn *conn,
+ const char *conname,
+ const char *sql,
+ bool fail);
+static int storeHandler(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
static Datum
dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
{
- char *msg;
- PGresult *res = NULL;
- PGconn *conn = NULL;
- char *connstr = NULL;
- char *sql = NULL;
- char *conname = NULL;
- remoteConn *rconn = NULL;
- bool fail = true; /* default to backward compatible */
- bool freeconn = false;
+ PGconn *volatile conn = NULL;
+ volatile bool freeconn = false;
prepTuplestoreResult(fcinfo);
DBLINK_INIT;
- if (!is_async)
+ PG_TRY();
{
- if (PG_NARGS() == 3)
- {
- /* text,text,bool */
- DBLINK_GET_CONN;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
- fail = PG_GETARG_BOOL(2);
- }
- else if (PG_NARGS() == 2)
+ char *msg;
+ char *connstr = NULL;
+ char *sql = NULL;
+ char *conname = NULL;
+ remoteConn *rconn = NULL;
+ bool fail = true; /* default to backward compatible */
+
+ if (!is_async)
{
- /* text,text or text,bool */
- if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ if (PG_NARGS() == 3)
{
+ /* text,text,bool */
+ DBLINK_GET_CONN;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ fail = PG_GETARG_BOOL(2);
+ }
+ else if (PG_NARGS() == 2)
+ {
+ /* text,text or text,bool */
+ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ {
+ conn = pconn->conn;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ fail = PG_GETARG_BOOL(1);
+ }
+ else
+ {
+ DBLINK_GET_CONN;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ }
+ }
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
- fail = PG_GETARG_BOOL(1);
}
else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
+ }
+ else /* is_async */
+ {
+ /* get async result */
+ if (PG_NARGS() == 2)
{
- DBLINK_GET_CONN;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ /* text,bool */
+ DBLINK_GET_NAMED_CONN;
+ fail = PG_GETARG_BOOL(1);
}
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
+ DBLINK_GET_NAMED_CONN;
+ }
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
}
- else if (PG_NARGS() == 1)
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ if (!is_async)
{
- /* text */
- conn = pconn->conn;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ /* synchronous query, use efficient tuple collection method */
+ materializeQueryResult(fcinfo, conn, conname, sql, fail);
}
else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
- }
- else /* is_async */
- {
- /* get async result */
- if (PG_NARGS() == 2)
- {
- /* text,bool */
- DBLINK_GET_NAMED_CONN;
- fail = PG_GETARG_BOOL(1);
- }
- else if (PG_NARGS() == 1)
{
- /* text */
- DBLINK_GET_NAMED_CONN;
+ /* async result retrieval, do it the old way */
+ PGresult *res = PQgetResult(conn);
+
+ /* NULL means we're all done with the async results */
+ if (res)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ dblink_res_error(conname, res, "could not execute query",
+ fail);
+ /* if fail isn't set, we'll return an empty query result */
+ }
+ else
+ {
+ materializeResult(fcinfo, res);
+ }
+ }
}
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
}
-
- if (!conn)
- DBLINK_CONN_NOT_AVAIL;
-
- /* synchronous query, or async result retrieval */
- if (!is_async)
- res = PQexec(conn, sql);
- else
+ PG_CATCH();
{
- res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
+ /* if needed, close the connection to the database */
+ if (freeconn)
+ PQfinish(conn);
+ PG_RE_THROW();
}
+ PG_END_TRY();
- /* if needed, close the connection to the database and cleanup */
+ /* if needed, close the connection to the database */
if (freeconn)
PQfinish(conn);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
- {
- dblink_res_error(conname, res, "could not execute query", fail);
- return (Datum) 0;
- }
-
- materializeResult(fcinfo, res);
return (Datum) 0;
}
PG_END_TRY();
}
+/*
+ * Execute the given SQL command and store its results into a tuplestore
+ * to be returned as the result of the current function.
+ * This is equivalent to PQexec followed by materializeResult, but we make
+ * use of libpq's "row processor" API to reduce per-row overhead.
+ */
+static void
+materializeQueryResult(FunctionCallInfo fcinfo,
+ PGconn *conn,
+ const char *conname,
+ const char *sql,
+ bool fail)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ PGresult *volatile res = NULL;
+ storeInfo sinfo;
+
+ /* prepTuplestoreResult must have been called previously */
+ Assert(rsinfo->returnMode == SFRM_Materialize);
+
+ PG_TRY();
+ {
+ /* initialize storeInfo to empty */
+ memset(&sinfo, 0, sizeof(sinfo));
+ sinfo.fcinfo = fcinfo;
+
+ /* We'll collect tuples using storeHandler */
+ PQsetRowProcessor(conn, storeHandler, &sinfo);
+
+ res = PQexec(conn, sql);
+
+ /* We don't keep the custom row processor installed permanently */
+ PQsetRowProcessor(conn, NULL, NULL);
+
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ /*
+ * dblink_res_error will clear the passed PGresult, so we need
+ * this ugly dance to avoid doing so twice during error exit
+ */
+ PGresult *res1 = res;
+
+ res = NULL;
+ dblink_res_error(conname, res1, "could not execute query", fail);
+ /* if fail isn't set, we'll return an empty query result */
+ }
+ else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ /*
+ * storeHandler didn't get called, so we need to convert the
+ * command status string to a tuple manually
+ */
+ TupleDesc tupdesc;
+ AttInMetadata *attinmeta;
+ Tuplestorestate *tupstore;
+ HeapTuple tuple;
+ char *values[1];
+ MemoryContext oldcontext;
+
+ /*
+ * need a tuple descriptor representing one TEXT column to return
+ * the command status string as our result tuple
+ */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+ oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+ MemoryContextSwitchTo(oldcontext);
+
+ values[0] = PQcmdStatus(res);
+
+ /* build the tuple and put it into the tuplestore. */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+ tuplestore_puttuple(tupstore, tuple);
+
+ PQclear(res);
+ }
+ else
+ {
+ Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ /* storeHandler should have created a tuplestore */
+ Assert(rsinfo->setResult != NULL);
+
+ PQclear(res);
+ }
+ }
+ PG_CATCH();
+ {
+ /* be sure to unset the custom row processor */
+ PQsetRowProcessor(conn, NULL, NULL);
+ /* be sure to release any libpq result we collected */
+ if (res)
+ PQclear(res);
+ /* and clear out any pending data in libpq */
+ while ((res = PQskipResult(conn)) != NULL)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
+/*
+ * Custom row processor for materializeQueryResult.
+ * Prototype of this function must match PQrowProcessor.
+ */
+static int
+storeHandler(PGresult *res, const PGdataValue *columns,
+ const char **errmsgp, void *param)
+{
+ storeInfo *sinfo = (storeInfo *) param;
+ int nfields = PQnfields(res);
+ char **cstrs = sinfo->cstrs;
+ HeapTuple tuple;
+ char *pbuf;
+ int pbuflen;
+ int i;
+ MemoryContext oldcontext;
+
+ if (columns == NULL)
+ {
+ /* Prepare for new result set */
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
+ TupleDesc tupdesc;
+
+ /*
+ * It's possible to get more than one result set if the query string
+ * contained multiple SQL commands. In that case, we follow PQexec's
+ * traditional behavior of throwing away all but the last result.
+ */
+ if (sinfo->tuplestore)
+ tuplestore_end(sinfo->tuplestore);
+ sinfo->tuplestore = NULL;
+
+ /* get a tuple descriptor for our result type */
+ switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
+ {
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
+
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (nfields != tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+
+ /* Prepare attinmeta for later data conversions */
+ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+ /* Create a new, empty tuplestore */
+ oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+ sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->setResult = sinfo->tuplestore;
+ rsinfo->setDesc = tupdesc;
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Set up sufficiently-wide string pointers array; this won't change
+ * in size so it's easy to preallocate.
+ */
+ if (sinfo->cstrs)
+ pfree(sinfo->cstrs);
+ sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
+
+ /* Create short-lived memory context for data conversions */
+ if (!sinfo->tmpcontext)
+ sinfo->tmpcontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "dblink temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ return 1;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Do the following work in a temp context that we reset after each tuple.
+ * This cleans up not only the data we have direct access to, but any
+ * cruft the I/O functions might leak.
+ */
+ oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
+
+ /*
+ * The strings passed to us are not null-terminated, but the datatype
+ * input functions we're about to call require null termination. Copy the
+ * strings and add null termination. As a micro-optimization, allocate
+ * all the strings with one palloc.
+ */
+ pbuflen = nfields; /* count the null terminators themselves */
+ for (i = 0; i < nfields; i++)
+ {
+ int len = columns[i].len;
+
+ if (len > 0)
+ pbuflen += len;
+ }
+ pbuf = (char *) palloc(pbuflen);
+
+ for (i = 0; i < nfields; i++)
+ {
+ int len = columns[i].len;
+
+ if (len < 0)
+ cstrs[i] = NULL;
+ else
+ {
+ cstrs[i] = pbuf;
+ memcpy(pbuf, columns[i].value, len);
+ pbuf += len;
+ *pbuf++ = '\0';
+ }
+ }
+
+ /* Convert row to a tuple, and add it to the tuplestore */
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+
+ tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+ /* Clean up */
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(sinfo->tmpcontext);
+
+ return 1;
+}
+
/*
* List all open dblink connections by name.
* Returns an array of all connection names.