4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.88 2010/02/03 23:01:11 joe Exp $
12 * Copyright (c) 2001-2010, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "utils/acl.h"
58 #include "utils/array.h"
59 #include "utils/builtins.h"
60 #include "utils/dynahash.h"
61 #include "utils/fmgroids.h"
62 #include "utils/hsearch.h"
63 #include "utils/lsyscache.h"
64 #include "utils/memutils.h"
65 #include "utils/syscache.h"
66 #include "utils/tqual.h"
72 typedef struct remoteConn
74 PGconn *conn; /* Hold the remote connection */
75 int openCursorCount; /* The number of open cursors */
76 bool newXactForCursor; /* Opened a transaction for a cursor */
80 * Internal declarations
82 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
83 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
84 static remoteConn *getConnectionByName(const char *name);
85 static HTAB *createConnHash(void);
86 static void createNewConnection(const char *name, remoteConn *rconn);
87 static void deleteConnection(const char *name);
88 static char **get_pkey_attnames(Oid relid, int16 *numatts);
89 static char **get_text_array_contents(ArrayType *array, int *numitems);
90 static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
91 static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
92 static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
93 static char *quote_literal_cstr(char *rawstr);
94 static char *quote_ident_cstr(char *rawstr);
95 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
96 static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
97 static Oid get_relid_from_relname(text *relname_text);
98 static char *generate_relation_name(Oid relid);
99 static void dblink_connstr_check(const char *connstr);
100 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
101 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
102 static char *get_connect_string(const char *servername);
103 static char *escape_param_str(const char *from);
104 static int get_nondropped_natts(Oid relid);
107 static remoteConn *pconn = NULL;
108 static HTAB *remoteConnHash = NULL;
111 * Following is list that holds multiple remote connections.
112 * Calling convention of each dblink function changes to accept
113 * connection name as the first parameter. The connection list is
114 * much like ecpg e.g. a mapping between a name and a PGconn object.
117 typedef struct remoteConnHashEnt
119 char name[NAMEDATALEN];
123 /* initial number of connection hashes */
126 /* general utility */
127 #define xpfree(var_) \
136 #define xpstrdup(var_c, var_) \
139 var_c = pstrdup(var_); \
144 #define DBLINK_RES_INTERNALERROR(p2) \
146 msg = pstrdup(PQerrorMessage(conn)); \
149 elog(ERROR, "%s: %s", p2, msg); \
152 #define DBLINK_CONN_NOT_AVAIL \
156 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
157 errmsg("connection \"%s\" not available", conname))); \
160 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
161 errmsg("connection not available"))); \
164 #define DBLINK_GET_CONN \
166 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
167 rconn = getConnectionByName(conname_or_str); \
170 conn = rconn->conn; \
174 connstr = get_connect_string(conname_or_str); \
175 if (connstr == NULL) \
177 connstr = conname_or_str; \
179 dblink_connstr_check(connstr); \
180 conn = PQconnectdb(connstr); \
181 if (PQstatus(conn) == CONNECTION_BAD) \
183 msg = pstrdup(PQerrorMessage(conn)); \
186 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
187 errmsg("could not establish connection"), \
188 errdetail("%s", msg))); \
190 dblink_security_check(conn, rconn); \
191 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
196 #define DBLINK_GET_NAMED_CONN \
198 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
199 rconn = getConnectionByName(conname); \
201 conn = rconn->conn; \
203 DBLINK_CONN_NOT_AVAIL; \
206 #define DBLINK_INIT \
210 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
211 pconn->conn = NULL; \
212 pconn->openCursorCount = 0; \
213 pconn->newXactForCursor = FALSE; \
218 * Create a persistent connection to another database
220 PG_FUNCTION_INFO_V1(dblink_connect);
222 dblink_connect(PG_FUNCTION_ARGS)
224 char *conname_or_str = NULL;
225 char *connstr = NULL;
226 char *connname = NULL;
229 remoteConn *rconn = NULL;
235 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
236 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
238 else if (PG_NARGS() == 1)
239 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
242 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
245 /* first check for valid foreign data server */
246 connstr = get_connect_string(conname_or_str);
248 connstr = conname_or_str;
250 /* check password in connection string if not superuser */
251 dblink_connstr_check(connstr);
252 conn = PQconnectdb(connstr);
254 if (PQstatus(conn) == CONNECTION_BAD)
256 msg = pstrdup(PQerrorMessage(conn));
262 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
263 errmsg("could not establish connection"),
264 errdetail("%s", msg)));
267 /* check password actually used if not superuser */
268 dblink_security_check(conn, rconn);
270 /* attempt to set client encoding to match server encoding */
271 PQsetClientEncoding(conn, GetDatabaseEncodingName());
276 createNewConnection(connname, rconn);
281 PG_RETURN_TEXT_P(cstring_to_text("OK"));
285 * Clear a persistent connection to another database
287 PG_FUNCTION_INFO_V1(dblink_disconnect);
289 dblink_disconnect(PG_FUNCTION_ARGS)
291 char *conname = NULL;
292 remoteConn *rconn = NULL;
299 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
300 rconn = getConnectionByName(conname);
308 DBLINK_CONN_NOT_AVAIL;
313 deleteConnection(conname);
319 PG_RETURN_TEXT_P(cstring_to_text("OK"));
323 * opens a cursor using a persistent connection
325 PG_FUNCTION_INFO_V1(dblink_open);
327 dblink_open(PG_FUNCTION_ARGS)
330 PGresult *res = NULL;
332 char *curname = NULL;
334 char *conname = NULL;
336 remoteConn *rconn = NULL;
337 bool fail = true; /* default to backward compatible behavior */
340 initStringInfo(&buf);
345 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
346 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
349 else if (PG_NARGS() == 3)
351 /* might be text,text,text or text,text,bool */
352 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
354 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
355 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
356 fail = PG_GETARG_BOOL(2);
361 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
362 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
363 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
364 rconn = getConnectionByName(conname);
367 else if (PG_NARGS() == 4)
369 /* text,text,text,bool */
370 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
371 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
372 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
373 fail = PG_GETARG_BOOL(3);
374 rconn = getConnectionByName(conname);
377 if (!rconn || !rconn->conn)
378 DBLINK_CONN_NOT_AVAIL;
382 /* If we are not in a transaction, start one */
383 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
385 res = PQexec(conn, "BEGIN");
386 if (PQresultStatus(res) != PGRES_COMMAND_OK)
387 DBLINK_RES_INTERNALERROR("begin error");
389 rconn->newXactForCursor = TRUE;
392 * Since transaction state was IDLE, we force cursor count to
393 * initially be 0. This is needed as a previous ABORT might have wiped
394 * out our transaction without maintaining the cursor count for us.
396 rconn->openCursorCount = 0;
399 /* if we started a transaction, increment cursor count */
400 if (rconn->newXactForCursor)
401 (rconn->openCursorCount)++;
403 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
404 res = PQexec(conn, buf.data);
405 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
407 dblink_res_error(conname, res, "could not open cursor", fail);
408 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
412 PG_RETURN_TEXT_P(cstring_to_text("OK"));
418 PG_FUNCTION_INFO_V1(dblink_close);
420 dblink_close(PG_FUNCTION_ARGS)
423 PGresult *res = NULL;
424 char *curname = NULL;
425 char *conname = NULL;
428 remoteConn *rconn = NULL;
429 bool fail = true; /* default to backward compatible behavior */
432 initStringInfo(&buf);
437 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
440 else if (PG_NARGS() == 2)
442 /* might be text,text or text,bool */
443 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
445 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
446 fail = PG_GETARG_BOOL(1);
451 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
452 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
453 rconn = getConnectionByName(conname);
459 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
460 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
461 fail = PG_GETARG_BOOL(2);
462 rconn = getConnectionByName(conname);
465 if (!rconn || !rconn->conn)
466 DBLINK_CONN_NOT_AVAIL;
470 appendStringInfo(&buf, "CLOSE %s", curname);
472 /* close the cursor */
473 res = PQexec(conn, buf.data);
474 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
476 dblink_res_error(conname, res, "could not close cursor", fail);
477 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
482 /* if we started a transaction, decrement cursor count */
483 if (rconn->newXactForCursor)
485 (rconn->openCursorCount)--;
487 /* if count is zero, commit the transaction */
488 if (rconn->openCursorCount == 0)
490 rconn->newXactForCursor = FALSE;
492 res = PQexec(conn, "COMMIT");
493 if (PQresultStatus(res) != PGRES_COMMAND_OK)
494 DBLINK_RES_INTERNALERROR("commit error");
499 PG_RETURN_TEXT_P(cstring_to_text("OK"));
503 * Fetch results from an open cursor
505 PG_FUNCTION_INFO_V1(dblink_fetch);
507 dblink_fetch(PG_FUNCTION_ARGS)
509 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
510 PGresult *res = NULL;
511 char *conname = NULL;
512 remoteConn *rconn = NULL;
515 char *curname = NULL;
517 bool fail = true; /* default to backward compatible */
523 /* text,text,int,bool */
524 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
525 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
526 howmany = PG_GETARG_INT32(2);
527 fail = PG_GETARG_BOOL(3);
529 rconn = getConnectionByName(conname);
533 else if (PG_NARGS() == 3)
535 /* text,text,int or text,int,bool */
536 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
538 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
539 howmany = PG_GETARG_INT32(1);
540 fail = PG_GETARG_BOOL(2);
545 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
546 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
547 howmany = PG_GETARG_INT32(2);
549 rconn = getConnectionByName(conname);
554 else if (PG_NARGS() == 2)
557 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
558 howmany = PG_GETARG_INT32(1);
563 DBLINK_CONN_NOT_AVAIL;
565 /* let the caller know we're sending back a tuplestore */
566 rsinfo->returnMode = SFRM_Materialize;
567 rsinfo->setResult = NULL;
568 rsinfo->setDesc = NULL;
570 initStringInfo(&buf);
571 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
574 * Try to execute the query. Note that since libpq uses malloc, the
575 * PGresult will be long-lived even though we are still in a
576 * short-lived memory context.
578 res = PQexec(conn, buf.data);
580 (PQresultStatus(res) != PGRES_COMMAND_OK &&
581 PQresultStatus(res) != PGRES_TUPLES_OK))
583 dblink_res_error(conname, res, "could not fetch from cursor", fail);
586 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
588 /* cursor does not exist - closed already or bad name */
591 (errcode(ERRCODE_INVALID_CURSOR_NAME),
592 errmsg("cursor \"%s\" does not exist", curname)));
595 materializeResult(fcinfo, res);
600 * Note: this is the new preferred version of dblink
602 PG_FUNCTION_INFO_V1(dblink_record);
604 dblink_record(PG_FUNCTION_ARGS)
606 return dblink_record_internal(fcinfo, false);
609 PG_FUNCTION_INFO_V1(dblink_send_query);
611 dblink_send_query(PG_FUNCTION_ARGS)
614 char *connstr = NULL;
616 remoteConn *rconn = NULL;
618 bool freeconn = false;
624 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
627 /* shouldn't happen */
628 elog(ERROR, "wrong number of arguments");
630 /* async query send */
631 retval = PQsendQuery(conn, sql);
633 elog(NOTICE, "%s", PQerrorMessage(conn));
635 PG_RETURN_INT32(retval);
638 PG_FUNCTION_INFO_V1(dblink_get_result);
640 dblink_get_result(PG_FUNCTION_ARGS)
642 return dblink_record_internal(fcinfo, true);
646 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
648 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
650 PGresult *res = NULL;
652 char *connstr = NULL;
654 char *conname = NULL;
655 remoteConn *rconn = NULL;
656 bool fail = true; /* default to backward compatible */
657 bool freeconn = false;
659 /* check to see if caller supports us returning a tuplestore */
660 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
662 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
663 errmsg("set-valued function called in context that cannot accept a set")));
664 if (!(rsinfo->allowedModes & SFRM_Materialize))
666 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
667 errmsg("materialize mode required, but it is not " \
668 "allowed in this context")));
678 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
679 fail = PG_GETARG_BOOL(2);
681 else if (PG_NARGS() == 2)
683 /* text,text or text,bool */
684 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
687 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
688 fail = PG_GETARG_BOOL(1);
693 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
696 else if (PG_NARGS() == 1)
700 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
703 /* shouldn't happen */
704 elog(ERROR, "wrong number of arguments");
708 /* get async result */
713 fail = PG_GETARG_BOOL(1);
715 else if (PG_NARGS() == 1)
721 /* shouldn't happen */
722 elog(ERROR, "wrong number of arguments");
726 DBLINK_CONN_NOT_AVAIL;
728 /* let the caller know we're sending back a tuplestore */
729 rsinfo->returnMode = SFRM_Materialize;
730 rsinfo->setResult = NULL;
731 rsinfo->setDesc = NULL;
733 /* synchronous query, or async result retrieval */
735 res = PQexec(conn, sql);
738 res = PQgetResult(conn);
739 /* NULL means we're all done with the async results */
744 /* if needed, close the connection to the database and cleanup */
749 (PQresultStatus(res) != PGRES_COMMAND_OK &&
750 PQresultStatus(res) != PGRES_TUPLES_OK))
752 dblink_res_error(conname, res, "could not execute query", fail);
756 materializeResult(fcinfo, res);
761 * Materialize the PGresult to return them as the function result.
762 * The res will be released in this function.
765 materializeResult(FunctionCallInfo fcinfo, PGresult *res)
767 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
769 Assert(rsinfo->returnMode == SFRM_Materialize);
774 bool is_sql_cmd = false;
778 if (PQresultStatus(res) == PGRES_COMMAND_OK)
783 * need a tuple descriptor representing one TEXT column to
784 * return the command status string as our result tuple
786 tupdesc = CreateTemplateTupleDesc(1, false);
787 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
794 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
798 /* get a tuple descriptor for our result type */
799 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
801 case TYPEFUNC_COMPOSITE:
804 case TYPEFUNC_RECORD:
805 /* failed to determine actual type of RECORD */
807 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
808 errmsg("function returning record called in context "
809 "that cannot accept type record")));
812 /* result type isn't composite */
813 elog(ERROR, "return type must be a row type");
817 /* make sure we have a persistent copy of the tupdesc */
818 tupdesc = CreateTupleDescCopy(tupdesc);
819 ntuples = PQntuples(res);
820 nfields = PQnfields(res);
824 * check result and tuple descriptor have the same number of columns
826 if (nfields != tupdesc->natts)
828 (errcode(ERRCODE_DATATYPE_MISMATCH),
829 errmsg("remote query result rowtype does not match "
830 "the specified FROM clause rowtype")));
834 AttInMetadata *attinmeta;
835 Tuplestorestate *tupstore;
836 MemoryContext oldcontext;
840 attinmeta = TupleDescGetAttInMetadata(tupdesc);
842 oldcontext = MemoryContextSwitchTo(
843 rsinfo->econtext->ecxt_per_query_memory);
844 tupstore = tuplestore_begin_heap(true, false, work_mem);
845 rsinfo->setResult = tupstore;
846 rsinfo->setDesc = tupdesc;
847 MemoryContextSwitchTo(oldcontext);
849 values = (char **) palloc(nfields * sizeof(char *));
851 /* put all tuples into the tuplestore */
852 for (row = 0; row < ntuples; row++)
860 for (i = 0; i < nfields; i++)
862 if (PQgetisnull(res, row, i))
865 values[i] = PQgetvalue(res, row, i);
870 values[0] = PQcmdStatus(res);
873 /* build the tuple and put it into the tuplestore. */
874 tuple = BuildTupleFromCStrings(attinmeta, values);
875 tuplestore_puttuple(tupstore, tuple);
878 /* clean up and return the tuplestore */
879 tuplestore_donestoring(tupstore);
886 /* be sure to release the libpq result */
894 * List all open dblink connections by name.
895 * Returns an array of all connection names.
898 PG_FUNCTION_INFO_V1(dblink_get_connections);
900 dblink_get_connections(PG_FUNCTION_ARGS)
902 HASH_SEQ_STATUS status;
903 remoteConnHashEnt *hentry;
904 ArrayBuildState *astate = NULL;
908 hash_seq_init(&status, remoteConnHash);
909 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
911 /* stash away current value */
912 astate = accumArrayResult(astate,
913 CStringGetTextDatum(hentry->name),
914 false, TEXTOID, CurrentMemoryContext);
919 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
920 CurrentMemoryContext));
926 * Checks if a given remote connection is busy
928 * Returns 1 if the connection is busy, 0 otherwise
930 * text connection_name - name of the connection to check
933 PG_FUNCTION_INFO_V1(dblink_is_busy);
935 dblink_is_busy(PG_FUNCTION_ARGS)
938 remoteConn *rconn = NULL;
941 DBLINK_GET_NAMED_CONN;
943 PQconsumeInput(conn);
944 PG_RETURN_INT32(PQisBusy(conn));
948 * Cancels a running request on a connection
951 * "OK" if the cancel request has been sent correctly,
952 * an error message otherwise
955 * text connection_name - name of the connection to check
958 PG_FUNCTION_INFO_V1(dblink_cancel_query);
960 dblink_cancel_query(PG_FUNCTION_ARGS)
964 remoteConn *rconn = NULL;
969 DBLINK_GET_NAMED_CONN;
970 cancel = PQgetCancel(conn);
972 res = PQcancel(cancel, errbuf, 256);
973 PQfreeCancel(cancel);
976 PG_RETURN_TEXT_P(cstring_to_text("OK"));
978 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
983 * Get error message from a connection
986 * "OK" if no error, an error message otherwise
989 * text connection_name - name of the connection to check
992 PG_FUNCTION_INFO_V1(dblink_error_message);
994 dblink_error_message(PG_FUNCTION_ARGS)
998 remoteConn *rconn = NULL;
1001 DBLINK_GET_NAMED_CONN;
1003 msg = PQerrorMessage(conn);
1004 if (msg == NULL || msg[0] == '\0')
1005 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1007 PG_RETURN_TEXT_P(cstring_to_text(msg));
1011 * Execute an SQL non-SELECT command
1013 PG_FUNCTION_INFO_V1(dblink_exec);
1015 dblink_exec(PG_FUNCTION_ARGS)
1018 PGresult *res = NULL;
1019 text *sql_cmd_status = NULL;
1020 TupleDesc tupdesc = NULL;
1021 PGconn *conn = NULL;
1022 char *connstr = NULL;
1024 char *conname = NULL;
1025 remoteConn *rconn = NULL;
1026 bool freeconn = false;
1027 bool fail = true; /* default to backward compatible behavior */
1031 if (PG_NARGS() == 3)
1033 /* must be text,text,bool */
1035 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1036 fail = PG_GETARG_BOOL(2);
1038 else if (PG_NARGS() == 2)
1040 /* might be text,text or text,bool */
1041 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1044 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1045 fail = PG_GETARG_BOOL(1);
1050 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1053 else if (PG_NARGS() == 1)
1055 /* must be single text argument */
1057 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1060 /* shouldn't happen */
1061 elog(ERROR, "wrong number of arguments");
1064 DBLINK_CONN_NOT_AVAIL;
1066 res = PQexec(conn, sql);
1068 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1069 PQresultStatus(res) != PGRES_TUPLES_OK))
1071 dblink_res_error(conname, res, "could not execute command", fail);
1073 /* need a tuple descriptor representing one TEXT column */
1074 tupdesc = CreateTemplateTupleDesc(1, false);
1075 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1079 * and save a copy of the command status string to return as our
1082 sql_cmd_status = cstring_to_text("ERROR");
1084 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1086 /* need a tuple descriptor representing one TEXT column */
1087 tupdesc = CreateTemplateTupleDesc(1, false);
1088 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1092 * and save a copy of the command status string to return as our
1095 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1102 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1103 errmsg("statement returning results not allowed")));
1106 /* if needed, close the connection to the database and cleanup */
1110 PG_RETURN_TEXT_P(sql_cmd_status);
1117 * Return list of primary key fields for the supplied relation,
1118 * or NULL if none exists.
1120 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1122 dblink_get_pkey(PG_FUNCTION_ARGS)
1127 FuncCallContext *funcctx;
1130 AttInMetadata *attinmeta;
1131 MemoryContext oldcontext;
1133 /* stuff done only on the first call of the function */
1134 if (SRF_IS_FIRSTCALL())
1136 TupleDesc tupdesc = NULL;
1138 /* create a function context for cross-call persistence */
1139 funcctx = SRF_FIRSTCALL_INIT();
1142 * switch to memory context appropriate for multiple function calls
1144 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1146 /* convert relname to rel Oid */
1147 relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
1148 if (!OidIsValid(relid))
1150 (errcode(ERRCODE_UNDEFINED_TABLE),
1151 errmsg("relation \"%s\" does not exist",
1152 text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1155 * need a tuple descriptor representing one INT and one TEXT column
1157 tupdesc = CreateTemplateTupleDesc(2, false);
1158 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1160 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1164 * Generate attribute metadata needed later to produce tuples from raw
1167 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1168 funcctx->attinmeta = attinmeta;
1170 /* get an array of attnums */
1171 results = get_pkey_attnames(relid, &numatts);
1173 if ((results != NULL) && (numatts > 0))
1175 funcctx->max_calls = numatts;
1177 /* got results, keep track of them */
1178 funcctx->user_fctx = results;
1182 /* fast track when no results */
1183 MemoryContextSwitchTo(oldcontext);
1184 SRF_RETURN_DONE(funcctx);
1187 MemoryContextSwitchTo(oldcontext);
1190 /* stuff done on every call of the function */
1191 funcctx = SRF_PERCALL_SETUP();
1194 * initialize per-call variables
1196 call_cntr = funcctx->call_cntr;
1197 max_calls = funcctx->max_calls;
1199 results = (char **) funcctx->user_fctx;
1200 attinmeta = funcctx->attinmeta;
1202 if (call_cntr < max_calls) /* do when there is more left to send */
1208 values = (char **) palloc(2 * sizeof(char *));
1209 values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1211 sprintf(values[0], "%d", call_cntr + 1);
1213 values[1] = results[call_cntr];
1215 /* build the tuple */
1216 tuple = BuildTupleFromCStrings(attinmeta, values);
1218 /* make the tuple into a datum */
1219 result = HeapTupleGetDatum(tuple);
1221 SRF_RETURN_NEXT(funcctx, result);
1225 /* do when there is no more left */
1226 SRF_RETURN_DONE(funcctx);
1232 * dblink_build_sql_insert
1234 * Used to generate an SQL insert statement
1235 * based on an existing tuple in a local relation.
1236 * This is useful for selectively replicating data
1237 * to another server via dblink.
1240 * <relname> - name of local table of interest
1241 * <pkattnums> - an int2vector of attnums which will be used
1242 * to identify the local tuple of interest
1243 * <pknumatts> - number of attnums in pkattnums
1244 * <src_pkattvals_arry> - text array of key values which will be used
1245 * to identify the local tuple of interest
1246 * <tgt_pkattvals_arry> - text array of key values which will be used
1247 * to build the string for execution remotely. These are substituted
1248 * for their counterparts in src_pkattvals_arry
1250 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1252 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1254 text *relname_text = PG_GETARG_TEXT_P(0);
1255 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1256 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1257 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1258 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1260 int16 pknumatts = 0;
1261 char **src_pkattvals;
1262 char **tgt_pkattvals;
1266 int nondropped_natts;
1269 * Convert relname to rel OID.
1271 relid = get_relid_from_relname(relname_text);
1272 if (!OidIsValid(relid))
1274 (errcode(ERRCODE_UNDEFINED_TABLE),
1275 errmsg("relation \"%s\" does not exist",
1276 text_to_cstring(relname_text))));
1279 * There should be at least one key attribute
1281 if (pknumatts_tmp <= 0)
1283 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1284 errmsg("number of key attributes must be > 0")));
1286 if (pknumatts_tmp <= SHRT_MAX)
1287 pknumatts = pknumatts_tmp;
1290 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1291 errmsg("input for number of primary key " \
1292 "attributes too large")));
1295 * ensure we don't ask for more pk attributes than we have
1296 * non-dropped columns
1298 nondropped_natts = get_nondropped_natts(relid);
1299 if (pknumatts > nondropped_natts)
1300 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1301 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1304 * Source array is made up of key values that will be used to locate the
1305 * tuple of interest from the local system.
1307 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1310 * There should be one source array key value for each key attnum
1312 if (src_nitems != pknumatts)
1314 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1315 errmsg("source key array length must match number of key " \
1319 * Target array is made up of key values that will be used to build the
1320 * SQL string for use on the remote system.
1322 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1325 * There should be one target array key value for each key attnum
1327 if (tgt_nitems != pknumatts)
1329 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1330 errmsg("target key array length must match number of key " \
1334 * Prep work is finally done. Go get the SQL string.
1336 sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1341 PG_RETURN_TEXT_P(cstring_to_text(sql));
1346 * dblink_build_sql_delete
1348 * Used to generate an SQL delete statement.
1349 * This is useful for selectively replicating a
1350 * delete to another server via dblink.
1353 * <relname> - name of remote table of interest
1354 * <pkattnums> - an int2vector of attnums which will be used
1355 * to identify the remote tuple of interest
1356 * <pknumatts> - number of attnums in pkattnums
1357 * <tgt_pkattvals_arry> - text array of key values which will be used
1358 * to build the string for execution remotely.
1360 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1362 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1364 text *relname_text = PG_GETARG_TEXT_P(0);
1365 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1366 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1367 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1368 int nondropped_natts;
1370 int16 pknumatts = 0;
1371 char **tgt_pkattvals;
1376 * Convert relname to rel OID.
1378 relid = get_relid_from_relname(relname_text);
1379 if (!OidIsValid(relid))
1381 (errcode(ERRCODE_UNDEFINED_TABLE),
1382 errmsg("relation \"%s\" does not exist",
1383 text_to_cstring(relname_text))));
1386 * There should be at least one key attribute
1388 if (pknumatts_tmp <= 0)
1390 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1391 errmsg("number of key attributes must be > 0")));
1393 if (pknumatts_tmp <= SHRT_MAX)
1394 pknumatts = pknumatts_tmp;
1397 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1398 errmsg("input for number of primary key " \
1399 "attributes too large")));
1402 * ensure we don't ask for more pk attributes than we have
1403 * non-dropped columns
1405 nondropped_natts = get_nondropped_natts(relid);
1406 if (pknumatts > nondropped_natts)
1407 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1408 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1411 * Target array is made up of key values that will be used to build the
1412 * SQL string for use on the remote system.
1414 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1417 * There should be one target array key value for each key attnum
1419 if (tgt_nitems != pknumatts)
1421 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1422 errmsg("target key array length must match number of key " \
1426 * Prep work is finally done. Go get the SQL string.
1428 sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
1433 PG_RETURN_TEXT_P(cstring_to_text(sql));
1438 * dblink_build_sql_update
1440 * Used to generate an SQL update statement
1441 * based on an existing tuple in a local relation.
1442 * This is useful for selectively replicating data
1443 * to another server via dblink.
1446 * <relname> - name of local table of interest
1447 * <pkattnums> - an int2vector of attnums which will be used
1448 * to identify the local tuple of interest
1449 * <pknumatts> - number of attnums in pkattnums
1450 * <src_pkattvals_arry> - text array of key values which will be used
1451 * to identify the local tuple of interest
1452 * <tgt_pkattvals_arry> - text array of key values which will be used
1453 * to build the string for execution remotely. These are substituted
1454 * for their counterparts in src_pkattvals_arry
1456 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1458 dblink_build_sql_update(PG_FUNCTION_ARGS)
1460 text *relname_text = PG_GETARG_TEXT_P(0);
1461 int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1462 int32 pknumatts_tmp = PG_GETARG_INT32(2);
1463 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1464 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1465 int nondropped_natts;
1467 int16 pknumatts = 0;
1468 char **src_pkattvals;
1469 char **tgt_pkattvals;
1475 * Convert relname to rel OID.
1477 relid = get_relid_from_relname(relname_text);
1478 if (!OidIsValid(relid))
1480 (errcode(ERRCODE_UNDEFINED_TABLE),
1481 errmsg("relation \"%s\" does not exist",
1482 text_to_cstring(relname_text))));
1485 * There should be one source array key values for each key attnum
1487 if (pknumatts_tmp <= 0)
1489 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1490 errmsg("number of key attributes must be > 0")));
1492 if (pknumatts_tmp <= SHRT_MAX)
1493 pknumatts = pknumatts_tmp;
1496 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1497 errmsg("input for number of primary key " \
1498 "attributes too large")));
1501 * ensure we don't ask for more pk attributes than we have
1502 * non-dropped columns
1504 nondropped_natts = get_nondropped_natts(relid);
1505 if (pknumatts > nondropped_natts)
1506 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1507 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1510 * Source array is made up of key values that will be used to locate the
1511 * tuple of interest from the local system.
1513 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1516 * There should be one source array key value for each key attnum
1518 if (src_nitems != pknumatts)
1520 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1521 errmsg("source key array length must match number of key " \
1525 * Target array is made up of key values that will be used to build the
1526 * SQL string for use on the remote system.
1528 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1531 * There should be one target array key value for each key attnum
1533 if (tgt_nitems != pknumatts)
1535 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1536 errmsg("target key array length must match number of key " \
1540 * Prep work is finally done. Go get the SQL string.
1542 sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1547 PG_RETURN_TEXT_P(cstring_to_text(sql));
1551 * dblink_current_query
1552 * return the current query string
1553 * to allow its use in (among other things)
1556 PG_FUNCTION_INFO_V1(dblink_current_query);
1558 dblink_current_query(PG_FUNCTION_ARGS)
1560 /* This is now just an alias for the built-in function current_query() */
1561 PG_RETURN_DATUM(current_query(fcinfo));
1565 * Retrieve async notifications for a connection.
1567 * Returns an setof record of notifications, or an empty set if none recieved.
1568 * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
1571 #define DBLINK_NOTIFY_COLS 3
1573 PG_FUNCTION_INFO_V1(dblink_get_notify);
1575 dblink_get_notify(PG_FUNCTION_ARGS)
1577 PGconn *conn = NULL;
1578 remoteConn *rconn = NULL;
1580 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1582 Tuplestorestate *tupstore;
1583 MemoryContext per_query_ctx;
1584 MemoryContext oldcontext;
1587 if (PG_NARGS() == 1)
1588 DBLINK_GET_NAMED_CONN;
1592 /* create the tuplestore */
1593 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1594 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1596 tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
1597 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1599 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1601 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1604 tupstore = tuplestore_begin_heap(true, false, work_mem);
1605 rsinfo->returnMode = SFRM_Materialize;
1606 rsinfo->setResult = tupstore;
1607 rsinfo->setDesc = tupdesc;
1609 MemoryContextSwitchTo(oldcontext);
1611 PQconsumeInput(conn);
1612 while ((notify = PQnotifies(conn)) != NULL)
1614 Datum values[DBLINK_NOTIFY_COLS];
1615 bool nulls[DBLINK_NOTIFY_COLS];
1617 memset(values, 0, sizeof(values));
1618 memset(nulls, 0, sizeof(nulls));
1620 if (notify->relname != NULL)
1621 values[0] = CStringGetTextDatum(notify->relname);
1625 values[1] = Int32GetDatum(notify->be_pid);
1627 if (notify->extra != NULL)
1628 values[2] = CStringGetTextDatum(notify->extra);
1632 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1635 PQconsumeInput(conn);
1638 /* clean up and return the tuplestore */
1639 tuplestore_donestoring(tupstore);
1644 /*************************************************************
1645 * internal functions
1652 * Get the primary key attnames for the given relation.
1653 * Return NULL, and set numatts = 0, if no primary key exists.
1656 get_pkey_attnames(Oid relid, int16 *numatts)
1658 Relation indexRelation;
1661 HeapTuple indexTuple;
1663 char **result = NULL;
1666 AclResult aclresult;
1668 /* initialize numatts to 0 in case no primary key exists */
1671 /* open relation using relid, check permissions, get tupdesc */
1672 rel = relation_open(relid, AccessShareLock);
1674 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1676 if (aclresult != ACLCHECK_OK)
1677 aclcheck_error(aclresult, ACL_KIND_CLASS,
1678 RelationGetRelationName(rel));
1680 tupdesc = rel->rd_att;
1682 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1683 indexRelation = heap_open(IndexRelationId, AccessShareLock);
1685 Anum_pg_index_indrelid,
1686 BTEqualStrategyNumber, F_OIDEQ,
1687 ObjectIdGetDatum(relid));
1689 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1690 SnapshotNow, 1, &skey);
1692 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1694 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1696 /* we're only interested if it is the primary key */
1697 if (index->indisprimary)
1699 *numatts = index->indnatts;
1702 result = (char **) palloc(*numatts * sizeof(char *));
1704 for (i = 0; i < *numatts; i++)
1705 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1711 systable_endscan(scan);
1712 heap_close(indexRelation, AccessShareLock);
1713 relation_close(rel, AccessShareLock);
1719 * Deconstruct a text[] into C-strings (note any NULL elements will be
1720 * returned as NULL pointers)
1723 get_text_array_contents(ArrayType *array, int *numitems)
1725 int ndim = ARR_NDIM(array);
1726 int *dims = ARR_DIMS(array);
1737 Assert(ARR_ELEMTYPE(array) == TEXTOID);
1739 *numitems = nitems = ArrayGetNItems(ndim, dims);
1741 get_typlenbyvalalign(ARR_ELEMTYPE(array),
1742 &typlen, &typbyval, &typalign);
1744 values = (char **) palloc(nitems * sizeof(char *));
1746 ptr = ARR_DATA_PTR(array);
1747 bitmap = ARR_NULLBITMAP(array);
1750 for (i = 0; i < nitems; i++)
1752 if (bitmap && (*bitmap & bitmask) == 0)
1758 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1759 ptr = att_addlength_pointer(ptr, typlen, ptr);
1760 ptr = (char *) att_align_nominal(ptr, typalign);
1763 /* advance bitmap pointer if any */
1767 if (bitmask == 0x100)
1779 get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1792 initStringInfo(&buf);
1794 /* get relation name including any needed schema prefix and quoting */
1795 relname = generate_relation_name(relid);
1798 * Open relation using relid
1800 rel = relation_open(relid, AccessShareLock);
1801 tupdesc = rel->rd_att;
1802 natts = tupdesc->natts;
1804 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1807 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1808 errmsg("source row not found")));
1810 appendStringInfo(&buf, "INSERT INTO %s(", relname);
1813 for (i = 0; i < natts; i++)
1815 if (tupdesc->attrs[i]->attisdropped)
1819 appendStringInfo(&buf, ",");
1821 appendStringInfoString(&buf,
1822 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1826 appendStringInfo(&buf, ") VALUES(");
1829 * remember attvals are 1 based
1832 for (i = 0; i < natts; i++)
1834 if (tupdesc->attrs[i]->attisdropped)
1838 appendStringInfo(&buf, ",");
1840 if (tgt_pkattvals != NULL)
1841 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1846 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1848 val = SPI_getvalue(tuple, tupdesc, i + 1);
1852 appendStringInfoString(&buf, quote_literal_cstr(val));
1856 appendStringInfo(&buf, "NULL");
1859 appendStringInfo(&buf, ")");
1861 relation_close(rel, AccessShareLock);
1866 get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
1875 initStringInfo(&buf);
1877 /* get relation name including any needed schema prefix and quoting */
1878 relname = generate_relation_name(relid);
1881 * Open relation using relid
1883 rel = relation_open(relid, AccessShareLock);
1884 tupdesc = rel->rd_att;
1885 natts = tupdesc->natts;
1887 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1888 for (i = 0; i < pknumatts; i++)
1890 int16 pkattnum = pkattnums->values[i];
1893 appendStringInfo(&buf, " AND ");
1895 appendStringInfoString(&buf,
1896 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1898 if (tgt_pkattvals == NULL)
1899 /* internal error */
1900 elog(ERROR, "target key array must not be NULL");
1902 if (tgt_pkattvals[i] != NULL)
1903 appendStringInfo(&buf, " = %s",
1904 quote_literal_cstr(tgt_pkattvals[i]));
1906 appendStringInfo(&buf, " IS NULL");
1909 relation_close(rel, AccessShareLock);
1914 get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1927 initStringInfo(&buf);
1929 /* get relation name including any needed schema prefix and quoting */
1930 relname = generate_relation_name(relid);
1933 * Open relation using relid
1935 rel = relation_open(relid, AccessShareLock);
1936 tupdesc = rel->rd_att;
1937 natts = tupdesc->natts;
1939 tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1942 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1943 errmsg("source row not found")));
1945 appendStringInfo(&buf, "UPDATE %s SET ", relname);
1948 for (i = 0; i < natts; i++)
1950 if (tupdesc->attrs[i]->attisdropped)
1954 appendStringInfo(&buf, ", ");
1956 appendStringInfo(&buf, "%s = ",
1957 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1959 if (tgt_pkattvals != NULL)
1960 key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1965 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1967 val = SPI_getvalue(tuple, tupdesc, i + 1);
1971 appendStringInfoString(&buf, quote_literal_cstr(val));
1975 appendStringInfoString(&buf, "NULL");
1979 appendStringInfo(&buf, " WHERE ");
1981 for (i = 0; i < pknumatts; i++)
1983 int16 pkattnum = pkattnums->values[i];
1986 appendStringInfo(&buf, " AND ");
1988 appendStringInfo(&buf, "%s",
1989 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1991 if (tgt_pkattvals != NULL)
1992 val = tgt_pkattvals[i] ? pstrdup(tgt_pkattvals[i]) : NULL;
1994 val = SPI_getvalue(tuple, tupdesc, pkattnum);
1998 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2002 appendStringInfo(&buf, " IS NULL");
2005 relation_close(rel, AccessShareLock);
2010 * Return a properly quoted literal value.
2011 * Uses quote_literal in quote.c
2014 quote_literal_cstr(char *rawstr)
2020 rawstr_text = cstring_to_text(rawstr);
2021 result_text = DatumGetTextP(DirectFunctionCall1(quote_literal,
2022 PointerGetDatum(rawstr_text)));
2023 result = text_to_cstring(result_text);
2029 * Return a properly quoted identifier.
2030 * Uses quote_ident in quote.c
2033 quote_ident_cstr(char *rawstr)
2039 rawstr_text = cstring_to_text(rawstr);
2040 result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
2041 PointerGetDatum(rawstr_text)));
2042 result = text_to_cstring(result_text);
2048 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
2053 * Not likely a long list anyway, so just scan for the value
2055 for (i = 0; i < pknumatts; i++)
2056 if (key == pkattnums->values[i])
2063 get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
2073 initStringInfo(&buf);
2075 /* get relation name including any needed schema prefix and quoting */
2076 relname = generate_relation_name(relid);
2079 * Open relation using relid
2081 rel = relation_open(relid, AccessShareLock);
2082 tupdesc = CreateTupleDescCopy(rel->rd_att);
2083 relation_close(rel, AccessShareLock);
2086 * Connect to SPI manager
2088 if ((ret = SPI_connect()) < 0)
2089 /* internal error */
2090 elog(ERROR, "SPI connect failure - returned %d", ret);
2093 * Build sql statement to look up tuple of interest Use src_pkattvals as
2096 appendStringInfo(&buf, "SELECT * FROM %s WHERE ", relname);
2098 for (i = 0; i < pknumatts; i++)
2100 int16 pkattnum = pkattnums->values[i];
2103 appendStringInfo(&buf, " AND ");
2105 appendStringInfoString(&buf,
2106 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
2108 if (src_pkattvals[i] != NULL)
2109 appendStringInfo(&buf, " = %s",
2110 quote_literal_cstr(src_pkattvals[i]));
2112 appendStringInfo(&buf, " IS NULL");
2116 * Retrieve the desired tuple
2118 ret = SPI_exec(buf.data, 0);
2122 * Only allow one qualifying tuple
2124 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2126 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2127 errmsg("source criteria matched more than one record")));
2129 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2131 SPITupleTable *tuptable = SPI_tuptable;
2133 tuple = SPI_copytuple(tuptable->vals[0]);
2141 * no qualifying tuples
2149 * never reached, but keep compiler quiet
2155 get_relid_from_relname(text *relname_text)
2161 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2162 rel = heap_openrv(relvar, AccessShareLock);
2163 relid = RelationGetRelid(rel);
2164 relation_close(rel, AccessShareLock);
2170 * generate_relation_name - copied from ruleutils.c
2171 * Compute the name to display for a relation specified by OID
2173 * The result includes all necessary quoting and schema-prefixing.
2176 generate_relation_name(Oid relid)
2179 Form_pg_class reltup;
2183 tp = SearchSysCache(RELOID,
2184 ObjectIdGetDatum(relid),
2186 if (!HeapTupleIsValid(tp))
2187 elog(ERROR, "cache lookup failed for relation %u", relid);
2189 reltup = (Form_pg_class) GETSTRUCT(tp);
2191 /* Qualify the name if not visible in search path */
2192 if (RelationIsVisible(relid))
2195 nspname = get_namespace_name(reltup->relnamespace);
2197 result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
2199 ReleaseSysCache(tp);
2206 getConnectionByName(const char *name)
2208 remoteConnHashEnt *hentry;
2209 char key[NAMEDATALEN];
2211 if (!remoteConnHash)
2212 remoteConnHash = createConnHash();
2214 MemSet(key, 0, NAMEDATALEN);
2215 snprintf(key, NAMEDATALEN - 1, "%s", name);
2216 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2217 key, HASH_FIND, NULL);
2220 return (hentry->rconn);
2226 createConnHash(void)
2230 ctl.keysize = NAMEDATALEN;
2231 ctl.entrysize = sizeof(remoteConnHashEnt);
2233 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2237 createNewConnection(const char *name, remoteConn *rconn)
2239 remoteConnHashEnt *hentry;
2241 char key[NAMEDATALEN];
2243 if (!remoteConnHash)
2244 remoteConnHash = createConnHash();
2246 MemSet(key, 0, NAMEDATALEN);
2247 snprintf(key, NAMEDATALEN - 1, "%s", name);
2248 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2249 HASH_ENTER, &found);
2253 (errcode(ERRCODE_DUPLICATE_OBJECT),
2254 errmsg("duplicate connection name")));
2256 hentry->rconn = rconn;
2257 strlcpy(hentry->name, name, sizeof(hentry->name));
2261 deleteConnection(const char *name)
2263 remoteConnHashEnt *hentry;
2265 char key[NAMEDATALEN];
2267 if (!remoteConnHash)
2268 remoteConnHash = createConnHash();
2270 MemSet(key, 0, NAMEDATALEN);
2271 snprintf(key, NAMEDATALEN - 1, "%s", name);
2273 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2274 key, HASH_REMOVE, &found);
2278 (errcode(ERRCODE_UNDEFINED_OBJECT),
2279 errmsg("undefined connection name")));
2284 dblink_security_check(PGconn *conn, remoteConn *rconn)
2288 if (!PQconnectionUsedPassword(conn))
2295 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2296 errmsg("password is required"),
2297 errdetail("Non-superuser cannot connect if the server does not request a password."),
2298 errhint("Target server's authentication method must be changed.")));
2304 * For non-superusers, insist that the connstr specify a password. This
2305 * prevents a password from being picked up from .pgpass, a service file,
2306 * the environment, etc. We don't want the postgres user's passwords
2307 * to be accessible to non-superusers.
2310 dblink_connstr_check(const char *connstr)
2314 PQconninfoOption *options;
2315 PQconninfoOption *option;
2316 bool connstr_gives_password = false;
2318 options = PQconninfoParse(connstr, NULL);
2321 for (option = options; option->keyword != NULL; option++)
2323 if (strcmp(option->keyword, "password") == 0)
2325 if (option->val != NULL && option->val[0] != '\0')
2327 connstr_gives_password = true;
2332 PQconninfoFree(options);
2335 if (!connstr_gives_password)
2337 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2338 errmsg("password is required"),
2339 errdetail("Non-superusers must provide a password in the connection string.")));
2344 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2347 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2348 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2349 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2350 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2351 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2353 char *message_primary;
2354 char *message_detail;
2356 char *message_context;
2357 const char *dblink_context_conname = "unnamed";
2364 if (pg_diag_sqlstate)
2365 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2366 pg_diag_sqlstate[1],
2367 pg_diag_sqlstate[2],
2368 pg_diag_sqlstate[3],
2369 pg_diag_sqlstate[4]);
2371 sqlstate = ERRCODE_CONNECTION_FAILURE;
2373 xpstrdup(message_primary, pg_diag_message_primary);
2374 xpstrdup(message_detail, pg_diag_message_detail);
2375 xpstrdup(message_hint, pg_diag_message_hint);
2376 xpstrdup(message_context, pg_diag_context);
2382 dblink_context_conname = conname;
2386 message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2387 message_detail ? errdetail("%s", message_detail) : 0,
2388 message_hint ? errhint("%s", message_hint) : 0,
2389 message_context ? errcontext("%s", message_context) : 0,
2390 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2391 dblink_context_conname, dblink_context_msg)));
2395 * Obtain connection string for a foreign server
2398 get_connect_string(const char *servername)
2400 ForeignServer *foreign_server = NULL;
2401 UserMapping *user_mapping;
2403 StringInfo buf = makeStringInfo();
2404 ForeignDataWrapper *fdw;
2405 AclResult aclresult;
2407 /* first gather the server connstr options */
2408 if (strlen(servername) < NAMEDATALEN)
2409 foreign_server = GetForeignServerByName(servername, true);
2413 Oid serverid = foreign_server->serverid;
2414 Oid fdwid = foreign_server->fdwid;
2415 Oid userid = GetUserId();
2417 user_mapping = GetUserMapping(userid, serverid);
2418 fdw = GetForeignDataWrapper(fdwid);
2420 /* Check permissions, user must have usage on the server. */
2421 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2422 if (aclresult != ACLCHECK_OK)
2423 aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
2425 foreach(cell, fdw->options)
2427 DefElem *def = lfirst(cell);
2429 appendStringInfo(buf, "%s='%s' ", def->defname,
2430 escape_param_str(strVal(def->arg)));
2433 foreach(cell, foreign_server->options)
2435 DefElem *def = lfirst(cell);
2437 appendStringInfo(buf, "%s='%s' ", def->defname,
2438 escape_param_str(strVal(def->arg)));
2441 foreach(cell, user_mapping->options)
2444 DefElem *def = lfirst(cell);
2446 appendStringInfo(buf, "%s='%s' ", def->defname,
2447 escape_param_str(strVal(def->arg)));
2457 * Escaping libpq connect parameter strings.
2459 * Replaces "'" with "\'" and "\" with "\\".
2462 escape_param_str(const char *str)
2465 StringInfo buf = makeStringInfo();
2467 for (cp = str; *cp; cp++)
2469 if (*cp == '\\' || *cp == '\'')
2470 appendStringInfoChar(buf, '\\');
2471 appendStringInfoChar(buf, *cp);
2478 get_nondropped_natts(Oid relid)
2480 int nondropped_natts = 0;
2486 rel = relation_open(relid, AccessShareLock);
2487 tupdesc = rel->rd_att;
2488 natts = tupdesc->natts;
2490 for (i = 0; i < natts; i++)
2492 if (tupdesc->attrs[i]->attisdropped)
2497 relation_close(rel, AccessShareLock);
2498 return nondropped_natts;