]> granicus.if.org Git - postgresql/blob - contrib/dblink/dblink.c
Replace heapam.h includes with {table, relation}.h where applicable.
[postgresql] / contrib / dblink / dblink.c
1 /*
2  * dblink.c
3  *
4  * Functions returning results from a remote database
5  *
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Darko Prenosil <Darko.Prenosil@finteh.hr>
9  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10  *
11  * contrib/dblink/dblink.c
12  * Copyright (c) 2001-2019, PostgreSQL Global Development Group
13  * ALL RIGHTS RESERVED;
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34
35 #include <limits.h>
36
37 #include "libpq-fe.h"
38
39 #include "access/htup_details.h"
40 #include "access/relation.h"
41 #include "access/reloptions.h"
42 #include "access/table.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_foreign_data_wrapper.h"
46 #include "catalog/pg_foreign_server.h"
47 #include "catalog/pg_type.h"
48 #include "catalog/pg_user_mapping.h"
49 #include "executor/spi.h"
50 #include "foreign/foreign.h"
51 #include "funcapi.h"
52 #include "lib/stringinfo.h"
53 #include "mb/pg_wchar.h"
54 #include "miscadmin.h"
55 #include "parser/scansup.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/guc.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/rel.h"
63 #include "utils/tqual.h"
64 #include "utils/varlena.h"
65
66 PG_MODULE_MAGIC;
67
68 typedef struct remoteConn
69 {
70         PGconn     *conn;                       /* Hold the remote connection */
71         int                     openCursorCount;        /* The number of open cursors */
72         bool            newXactForCursor;       /* Opened a transaction for a cursor */
73 } remoteConn;
74
75 typedef struct storeInfo
76 {
77         FunctionCallInfo fcinfo;
78         Tuplestorestate *tuplestore;
79         AttInMetadata *attinmeta;
80         MemoryContext tmpcontext;
81         char      **cstrs;
82         /* temp storage for results to avoid leaks on exception */
83         PGresult   *last_res;
84         PGresult   *cur_res;
85 } storeInfo;
86
87 /*
88  * Internal declarations
89  */
90 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
91 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
92 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
93                                   PGresult *res);
94 static void materializeQueryResult(FunctionCallInfo fcinfo,
95                                            PGconn *conn,
96                                            const char *conname,
97                                            const char *sql,
98                                            bool fail);
99 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
100 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
101 static remoteConn *getConnectionByName(const char *name);
102 static HTAB *createConnHash(void);
103 static void createNewConnection(const char *name, remoteConn *rconn);
104 static void deleteConnection(const char *name);
105 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
106 static char **get_text_array_contents(ArrayType *array, int *numitems);
107 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
108 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
109 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
110 static char *quote_ident_cstr(char *rawstr);
111 static int      get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
112 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
113 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
114 static char *generate_relation_name(Relation rel);
115 static void dblink_connstr_check(const char *connstr);
116 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
117 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
118                                  bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
119 static char *get_connect_string(const char *servername);
120 static char *escape_param_str(const char *from);
121 static void validate_pkattnums(Relation rel,
122                                    int2vector *pkattnums_arg, int32 pknumatts_arg,
123                                    int **pkattnums, int *pknumatts);
124 static bool is_valid_dblink_option(const PQconninfoOption *options,
125                                            const char *option, Oid context);
126 static int      applyRemoteGucs(PGconn *conn);
127 static void restoreLocalGucs(int nestlevel);
128
129 /* Global */
130 static remoteConn *pconn = NULL;
131 static HTAB *remoteConnHash = NULL;
132
133 /*
134  *      Following is list that holds multiple remote connections.
135  *      Calling convention of each dblink function changes to accept
136  *      connection name as the first parameter. The connection list is
137  *      much like ecpg e.g. a mapping between a name and a PGconn object.
138  */
139
140 typedef struct remoteConnHashEnt
141 {
142         char            name[NAMEDATALEN];
143         remoteConn *rconn;
144 } remoteConnHashEnt;
145
146 /* initial number of connection hashes */
147 #define NUMCONN 16
148
149 static char *
150 xpstrdup(const char *in)
151 {
152         if (in == NULL)
153                 return NULL;
154         return pstrdup(in);
155 }
156
157 static void
158 pg_attribute_noreturn()
159 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
160 {
161         char       *msg = pchomp(PQerrorMessage(conn));
162
163         if (res)
164                 PQclear(res);
165         elog(ERROR, "%s: %s", p2, msg);
166 }
167
168 static void
169 pg_attribute_noreturn()
170 dblink_conn_not_avail(const char *conname)
171 {
172         if (conname)
173                 ereport(ERROR,
174                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
175                                  errmsg("connection \"%s\" not available", conname)));
176         else
177                 ereport(ERROR,
178                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
179                                  errmsg("connection not available")));
180 }
181
182 static void
183 dblink_get_conn(char *conname_or_str,
184                                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
185 {
186         remoteConn *rconn = getConnectionByName(conname_or_str);
187         PGconn     *conn;
188         char       *conname;
189         bool            freeconn;
190
191         if (rconn)
192         {
193                 conn = rconn->conn;
194                 conname = conname_or_str;
195                 freeconn = false;
196         }
197         else
198         {
199                 const char *connstr;
200
201                 connstr = get_connect_string(conname_or_str);
202                 if (connstr == NULL)
203                         connstr = conname_or_str;
204                 dblink_connstr_check(connstr);
205                 conn = PQconnectdb(connstr);
206                 if (PQstatus(conn) == CONNECTION_BAD)
207                 {
208                         char       *msg = pchomp(PQerrorMessage(conn));
209
210                         PQfinish(conn);
211                         ereport(ERROR,
212                                         (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
213                                          errmsg("could not establish connection"),
214                                          errdetail_internal("%s", msg)));
215                 }
216                 dblink_security_check(conn, rconn);
217                 if (PQclientEncoding(conn) != GetDatabaseEncoding())
218                         PQsetClientEncoding(conn, GetDatabaseEncodingName());
219                 freeconn = true;
220                 conname = NULL;
221         }
222
223         *conn_p = conn;
224         *conname_p = conname;
225         *freeconn_p = freeconn;
226 }
227
228 static PGconn *
229 dblink_get_named_conn(const char *conname)
230 {
231         remoteConn *rconn = getConnectionByName(conname);
232
233         if (rconn)
234                 return rconn->conn;
235
236         dblink_conn_not_avail(conname);
237         return NULL;                            /* keep compiler quiet */
238 }
239
240 static void
241 dblink_init(void)
242 {
243         if (!pconn)
244         {
245                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
246                 pconn->conn = NULL;
247                 pconn->openCursorCount = 0;
248                 pconn->newXactForCursor = false;
249         }
250 }
251
252 /*
253  * Create a persistent connection to another database
254  */
255 PG_FUNCTION_INFO_V1(dblink_connect);
256 Datum
257 dblink_connect(PG_FUNCTION_ARGS)
258 {
259         char       *conname_or_str = NULL;
260         char       *connstr = NULL;
261         char       *connname = NULL;
262         char       *msg;
263         PGconn     *conn = NULL;
264         remoteConn *rconn = NULL;
265
266         dblink_init();
267
268         if (PG_NARGS() == 2)
269         {
270                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
271                 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
272         }
273         else if (PG_NARGS() == 1)
274                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
275
276         if (connname)
277                 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
278                                                                                                   sizeof(remoteConn));
279
280         /* first check for valid foreign data server */
281         connstr = get_connect_string(conname_or_str);
282         if (connstr == NULL)
283                 connstr = conname_or_str;
284
285         /* check password in connection string if not superuser */
286         dblink_connstr_check(connstr);
287         conn = PQconnectdb(connstr);
288
289         if (PQstatus(conn) == CONNECTION_BAD)
290         {
291                 msg = pchomp(PQerrorMessage(conn));
292                 PQfinish(conn);
293                 if (rconn)
294                         pfree(rconn);
295
296                 ereport(ERROR,
297                                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
298                                  errmsg("could not establish connection"),
299                                  errdetail_internal("%s", msg)));
300         }
301
302         /* check password actually used if not superuser */
303         dblink_security_check(conn, rconn);
304
305         /* attempt to set client encoding to match server encoding, if needed */
306         if (PQclientEncoding(conn) != GetDatabaseEncoding())
307                 PQsetClientEncoding(conn, GetDatabaseEncodingName());
308
309         if (connname)
310         {
311                 rconn->conn = conn;
312                 createNewConnection(connname, rconn);
313         }
314         else
315         {
316                 if (pconn->conn)
317                         PQfinish(pconn->conn);
318                 pconn->conn = conn;
319         }
320
321         PG_RETURN_TEXT_P(cstring_to_text("OK"));
322 }
323
324 /*
325  * Clear a persistent connection to another database
326  */
327 PG_FUNCTION_INFO_V1(dblink_disconnect);
328 Datum
329 dblink_disconnect(PG_FUNCTION_ARGS)
330 {
331         char       *conname = NULL;
332         remoteConn *rconn = NULL;
333         PGconn     *conn = NULL;
334
335         dblink_init();
336
337         if (PG_NARGS() == 1)
338         {
339                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
340                 rconn = getConnectionByName(conname);
341                 if (rconn)
342                         conn = rconn->conn;
343         }
344         else
345                 conn = pconn->conn;
346
347         if (!conn)
348                 dblink_conn_not_avail(conname);
349
350         PQfinish(conn);
351         if (rconn)
352         {
353                 deleteConnection(conname);
354                 pfree(rconn);
355         }
356         else
357                 pconn->conn = NULL;
358
359         PG_RETURN_TEXT_P(cstring_to_text("OK"));
360 }
361
362 /*
363  * opens a cursor using a persistent connection
364  */
365 PG_FUNCTION_INFO_V1(dblink_open);
366 Datum
367 dblink_open(PG_FUNCTION_ARGS)
368 {
369         PGresult   *res = NULL;
370         PGconn     *conn;
371         char       *curname = NULL;
372         char       *sql = NULL;
373         char       *conname = NULL;
374         StringInfoData buf;
375         remoteConn *rconn = NULL;
376         bool            fail = true;    /* default to backward compatible behavior */
377
378         dblink_init();
379         initStringInfo(&buf);
380
381         if (PG_NARGS() == 2)
382         {
383                 /* text,text */
384                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
385                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
386                 rconn = pconn;
387         }
388         else if (PG_NARGS() == 3)
389         {
390                 /* might be text,text,text or text,text,bool */
391                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
392                 {
393                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
394                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
395                         fail = PG_GETARG_BOOL(2);
396                         rconn = pconn;
397                 }
398                 else
399                 {
400                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
401                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
402                         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
403                         rconn = getConnectionByName(conname);
404                 }
405         }
406         else if (PG_NARGS() == 4)
407         {
408                 /* text,text,text,bool */
409                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
410                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
411                 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
412                 fail = PG_GETARG_BOOL(3);
413                 rconn = getConnectionByName(conname);
414         }
415
416         if (!rconn || !rconn->conn)
417                 dblink_conn_not_avail(conname);
418
419         conn = rconn->conn;
420
421         /* If we are not in a transaction, start one */
422         if (PQtransactionStatus(conn) == PQTRANS_IDLE)
423         {
424                 res = PQexec(conn, "BEGIN");
425                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
426                         dblink_res_internalerror(conn, res, "begin error");
427                 PQclear(res);
428                 rconn->newXactForCursor = true;
429
430                 /*
431                  * Since transaction state was IDLE, we force cursor count to
432                  * initially be 0. This is needed as a previous ABORT might have wiped
433                  * out our transaction without maintaining the cursor count for us.
434                  */
435                 rconn->openCursorCount = 0;
436         }
437
438         /* if we started a transaction, increment cursor count */
439         if (rconn->newXactForCursor)
440                 (rconn->openCursorCount)++;
441
442         appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
443         res = PQexec(conn, buf.data);
444         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
445         {
446                 dblink_res_error(conn, conname, res, fail,
447                                                  "while opening cursor \"%s\"", curname);
448                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
449         }
450
451         PQclear(res);
452         PG_RETURN_TEXT_P(cstring_to_text("OK"));
453 }
454
455 /*
456  * closes a cursor
457  */
458 PG_FUNCTION_INFO_V1(dblink_close);
459 Datum
460 dblink_close(PG_FUNCTION_ARGS)
461 {
462         PGconn     *conn;
463         PGresult   *res = NULL;
464         char       *curname = NULL;
465         char       *conname = NULL;
466         StringInfoData buf;
467         remoteConn *rconn = NULL;
468         bool            fail = true;    /* default to backward compatible behavior */
469
470         dblink_init();
471         initStringInfo(&buf);
472
473         if (PG_NARGS() == 1)
474         {
475                 /* text */
476                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
477                 rconn = pconn;
478         }
479         else if (PG_NARGS() == 2)
480         {
481                 /* might be text,text or text,bool */
482                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
483                 {
484                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
485                         fail = PG_GETARG_BOOL(1);
486                         rconn = pconn;
487                 }
488                 else
489                 {
490                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
491                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
492                         rconn = getConnectionByName(conname);
493                 }
494         }
495         if (PG_NARGS() == 3)
496         {
497                 /* text,text,bool */
498                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
499                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
500                 fail = PG_GETARG_BOOL(2);
501                 rconn = getConnectionByName(conname);
502         }
503
504         if (!rconn || !rconn->conn)
505                 dblink_conn_not_avail(conname);
506
507         conn = rconn->conn;
508
509         appendStringInfo(&buf, "CLOSE %s", curname);
510
511         /* close the cursor */
512         res = PQexec(conn, buf.data);
513         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
514         {
515                 dblink_res_error(conn, conname, res, fail,
516                                                  "while closing cursor \"%s\"", curname);
517                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
518         }
519
520         PQclear(res);
521
522         /* if we started a transaction, decrement cursor count */
523         if (rconn->newXactForCursor)
524         {
525                 (rconn->openCursorCount)--;
526
527                 /* if count is zero, commit the transaction */
528                 if (rconn->openCursorCount == 0)
529                 {
530                         rconn->newXactForCursor = false;
531
532                         res = PQexec(conn, "COMMIT");
533                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
534                                 dblink_res_internalerror(conn, res, "commit error");
535                         PQclear(res);
536                 }
537         }
538
539         PG_RETURN_TEXT_P(cstring_to_text("OK"));
540 }
541
542 /*
543  * Fetch results from an open cursor
544  */
545 PG_FUNCTION_INFO_V1(dblink_fetch);
546 Datum
547 dblink_fetch(PG_FUNCTION_ARGS)
548 {
549         PGresult   *res = NULL;
550         char       *conname = NULL;
551         remoteConn *rconn = NULL;
552         PGconn     *conn = NULL;
553         StringInfoData buf;
554         char       *curname = NULL;
555         int                     howmany = 0;
556         bool            fail = true;    /* default to backward compatible */
557
558         prepTuplestoreResult(fcinfo);
559
560         dblink_init();
561
562         if (PG_NARGS() == 4)
563         {
564                 /* text,text,int,bool */
565                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
566                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
567                 howmany = PG_GETARG_INT32(2);
568                 fail = PG_GETARG_BOOL(3);
569
570                 rconn = getConnectionByName(conname);
571                 if (rconn)
572                         conn = rconn->conn;
573         }
574         else if (PG_NARGS() == 3)
575         {
576                 /* text,text,int or text,int,bool */
577                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
578                 {
579                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
580                         howmany = PG_GETARG_INT32(1);
581                         fail = PG_GETARG_BOOL(2);
582                         conn = pconn->conn;
583                 }
584                 else
585                 {
586                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
587                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
588                         howmany = PG_GETARG_INT32(2);
589
590                         rconn = getConnectionByName(conname);
591                         if (rconn)
592                                 conn = rconn->conn;
593                 }
594         }
595         else if (PG_NARGS() == 2)
596         {
597                 /* text,int */
598                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
599                 howmany = PG_GETARG_INT32(1);
600                 conn = pconn->conn;
601         }
602
603         if (!conn)
604                 dblink_conn_not_avail(conname);
605
606         initStringInfo(&buf);
607         appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
608
609         /*
610          * Try to execute the query.  Note that since libpq uses malloc, the
611          * PGresult will be long-lived even though we are still in a short-lived
612          * memory context.
613          */
614         res = PQexec(conn, buf.data);
615         if (!res ||
616                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
617                  PQresultStatus(res) != PGRES_TUPLES_OK))
618         {
619                 dblink_res_error(conn, conname, res, fail,
620                                                  "while fetching from cursor \"%s\"", curname);
621                 return (Datum) 0;
622         }
623         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
624         {
625                 /* cursor does not exist - closed already or bad name */
626                 PQclear(res);
627                 ereport(ERROR,
628                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
629                                  errmsg("cursor \"%s\" does not exist", curname)));
630         }
631
632         materializeResult(fcinfo, conn, res);
633         return (Datum) 0;
634 }
635
636 /*
637  * Note: this is the new preferred version of dblink
638  */
639 PG_FUNCTION_INFO_V1(dblink_record);
640 Datum
641 dblink_record(PG_FUNCTION_ARGS)
642 {
643         return dblink_record_internal(fcinfo, false);
644 }
645
646 PG_FUNCTION_INFO_V1(dblink_send_query);
647 Datum
648 dblink_send_query(PG_FUNCTION_ARGS)
649 {
650         PGconn     *conn;
651         char       *sql;
652         int                     retval;
653
654         if (PG_NARGS() == 2)
655         {
656                 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
657                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
658         }
659         else
660                 /* shouldn't happen */
661                 elog(ERROR, "wrong number of arguments");
662
663         /* async query send */
664         retval = PQsendQuery(conn, sql);
665         if (retval != 1)
666                 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
667
668         PG_RETURN_INT32(retval);
669 }
670
671 PG_FUNCTION_INFO_V1(dblink_get_result);
672 Datum
673 dblink_get_result(PG_FUNCTION_ARGS)
674 {
675         return dblink_record_internal(fcinfo, true);
676 }
677
678 static Datum
679 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
680 {
681         PGconn     *volatile conn = NULL;
682         volatile bool freeconn = false;
683
684         prepTuplestoreResult(fcinfo);
685
686         dblink_init();
687
688         PG_TRY();
689         {
690                 char       *sql = NULL;
691                 char       *conname = NULL;
692                 bool            fail = true;    /* default to backward compatible */
693
694                 if (!is_async)
695                 {
696                         if (PG_NARGS() == 3)
697                         {
698                                 /* text,text,bool */
699                                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
700                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
701                                 fail = PG_GETARG_BOOL(2);
702                                 dblink_get_conn(conname, &conn, &conname, &freeconn);
703                         }
704                         else if (PG_NARGS() == 2)
705                         {
706                                 /* text,text or text,bool */
707                                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
708                                 {
709                                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
710                                         fail = PG_GETARG_BOOL(1);
711                                         conn = pconn->conn;
712                                 }
713                                 else
714                                 {
715                                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
716                                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
717                                         dblink_get_conn(conname, &conn, &conname, &freeconn);
718                                 }
719                         }
720                         else if (PG_NARGS() == 1)
721                         {
722                                 /* text */
723                                 conn = pconn->conn;
724                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
725                         }
726                         else
727                                 /* shouldn't happen */
728                                 elog(ERROR, "wrong number of arguments");
729                 }
730                 else                                    /* is_async */
731                 {
732                         /* get async result */
733                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
734
735                         if (PG_NARGS() == 2)
736                         {
737                                 /* text,bool */
738                                 fail = PG_GETARG_BOOL(1);
739                                 conn = dblink_get_named_conn(conname);
740                         }
741                         else if (PG_NARGS() == 1)
742                         {
743                                 /* text */
744                                 conn = dblink_get_named_conn(conname);
745                         }
746                         else
747                                 /* shouldn't happen */
748                                 elog(ERROR, "wrong number of arguments");
749                 }
750
751                 if (!conn)
752                         dblink_conn_not_avail(conname);
753
754                 if (!is_async)
755                 {
756                         /* synchronous query, use efficient tuple collection method */
757                         materializeQueryResult(fcinfo, conn, conname, sql, fail);
758                 }
759                 else
760                 {
761                         /* async result retrieval, do it the old way */
762                         PGresult   *res = PQgetResult(conn);
763
764                         /* NULL means we're all done with the async results */
765                         if (res)
766                         {
767                                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
768                                         PQresultStatus(res) != PGRES_TUPLES_OK)
769                                 {
770                                         dblink_res_error(conn, conname, res, fail,
771                                                                          "while executing query");
772                                         /* if fail isn't set, we'll return an empty query result */
773                                 }
774                                 else
775                                 {
776                                         materializeResult(fcinfo, conn, res);
777                                 }
778                         }
779                 }
780         }
781         PG_CATCH();
782         {
783                 /* if needed, close the connection to the database */
784                 if (freeconn)
785                         PQfinish(conn);
786                 PG_RE_THROW();
787         }
788         PG_END_TRY();
789
790         /* if needed, close the connection to the database */
791         if (freeconn)
792                 PQfinish(conn);
793
794         return (Datum) 0;
795 }
796
797 /*
798  * Verify function caller can handle a tuplestore result, and set up for that.
799  *
800  * Note: if the caller returns without actually creating a tuplestore, the
801  * executor will treat the function result as an empty set.
802  */
803 static void
804 prepTuplestoreResult(FunctionCallInfo fcinfo)
805 {
806         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
807
808         /* check to see if query supports us returning a tuplestore */
809         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
810                 ereport(ERROR,
811                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
812                                  errmsg("set-valued function called in context that cannot accept a set")));
813         if (!(rsinfo->allowedModes & SFRM_Materialize))
814                 ereport(ERROR,
815                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816                                  errmsg("materialize mode required, but it is not allowed in this context")));
817
818         /* let the executor know we're sending back a tuplestore */
819         rsinfo->returnMode = SFRM_Materialize;
820
821         /* caller must fill these to return a non-empty result */
822         rsinfo->setResult = NULL;
823         rsinfo->setDesc = NULL;
824 }
825
826 /*
827  * Copy the contents of the PGresult into a tuplestore to be returned
828  * as the result of the current function.
829  * The PGresult will be released in this function.
830  */
831 static void
832 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
833 {
834         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
835
836         /* prepTuplestoreResult must have been called previously */
837         Assert(rsinfo->returnMode == SFRM_Materialize);
838
839         PG_TRY();
840         {
841                 TupleDesc       tupdesc;
842                 bool            is_sql_cmd;
843                 int                     ntuples;
844                 int                     nfields;
845
846                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
847                 {
848                         is_sql_cmd = true;
849
850                         /*
851                          * need a tuple descriptor representing one TEXT column to return
852                          * the command status string as our result tuple
853                          */
854                         tupdesc = CreateTemplateTupleDesc(1);
855                         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
856                                                            TEXTOID, -1, 0);
857                         ntuples = 1;
858                         nfields = 1;
859                 }
860                 else
861                 {
862                         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
863
864                         is_sql_cmd = false;
865
866                         /* get a tuple descriptor for our result type */
867                         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
868                         {
869                                 case TYPEFUNC_COMPOSITE:
870                                         /* success */
871                                         break;
872                                 case TYPEFUNC_RECORD:
873                                         /* failed to determine actual type of RECORD */
874                                         ereport(ERROR,
875                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
876                                                          errmsg("function returning record called in context "
877                                                                         "that cannot accept type record")));
878                                         break;
879                                 default:
880                                         /* result type isn't composite */
881                                         elog(ERROR, "return type must be a row type");
882                                         break;
883                         }
884
885                         /* make sure we have a persistent copy of the tupdesc */
886                         tupdesc = CreateTupleDescCopy(tupdesc);
887                         ntuples = PQntuples(res);
888                         nfields = PQnfields(res);
889                 }
890
891                 /*
892                  * check result and tuple descriptor have the same number of columns
893                  */
894                 if (nfields != tupdesc->natts)
895                         ereport(ERROR,
896                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
897                                          errmsg("remote query result rowtype does not match "
898                                                         "the specified FROM clause rowtype")));
899
900                 if (ntuples > 0)
901                 {
902                         AttInMetadata *attinmeta;
903                         int                     nestlevel = -1;
904                         Tuplestorestate *tupstore;
905                         MemoryContext oldcontext;
906                         int                     row;
907                         char      **values;
908
909                         attinmeta = TupleDescGetAttInMetadata(tupdesc);
910
911                         /* Set GUCs to ensure we read GUC-sensitive data types correctly */
912                         if (!is_sql_cmd)
913                                 nestlevel = applyRemoteGucs(conn);
914
915                         oldcontext = MemoryContextSwitchTo(
916                                                                                            rsinfo->econtext->ecxt_per_query_memory);
917                         tupstore = tuplestore_begin_heap(true, false, work_mem);
918                         rsinfo->setResult = tupstore;
919                         rsinfo->setDesc = tupdesc;
920                         MemoryContextSwitchTo(oldcontext);
921
922                         values = (char **) palloc(nfields * sizeof(char *));
923
924                         /* put all tuples into the tuplestore */
925                         for (row = 0; row < ntuples; row++)
926                         {
927                                 HeapTuple       tuple;
928
929                                 if (!is_sql_cmd)
930                                 {
931                                         int                     i;
932
933                                         for (i = 0; i < nfields; i++)
934                                         {
935                                                 if (PQgetisnull(res, row, i))
936                                                         values[i] = NULL;
937                                                 else
938                                                         values[i] = PQgetvalue(res, row, i);
939                                         }
940                                 }
941                                 else
942                                 {
943                                         values[0] = PQcmdStatus(res);
944                                 }
945
946                                 /* build the tuple and put it into the tuplestore. */
947                                 tuple = BuildTupleFromCStrings(attinmeta, values);
948                                 tuplestore_puttuple(tupstore, tuple);
949                         }
950
951                         /* clean up GUC settings, if we changed any */
952                         restoreLocalGucs(nestlevel);
953
954                         /* clean up and return the tuplestore */
955                         tuplestore_donestoring(tupstore);
956                 }
957
958                 PQclear(res);
959         }
960         PG_CATCH();
961         {
962                 /* be sure to release the libpq result */
963                 PQclear(res);
964                 PG_RE_THROW();
965         }
966         PG_END_TRY();
967 }
968
969 /*
970  * Execute the given SQL command and store its results into a tuplestore
971  * to be returned as the result of the current function.
972  *
973  * This is equivalent to PQexec followed by materializeResult, but we make
974  * use of libpq's single-row mode to avoid accumulating the whole result
975  * inside libpq before it gets transferred to the tuplestore.
976  */
977 static void
978 materializeQueryResult(FunctionCallInfo fcinfo,
979                                            PGconn *conn,
980                                            const char *conname,
981                                            const char *sql,
982                                            bool fail)
983 {
984         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
985         PGresult   *volatile res = NULL;
986         volatile storeInfo sinfo;
987
988         /* prepTuplestoreResult must have been called previously */
989         Assert(rsinfo->returnMode == SFRM_Materialize);
990
991         /* initialize storeInfo to empty */
992         memset((void *) &sinfo, 0, sizeof(sinfo));
993         sinfo.fcinfo = fcinfo;
994
995         PG_TRY();
996         {
997                 /* Create short-lived memory context for data conversions */
998                 sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
999                                                                                                  "dblink temporary context",
1000                                                                                                  ALLOCSET_DEFAULT_SIZES);
1001
1002                 /* execute query, collecting any tuples into the tuplestore */
1003                 res = storeQueryResult(&sinfo, conn, sql);
1004
1005                 if (!res ||
1006                         (PQresultStatus(res) != PGRES_COMMAND_OK &&
1007                          PQresultStatus(res) != PGRES_TUPLES_OK))
1008                 {
1009                         /*
1010                          * dblink_res_error will clear the passed PGresult, so we need
1011                          * this ugly dance to avoid doing so twice during error exit
1012                          */
1013                         PGresult   *res1 = res;
1014
1015                         res = NULL;
1016                         dblink_res_error(conn, conname, res1, fail,
1017                                                          "while executing query");
1018                         /* if fail isn't set, we'll return an empty query result */
1019                 }
1020                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1021                 {
1022                         /*
1023                          * storeRow didn't get called, so we need to convert the command
1024                          * status string to a tuple manually
1025                          */
1026                         TupleDesc       tupdesc;
1027                         AttInMetadata *attinmeta;
1028                         Tuplestorestate *tupstore;
1029                         HeapTuple       tuple;
1030                         char       *values[1];
1031                         MemoryContext oldcontext;
1032
1033                         /*
1034                          * need a tuple descriptor representing one TEXT column to return
1035                          * the command status string as our result tuple
1036                          */
1037                         tupdesc = CreateTemplateTupleDesc(1);
1038                         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1039                                                            TEXTOID, -1, 0);
1040                         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1041
1042                         oldcontext = MemoryContextSwitchTo(
1043                                                                                            rsinfo->econtext->ecxt_per_query_memory);
1044                         tupstore = tuplestore_begin_heap(true, false, work_mem);
1045                         rsinfo->setResult = tupstore;
1046                         rsinfo->setDesc = tupdesc;
1047                         MemoryContextSwitchTo(oldcontext);
1048
1049                         values[0] = PQcmdStatus(res);
1050
1051                         /* build the tuple and put it into the tuplestore. */
1052                         tuple = BuildTupleFromCStrings(attinmeta, values);
1053                         tuplestore_puttuple(tupstore, tuple);
1054
1055                         PQclear(res);
1056                         res = NULL;
1057                 }
1058                 else
1059                 {
1060                         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1061                         /* storeRow should have created a tuplestore */
1062                         Assert(rsinfo->setResult != NULL);
1063
1064                         PQclear(res);
1065                         res = NULL;
1066                 }
1067
1068                 /* clean up data conversion short-lived memory context */
1069                 if (sinfo.tmpcontext != NULL)
1070                         MemoryContextDelete(sinfo.tmpcontext);
1071                 sinfo.tmpcontext = NULL;
1072
1073                 PQclear(sinfo.last_res);
1074                 sinfo.last_res = NULL;
1075                 PQclear(sinfo.cur_res);
1076                 sinfo.cur_res = NULL;
1077         }
1078         PG_CATCH();
1079         {
1080                 /* be sure to release any libpq result we collected */
1081                 PQclear(res);
1082                 PQclear(sinfo.last_res);
1083                 PQclear(sinfo.cur_res);
1084                 /* and clear out any pending data in libpq */
1085                 while ((res = PQgetResult(conn)) != NULL)
1086                         PQclear(res);
1087                 PG_RE_THROW();
1088         }
1089         PG_END_TRY();
1090 }
1091
1092 /*
1093  * Execute query, and send any result rows to sinfo->tuplestore.
1094  */
1095 static PGresult *
1096 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1097 {
1098         bool            first = true;
1099         int                     nestlevel = -1;
1100         PGresult   *res;
1101
1102         if (!PQsendQuery(conn, sql))
1103                 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1104
1105         if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
1106                 elog(ERROR, "failed to set single-row mode for dblink query");
1107
1108         for (;;)
1109         {
1110                 CHECK_FOR_INTERRUPTS();
1111
1112                 sinfo->cur_res = PQgetResult(conn);
1113                 if (!sinfo->cur_res)
1114                         break;
1115
1116                 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1117                 {
1118                         /* got one row from possibly-bigger resultset */
1119
1120                         /*
1121                          * Set GUCs to ensure we read GUC-sensitive data types correctly.
1122                          * We shouldn't do this until we have a row in hand, to ensure
1123                          * libpq has seen any earlier ParameterStatus protocol messages.
1124                          */
1125                         if (first && nestlevel < 0)
1126                                 nestlevel = applyRemoteGucs(conn);
1127
1128                         storeRow(sinfo, sinfo->cur_res, first);
1129
1130                         PQclear(sinfo->cur_res);
1131                         sinfo->cur_res = NULL;
1132                         first = false;
1133                 }
1134                 else
1135                 {
1136                         /* if empty resultset, fill tuplestore header */
1137                         if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1138                                 storeRow(sinfo, sinfo->cur_res, first);
1139
1140                         /* store completed result at last_res */
1141                         PQclear(sinfo->last_res);
1142                         sinfo->last_res = sinfo->cur_res;
1143                         sinfo->cur_res = NULL;
1144                         first = true;
1145                 }
1146         }
1147
1148         /* clean up GUC settings, if we changed any */
1149         restoreLocalGucs(nestlevel);
1150
1151         /* return last_res */
1152         res = sinfo->last_res;
1153         sinfo->last_res = NULL;
1154         return res;
1155 }
1156
1157 /*
1158  * Send single row to sinfo->tuplestore.
1159  *
1160  * If "first" is true, create the tuplestore using PGresult's metadata
1161  * (in this case the PGresult might contain either zero or one row).
1162  */
1163 static void
1164 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1165 {
1166         int                     nfields = PQnfields(res);
1167         HeapTuple       tuple;
1168         int                     i;
1169         MemoryContext oldcontext;
1170
1171         if (first)
1172         {
1173                 /* Prepare for new result set */
1174                 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1175                 TupleDesc       tupdesc;
1176
1177                 /*
1178                  * It's possible to get more than one result set if the query string
1179                  * contained multiple SQL commands.  In that case, we follow PQexec's
1180                  * traditional behavior of throwing away all but the last result.
1181                  */
1182                 if (sinfo->tuplestore)
1183                         tuplestore_end(sinfo->tuplestore);
1184                 sinfo->tuplestore = NULL;
1185
1186                 /* get a tuple descriptor for our result type */
1187                 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1188                 {
1189                         case TYPEFUNC_COMPOSITE:
1190                                 /* success */
1191                                 break;
1192                         case TYPEFUNC_RECORD:
1193                                 /* failed to determine actual type of RECORD */
1194                                 ereport(ERROR,
1195                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1196                                                  errmsg("function returning record called in context "
1197                                                                 "that cannot accept type record")));
1198                                 break;
1199                         default:
1200                                 /* result type isn't composite */
1201                                 elog(ERROR, "return type must be a row type");
1202                                 break;
1203                 }
1204
1205                 /* make sure we have a persistent copy of the tupdesc */
1206                 tupdesc = CreateTupleDescCopy(tupdesc);
1207
1208                 /* check result and tuple descriptor have the same number of columns */
1209                 if (nfields != tupdesc->natts)
1210                         ereport(ERROR,
1211                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1212                                          errmsg("remote query result rowtype does not match "
1213                                                         "the specified FROM clause rowtype")));
1214
1215                 /* Prepare attinmeta for later data conversions */
1216                 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1217
1218                 /* Create a new, empty tuplestore */
1219                 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1220                 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1221                 rsinfo->setResult = sinfo->tuplestore;
1222                 rsinfo->setDesc = tupdesc;
1223                 MemoryContextSwitchTo(oldcontext);
1224
1225                 /* Done if empty resultset */
1226                 if (PQntuples(res) == 0)
1227                         return;
1228
1229                 /*
1230                  * Set up sufficiently-wide string pointers array; this won't change
1231                  * in size so it's easy to preallocate.
1232                  */
1233                 if (sinfo->cstrs)
1234                         pfree(sinfo->cstrs);
1235                 sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
1236         }
1237
1238         /* Should have a single-row result if we get here */
1239         Assert(PQntuples(res) == 1);
1240
1241         /*
1242          * Do the following work in a temp context that we reset after each tuple.
1243          * This cleans up not only the data we have direct access to, but any
1244          * cruft the I/O functions might leak.
1245          */
1246         oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1247
1248         /*
1249          * Fill cstrs with null-terminated strings of column values.
1250          */
1251         for (i = 0; i < nfields; i++)
1252         {
1253                 if (PQgetisnull(res, 0, i))
1254                         sinfo->cstrs[i] = NULL;
1255                 else
1256                         sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1257         }
1258
1259         /* Convert row to a tuple, and add it to the tuplestore */
1260         tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1261
1262         tuplestore_puttuple(sinfo->tuplestore, tuple);
1263
1264         /* Clean up */
1265         MemoryContextSwitchTo(oldcontext);
1266         MemoryContextReset(sinfo->tmpcontext);
1267 }
1268
1269 /*
1270  * List all open dblink connections by name.
1271  * Returns an array of all connection names.
1272  * Takes no params
1273  */
1274 PG_FUNCTION_INFO_V1(dblink_get_connections);
1275 Datum
1276 dblink_get_connections(PG_FUNCTION_ARGS)
1277 {
1278         HASH_SEQ_STATUS status;
1279         remoteConnHashEnt *hentry;
1280         ArrayBuildState *astate = NULL;
1281
1282         if (remoteConnHash)
1283         {
1284                 hash_seq_init(&status, remoteConnHash);
1285                 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1286                 {
1287                         /* stash away current value */
1288                         astate = accumArrayResult(astate,
1289                                                                           CStringGetTextDatum(hentry->name),
1290                                                                           false, TEXTOID, CurrentMemoryContext);
1291                 }
1292         }
1293
1294         if (astate)
1295                 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
1296                                                                                           CurrentMemoryContext));
1297         else
1298                 PG_RETURN_NULL();
1299 }
1300
1301 /*
1302  * Checks if a given remote connection is busy
1303  *
1304  * Returns 1 if the connection is busy, 0 otherwise
1305  * Params:
1306  *      text connection_name - name of the connection to check
1307  *
1308  */
1309 PG_FUNCTION_INFO_V1(dblink_is_busy);
1310 Datum
1311 dblink_is_busy(PG_FUNCTION_ARGS)
1312 {
1313         PGconn     *conn;
1314
1315         dblink_init();
1316         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1317
1318         PQconsumeInput(conn);
1319         PG_RETURN_INT32(PQisBusy(conn));
1320 }
1321
1322 /*
1323  * Cancels a running request on a connection
1324  *
1325  * Returns text:
1326  *      "OK" if the cancel request has been sent correctly,
1327  *              an error message otherwise
1328  *
1329  * Params:
1330  *      text connection_name - name of the connection to check
1331  *
1332  */
1333 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1334 Datum
1335 dblink_cancel_query(PG_FUNCTION_ARGS)
1336 {
1337         int                     res;
1338         PGconn     *conn;
1339         PGcancel   *cancel;
1340         char            errbuf[256];
1341
1342         dblink_init();
1343         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1344         cancel = PQgetCancel(conn);
1345
1346         res = PQcancel(cancel, errbuf, 256);
1347         PQfreeCancel(cancel);
1348
1349         if (res == 1)
1350                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1351         else
1352                 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1353 }
1354
1355
1356 /*
1357  * Get error message from a connection
1358  *
1359  * Returns text:
1360  *      "OK" if no error, an error message otherwise
1361  *
1362  * Params:
1363  *      text connection_name - name of the connection to check
1364  *
1365  */
1366 PG_FUNCTION_INFO_V1(dblink_error_message);
1367 Datum
1368 dblink_error_message(PG_FUNCTION_ARGS)
1369 {
1370         char       *msg;
1371         PGconn     *conn;
1372
1373         dblink_init();
1374         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1375
1376         msg = PQerrorMessage(conn);
1377         if (msg == NULL || msg[0] == '\0')
1378                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1379         else
1380                 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1381 }
1382
1383 /*
1384  * Execute an SQL non-SELECT command
1385  */
1386 PG_FUNCTION_INFO_V1(dblink_exec);
1387 Datum
1388 dblink_exec(PG_FUNCTION_ARGS)
1389 {
1390         text       *volatile sql_cmd_status = NULL;
1391         PGconn     *volatile conn = NULL;
1392         volatile bool freeconn = false;
1393
1394         dblink_init();
1395
1396         PG_TRY();
1397         {
1398                 PGresult   *res = NULL;
1399                 char       *sql = NULL;
1400                 char       *conname = NULL;
1401                 bool            fail = true;    /* default to backward compatible behavior */
1402
1403                 if (PG_NARGS() == 3)
1404                 {
1405                         /* must be text,text,bool */
1406                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1407                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1408                         fail = PG_GETARG_BOOL(2);
1409                         dblink_get_conn(conname, &conn, &conname, &freeconn);
1410                 }
1411                 else if (PG_NARGS() == 2)
1412                 {
1413                         /* might be text,text or text,bool */
1414                         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1415                         {
1416                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1417                                 fail = PG_GETARG_BOOL(1);
1418                                 conn = pconn->conn;
1419                         }
1420                         else
1421                         {
1422                                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1423                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1424                                 dblink_get_conn(conname, &conn, &conname, &freeconn);
1425                         }
1426                 }
1427                 else if (PG_NARGS() == 1)
1428                 {
1429                         /* must be single text argument */
1430                         conn = pconn->conn;
1431                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1432                 }
1433                 else
1434                         /* shouldn't happen */
1435                         elog(ERROR, "wrong number of arguments");
1436
1437                 if (!conn)
1438                         dblink_conn_not_avail(conname);
1439
1440                 res = PQexec(conn, sql);
1441                 if (!res ||
1442                         (PQresultStatus(res) != PGRES_COMMAND_OK &&
1443                          PQresultStatus(res) != PGRES_TUPLES_OK))
1444                 {
1445                         dblink_res_error(conn, conname, res, fail,
1446                                                          "while executing command");
1447
1448                         /*
1449                          * and save a copy of the command status string to return as our
1450                          * result tuple
1451                          */
1452                         sql_cmd_status = cstring_to_text("ERROR");
1453                 }
1454                 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1455                 {
1456                         /*
1457                          * and save a copy of the command status string to return as our
1458                          * result tuple
1459                          */
1460                         sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1461                         PQclear(res);
1462                 }
1463                 else
1464                 {
1465                         PQclear(res);
1466                         ereport(ERROR,
1467                                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1468                                          errmsg("statement returning results not allowed")));
1469                 }
1470         }
1471         PG_CATCH();
1472         {
1473                 /* if needed, close the connection to the database */
1474                 if (freeconn)
1475                         PQfinish(conn);
1476                 PG_RE_THROW();
1477         }
1478         PG_END_TRY();
1479
1480         /* if needed, close the connection to the database */
1481         if (freeconn)
1482                 PQfinish(conn);
1483
1484         PG_RETURN_TEXT_P(sql_cmd_status);
1485 }
1486
1487
1488 /*
1489  * dblink_get_pkey
1490  *
1491  * Return list of primary key fields for the supplied relation,
1492  * or NULL if none exists.
1493  */
1494 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1495 Datum
1496 dblink_get_pkey(PG_FUNCTION_ARGS)
1497 {
1498         int16           indnkeyatts;
1499         char      **results;
1500         FuncCallContext *funcctx;
1501         int32           call_cntr;
1502         int32           max_calls;
1503         AttInMetadata *attinmeta;
1504         MemoryContext oldcontext;
1505
1506         /* stuff done only on the first call of the function */
1507         if (SRF_IS_FIRSTCALL())
1508         {
1509                 Relation        rel;
1510                 TupleDesc       tupdesc;
1511
1512                 /* create a function context for cross-call persistence */
1513                 funcctx = SRF_FIRSTCALL_INIT();
1514
1515                 /*
1516                  * switch to memory context appropriate for multiple function calls
1517                  */
1518                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1519
1520                 /* open target relation */
1521                 rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1522
1523                 /* get the array of attnums */
1524                 results = get_pkey_attnames(rel, &indnkeyatts);
1525
1526                 relation_close(rel, AccessShareLock);
1527
1528                 /*
1529                  * need a tuple descriptor representing one INT and one TEXT column
1530                  */
1531                 tupdesc = CreateTemplateTupleDesc(2);
1532                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1533                                                    INT4OID, -1, 0);
1534                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1535                                                    TEXTOID, -1, 0);
1536
1537                 /*
1538                  * Generate attribute metadata needed later to produce tuples from raw
1539                  * C strings
1540                  */
1541                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1542                 funcctx->attinmeta = attinmeta;
1543
1544                 if ((results != NULL) && (indnkeyatts > 0))
1545                 {
1546                         funcctx->max_calls = indnkeyatts;
1547
1548                         /* got results, keep track of them */
1549                         funcctx->user_fctx = results;
1550                 }
1551                 else
1552                 {
1553                         /* fast track when no results */
1554                         MemoryContextSwitchTo(oldcontext);
1555                         SRF_RETURN_DONE(funcctx);
1556                 }
1557
1558                 MemoryContextSwitchTo(oldcontext);
1559         }
1560
1561         /* stuff done on every call of the function */
1562         funcctx = SRF_PERCALL_SETUP();
1563
1564         /*
1565          * initialize per-call variables
1566          */
1567         call_cntr = funcctx->call_cntr;
1568         max_calls = funcctx->max_calls;
1569
1570         results = (char **) funcctx->user_fctx;
1571         attinmeta = funcctx->attinmeta;
1572
1573         if (call_cntr < max_calls)      /* do when there is more left to send */
1574         {
1575                 char      **values;
1576                 HeapTuple       tuple;
1577                 Datum           result;
1578
1579                 values = (char **) palloc(2 * sizeof(char *));
1580                 values[0] = psprintf("%d", call_cntr + 1);
1581                 values[1] = results[call_cntr];
1582
1583                 /* build the tuple */
1584                 tuple = BuildTupleFromCStrings(attinmeta, values);
1585
1586                 /* make the tuple into a datum */
1587                 result = HeapTupleGetDatum(tuple);
1588
1589                 SRF_RETURN_NEXT(funcctx, result);
1590         }
1591         else
1592         {
1593                 /* do when there is no more left */
1594                 SRF_RETURN_DONE(funcctx);
1595         }
1596 }
1597
1598
1599 /*
1600  * dblink_build_sql_insert
1601  *
1602  * Used to generate an SQL insert statement
1603  * based on an existing tuple in a local relation.
1604  * This is useful for selectively replicating data
1605  * to another server via dblink.
1606  *
1607  * API:
1608  * <relname> - name of local table of interest
1609  * <pkattnums> - an int2vector of attnums which will be used
1610  * to identify the local tuple of interest
1611  * <pknumatts> - number of attnums in pkattnums
1612  * <src_pkattvals_arry> - text array of key values which will be used
1613  * to identify the local tuple of interest
1614  * <tgt_pkattvals_arry> - text array of key values which will be used
1615  * to build the string for execution remotely. These are substituted
1616  * for their counterparts in src_pkattvals_arry
1617  */
1618 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1619 Datum
1620 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1621 {
1622         text       *relname_text = PG_GETARG_TEXT_PP(0);
1623         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1624         int32           pknumatts_arg = PG_GETARG_INT32(2);
1625         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1626         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1627         Relation        rel;
1628         int                *pkattnums;
1629         int                     pknumatts;
1630         char      **src_pkattvals;
1631         char      **tgt_pkattvals;
1632         int                     src_nitems;
1633         int                     tgt_nitems;
1634         char       *sql;
1635
1636         /*
1637          * Open target relation.
1638          */
1639         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1640
1641         /*
1642          * Process pkattnums argument.
1643          */
1644         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1645                                            &pkattnums, &pknumatts);
1646
1647         /*
1648          * Source array is made up of key values that will be used to locate the
1649          * tuple of interest from the local system.
1650          */
1651         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1652
1653         /*
1654          * There should be one source array key value for each key attnum
1655          */
1656         if (src_nitems != pknumatts)
1657                 ereport(ERROR,
1658                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1659                                  errmsg("source key array length must match number of key " \
1660                                                 "attributes")));
1661
1662         /*
1663          * Target array is made up of key values that will be used to build the
1664          * SQL string for use on the remote system.
1665          */
1666         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1667
1668         /*
1669          * There should be one target array key value for each key attnum
1670          */
1671         if (tgt_nitems != pknumatts)
1672                 ereport(ERROR,
1673                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1674                                  errmsg("target key array length must match number of key " \
1675                                                 "attributes")));
1676
1677         /*
1678          * Prep work is finally done. Go get the SQL string.
1679          */
1680         sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1681
1682         /*
1683          * Now we can close the relation.
1684          */
1685         relation_close(rel, AccessShareLock);
1686
1687         /*
1688          * And send it
1689          */
1690         PG_RETURN_TEXT_P(cstring_to_text(sql));
1691 }
1692
1693
1694 /*
1695  * dblink_build_sql_delete
1696  *
1697  * Used to generate an SQL delete statement.
1698  * This is useful for selectively replicating a
1699  * delete to another server via dblink.
1700  *
1701  * API:
1702  * <relname> - name of remote table of interest
1703  * <pkattnums> - an int2vector of attnums which will be used
1704  * to identify the remote tuple of interest
1705  * <pknumatts> - number of attnums in pkattnums
1706  * <tgt_pkattvals_arry> - text array of key values which will be used
1707  * to build the string for execution remotely.
1708  */
1709 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1710 Datum
1711 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1712 {
1713         text       *relname_text = PG_GETARG_TEXT_PP(0);
1714         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1715         int32           pknumatts_arg = PG_GETARG_INT32(2);
1716         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1717         Relation        rel;
1718         int                *pkattnums;
1719         int                     pknumatts;
1720         char      **tgt_pkattvals;
1721         int                     tgt_nitems;
1722         char       *sql;
1723
1724         /*
1725          * Open target relation.
1726          */
1727         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1728
1729         /*
1730          * Process pkattnums argument.
1731          */
1732         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1733                                            &pkattnums, &pknumatts);
1734
1735         /*
1736          * Target array is made up of key values that will be used to build the
1737          * SQL string for use on the remote system.
1738          */
1739         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1740
1741         /*
1742          * There should be one target array key value for each key attnum
1743          */
1744         if (tgt_nitems != pknumatts)
1745                 ereport(ERROR,
1746                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1747                                  errmsg("target key array length must match number of key " \
1748                                                 "attributes")));
1749
1750         /*
1751          * Prep work is finally done. Go get the SQL string.
1752          */
1753         sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1754
1755         /*
1756          * Now we can close the relation.
1757          */
1758         relation_close(rel, AccessShareLock);
1759
1760         /*
1761          * And send it
1762          */
1763         PG_RETURN_TEXT_P(cstring_to_text(sql));
1764 }
1765
1766
1767 /*
1768  * dblink_build_sql_update
1769  *
1770  * Used to generate an SQL update statement
1771  * based on an existing tuple in a local relation.
1772  * This is useful for selectively replicating data
1773  * to another server via dblink.
1774  *
1775  * API:
1776  * <relname> - name of local table of interest
1777  * <pkattnums> - an int2vector of attnums which will be used
1778  * to identify the local tuple of interest
1779  * <pknumatts> - number of attnums in pkattnums
1780  * <src_pkattvals_arry> - text array of key values which will be used
1781  * to identify the local tuple of interest
1782  * <tgt_pkattvals_arry> - text array of key values which will be used
1783  * to build the string for execution remotely. These are substituted
1784  * for their counterparts in src_pkattvals_arry
1785  */
1786 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1787 Datum
1788 dblink_build_sql_update(PG_FUNCTION_ARGS)
1789 {
1790         text       *relname_text = PG_GETARG_TEXT_PP(0);
1791         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1792         int32           pknumatts_arg = PG_GETARG_INT32(2);
1793         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1794         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1795         Relation        rel;
1796         int                *pkattnums;
1797         int                     pknumatts;
1798         char      **src_pkattvals;
1799         char      **tgt_pkattvals;
1800         int                     src_nitems;
1801         int                     tgt_nitems;
1802         char       *sql;
1803
1804         /*
1805          * Open target relation.
1806          */
1807         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1808
1809         /*
1810          * Process pkattnums argument.
1811          */
1812         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1813                                            &pkattnums, &pknumatts);
1814
1815         /*
1816          * Source array is made up of key values that will be used to locate the
1817          * tuple of interest from the local system.
1818          */
1819         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1820
1821         /*
1822          * There should be one source array key value for each key attnum
1823          */
1824         if (src_nitems != pknumatts)
1825                 ereport(ERROR,
1826                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1827                                  errmsg("source key array length must match number of key " \
1828                                                 "attributes")));
1829
1830         /*
1831          * Target array is made up of key values that will be used to build the
1832          * SQL string for use on the remote system.
1833          */
1834         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1835
1836         /*
1837          * There should be one target array key value for each key attnum
1838          */
1839         if (tgt_nitems != pknumatts)
1840                 ereport(ERROR,
1841                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1842                                  errmsg("target key array length must match number of key " \
1843                                                 "attributes")));
1844
1845         /*
1846          * Prep work is finally done. Go get the SQL string.
1847          */
1848         sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1849
1850         /*
1851          * Now we can close the relation.
1852          */
1853         relation_close(rel, AccessShareLock);
1854
1855         /*
1856          * And send it
1857          */
1858         PG_RETURN_TEXT_P(cstring_to_text(sql));
1859 }
1860
1861 /*
1862  * dblink_current_query
1863  * return the current query string
1864  * to allow its use in (among other things)
1865  * rewrite rules
1866  */
1867 PG_FUNCTION_INFO_V1(dblink_current_query);
1868 Datum
1869 dblink_current_query(PG_FUNCTION_ARGS)
1870 {
1871         /* This is now just an alias for the built-in function current_query() */
1872         PG_RETURN_DATUM(current_query(fcinfo));
1873 }
1874
1875 /*
1876  * Retrieve async notifications for a connection.
1877  *
1878  * Returns a setof record of notifications, or an empty set if none received.
1879  * Can optionally take a named connection as parameter, but uses the unnamed
1880  * connection per default.
1881  *
1882  */
1883 #define DBLINK_NOTIFY_COLS              3
1884
1885 PG_FUNCTION_INFO_V1(dblink_get_notify);
1886 Datum
1887 dblink_get_notify(PG_FUNCTION_ARGS)
1888 {
1889         PGconn     *conn;
1890         PGnotify   *notify;
1891         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1892         TupleDesc       tupdesc;
1893         Tuplestorestate *tupstore;
1894         MemoryContext per_query_ctx;
1895         MemoryContext oldcontext;
1896
1897         prepTuplestoreResult(fcinfo);
1898
1899         dblink_init();
1900         if (PG_NARGS() == 1)
1901                 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1902         else
1903                 conn = pconn->conn;
1904
1905         /* create the tuplestore in per-query memory */
1906         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1907         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1908
1909         tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
1910         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1911                                            TEXTOID, -1, 0);
1912         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1913                                            INT4OID, -1, 0);
1914         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1915                                            TEXTOID, -1, 0);
1916
1917         tupstore = tuplestore_begin_heap(true, false, work_mem);
1918         rsinfo->setResult = tupstore;
1919         rsinfo->setDesc = tupdesc;
1920
1921         MemoryContextSwitchTo(oldcontext);
1922
1923         PQconsumeInput(conn);
1924         while ((notify = PQnotifies(conn)) != NULL)
1925         {
1926                 Datum           values[DBLINK_NOTIFY_COLS];
1927                 bool            nulls[DBLINK_NOTIFY_COLS];
1928
1929                 memset(values, 0, sizeof(values));
1930                 memset(nulls, 0, sizeof(nulls));
1931
1932                 if (notify->relname != NULL)
1933                         values[0] = CStringGetTextDatum(notify->relname);
1934                 else
1935                         nulls[0] = true;
1936
1937                 values[1] = Int32GetDatum(notify->be_pid);
1938
1939                 if (notify->extra != NULL)
1940                         values[2] = CStringGetTextDatum(notify->extra);
1941                 else
1942                         nulls[2] = true;
1943
1944                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1945
1946                 PQfreemem(notify);
1947                 PQconsumeInput(conn);
1948         }
1949
1950         /* clean up and return the tuplestore */
1951         tuplestore_donestoring(tupstore);
1952
1953         return (Datum) 0;
1954 }
1955
1956 /*
1957  * Validate the options given to a dblink foreign server or user mapping.
1958  * Raise an error if any option is invalid.
1959  *
1960  * We just check the names of options here, so semantic errors in options,
1961  * such as invalid numeric format, will be detected at the attempt to connect.
1962  */
1963 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1964 Datum
1965 dblink_fdw_validator(PG_FUNCTION_ARGS)
1966 {
1967         List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1968         Oid                     context = PG_GETARG_OID(1);
1969         ListCell   *cell;
1970
1971         static const PQconninfoOption *options = NULL;
1972
1973         /*
1974          * Get list of valid libpq options.
1975          *
1976          * To avoid unnecessary work, we get the list once and use it throughout
1977          * the lifetime of this backend process.  We don't need to care about
1978          * memory context issues, because PQconndefaults allocates with malloc.
1979          */
1980         if (!options)
1981         {
1982                 options = PQconndefaults();
1983                 if (!options)                   /* assume reason for failure is OOM */
1984                         ereport(ERROR,
1985                                         (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1986                                          errmsg("out of memory"),
1987                                          errdetail("Could not get libpq's default connection options.")));
1988         }
1989
1990         /* Validate each supplied option. */
1991         foreach(cell, options_list)
1992         {
1993                 DefElem    *def = (DefElem *) lfirst(cell);
1994
1995                 if (!is_valid_dblink_option(options, def->defname, context))
1996                 {
1997                         /*
1998                          * Unknown option, or invalid option for the context specified, so
1999                          * complain about it.  Provide a hint with list of valid options
2000                          * for the context.
2001                          */
2002                         StringInfoData buf;
2003                         const PQconninfoOption *opt;
2004
2005                         initStringInfo(&buf);
2006                         for (opt = options; opt->keyword; opt++)
2007                         {
2008                                 if (is_valid_dblink_option(options, opt->keyword, context))
2009                                         appendStringInfo(&buf, "%s%s",
2010                                                                          (buf.len > 0) ? ", " : "",
2011                                                                          opt->keyword);
2012                         }
2013                         ereport(ERROR,
2014                                         (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2015                                          errmsg("invalid option \"%s\"", def->defname),
2016                                          errhint("Valid options in this context are: %s",
2017                                                          buf.data)));
2018                 }
2019         }
2020
2021         PG_RETURN_VOID();
2022 }
2023
2024
2025 /*************************************************************
2026  * internal functions
2027  */
2028
2029
2030 /*
2031  * get_pkey_attnames
2032  *
2033  * Get the primary key attnames for the given relation.
2034  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2035  */
2036 static char **
2037 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2038 {
2039         Relation        indexRelation;
2040         ScanKeyData skey;
2041         SysScanDesc scan;
2042         HeapTuple       indexTuple;
2043         int                     i;
2044         char      **result = NULL;
2045         TupleDesc       tupdesc;
2046
2047         /* initialize indnkeyatts to 0 in case no primary key exists */
2048         *indnkeyatts = 0;
2049
2050         tupdesc = rel->rd_att;
2051
2052         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2053         indexRelation = heap_open(IndexRelationId, AccessShareLock);
2054         ScanKeyInit(&skey,
2055                                 Anum_pg_index_indrelid,
2056                                 BTEqualStrategyNumber, F_OIDEQ,
2057                                 ObjectIdGetDatum(RelationGetRelid(rel)));
2058
2059         scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2060                                                           NULL, 1, &skey);
2061
2062         while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2063         {
2064                 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2065
2066                 /* we're only interested if it is the primary key */
2067                 if (index->indisprimary)
2068                 {
2069                         *indnkeyatts = index->indnkeyatts;
2070                         if (*indnkeyatts > 0)
2071                         {
2072                                 result = (char **) palloc(*indnkeyatts * sizeof(char *));
2073
2074                                 for (i = 0; i < *indnkeyatts; i++)
2075                                         result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2076                         }
2077                         break;
2078                 }
2079         }
2080
2081         systable_endscan(scan);
2082         heap_close(indexRelation, AccessShareLock);
2083
2084         return result;
2085 }
2086
2087 /*
2088  * Deconstruct a text[] into C-strings (note any NULL elements will be
2089  * returned as NULL pointers)
2090  */
2091 static char **
2092 get_text_array_contents(ArrayType *array, int *numitems)
2093 {
2094         int                     ndim = ARR_NDIM(array);
2095         int                *dims = ARR_DIMS(array);
2096         int                     nitems;
2097         int16           typlen;
2098         bool            typbyval;
2099         char            typalign;
2100         char      **values;
2101         char       *ptr;
2102         bits8      *bitmap;
2103         int                     bitmask;
2104         int                     i;
2105
2106         Assert(ARR_ELEMTYPE(array) == TEXTOID);
2107
2108         *numitems = nitems = ArrayGetNItems(ndim, dims);
2109
2110         get_typlenbyvalalign(ARR_ELEMTYPE(array),
2111                                                  &typlen, &typbyval, &typalign);
2112
2113         values = (char **) palloc(nitems * sizeof(char *));
2114
2115         ptr = ARR_DATA_PTR(array);
2116         bitmap = ARR_NULLBITMAP(array);
2117         bitmask = 1;
2118
2119         for (i = 0; i < nitems; i++)
2120         {
2121                 if (bitmap && (*bitmap & bitmask) == 0)
2122                 {
2123                         values[i] = NULL;
2124                 }
2125                 else
2126                 {
2127                         values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2128                         ptr = att_addlength_pointer(ptr, typlen, ptr);
2129                         ptr = (char *) att_align_nominal(ptr, typalign);
2130                 }
2131
2132                 /* advance bitmap pointer if any */
2133                 if (bitmap)
2134                 {
2135                         bitmask <<= 1;
2136                         if (bitmask == 0x100)
2137                         {
2138                                 bitmap++;
2139                                 bitmask = 1;
2140                         }
2141                 }
2142         }
2143
2144         return values;
2145 }
2146
2147 static char *
2148 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2149 {
2150         char       *relname;
2151         HeapTuple       tuple;
2152         TupleDesc       tupdesc;
2153         int                     natts;
2154         StringInfoData buf;
2155         char       *val;
2156         int                     key;
2157         int                     i;
2158         bool            needComma;
2159
2160         initStringInfo(&buf);
2161
2162         /* get relation name including any needed schema prefix and quoting */
2163         relname = generate_relation_name(rel);
2164
2165         tupdesc = rel->rd_att;
2166         natts = tupdesc->natts;
2167
2168         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2169         if (!tuple)
2170                 ereport(ERROR,
2171                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2172                                  errmsg("source row not found")));
2173
2174         appendStringInfo(&buf, "INSERT INTO %s(", relname);
2175
2176         needComma = false;
2177         for (i = 0; i < natts; i++)
2178         {
2179                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2180
2181                 if (att->attisdropped)
2182                         continue;
2183
2184                 if (needComma)
2185                         appendStringInfoChar(&buf, ',');
2186
2187                 appendStringInfoString(&buf,
2188                                                            quote_ident_cstr(NameStr(att->attname)));
2189                 needComma = true;
2190         }
2191
2192         appendStringInfoString(&buf, ") VALUES(");
2193
2194         /*
2195          * Note: i is physical column number (counting from 0).
2196          */
2197         needComma = false;
2198         for (i = 0; i < natts; i++)
2199         {
2200                 if (TupleDescAttr(tupdesc, i)->attisdropped)
2201                         continue;
2202
2203                 if (needComma)
2204                         appendStringInfoChar(&buf, ',');
2205
2206                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2207
2208                 if (key >= 0)
2209                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2210                 else
2211                         val = SPI_getvalue(tuple, tupdesc, i + 1);
2212
2213                 if (val != NULL)
2214                 {
2215                         appendStringInfoString(&buf, quote_literal_cstr(val));
2216                         pfree(val);
2217                 }
2218                 else
2219                         appendStringInfoString(&buf, "NULL");
2220                 needComma = true;
2221         }
2222         appendStringInfoChar(&buf, ')');
2223
2224         return buf.data;
2225 }
2226
2227 static char *
2228 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2229 {
2230         char       *relname;
2231         TupleDesc       tupdesc;
2232         StringInfoData buf;
2233         int                     i;
2234
2235         initStringInfo(&buf);
2236
2237         /* get relation name including any needed schema prefix and quoting */
2238         relname = generate_relation_name(rel);
2239
2240         tupdesc = rel->rd_att;
2241
2242         appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2243         for (i = 0; i < pknumatts; i++)
2244         {
2245                 int                     pkattnum = pkattnums[i];
2246                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2247
2248                 if (i > 0)
2249                         appendStringInfoString(&buf, " AND ");
2250
2251                 appendStringInfoString(&buf,
2252                                                            quote_ident_cstr(NameStr(attr->attname)));
2253
2254                 if (tgt_pkattvals[i] != NULL)
2255                         appendStringInfo(&buf, " = %s",
2256                                                          quote_literal_cstr(tgt_pkattvals[i]));
2257                 else
2258                         appendStringInfoString(&buf, " IS NULL");
2259         }
2260
2261         return buf.data;
2262 }
2263
2264 static char *
2265 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2266 {
2267         char       *relname;
2268         HeapTuple       tuple;
2269         TupleDesc       tupdesc;
2270         int                     natts;
2271         StringInfoData buf;
2272         char       *val;
2273         int                     key;
2274         int                     i;
2275         bool            needComma;
2276
2277         initStringInfo(&buf);
2278
2279         /* get relation name including any needed schema prefix and quoting */
2280         relname = generate_relation_name(rel);
2281
2282         tupdesc = rel->rd_att;
2283         natts = tupdesc->natts;
2284
2285         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2286         if (!tuple)
2287                 ereport(ERROR,
2288                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2289                                  errmsg("source row not found")));
2290
2291         appendStringInfo(&buf, "UPDATE %s SET ", relname);
2292
2293         /*
2294          * Note: i is physical column number (counting from 0).
2295          */
2296         needComma = false;
2297         for (i = 0; i < natts; i++)
2298         {
2299                 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2300
2301                 if (attr->attisdropped)
2302                         continue;
2303
2304                 if (needComma)
2305                         appendStringInfoString(&buf, ", ");
2306
2307                 appendStringInfo(&buf, "%s = ",
2308                                                  quote_ident_cstr(NameStr(attr->attname)));
2309
2310                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2311
2312                 if (key >= 0)
2313                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2314                 else
2315                         val = SPI_getvalue(tuple, tupdesc, i + 1);
2316
2317                 if (val != NULL)
2318                 {
2319                         appendStringInfoString(&buf, quote_literal_cstr(val));
2320                         pfree(val);
2321                 }
2322                 else
2323                         appendStringInfoString(&buf, "NULL");
2324                 needComma = true;
2325         }
2326
2327         appendStringInfoString(&buf, " WHERE ");
2328
2329         for (i = 0; i < pknumatts; i++)
2330         {
2331                 int                     pkattnum = pkattnums[i];
2332                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2333
2334                 if (i > 0)
2335                         appendStringInfoString(&buf, " AND ");
2336
2337                 appendStringInfoString(&buf,
2338                                                            quote_ident_cstr(NameStr(attr->attname)));
2339
2340                 val = tgt_pkattvals[i];
2341
2342                 if (val != NULL)
2343                         appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2344                 else
2345                         appendStringInfoString(&buf, " IS NULL");
2346         }
2347
2348         return buf.data;
2349 }
2350
2351 /*
2352  * Return a properly quoted identifier.
2353  * Uses quote_ident in quote.c
2354  */
2355 static char *
2356 quote_ident_cstr(char *rawstr)
2357 {
2358         text       *rawstr_text;
2359         text       *result_text;
2360         char       *result;
2361
2362         rawstr_text = cstring_to_text(rawstr);
2363         result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2364                                                                                                          PointerGetDatum(rawstr_text)));
2365         result = text_to_cstring(result_text);
2366
2367         return result;
2368 }
2369
2370 static int
2371 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2372 {
2373         int                     i;
2374
2375         /*
2376          * Not likely a long list anyway, so just scan for the value
2377          */
2378         for (i = 0; i < pknumatts; i++)
2379                 if (key == pkattnums[i])
2380                         return i;
2381
2382         return -1;
2383 }
2384
2385 static HeapTuple
2386 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2387 {
2388         char       *relname;
2389         TupleDesc       tupdesc;
2390         int                     natts;
2391         StringInfoData buf;
2392         int                     ret;
2393         HeapTuple       tuple;
2394         int                     i;
2395
2396         /*
2397          * Connect to SPI manager
2398          */
2399         if ((ret = SPI_connect()) < 0)
2400                 /* internal error */
2401                 elog(ERROR, "SPI connect failure - returned %d", ret);
2402
2403         initStringInfo(&buf);
2404
2405         /* get relation name including any needed schema prefix and quoting */
2406         relname = generate_relation_name(rel);
2407
2408         tupdesc = rel->rd_att;
2409         natts = tupdesc->natts;
2410
2411         /*
2412          * Build sql statement to look up tuple of interest, ie, the one matching
2413          * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
2414          * generate a result tuple that matches the table's physical structure,
2415          * with NULLs for any dropped columns.  Otherwise we have to deal with two
2416          * different tupdescs and everything's very confusing.
2417          */
2418         appendStringInfoString(&buf, "SELECT ");
2419
2420         for (i = 0; i < natts; i++)
2421         {
2422                 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2423
2424                 if (i > 0)
2425                         appendStringInfoString(&buf, ", ");
2426
2427                 if (attr->attisdropped)
2428                         appendStringInfoString(&buf, "NULL");
2429                 else
2430                         appendStringInfoString(&buf,
2431                                                                    quote_ident_cstr(NameStr(attr->attname)));
2432         }
2433
2434         appendStringInfo(&buf, " FROM %s WHERE ", relname);
2435
2436         for (i = 0; i < pknumatts; i++)
2437         {
2438                 int                     pkattnum = pkattnums[i];
2439                 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2440
2441                 if (i > 0)
2442                         appendStringInfoString(&buf, " AND ");
2443
2444                 appendStringInfoString(&buf,
2445                                                            quote_ident_cstr(NameStr(attr->attname)));
2446
2447                 if (src_pkattvals[i] != NULL)
2448                         appendStringInfo(&buf, " = %s",
2449                                                          quote_literal_cstr(src_pkattvals[i]));
2450                 else
2451                         appendStringInfoString(&buf, " IS NULL");
2452         }
2453
2454         /*
2455          * Retrieve the desired tuple
2456          */
2457         ret = SPI_exec(buf.data, 0);
2458         pfree(buf.data);
2459
2460         /*
2461          * Only allow one qualifying tuple
2462          */
2463         if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2464                 ereport(ERROR,
2465                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2466                                  errmsg("source criteria matched more than one record")));
2467
2468         else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2469         {
2470                 SPITupleTable *tuptable = SPI_tuptable;
2471
2472                 tuple = SPI_copytuple(tuptable->vals[0]);
2473                 SPI_finish();
2474
2475                 return tuple;
2476         }
2477         else
2478         {
2479                 /*
2480                  * no qualifying tuples
2481                  */
2482                 SPI_finish();
2483
2484                 return NULL;
2485         }
2486
2487         /*
2488          * never reached, but keep compiler quiet
2489          */
2490         return NULL;
2491 }
2492
2493 /*
2494  * Open the relation named by relname_text, acquire specified type of lock,
2495  * verify we have specified permissions.
2496  * Caller must close rel when done with it.
2497  */
2498 static Relation
2499 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2500 {
2501         RangeVar   *relvar;
2502         Relation        rel;
2503         AclResult       aclresult;
2504
2505         relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2506         rel = heap_openrv(relvar, lockmode);
2507
2508         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2509                                                                   aclmode);
2510         if (aclresult != ACLCHECK_OK)
2511                 aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2512                                            RelationGetRelationName(rel));
2513
2514         return rel;
2515 }
2516
2517 /*
2518  * generate_relation_name - copied from ruleutils.c
2519  *              Compute the name to display for a relation
2520  *
2521  * The result includes all necessary quoting and schema-prefixing.
2522  */
2523 static char *
2524 generate_relation_name(Relation rel)
2525 {
2526         char       *nspname;
2527         char       *result;
2528
2529         /* Qualify the name if not visible in search path */
2530         if (RelationIsVisible(RelationGetRelid(rel)))
2531                 nspname = NULL;
2532         else
2533                 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2534
2535         result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2536
2537         return result;
2538 }
2539
2540
2541 static remoteConn *
2542 getConnectionByName(const char *name)
2543 {
2544         remoteConnHashEnt *hentry;
2545         char       *key;
2546
2547         if (!remoteConnHash)
2548                 remoteConnHash = createConnHash();
2549
2550         key = pstrdup(name);
2551         truncate_identifier(key, strlen(key), false);
2552         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2553                                                                                            key, HASH_FIND, NULL);
2554
2555         if (hentry)
2556                 return hentry->rconn;
2557
2558         return NULL;
2559 }
2560
2561 static HTAB *
2562 createConnHash(void)
2563 {
2564         HASHCTL         ctl;
2565
2566         ctl.keysize = NAMEDATALEN;
2567         ctl.entrysize = sizeof(remoteConnHashEnt);
2568
2569         return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2570 }
2571
2572 static void
2573 createNewConnection(const char *name, remoteConn *rconn)
2574 {
2575         remoteConnHashEnt *hentry;
2576         bool            found;
2577         char       *key;
2578
2579         if (!remoteConnHash)
2580                 remoteConnHash = createConnHash();
2581
2582         key = pstrdup(name);
2583         truncate_identifier(key, strlen(key), true);
2584         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2585                                                                                            HASH_ENTER, &found);
2586
2587         if (found)
2588         {
2589                 PQfinish(rconn->conn);
2590                 pfree(rconn);
2591
2592                 ereport(ERROR,
2593                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2594                                  errmsg("duplicate connection name")));
2595         }
2596
2597         hentry->rconn = rconn;
2598         strlcpy(hentry->name, name, sizeof(hentry->name));
2599 }
2600
2601 static void
2602 deleteConnection(const char *name)
2603 {
2604         remoteConnHashEnt *hentry;
2605         bool            found;
2606         char       *key;
2607
2608         if (!remoteConnHash)
2609                 remoteConnHash = createConnHash();
2610
2611         key = pstrdup(name);
2612         truncate_identifier(key, strlen(key), false);
2613         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2614                                                                                            key, HASH_REMOVE, &found);
2615
2616         if (!hentry)
2617                 ereport(ERROR,
2618                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
2619                                  errmsg("undefined connection name")));
2620
2621 }
2622
2623 static void
2624 dblink_security_check(PGconn *conn, remoteConn *rconn)
2625 {
2626         if (!superuser())
2627         {
2628                 if (!PQconnectionUsedPassword(conn))
2629                 {
2630                         PQfinish(conn);
2631                         if (rconn)
2632                                 pfree(rconn);
2633
2634                         ereport(ERROR,
2635                                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2636                                          errmsg("password is required"),
2637                                          errdetail("Non-superuser cannot connect if the server does not request a password."),
2638                                          errhint("Target server's authentication method must be changed.")));
2639                 }
2640         }
2641 }
2642
2643 /*
2644  * For non-superusers, insist that the connstr specify a password.  This
2645  * prevents a password from being picked up from .pgpass, a service file,
2646  * the environment, etc.  We don't want the postgres user's passwords
2647  * to be accessible to non-superusers.
2648  */
2649 static void
2650 dblink_connstr_check(const char *connstr)
2651 {
2652         if (!superuser())
2653         {
2654                 PQconninfoOption *options;
2655                 PQconninfoOption *option;
2656                 bool            connstr_gives_password = false;
2657
2658                 options = PQconninfoParse(connstr, NULL);
2659                 if (options)
2660                 {
2661                         for (option = options; option->keyword != NULL; option++)
2662                         {
2663                                 if (strcmp(option->keyword, "password") == 0)
2664                                 {
2665                                         if (option->val != NULL && option->val[0] != '\0')
2666                                         {
2667                                                 connstr_gives_password = true;
2668                                                 break;
2669                                         }
2670                                 }
2671                         }
2672                         PQconninfoFree(options);
2673                 }
2674
2675                 if (!connstr_gives_password)
2676                         ereport(ERROR,
2677                                         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2678                                          errmsg("password is required"),
2679                                          errdetail("Non-superusers must provide a password in the connection string.")));
2680         }
2681 }
2682
2683 /*
2684  * Report an error received from the remote server
2685  *
2686  * res: the received error result (will be freed)
2687  * fail: true for ERROR ereport, false for NOTICE
2688  * fmt and following args: sprintf-style format and values for errcontext;
2689  * the resulting string should be worded like "while <some action>"
2690  */
2691 static void
2692 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2693                                  bool fail, const char *fmt,...)
2694 {
2695         int                     level;
2696         char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2697         char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2698         char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2699         char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2700         char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2701         int                     sqlstate;
2702         char       *message_primary;
2703         char       *message_detail;
2704         char       *message_hint;
2705         char       *message_context;
2706         va_list         ap;
2707         char            dblink_context_msg[512];
2708
2709         if (fail)
2710                 level = ERROR;
2711         else
2712                 level = NOTICE;
2713
2714         if (pg_diag_sqlstate)
2715                 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2716                                                                  pg_diag_sqlstate[1],
2717                                                                  pg_diag_sqlstate[2],
2718                                                                  pg_diag_sqlstate[3],
2719                                                                  pg_diag_sqlstate[4]);
2720         else
2721                 sqlstate = ERRCODE_CONNECTION_FAILURE;
2722
2723         message_primary = xpstrdup(pg_diag_message_primary);
2724         message_detail = xpstrdup(pg_diag_message_detail);
2725         message_hint = xpstrdup(pg_diag_message_hint);
2726         message_context = xpstrdup(pg_diag_context);
2727
2728         /*
2729          * If we don't get a message from the PGresult, try the PGconn.  This is
2730          * needed because for connection-level failures, PQexec may just return
2731          * NULL, not a PGresult at all.
2732          */
2733         if (message_primary == NULL)
2734                 message_primary = pchomp(PQerrorMessage(conn));
2735
2736         /*
2737          * Now that we've copied all the data we need out of the PGresult, it's
2738          * safe to free it.  We must do this to avoid PGresult leakage.  We're
2739          * leaking all the strings too, but those are in palloc'd memory that will
2740          * get cleaned up eventually.
2741          */
2742         if (res)
2743                 PQclear(res);
2744
2745         /*
2746          * Format the basic errcontext string.  Below, we'll add on something
2747          * about the connection name.  That's a violation of the translatability
2748          * guidelines about constructing error messages out of parts, but since
2749          * there's no translation support for dblink, there's no need to worry
2750          * about that (yet).
2751          */
2752         va_start(ap, fmt);
2753         vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2754         va_end(ap);
2755
2756         ereport(level,
2757                         (errcode(sqlstate),
2758                          message_primary ? errmsg_internal("%s", message_primary) :
2759                          errmsg("could not obtain message string for remote error"),
2760                          message_detail ? errdetail_internal("%s", message_detail) : 0,
2761                          message_hint ? errhint("%s", message_hint) : 0,
2762                          message_context ? (errcontext("%s", message_context)) : 0,
2763                          conname ?
2764                          (errcontext("%s on dblink connection named \"%s\"",
2765                                                  dblink_context_msg, conname)) :
2766                          (errcontext("%s on unnamed dblink connection",
2767                                                  dblink_context_msg))));
2768 }
2769
2770 /*
2771  * Obtain connection string for a foreign server
2772  */
2773 static char *
2774 get_connect_string(const char *servername)
2775 {
2776         ForeignServer *foreign_server = NULL;
2777         UserMapping *user_mapping;
2778         ListCell   *cell;
2779         StringInfoData buf;
2780         ForeignDataWrapper *fdw;
2781         AclResult       aclresult;
2782         char       *srvname;
2783
2784         static const PQconninfoOption *options = NULL;
2785
2786         initStringInfo(&buf);
2787
2788         /*
2789          * Get list of valid libpq options.
2790          *
2791          * To avoid unnecessary work, we get the list once and use it throughout
2792          * the lifetime of this backend process.  We don't need to care about
2793          * memory context issues, because PQconndefaults allocates with malloc.
2794          */
2795         if (!options)
2796         {
2797                 options = PQconndefaults();
2798                 if (!options)                   /* assume reason for failure is OOM */
2799                         ereport(ERROR,
2800                                         (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2801                                          errmsg("out of memory"),
2802                                          errdetail("Could not get libpq's default connection options.")));
2803         }
2804
2805         /* first gather the server connstr options */
2806         srvname = pstrdup(servername);
2807         truncate_identifier(srvname, strlen(srvname), false);
2808         foreign_server = GetForeignServerByName(srvname, true);
2809
2810         if (foreign_server)
2811         {
2812                 Oid                     serverid = foreign_server->serverid;
2813                 Oid                     fdwid = foreign_server->fdwid;
2814                 Oid                     userid = GetUserId();
2815
2816                 user_mapping = GetUserMapping(userid, serverid);
2817                 fdw = GetForeignDataWrapper(fdwid);
2818
2819                 /* Check permissions, user must have usage on the server. */
2820                 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2821                 if (aclresult != ACLCHECK_OK)
2822                         aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2823
2824                 foreach(cell, fdw->options)
2825                 {
2826                         DefElem    *def = lfirst(cell);
2827
2828                         if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2829                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
2830                                                                  escape_param_str(strVal(def->arg)));
2831                 }
2832
2833                 foreach(cell, foreign_server->options)
2834                 {
2835                         DefElem    *def = lfirst(cell);
2836
2837                         if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2838                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
2839                                                                  escape_param_str(strVal(def->arg)));
2840                 }
2841
2842                 foreach(cell, user_mapping->options)
2843                 {
2844
2845                         DefElem    *def = lfirst(cell);
2846
2847                         if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2848                                 appendStringInfo(&buf, "%s='%s' ", def->defname,
2849                                                                  escape_param_str(strVal(def->arg)));
2850                 }
2851
2852                 return buf.data;
2853         }
2854         else
2855                 return NULL;
2856 }
2857
2858 /*
2859  * Escaping libpq connect parameter strings.
2860  *
2861  * Replaces "'" with "\'" and "\" with "\\".
2862  */
2863 static char *
2864 escape_param_str(const char *str)
2865 {
2866         const char *cp;
2867         StringInfoData buf;
2868
2869         initStringInfo(&buf);
2870
2871         for (cp = str; *cp; cp++)
2872         {
2873                 if (*cp == '\\' || *cp == '\'')
2874                         appendStringInfoChar(&buf, '\\');
2875                 appendStringInfoChar(&buf, *cp);
2876         }
2877
2878         return buf.data;
2879 }
2880
2881 /*
2882  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2883  * functions, and translate to the internal representation.
2884  *
2885  * The user supplies an int2vector of 1-based logical attnums, plus a count
2886  * argument (the need for the separate count argument is historical, but we
2887  * still check it).  We check that each attnum corresponds to a valid,
2888  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
2889  * listed twice, though the actual use-case for such things is dubious.
2890  * Note that before Postgres 9.0, the user's attnums were interpreted as
2891  * physical not logical column numbers; this was changed for future-proofing.
2892  *
2893  * The internal representation is a palloc'd int array of 0-based physical
2894  * attnums.
2895  */
2896 static void
2897 validate_pkattnums(Relation rel,
2898                                    int2vector *pkattnums_arg, int32 pknumatts_arg,
2899                                    int **pkattnums, int *pknumatts)
2900 {
2901         TupleDesc       tupdesc = rel->rd_att;
2902         int                     natts = tupdesc->natts;
2903         int                     i;
2904
2905         /* Don't take more array elements than there are */
2906         pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2907
2908         /* Must have at least one pk attnum selected */
2909         if (pknumatts_arg <= 0)
2910                 ereport(ERROR,
2911                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2912                                  errmsg("number of key attributes must be > 0")));
2913
2914         /* Allocate output array */
2915         *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2916         *pknumatts = pknumatts_arg;
2917
2918         /* Validate attnums and convert to internal form */
2919         for (i = 0; i < pknumatts_arg; i++)
2920         {
2921                 int                     pkattnum = pkattnums_arg->values[i];
2922                 int                     lnum;
2923                 int                     j;
2924
2925                 /* Can throw error immediately if out of range */
2926                 if (pkattnum <= 0 || pkattnum > natts)
2927                         ereport(ERROR,
2928                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2929                                          errmsg("invalid attribute number %d", pkattnum)));
2930
2931                 /* Identify which physical column has this logical number */
2932                 lnum = 0;
2933                 for (j = 0; j < natts; j++)
2934                 {
2935                         /* dropped columns don't count */
2936                         if (TupleDescAttr(tupdesc, j)->attisdropped)
2937                                 continue;
2938
2939                         if (++lnum == pkattnum)
2940                                 break;
2941                 }
2942
2943                 if (j < natts)
2944                         (*pkattnums)[i] = j;
2945                 else
2946                         ereport(ERROR,
2947                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2948                                          errmsg("invalid attribute number %d", pkattnum)));
2949         }
2950 }
2951
2952 /*
2953  * Check if the specified connection option is valid.
2954  *
2955  * We basically allow whatever libpq thinks is an option, with these
2956  * restrictions:
2957  *              debug options: disallowed
2958  *              "client_encoding": disallowed
2959  *              "user": valid only in USER MAPPING options
2960  *              secure options (eg password): valid only in USER MAPPING options
2961  *              others: valid only in FOREIGN SERVER options
2962  *
2963  * We disallow client_encoding because it would be overridden anyway via
2964  * PQclientEncoding; allowing it to be specified would merely promote
2965  * confusion.
2966  */
2967 static bool
2968 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
2969                                            Oid context)
2970 {
2971         const PQconninfoOption *opt;
2972
2973         /* Look up the option in libpq result */
2974         for (opt = options; opt->keyword; opt++)
2975         {
2976                 if (strcmp(opt->keyword, option) == 0)
2977                         break;
2978         }
2979         if (opt->keyword == NULL)
2980                 return false;
2981
2982         /* Disallow debug options (particularly "replication") */
2983         if (strchr(opt->dispchar, 'D'))
2984                 return false;
2985
2986         /* Disallow "client_encoding" */
2987         if (strcmp(opt->keyword, "client_encoding") == 0)
2988                 return false;
2989
2990         /*
2991          * If the option is "user" or marked secure, it should be specified only
2992          * in USER MAPPING.  Others should be specified only in SERVER.
2993          */
2994         if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
2995         {
2996                 if (context != UserMappingRelationId)
2997                         return false;
2998         }
2999         else
3000         {
3001                 if (context != ForeignServerRelationId)
3002                         return false;
3003         }
3004
3005         return true;
3006 }
3007
3008 /*
3009  * Copy the remote session's values of GUCs that affect datatype I/O
3010  * and apply them locally in a new GUC nesting level.  Returns the new
3011  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3012  * or -1 if no new nestlevel was needed.
3013  *
3014  * We use the equivalent of a function SET option to allow the settings to
3015  * persist only until the caller calls restoreLocalGucs.  If an error is
3016  * thrown in between, guc.c will take care of undoing the settings.
3017  */
3018 static int
3019 applyRemoteGucs(PGconn *conn)
3020 {
3021         static const char *const GUCsAffectingIO[] = {
3022                 "DateStyle",
3023                 "IntervalStyle"
3024         };
3025
3026         int                     nestlevel = -1;
3027         int                     i;
3028
3029         for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3030         {
3031                 const char *gucName = GUCsAffectingIO[i];
3032                 const char *remoteVal = PQparameterStatus(conn, gucName);
3033                 const char *localVal;
3034
3035                 /*
3036                  * If the remote server is pre-8.4, it won't have IntervalStyle, but
3037                  * that's okay because its output format won't be ambiguous.  So just
3038                  * skip the GUC if we don't get a value for it.  (We might eventually
3039                  * need more complicated logic with remote-version checks here.)
3040                  */
3041                 if (remoteVal == NULL)
3042                         continue;
3043
3044                 /*
3045                  * Avoid GUC-setting overhead if the remote and local GUCs already
3046                  * have the same value.
3047                  */
3048                 localVal = GetConfigOption(gucName, false, false);
3049                 Assert(localVal != NULL);
3050
3051                 if (strcmp(remoteVal, localVal) == 0)
3052                         continue;
3053
3054                 /* Create new GUC nest level if we didn't already */
3055                 if (nestlevel < 0)
3056                         nestlevel = NewGUCNestLevel();
3057
3058                 /* Apply the option (this will throw error on failure) */
3059                 (void) set_config_option(gucName, remoteVal,
3060                                                                  PGC_USERSET, PGC_S_SESSION,
3061                                                                  GUC_ACTION_SAVE, true, 0, false);
3062         }
3063
3064         return nestlevel;
3065 }
3066
3067 /*
3068  * Restore local GUCs after they have been overlaid with remote settings.
3069  */
3070 static void
3071 restoreLocalGucs(int nestlevel)
3072 {
3073         /* Do nothing if no new nestlevel was created */
3074         if (nestlevel > 0)
3075                 AtEOXact_GUC(true, nestlevel);
3076 }