]> granicus.if.org Git - postgresql/blob - contrib/dblink/dblink.c
Check to ensure the number of primary key fields supplied does not
[postgresql] / contrib / dblink / dblink.c
1 /*
2  * dblink.c
3  *
4  * Functions returning results from a remote database
5  *
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Darko Prenosil <Darko.Prenosil@finteh.hr>
9  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10  *
11  * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.88 2010/02/03 23:01:11 joe Exp $
12  * Copyright (c) 2001-2010, PostgreSQL Global Development Group
13  * ALL RIGHTS RESERVED;
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34
35 #include <limits.h>
36
37 #include "libpq-fe.h"
38 #include "fmgr.h"
39 #include "funcapi.h"
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "utils/acl.h"
58 #include "utils/array.h"
59 #include "utils/builtins.h"
60 #include "utils/dynahash.h"
61 #include "utils/fmgroids.h"
62 #include "utils/hsearch.h"
63 #include "utils/lsyscache.h"
64 #include "utils/memutils.h"
65 #include "utils/syscache.h"
66 #include "utils/tqual.h"
67
68 #include "dblink.h"
69
70 PG_MODULE_MAGIC;
71
72 typedef struct remoteConn
73 {
74         PGconn     *conn;                       /* Hold the remote connection */
75         int                     openCursorCount;        /* The number of open cursors */
76         bool            newXactForCursor;               /* Opened a transaction for a cursor */
77 } remoteConn;
78
79 /*
80  * Internal declarations
81  */
82 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
83 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
84 static remoteConn *getConnectionByName(const char *name);
85 static HTAB *createConnHash(void);
86 static void createNewConnection(const char *name, remoteConn *rconn);
87 static void deleteConnection(const char *name);
88 static char **get_pkey_attnames(Oid relid, int16 *numatts);
89 static char **get_text_array_contents(ArrayType *array, int *numitems);
90 static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
91 static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
92 static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
93 static char *quote_literal_cstr(char *rawstr);
94 static char *quote_ident_cstr(char *rawstr);
95 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
96 static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
97 static Oid      get_relid_from_relname(text *relname_text);
98 static char *generate_relation_name(Oid relid);
99 static void dblink_connstr_check(const char *connstr);
100 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
101 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
102 static char *get_connect_string(const char *servername);
103 static char *escape_param_str(const char *from);
104 static int get_nondropped_natts(Oid relid);
105
106 /* Global */
107 static remoteConn *pconn = NULL;
108 static HTAB *remoteConnHash = NULL;
109
110 /*
111  *      Following is list that holds multiple remote connections.
112  *      Calling convention of each dblink function changes to accept
113  *      connection name as the first parameter. The connection list is
114  *      much like ecpg e.g. a mapping between a name and a PGconn object.
115  */
116
117 typedef struct remoteConnHashEnt
118 {
119         char            name[NAMEDATALEN];
120         remoteConn *rconn;
121 } remoteConnHashEnt;
122
123 /* initial number of connection hashes */
124 #define NUMCONN 16
125
126 /* general utility */
127 #define xpfree(var_) \
128         do { \
129                 if (var_ != NULL) \
130                 { \
131                         pfree(var_); \
132                         var_ = NULL; \
133                 } \
134         } while (0)
135
136 #define xpstrdup(var_c, var_) \
137         do { \
138                 if (var_ != NULL) \
139                         var_c = pstrdup(var_); \
140                 else \
141                         var_c = NULL; \
142         } while (0)
143
144 #define DBLINK_RES_INTERNALERROR(p2) \
145         do { \
146                         msg = pstrdup(PQerrorMessage(conn)); \
147                         if (res) \
148                                 PQclear(res); \
149                         elog(ERROR, "%s: %s", p2, msg); \
150         } while (0)
151
152 #define DBLINK_CONN_NOT_AVAIL \
153         do { \
154                 if(conname) \
155                         ereport(ERROR, \
156                                         (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
157                                          errmsg("connection \"%s\" not available", conname))); \
158                 else \
159                         ereport(ERROR, \
160                                         (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
161                                          errmsg("connection not available"))); \
162         } while (0)
163
164 #define DBLINK_GET_CONN \
165         do { \
166                         char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
167                         rconn = getConnectionByName(conname_or_str); \
168                         if(rconn) \
169                         { \
170                                 conn = rconn->conn; \
171                         } \
172                         else \
173                         { \
174                                 connstr = get_connect_string(conname_or_str); \
175                                 if (connstr == NULL) \
176                                 { \
177                                         connstr = conname_or_str; \
178                                 } \
179                                 dblink_connstr_check(connstr); \
180                                 conn = PQconnectdb(connstr); \
181                                 if (PQstatus(conn) == CONNECTION_BAD) \
182                                 { \
183                                         msg = pstrdup(PQerrorMessage(conn)); \
184                                         PQfinish(conn); \
185                                         ereport(ERROR, \
186                                                         (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
187                                                          errmsg("could not establish connection"), \
188                                                          errdetail("%s", msg))); \
189                                 } \
190                                 dblink_security_check(conn, rconn); \
191                                 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
192                                 freeconn = true; \
193                         } \
194         } while (0)
195
196 #define DBLINK_GET_NAMED_CONN \
197         do { \
198                         char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
199                         rconn = getConnectionByName(conname); \
200                         if(rconn) \
201                                 conn = rconn->conn; \
202                         else \
203                                 DBLINK_CONN_NOT_AVAIL; \
204         } while (0)
205
206 #define DBLINK_INIT \
207         do { \
208                         if (!pconn) \
209                         { \
210                                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
211                                 pconn->conn = NULL; \
212                                 pconn->openCursorCount = 0; \
213                                 pconn->newXactForCursor = FALSE; \
214                         } \
215         } while (0)
216
217 /*
218  * Create a persistent connection to another database
219  */
220 PG_FUNCTION_INFO_V1(dblink_connect);
221 Datum
222 dblink_connect(PG_FUNCTION_ARGS)
223 {
224         char       *conname_or_str = NULL;
225         char       *connstr = NULL;
226         char       *connname = NULL;
227         char       *msg;
228         PGconn     *conn = NULL;
229         remoteConn *rconn = NULL;
230
231         DBLINK_INIT;
232
233         if (PG_NARGS() == 2)
234         {
235                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
236                 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
237         }
238         else if (PG_NARGS() == 1)
239                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
240
241         if (connname)
242                 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
243                                                                                                   sizeof(remoteConn));
244
245         /* first check for valid foreign data server */
246         connstr = get_connect_string(conname_or_str);
247         if (connstr == NULL)
248                 connstr = conname_or_str;
249
250         /* check password in connection string if not superuser */
251         dblink_connstr_check(connstr);
252         conn = PQconnectdb(connstr);
253
254         if (PQstatus(conn) == CONNECTION_BAD)
255         {
256                 msg = pstrdup(PQerrorMessage(conn));
257                 PQfinish(conn);
258                 if (rconn)
259                         pfree(rconn);
260
261                 ereport(ERROR,
262                                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
263                                  errmsg("could not establish connection"),
264                                  errdetail("%s", msg)));
265         }
266
267         /* check password actually used if not superuser */
268         dblink_security_check(conn, rconn);
269
270         /* attempt to set client encoding to match server encoding */
271         PQsetClientEncoding(conn, GetDatabaseEncodingName());
272
273         if (connname)
274         {
275                 rconn->conn = conn;
276                 createNewConnection(connname, rconn);
277         }
278         else
279                 pconn->conn = conn;
280
281         PG_RETURN_TEXT_P(cstring_to_text("OK"));
282 }
283
284 /*
285  * Clear a persistent connection to another database
286  */
287 PG_FUNCTION_INFO_V1(dblink_disconnect);
288 Datum
289 dblink_disconnect(PG_FUNCTION_ARGS)
290 {
291         char       *conname = NULL;
292         remoteConn *rconn = NULL;
293         PGconn     *conn = NULL;
294
295         DBLINK_INIT;
296
297         if (PG_NARGS() == 1)
298         {
299                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
300                 rconn = getConnectionByName(conname);
301                 if (rconn)
302                         conn = rconn->conn;
303         }
304         else
305                 conn = pconn->conn;
306
307         if (!conn)
308                 DBLINK_CONN_NOT_AVAIL;
309
310         PQfinish(conn);
311         if (rconn)
312         {
313                 deleteConnection(conname);
314                 pfree(rconn);
315         }
316         else
317                 pconn->conn = NULL;
318
319         PG_RETURN_TEXT_P(cstring_to_text("OK"));
320 }
321
322 /*
323  * opens a cursor using a persistent connection
324  */
325 PG_FUNCTION_INFO_V1(dblink_open);
326 Datum
327 dblink_open(PG_FUNCTION_ARGS)
328 {
329         char       *msg;
330         PGresult   *res = NULL;
331         PGconn     *conn = NULL;
332         char       *curname = NULL;
333         char       *sql = NULL;
334         char       *conname = NULL;
335         StringInfoData buf;
336         remoteConn *rconn = NULL;
337         bool            fail = true;    /* default to backward compatible behavior */
338
339         DBLINK_INIT;
340         initStringInfo(&buf);
341
342         if (PG_NARGS() == 2)
343         {
344                 /* text,text */
345                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
346                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
347                 rconn = pconn;
348         }
349         else if (PG_NARGS() == 3)
350         {
351                 /* might be text,text,text or text,text,bool */
352                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
353                 {
354                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
355                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
356                         fail = PG_GETARG_BOOL(2);
357                         rconn = pconn;
358                 }
359                 else
360                 {
361                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
362                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
363                         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
364                         rconn = getConnectionByName(conname);
365                 }
366         }
367         else if (PG_NARGS() == 4)
368         {
369                 /* text,text,text,bool */
370                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
371                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
372                 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
373                 fail = PG_GETARG_BOOL(3);
374                 rconn = getConnectionByName(conname);
375         }
376
377         if (!rconn || !rconn->conn)
378                 DBLINK_CONN_NOT_AVAIL;
379         else
380                 conn = rconn->conn;
381
382         /* If we are not in a transaction, start one */
383         if (PQtransactionStatus(conn) == PQTRANS_IDLE)
384         {
385                 res = PQexec(conn, "BEGIN");
386                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
387                         DBLINK_RES_INTERNALERROR("begin error");
388                 PQclear(res);
389                 rconn->newXactForCursor = TRUE;
390
391                 /*
392                  * Since transaction state was IDLE, we force cursor count to
393                  * initially be 0. This is needed as a previous ABORT might have wiped
394                  * out our transaction without maintaining the cursor count for us.
395                  */
396                 rconn->openCursorCount = 0;
397         }
398
399         /* if we started a transaction, increment cursor count */
400         if (rconn->newXactForCursor)
401                 (rconn->openCursorCount)++;
402
403         appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
404         res = PQexec(conn, buf.data);
405         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
406         {
407                 dblink_res_error(conname, res, "could not open cursor", fail);
408                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
409         }
410
411         PQclear(res);
412         PG_RETURN_TEXT_P(cstring_to_text("OK"));
413 }
414
415 /*
416  * closes a cursor
417  */
418 PG_FUNCTION_INFO_V1(dblink_close);
419 Datum
420 dblink_close(PG_FUNCTION_ARGS)
421 {
422         PGconn     *conn = NULL;
423         PGresult   *res = NULL;
424         char       *curname = NULL;
425         char       *conname = NULL;
426         StringInfoData buf;
427         char       *msg;
428         remoteConn *rconn = NULL;
429         bool            fail = true;    /* default to backward compatible behavior */
430
431         DBLINK_INIT;
432         initStringInfo(&buf);
433
434         if (PG_NARGS() == 1)
435         {
436                 /* text */
437                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
438                 rconn = pconn;
439         }
440         else if (PG_NARGS() == 2)
441         {
442                 /* might be text,text or text,bool */
443                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
444                 {
445                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
446                         fail = PG_GETARG_BOOL(1);
447                         rconn = pconn;
448                 }
449                 else
450                 {
451                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
452                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
453                         rconn = getConnectionByName(conname);
454                 }
455         }
456         if (PG_NARGS() == 3)
457         {
458                 /* text,text,bool */
459                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
460                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
461                 fail = PG_GETARG_BOOL(2);
462                 rconn = getConnectionByName(conname);
463         }
464
465         if (!rconn || !rconn->conn)
466                 DBLINK_CONN_NOT_AVAIL;
467         else
468                 conn = rconn->conn;
469
470         appendStringInfo(&buf, "CLOSE %s", curname);
471
472         /* close the cursor */
473         res = PQexec(conn, buf.data);
474         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
475         {
476                 dblink_res_error(conname, res, "could not close cursor", fail);
477                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
478         }
479
480         PQclear(res);
481
482         /* if we started a transaction, decrement cursor count */
483         if (rconn->newXactForCursor)
484         {
485                 (rconn->openCursorCount)--;
486
487                 /* if count is zero, commit the transaction */
488                 if (rconn->openCursorCount == 0)
489                 {
490                         rconn->newXactForCursor = FALSE;
491
492                         res = PQexec(conn, "COMMIT");
493                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
494                                 DBLINK_RES_INTERNALERROR("commit error");
495                         PQclear(res);
496                 }
497         }
498
499         PG_RETURN_TEXT_P(cstring_to_text("OK"));
500 }
501
502 /*
503  * Fetch results from an open cursor
504  */
505 PG_FUNCTION_INFO_V1(dblink_fetch);
506 Datum
507 dblink_fetch(PG_FUNCTION_ARGS)
508 {
509         ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
510         PGresult           *res = NULL;
511         char               *conname = NULL;
512         remoteConn         *rconn = NULL;
513         PGconn             *conn = NULL;
514         StringInfoData  buf;
515         char               *curname = NULL;
516         int                             howmany = 0;
517         bool                    fail = true;    /* default to backward compatible */
518
519         DBLINK_INIT;
520
521         if (PG_NARGS() == 4)
522         {
523                 /* text,text,int,bool */
524                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
525                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
526                 howmany = PG_GETARG_INT32(2);
527                 fail = PG_GETARG_BOOL(3);
528
529                 rconn = getConnectionByName(conname);
530                 if (rconn)
531                         conn = rconn->conn;
532         }
533         else if (PG_NARGS() == 3)
534         {
535                 /* text,text,int or text,int,bool */
536                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
537                 {
538                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
539                         howmany = PG_GETARG_INT32(1);
540                         fail = PG_GETARG_BOOL(2);
541                         conn = pconn->conn;
542                 }
543                 else
544                 {
545                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
546                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
547                         howmany = PG_GETARG_INT32(2);
548
549                         rconn = getConnectionByName(conname);
550                         if (rconn)
551                                 conn = rconn->conn;
552                 }
553         }
554         else if (PG_NARGS() == 2)
555         {
556                 /* text,int */
557                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
558                 howmany = PG_GETARG_INT32(1);
559                 conn = pconn->conn;
560         }
561
562         if (!conn)
563                 DBLINK_CONN_NOT_AVAIL;
564
565         /* let the caller know we're sending back a tuplestore */
566         rsinfo->returnMode = SFRM_Materialize;
567         rsinfo->setResult = NULL;
568         rsinfo->setDesc = NULL;
569
570         initStringInfo(&buf);
571         appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
572
573         /*
574          * Try to execute the query.  Note that since libpq uses malloc, the
575          * PGresult will be long-lived even though we are still in a
576          * short-lived memory context.
577          */
578         res = PQexec(conn, buf.data);
579         if (!res ||
580                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
581                  PQresultStatus(res) != PGRES_TUPLES_OK))
582         {
583                 dblink_res_error(conname, res, "could not fetch from cursor", fail);
584                 return (Datum) 0;
585         }
586         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
587         {
588                 /* cursor does not exist - closed already or bad name */
589                 PQclear(res);
590                 ereport(ERROR,
591                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
592                                  errmsg("cursor \"%s\" does not exist", curname)));
593         }
594
595         materializeResult(fcinfo, res);
596         return (Datum) 0;
597 }
598
599 /*
600  * Note: this is the new preferred version of dblink
601  */
602 PG_FUNCTION_INFO_V1(dblink_record);
603 Datum
604 dblink_record(PG_FUNCTION_ARGS)
605 {
606         return dblink_record_internal(fcinfo, false);
607 }
608
609 PG_FUNCTION_INFO_V1(dblink_send_query);
610 Datum
611 dblink_send_query(PG_FUNCTION_ARGS)
612 {
613         PGconn     *conn = NULL;
614         char       *connstr = NULL;
615         char       *sql = NULL;
616         remoteConn *rconn = NULL;
617         char       *msg;
618         bool            freeconn = false;
619         int                     retval;
620
621         if (PG_NARGS() == 2)
622         {
623                 DBLINK_GET_CONN;
624                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
625         }
626         else
627                 /* shouldn't happen */
628                 elog(ERROR, "wrong number of arguments");
629
630         /* async query send */
631         retval = PQsendQuery(conn, sql);
632         if (retval != 1)
633                 elog(NOTICE, "%s", PQerrorMessage(conn));
634
635         PG_RETURN_INT32(retval);
636 }
637
638 PG_FUNCTION_INFO_V1(dblink_get_result);
639 Datum
640 dblink_get_result(PG_FUNCTION_ARGS)
641 {
642         return dblink_record_internal(fcinfo, true);
643 }
644
645 static Datum
646 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
647 {
648         ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
649         char               *msg;
650         PGresult           *res = NULL;
651         PGconn             *conn = NULL;
652         char               *connstr = NULL;
653         char               *sql = NULL;
654         char               *conname = NULL;
655         remoteConn         *rconn = NULL;
656         bool                    fail = true;    /* default to backward compatible */
657         bool                    freeconn = false;
658
659         /* check to see if caller supports us returning a tuplestore */
660         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
661                 ereport(ERROR,
662                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
663                                  errmsg("set-valued function called in context that cannot accept a set")));
664         if (!(rsinfo->allowedModes & SFRM_Materialize))
665                 ereport(ERROR,
666                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
667                                  errmsg("materialize mode required, but it is not " \
668                                                 "allowed in this context")));
669
670         DBLINK_INIT;
671
672         if (!is_async)
673         {
674                 if (PG_NARGS() == 3)
675                 {
676                         /* text,text,bool */
677                         DBLINK_GET_CONN;
678                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
679                         fail = PG_GETARG_BOOL(2);
680                 }
681                 else if (PG_NARGS() == 2)
682                 {
683                         /* text,text or text,bool */
684                         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
685                         {
686                                 conn = pconn->conn;
687                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
688                                 fail = PG_GETARG_BOOL(1);
689                         }
690                         else
691                         {
692                                 DBLINK_GET_CONN;
693                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
694                         }
695                 }
696                 else if (PG_NARGS() == 1)
697                 {
698                         /* text */
699                         conn = pconn->conn;
700                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
701                 }
702                 else
703                         /* shouldn't happen */
704                         elog(ERROR, "wrong number of arguments");
705         }
706         else    /* is_async */
707         {
708                 /* get async result */
709                 if (PG_NARGS() == 2)
710                 {
711                         /* text,bool */
712                         DBLINK_GET_CONN;
713                         fail = PG_GETARG_BOOL(1);
714                 }
715                 else if (PG_NARGS() == 1)
716                 {
717                         /* text */
718                         DBLINK_GET_CONN;
719                 }
720                 else
721                         /* shouldn't happen */
722                         elog(ERROR, "wrong number of arguments");
723         }
724
725         if (!conn)
726                 DBLINK_CONN_NOT_AVAIL;
727
728         /* let the caller know we're sending back a tuplestore */
729         rsinfo->returnMode = SFRM_Materialize;
730         rsinfo->setResult = NULL;
731         rsinfo->setDesc = NULL;
732
733         /* synchronous query, or async result retrieval */
734         if (!is_async)
735                 res = PQexec(conn, sql);
736         else
737         {
738                 res = PQgetResult(conn);
739                 /* NULL means we're all done with the async results */
740                 if (!res)
741                         return (Datum) 0;
742         }
743
744         /* if needed, close the connection to the database and cleanup */
745         if (freeconn)
746                 PQfinish(conn);
747
748         if (!res ||
749                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
750                  PQresultStatus(res) != PGRES_TUPLES_OK))
751         {
752                 dblink_res_error(conname, res, "could not execute query", fail);
753                 return (Datum) 0;
754         }
755
756         materializeResult(fcinfo, res);
757         return (Datum) 0;
758 }
759
760 /*
761  * Materialize the PGresult to return them as the function result.
762  * The res will be released in this function.
763  */
764 static void
765 materializeResult(FunctionCallInfo fcinfo, PGresult *res)
766 {
767         ReturnSetInfo  *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
768
769         Assert(rsinfo->returnMode == SFRM_Materialize);
770
771         PG_TRY();
772         {
773                 TupleDesc       tupdesc;
774                 bool            is_sql_cmd = false;
775                 int                     ntuples;
776                 int                     nfields;
777
778                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
779                 {
780                         is_sql_cmd = true;
781
782                         /*
783                          * need a tuple descriptor representing one TEXT column to
784                          * return the command status string as our result tuple
785                          */
786                         tupdesc = CreateTemplateTupleDesc(1, false);
787                         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
788                                                            TEXTOID, -1, 0);
789                         ntuples = 1;
790                         nfields = 1;
791                 }
792                 else
793                 {
794                         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
795
796                         is_sql_cmd = false;
797
798                         /* get a tuple descriptor for our result type */
799                         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
800                         {
801                                 case TYPEFUNC_COMPOSITE:
802                                         /* success */
803                                         break;
804                                 case TYPEFUNC_RECORD:
805                                         /* failed to determine actual type of RECORD */
806                                         ereport(ERROR,
807                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
808                                                 errmsg("function returning record called in context "
809                                                            "that cannot accept type record")));
810                                         break;
811                                 default:
812                                         /* result type isn't composite */
813                                         elog(ERROR, "return type must be a row type");
814                                         break;
815                         }
816
817                         /* make sure we have a persistent copy of the tupdesc */
818                         tupdesc = CreateTupleDescCopy(tupdesc);
819                         ntuples = PQntuples(res);
820                         nfields = PQnfields(res);
821                 }
822
823                 /*
824                  * check result and tuple descriptor have the same number of columns
825                  */
826                 if (nfields != tupdesc->natts)
827                         ereport(ERROR,
828                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
829                                          errmsg("remote query result rowtype does not match "
830                                                         "the specified FROM clause rowtype")));
831
832                 if (ntuples > 0)
833                 {
834                         AttInMetadata      *attinmeta;
835                         Tuplestorestate    *tupstore;
836                         MemoryContext           oldcontext;
837                         int                                     row;
838                         char                      **values;
839
840                         attinmeta = TupleDescGetAttInMetadata(tupdesc);
841
842                         oldcontext = MemoryContextSwitchTo(
843                                                                 rsinfo->econtext->ecxt_per_query_memory);
844                         tupstore = tuplestore_begin_heap(true, false, work_mem);
845                         rsinfo->setResult = tupstore;
846                         rsinfo->setDesc = tupdesc;
847                         MemoryContextSwitchTo(oldcontext);
848
849                         values = (char **) palloc(nfields * sizeof(char *));
850
851                         /* put all tuples into the tuplestore */
852                         for (row = 0; row < ntuples; row++)
853                         {
854                                 HeapTuple       tuple;
855
856                                 if (!is_sql_cmd)
857                                 {
858                                         int                     i;
859
860                                         for (i = 0; i < nfields; i++)
861                                         {
862                                                 if (PQgetisnull(res, row, i))
863                                                         values[i] = NULL;
864                                                 else
865                                                         values[i] = PQgetvalue(res, row, i);
866                                         }
867                                 }
868                                 else
869                                 {
870                                         values[0] = PQcmdStatus(res);
871                                 }
872
873                                 /* build the tuple and put it into the tuplestore. */
874                                 tuple = BuildTupleFromCStrings(attinmeta, values);
875                                 tuplestore_puttuple(tupstore, tuple);
876                         }
877
878                         /* clean up and return the tuplestore */
879                         tuplestore_donestoring(tupstore);
880                 }
881
882                 PQclear(res);
883         }
884         PG_CATCH();
885         {
886                 /* be sure to release the libpq result */
887                 PQclear(res);
888                 PG_RE_THROW();
889         }
890         PG_END_TRY();
891 }
892
893 /*
894  * List all open dblink connections by name.
895  * Returns an array of all connection names.
896  * Takes no params
897  */
898 PG_FUNCTION_INFO_V1(dblink_get_connections);
899 Datum
900 dblink_get_connections(PG_FUNCTION_ARGS)
901 {
902         HASH_SEQ_STATUS status;
903         remoteConnHashEnt *hentry;
904         ArrayBuildState *astate = NULL;
905
906         if (remoteConnHash)
907         {
908                 hash_seq_init(&status, remoteConnHash);
909                 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
910                 {
911                         /* stash away current value */
912                         astate = accumArrayResult(astate,
913                                                                           CStringGetTextDatum(hentry->name),
914                                                                           false, TEXTOID, CurrentMemoryContext);
915                 }
916         }
917
918         if (astate)
919                 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
920                                                                                           CurrentMemoryContext));
921         else
922                 PG_RETURN_NULL();
923 }
924
925 /*
926  * Checks if a given remote connection is busy
927  *
928  * Returns 1 if the connection is busy, 0 otherwise
929  * Params:
930  *      text connection_name - name of the connection to check
931  *
932  */
933 PG_FUNCTION_INFO_V1(dblink_is_busy);
934 Datum
935 dblink_is_busy(PG_FUNCTION_ARGS)
936 {
937         PGconn     *conn = NULL;
938         remoteConn *rconn = NULL;
939
940         DBLINK_INIT;
941         DBLINK_GET_NAMED_CONN;
942
943         PQconsumeInput(conn);
944         PG_RETURN_INT32(PQisBusy(conn));
945 }
946
947 /*
948  * Cancels a running request on a connection
949  *
950  * Returns text:
951  *      "OK" if the cancel request has been sent correctly,
952  *              an error message otherwise
953  *
954  * Params:
955  *      text connection_name - name of the connection to check
956  *
957  */
958 PG_FUNCTION_INFO_V1(dblink_cancel_query);
959 Datum
960 dblink_cancel_query(PG_FUNCTION_ARGS)
961 {
962         int                     res = 0;
963         PGconn     *conn = NULL;
964         remoteConn *rconn = NULL;
965         PGcancel   *cancel;
966         char            errbuf[256];
967
968         DBLINK_INIT;
969         DBLINK_GET_NAMED_CONN;
970         cancel = PQgetCancel(conn);
971
972         res = PQcancel(cancel, errbuf, 256);
973         PQfreeCancel(cancel);
974
975         if (res == 1)
976                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
977         else
978                 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
979 }
980
981
982 /*
983  * Get error message from a connection
984  *
985  * Returns text:
986  *      "OK" if no error, an error message otherwise
987  *
988  * Params:
989  *      text connection_name - name of the connection to check
990  *
991  */
992 PG_FUNCTION_INFO_V1(dblink_error_message);
993 Datum
994 dblink_error_message(PG_FUNCTION_ARGS)
995 {
996         char       *msg;
997         PGconn     *conn = NULL;
998         remoteConn *rconn = NULL;
999
1000         DBLINK_INIT;
1001         DBLINK_GET_NAMED_CONN;
1002
1003         msg = PQerrorMessage(conn);
1004         if (msg == NULL || msg[0] == '\0')
1005                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1006         else
1007                 PG_RETURN_TEXT_P(cstring_to_text(msg));
1008 }
1009
1010 /*
1011  * Execute an SQL non-SELECT command
1012  */
1013 PG_FUNCTION_INFO_V1(dblink_exec);
1014 Datum
1015 dblink_exec(PG_FUNCTION_ARGS)
1016 {
1017         char       *msg;
1018         PGresult   *res = NULL;
1019         text       *sql_cmd_status = NULL;
1020         TupleDesc       tupdesc = NULL;
1021         PGconn     *conn = NULL;
1022         char       *connstr = NULL;
1023         char       *sql = NULL;
1024         char       *conname = NULL;
1025         remoteConn *rconn = NULL;
1026         bool            freeconn = false;
1027         bool            fail = true;    /* default to backward compatible behavior */
1028
1029         DBLINK_INIT;
1030
1031         if (PG_NARGS() == 3)
1032         {
1033                 /* must be text,text,bool */
1034                 DBLINK_GET_CONN;
1035                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1036                 fail = PG_GETARG_BOOL(2);
1037         }
1038         else if (PG_NARGS() == 2)
1039         {
1040                 /* might be text,text or text,bool */
1041                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1042                 {
1043                         conn = pconn->conn;
1044                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1045                         fail = PG_GETARG_BOOL(1);
1046                 }
1047                 else
1048                 {
1049                         DBLINK_GET_CONN;
1050                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1051                 }
1052         }
1053         else if (PG_NARGS() == 1)
1054         {
1055                 /* must be single text argument */
1056                 conn = pconn->conn;
1057                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1058         }
1059         else
1060                 /* shouldn't happen */
1061                 elog(ERROR, "wrong number of arguments");
1062
1063         if (!conn)
1064                 DBLINK_CONN_NOT_AVAIL;
1065
1066         res = PQexec(conn, sql);
1067         if (!res ||
1068                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1069                  PQresultStatus(res) != PGRES_TUPLES_OK))
1070         {
1071                 dblink_res_error(conname, res, "could not execute command", fail);
1072
1073                 /* need a tuple descriptor representing one TEXT column */
1074                 tupdesc = CreateTemplateTupleDesc(1, false);
1075                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1076                                                    TEXTOID, -1, 0);
1077
1078                 /*
1079                  * and save a copy of the command status string to return as our
1080                  * result tuple
1081                  */
1082                 sql_cmd_status = cstring_to_text("ERROR");
1083         }
1084         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1085         {
1086                 /* need a tuple descriptor representing one TEXT column */
1087                 tupdesc = CreateTemplateTupleDesc(1, false);
1088                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1089                                                    TEXTOID, -1, 0);
1090
1091                 /*
1092                  * and save a copy of the command status string to return as our
1093                  * result tuple
1094                  */
1095                 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1096                 PQclear(res);
1097         }
1098         else
1099         {
1100                 PQclear(res);
1101                 ereport(ERROR,
1102                                 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1103                                  errmsg("statement returning results not allowed")));
1104         }
1105
1106         /* if needed, close the connection to the database and cleanup */
1107         if (freeconn)
1108                 PQfinish(conn);
1109
1110         PG_RETURN_TEXT_P(sql_cmd_status);
1111 }
1112
1113
1114 /*
1115  * dblink_get_pkey
1116  *
1117  * Return list of primary key fields for the supplied relation,
1118  * or NULL if none exists.
1119  */
1120 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1121 Datum
1122 dblink_get_pkey(PG_FUNCTION_ARGS)
1123 {
1124         int16           numatts;
1125         Oid                     relid;
1126         char      **results;
1127         FuncCallContext *funcctx;
1128         int32           call_cntr;
1129         int32           max_calls;
1130         AttInMetadata *attinmeta;
1131         MemoryContext oldcontext;
1132
1133         /* stuff done only on the first call of the function */
1134         if (SRF_IS_FIRSTCALL())
1135         {
1136                 TupleDesc       tupdesc = NULL;
1137
1138                 /* create a function context for cross-call persistence */
1139                 funcctx = SRF_FIRSTCALL_INIT();
1140
1141                 /*
1142                  * switch to memory context appropriate for multiple function calls
1143                  */
1144                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1145
1146                 /* convert relname to rel Oid */
1147                 relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
1148                 if (!OidIsValid(relid))
1149                         ereport(ERROR,
1150                                         (errcode(ERRCODE_UNDEFINED_TABLE),
1151                                          errmsg("relation \"%s\" does not exist",
1152                                                         text_to_cstring(PG_GETARG_TEXT_PP(0)))));
1153
1154                 /*
1155                  * need a tuple descriptor representing one INT and one TEXT column
1156                  */
1157                 tupdesc = CreateTemplateTupleDesc(2, false);
1158                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1159                                                    INT4OID, -1, 0);
1160                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1161                                                    TEXTOID, -1, 0);
1162
1163                 /*
1164                  * Generate attribute metadata needed later to produce tuples from raw
1165                  * C strings
1166                  */
1167                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1168                 funcctx->attinmeta = attinmeta;
1169
1170                 /* get an array of attnums */
1171                 results = get_pkey_attnames(relid, &numatts);
1172
1173                 if ((results != NULL) && (numatts > 0))
1174                 {
1175                         funcctx->max_calls = numatts;
1176
1177                         /* got results, keep track of them */
1178                         funcctx->user_fctx = results;
1179                 }
1180                 else
1181                 {
1182                         /* fast track when no results */
1183                         MemoryContextSwitchTo(oldcontext);
1184                         SRF_RETURN_DONE(funcctx);
1185                 }
1186
1187                 MemoryContextSwitchTo(oldcontext);
1188         }
1189
1190         /* stuff done on every call of the function */
1191         funcctx = SRF_PERCALL_SETUP();
1192
1193         /*
1194          * initialize per-call variables
1195          */
1196         call_cntr = funcctx->call_cntr;
1197         max_calls = funcctx->max_calls;
1198
1199         results = (char **) funcctx->user_fctx;
1200         attinmeta = funcctx->attinmeta;
1201
1202         if (call_cntr < max_calls)      /* do when there is more left to send */
1203         {
1204                 char      **values;
1205                 HeapTuple       tuple;
1206                 Datum           result;
1207
1208                 values = (char **) palloc(2 * sizeof(char *));
1209                 values[0] = (char *) palloc(12);                /* sign, 10 digits, '\0' */
1210
1211                 sprintf(values[0], "%d", call_cntr + 1);
1212
1213                 values[1] = results[call_cntr];
1214
1215                 /* build the tuple */
1216                 tuple = BuildTupleFromCStrings(attinmeta, values);
1217
1218                 /* make the tuple into a datum */
1219                 result = HeapTupleGetDatum(tuple);
1220
1221                 SRF_RETURN_NEXT(funcctx, result);
1222         }
1223         else
1224         {
1225                 /* do when there is no more left */
1226                 SRF_RETURN_DONE(funcctx);
1227         }
1228 }
1229
1230
1231 /*
1232  * dblink_build_sql_insert
1233  *
1234  * Used to generate an SQL insert statement
1235  * based on an existing tuple in a local relation.
1236  * This is useful for selectively replicating data
1237  * to another server via dblink.
1238  *
1239  * API:
1240  * <relname> - name of local table of interest
1241  * <pkattnums> - an int2vector of attnums which will be used
1242  * to identify the local tuple of interest
1243  * <pknumatts> - number of attnums in pkattnums
1244  * <src_pkattvals_arry> - text array of key values which will be used
1245  * to identify the local tuple of interest
1246  * <tgt_pkattvals_arry> - text array of key values which will be used
1247  * to build the string for execution remotely. These are substituted
1248  * for their counterparts in src_pkattvals_arry
1249  */
1250 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1251 Datum
1252 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1253 {
1254         text       *relname_text = PG_GETARG_TEXT_P(0);
1255         int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1256         int32           pknumatts_tmp = PG_GETARG_INT32(2);
1257         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1258         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1259         Oid                     relid;
1260         int16           pknumatts = 0;
1261         char      **src_pkattvals;
1262         char      **tgt_pkattvals;
1263         int                     src_nitems;
1264         int                     tgt_nitems;
1265         char       *sql;
1266         int                     nondropped_natts;
1267
1268         /*
1269          * Convert relname to rel OID.
1270          */
1271         relid = get_relid_from_relname(relname_text);
1272         if (!OidIsValid(relid))
1273                 ereport(ERROR,
1274                                 (errcode(ERRCODE_UNDEFINED_TABLE),
1275                                  errmsg("relation \"%s\" does not exist",
1276                                                 text_to_cstring(relname_text))));
1277
1278         /*
1279          * There should be at least one key attribute
1280          */
1281         if (pknumatts_tmp <= 0)
1282                 ereport(ERROR,
1283                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1284                                  errmsg("number of key attributes must be > 0")));
1285
1286         if (pknumatts_tmp <= SHRT_MAX)
1287                 pknumatts = pknumatts_tmp;
1288         else
1289                 ereport(ERROR,
1290                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1291                                  errmsg("input for number of primary key " \
1292                                                 "attributes too large")));
1293
1294         /*
1295          * ensure we don't ask for more pk attributes than we have
1296          * non-dropped columns
1297          */
1298         nondropped_natts = get_nondropped_natts(relid);
1299         if (pknumatts > nondropped_natts)
1300                 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1301                                 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1302
1303         /*
1304          * Source array is made up of key values that will be used to locate the
1305          * tuple of interest from the local system.
1306          */
1307         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1308
1309         /*
1310          * There should be one source array key value for each key attnum
1311          */
1312         if (src_nitems != pknumatts)
1313                 ereport(ERROR,
1314                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1315                                  errmsg("source key array length must match number of key " \
1316                                                 "attributes")));
1317
1318         /*
1319          * Target array is made up of key values that will be used to build the
1320          * SQL string for use on the remote system.
1321          */
1322         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1323
1324         /*
1325          * There should be one target array key value for each key attnum
1326          */
1327         if (tgt_nitems != pknumatts)
1328                 ereport(ERROR,
1329                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1330                                  errmsg("target key array length must match number of key " \
1331                                                 "attributes")));
1332
1333         /*
1334          * Prep work is finally done. Go get the SQL string.
1335          */
1336         sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1337
1338         /*
1339          * And send it
1340          */
1341         PG_RETURN_TEXT_P(cstring_to_text(sql));
1342 }
1343
1344
1345 /*
1346  * dblink_build_sql_delete
1347  *
1348  * Used to generate an SQL delete statement.
1349  * This is useful for selectively replicating a
1350  * delete to another server via dblink.
1351  *
1352  * API:
1353  * <relname> - name of remote table of interest
1354  * <pkattnums> - an int2vector of attnums which will be used
1355  * to identify the remote tuple of interest
1356  * <pknumatts> - number of attnums in pkattnums
1357  * <tgt_pkattvals_arry> - text array of key values which will be used
1358  * to build the string for execution remotely.
1359  */
1360 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1361 Datum
1362 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1363 {
1364         text       *relname_text = PG_GETARG_TEXT_P(0);
1365         int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1366         int32           pknumatts_tmp = PG_GETARG_INT32(2);
1367         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1368         int                     nondropped_natts;
1369         Oid                     relid;
1370         int16           pknumatts = 0;
1371         char      **tgt_pkattvals;
1372         int                     tgt_nitems;
1373         char       *sql;
1374
1375         /*
1376          * Convert relname to rel OID.
1377          */
1378         relid = get_relid_from_relname(relname_text);
1379         if (!OidIsValid(relid))
1380                 ereport(ERROR,
1381                                 (errcode(ERRCODE_UNDEFINED_TABLE),
1382                                  errmsg("relation \"%s\" does not exist",
1383                                                 text_to_cstring(relname_text))));
1384
1385         /*
1386          * There should be at least one key attribute
1387          */
1388         if (pknumatts_tmp <= 0)
1389                 ereport(ERROR,
1390                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1391                                  errmsg("number of key attributes must be > 0")));
1392
1393         if (pknumatts_tmp <= SHRT_MAX)
1394                 pknumatts = pknumatts_tmp;
1395         else
1396                 ereport(ERROR,
1397                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1398                                  errmsg("input for number of primary key " \
1399                                                 "attributes too large")));
1400
1401         /*
1402          * ensure we don't ask for more pk attributes than we have
1403          * non-dropped columns
1404          */
1405         nondropped_natts = get_nondropped_natts(relid);
1406         if (pknumatts > nondropped_natts)
1407                 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1408                                 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1409
1410         /*
1411          * Target array is made up of key values that will be used to build the
1412          * SQL string for use on the remote system.
1413          */
1414         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1415
1416         /*
1417          * There should be one target array key value for each key attnum
1418          */
1419         if (tgt_nitems != pknumatts)
1420                 ereport(ERROR,
1421                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1422                                  errmsg("target key array length must match number of key " \
1423                                                 "attributes")));
1424
1425         /*
1426          * Prep work is finally done. Go get the SQL string.
1427          */
1428         sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
1429
1430         /*
1431          * And send it
1432          */
1433         PG_RETURN_TEXT_P(cstring_to_text(sql));
1434 }
1435
1436
1437 /*
1438  * dblink_build_sql_update
1439  *
1440  * Used to generate an SQL update statement
1441  * based on an existing tuple in a local relation.
1442  * This is useful for selectively replicating data
1443  * to another server via dblink.
1444  *
1445  * API:
1446  * <relname> - name of local table of interest
1447  * <pkattnums> - an int2vector of attnums which will be used
1448  * to identify the local tuple of interest
1449  * <pknumatts> - number of attnums in pkattnums
1450  * <src_pkattvals_arry> - text array of key values which will be used
1451  * to identify the local tuple of interest
1452  * <tgt_pkattvals_arry> - text array of key values which will be used
1453  * to build the string for execution remotely. These are substituted
1454  * for their counterparts in src_pkattvals_arry
1455  */
1456 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1457 Datum
1458 dblink_build_sql_update(PG_FUNCTION_ARGS)
1459 {
1460         text       *relname_text = PG_GETARG_TEXT_P(0);
1461         int2vector *pkattnums = (int2vector *) PG_GETARG_POINTER(1);
1462         int32           pknumatts_tmp = PG_GETARG_INT32(2);
1463         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1464         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1465         int                     nondropped_natts;
1466         Oid                     relid;
1467         int16           pknumatts = 0;
1468         char      **src_pkattvals;
1469         char      **tgt_pkattvals;
1470         int                     src_nitems;
1471         int                     tgt_nitems;
1472         char       *sql;
1473
1474         /*
1475          * Convert relname to rel OID.
1476          */
1477         relid = get_relid_from_relname(relname_text);
1478         if (!OidIsValid(relid))
1479                 ereport(ERROR,
1480                                 (errcode(ERRCODE_UNDEFINED_TABLE),
1481                                  errmsg("relation \"%s\" does not exist",
1482                                                 text_to_cstring(relname_text))));
1483
1484         /*
1485          * There should be one source array key values for each key attnum
1486          */
1487         if (pknumatts_tmp <= 0)
1488                 ereport(ERROR,
1489                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1490                                  errmsg("number of key attributes must be > 0")));
1491
1492         if (pknumatts_tmp <= SHRT_MAX)
1493                 pknumatts = pknumatts_tmp;
1494         else
1495                 ereport(ERROR,
1496                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1497                                  errmsg("input for number of primary key " \
1498                                                 "attributes too large")));
1499
1500         /*
1501          * ensure we don't ask for more pk attributes than we have
1502          * non-dropped columns
1503          */
1504         nondropped_natts = get_nondropped_natts(relid);
1505         if (pknumatts > nondropped_natts)
1506                 ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
1507                                 errmsg("number of primary key fields exceeds number of specified relation attributes")));
1508
1509         /*
1510          * Source array is made up of key values that will be used to locate the
1511          * tuple of interest from the local system.
1512          */
1513         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1514
1515         /*
1516          * There should be one source array key value for each key attnum
1517          */
1518         if (src_nitems != pknumatts)
1519                 ereport(ERROR,
1520                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1521                                  errmsg("source key array length must match number of key " \
1522                                                 "attributes")));
1523
1524         /*
1525          * Target array is made up of key values that will be used to build the
1526          * SQL string for use on the remote system.
1527          */
1528         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1529
1530         /*
1531          * There should be one target array key value for each key attnum
1532          */
1533         if (tgt_nitems != pknumatts)
1534                 ereport(ERROR,
1535                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1536                                  errmsg("target key array length must match number of key " \
1537                                                 "attributes")));
1538
1539         /*
1540          * Prep work is finally done. Go get the SQL string.
1541          */
1542         sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1543
1544         /*
1545          * And send it
1546          */
1547         PG_RETURN_TEXT_P(cstring_to_text(sql));
1548 }
1549
1550 /*
1551  * dblink_current_query
1552  * return the current query string
1553  * to allow its use in (among other things)
1554  * rewrite rules
1555  */
1556 PG_FUNCTION_INFO_V1(dblink_current_query);
1557 Datum
1558 dblink_current_query(PG_FUNCTION_ARGS)
1559 {
1560         /* This is now just an alias for the built-in function current_query() */
1561         PG_RETURN_DATUM(current_query(fcinfo));
1562 }
1563
1564 /*
1565  * Retrieve async notifications for a connection. 
1566  *
1567  * Returns an setof record of notifications, or an empty set if none recieved.
1568  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
1569  *
1570  */
1571 #define DBLINK_NOTIFY_COLS              3
1572
1573 PG_FUNCTION_INFO_V1(dblink_get_notify);
1574 Datum
1575 dblink_get_notify(PG_FUNCTION_ARGS)
1576 {
1577         PGconn                     *conn = NULL;
1578         remoteConn                 *rconn = NULL;
1579         PGnotify                   *notify;
1580         ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1581         TupleDesc                       tupdesc;
1582         Tuplestorestate    *tupstore;
1583         MemoryContext           per_query_ctx;
1584         MemoryContext           oldcontext;
1585
1586         DBLINK_INIT;
1587         if (PG_NARGS() == 1)
1588                 DBLINK_GET_NAMED_CONN;
1589         else
1590                 conn = pconn->conn;
1591
1592         /* create the tuplestore */
1593         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1594         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1595
1596         tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
1597         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1598                                            TEXTOID, -1, 0);
1599         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1600                                            INT4OID, -1, 0);
1601         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1602                                            TEXTOID, -1, 0);
1603
1604         tupstore = tuplestore_begin_heap(true, false, work_mem);
1605         rsinfo->returnMode = SFRM_Materialize;
1606         rsinfo->setResult = tupstore;
1607         rsinfo->setDesc = tupdesc;
1608
1609         MemoryContextSwitchTo(oldcontext);
1610
1611         PQconsumeInput(conn);
1612         while ((notify = PQnotifies(conn)) != NULL)
1613         {
1614                 Datum           values[DBLINK_NOTIFY_COLS];
1615                 bool            nulls[DBLINK_NOTIFY_COLS];
1616
1617                 memset(values, 0, sizeof(values));
1618                 memset(nulls, 0, sizeof(nulls));
1619
1620                 if (notify->relname != NULL)
1621                         values[0] = CStringGetTextDatum(notify->relname);
1622                 else
1623                         nulls[0] = true;
1624
1625                 values[1] = Int32GetDatum(notify->be_pid);
1626
1627                 if (notify->extra != NULL)
1628                         values[2] = CStringGetTextDatum(notify->extra);
1629                 else
1630                         nulls[2] = true;
1631
1632                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1633
1634                 PQfreemem(notify);
1635                 PQconsumeInput(conn);
1636         }
1637
1638         /* clean up and return the tuplestore */
1639         tuplestore_donestoring(tupstore);
1640
1641         return (Datum) 0;
1642 }
1643
1644 /*************************************************************
1645  * internal functions
1646  */
1647
1648
1649 /*
1650  * get_pkey_attnames
1651  *
1652  * Get the primary key attnames for the given relation.
1653  * Return NULL, and set numatts = 0, if no primary key exists.
1654  */
1655 static char **
1656 get_pkey_attnames(Oid relid, int16 *numatts)
1657 {
1658         Relation        indexRelation;
1659         ScanKeyData skey;
1660         SysScanDesc scan;
1661         HeapTuple       indexTuple;
1662         int                     i;
1663         char      **result = NULL;
1664         Relation        rel;
1665         TupleDesc       tupdesc;
1666         AclResult       aclresult;
1667
1668         /* initialize numatts to 0 in case no primary key exists */
1669         *numatts = 0;
1670
1671         /* open relation using relid, check permissions, get tupdesc */
1672         rel = relation_open(relid, AccessShareLock);
1673
1674         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1675                                                                   ACL_SELECT);
1676         if (aclresult != ACLCHECK_OK)
1677                 aclcheck_error(aclresult, ACL_KIND_CLASS,
1678                                            RelationGetRelationName(rel));
1679
1680         tupdesc = rel->rd_att;
1681
1682         /* Prepare to scan pg_index for entries having indrelid = this rel. */
1683         indexRelation = heap_open(IndexRelationId, AccessShareLock);
1684         ScanKeyInit(&skey,
1685                                 Anum_pg_index_indrelid,
1686                                 BTEqualStrategyNumber, F_OIDEQ,
1687                                 ObjectIdGetDatum(relid));
1688
1689         scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1690                                                           SnapshotNow, 1, &skey);
1691
1692         while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1693         {
1694                 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1695
1696                 /* we're only interested if it is the primary key */
1697                 if (index->indisprimary)
1698                 {
1699                         *numatts = index->indnatts;
1700                         if (*numatts > 0)
1701                         {
1702                                 result = (char **) palloc(*numatts * sizeof(char *));
1703
1704                                 for (i = 0; i < *numatts; i++)
1705                                         result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1706                         }
1707                         break;
1708                 }
1709         }
1710
1711         systable_endscan(scan);
1712         heap_close(indexRelation, AccessShareLock);
1713         relation_close(rel, AccessShareLock);
1714
1715         return result;
1716 }
1717
1718 /*
1719  * Deconstruct a text[] into C-strings (note any NULL elements will be
1720  * returned as NULL pointers)
1721  */
1722 static char **
1723 get_text_array_contents(ArrayType *array, int *numitems)
1724 {
1725         int                     ndim = ARR_NDIM(array);
1726         int                *dims = ARR_DIMS(array);
1727         int                     nitems;
1728         int16           typlen;
1729         bool            typbyval;
1730         char            typalign;
1731         char      **values;
1732         char       *ptr;
1733         bits8      *bitmap;
1734         int                     bitmask;
1735         int                     i;
1736
1737         Assert(ARR_ELEMTYPE(array) == TEXTOID);
1738
1739         *numitems = nitems = ArrayGetNItems(ndim, dims);
1740
1741         get_typlenbyvalalign(ARR_ELEMTYPE(array),
1742                                                  &typlen, &typbyval, &typalign);
1743
1744         values = (char **) palloc(nitems * sizeof(char *));
1745
1746         ptr = ARR_DATA_PTR(array);
1747         bitmap = ARR_NULLBITMAP(array);
1748         bitmask = 1;
1749
1750         for (i = 0; i < nitems; i++)
1751         {
1752                 if (bitmap && (*bitmap & bitmask) == 0)
1753                 {
1754                         values[i] = NULL;
1755                 }
1756                 else
1757                 {
1758                         values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1759                         ptr = att_addlength_pointer(ptr, typlen, ptr);
1760                         ptr = (char *) att_align_nominal(ptr, typalign);
1761                 }
1762
1763                 /* advance bitmap pointer if any */
1764                 if (bitmap)
1765                 {
1766                         bitmask <<= 1;
1767                         if (bitmask == 0x100)
1768                         {
1769                                 bitmap++;
1770                                 bitmask = 1;
1771                         }
1772                 }
1773         }
1774
1775         return values;
1776 }
1777
1778 static char *
1779 get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1780 {
1781         Relation        rel;
1782         char       *relname;
1783         HeapTuple       tuple;
1784         TupleDesc       tupdesc;
1785         int                     natts;
1786         StringInfoData buf;
1787         char       *val;
1788         int16           key;
1789         int                     i;
1790         bool            needComma;
1791
1792         initStringInfo(&buf);
1793
1794         /* get relation name including any needed schema prefix and quoting */
1795         relname = generate_relation_name(relid);
1796
1797         /*
1798          * Open relation using relid
1799          */
1800         rel = relation_open(relid, AccessShareLock);
1801         tupdesc = rel->rd_att;
1802         natts = tupdesc->natts;
1803
1804         tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1805         if (!tuple)
1806                 ereport(ERROR,
1807                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1808                                  errmsg("source row not found")));
1809
1810         appendStringInfo(&buf, "INSERT INTO %s(", relname);
1811
1812         needComma = false;
1813         for (i = 0; i < natts; i++)
1814         {
1815                 if (tupdesc->attrs[i]->attisdropped)
1816                         continue;
1817
1818                 if (needComma)
1819                         appendStringInfo(&buf, ",");
1820
1821                 appendStringInfoString(&buf,
1822                                           quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1823                 needComma = true;
1824         }
1825
1826         appendStringInfo(&buf, ") VALUES(");
1827
1828         /*
1829          * remember attvals are 1 based
1830          */
1831         needComma = false;
1832         for (i = 0; i < natts; i++)
1833         {
1834                 if (tupdesc->attrs[i]->attisdropped)
1835                         continue;
1836
1837                 if (needComma)
1838                         appendStringInfo(&buf, ",");
1839
1840                 if (tgt_pkattvals != NULL)
1841                         key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1842                 else
1843                         key = -1;
1844
1845                 if (key > -1)
1846                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1847                 else
1848                         val = SPI_getvalue(tuple, tupdesc, i + 1);
1849
1850                 if (val != NULL)
1851                 {
1852                         appendStringInfoString(&buf, quote_literal_cstr(val));
1853                         pfree(val);
1854                 }
1855                 else
1856                         appendStringInfo(&buf, "NULL");
1857                 needComma = true;
1858         }
1859         appendStringInfo(&buf, ")");
1860
1861         relation_close(rel, AccessShareLock);
1862         return (buf.data);
1863 }
1864
1865 static char *
1866 get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
1867 {
1868         Relation        rel;
1869         char       *relname;
1870         TupleDesc       tupdesc;
1871         int                     natts;
1872         StringInfoData buf;
1873         int                     i;
1874
1875         initStringInfo(&buf);
1876
1877         /* get relation name including any needed schema prefix and quoting */
1878         relname = generate_relation_name(relid);
1879
1880         /*
1881          * Open relation using relid
1882          */
1883         rel = relation_open(relid, AccessShareLock);
1884         tupdesc = rel->rd_att;
1885         natts = tupdesc->natts;
1886
1887         appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1888         for (i = 0; i < pknumatts; i++)
1889         {
1890                 int16           pkattnum = pkattnums->values[i];
1891
1892                 if (i > 0)
1893                         appendStringInfo(&buf, " AND ");
1894
1895                 appendStringInfoString(&buf,
1896                    quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1897
1898                 if (tgt_pkattvals == NULL)
1899                         /* internal error */
1900                         elog(ERROR, "target key array must not be NULL");
1901
1902                 if (tgt_pkattvals[i] != NULL)
1903                         appendStringInfo(&buf, " = %s",
1904                                                          quote_literal_cstr(tgt_pkattvals[i]));
1905                 else
1906                         appendStringInfo(&buf, " IS NULL");
1907         }
1908
1909         relation_close(rel, AccessShareLock);
1910         return (buf.data);
1911 }
1912
1913 static char *
1914 get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1915 {
1916         Relation        rel;
1917         char       *relname;
1918         HeapTuple       tuple;
1919         TupleDesc       tupdesc;
1920         int                     natts;
1921         StringInfoData buf;
1922         char       *val;
1923         int16           key;
1924         int                     i;
1925         bool            needComma;
1926
1927         initStringInfo(&buf);
1928
1929         /* get relation name including any needed schema prefix and quoting */
1930         relname = generate_relation_name(relid);
1931
1932         /*
1933          * Open relation using relid
1934          */
1935         rel = relation_open(relid, AccessShareLock);
1936         tupdesc = rel->rd_att;
1937         natts = tupdesc->natts;
1938
1939         tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
1940         if (!tuple)
1941                 ereport(ERROR,
1942                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1943                                  errmsg("source row not found")));
1944
1945         appendStringInfo(&buf, "UPDATE %s SET ", relname);
1946
1947         needComma = false;
1948         for (i = 0; i < natts; i++)
1949         {
1950                 if (tupdesc->attrs[i]->attisdropped)
1951                         continue;
1952
1953                 if (needComma)
1954                         appendStringInfo(&buf, ", ");
1955
1956                 appendStringInfo(&buf, "%s = ",
1957                                           quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1958
1959                 if (tgt_pkattvals != NULL)
1960                         key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
1961                 else
1962                         key = -1;
1963
1964                 if (key > -1)
1965                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1966                 else
1967                         val = SPI_getvalue(tuple, tupdesc, i + 1);
1968
1969                 if (val != NULL)
1970                 {
1971                         appendStringInfoString(&buf, quote_literal_cstr(val));
1972                         pfree(val);
1973                 }
1974                 else
1975                         appendStringInfoString(&buf, "NULL");
1976                 needComma = true;
1977         }
1978
1979         appendStringInfo(&buf, " WHERE ");
1980
1981         for (i = 0; i < pknumatts; i++)
1982         {
1983                 int16           pkattnum = pkattnums->values[i];
1984
1985                 if (i > 0)
1986                         appendStringInfo(&buf, " AND ");
1987
1988                 appendStringInfo(&buf, "%s",
1989                    quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
1990
1991                 if (tgt_pkattvals != NULL)
1992                         val = tgt_pkattvals[i] ? pstrdup(tgt_pkattvals[i]) : NULL;
1993                 else
1994                         val = SPI_getvalue(tuple, tupdesc, pkattnum);
1995
1996                 if (val != NULL)
1997                 {
1998                         appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
1999                         pfree(val);
2000                 }
2001                 else
2002                         appendStringInfo(&buf, " IS NULL");
2003         }
2004
2005         relation_close(rel, AccessShareLock);
2006         return (buf.data);
2007 }
2008
2009 /*
2010  * Return a properly quoted literal value.
2011  * Uses quote_literal in quote.c
2012  */
2013 static char *
2014 quote_literal_cstr(char *rawstr)
2015 {
2016         text       *rawstr_text;
2017         text       *result_text;
2018         char       *result;
2019
2020         rawstr_text = cstring_to_text(rawstr);
2021         result_text = DatumGetTextP(DirectFunctionCall1(quote_literal,
2022                                                                                           PointerGetDatum(rawstr_text)));
2023         result = text_to_cstring(result_text);
2024
2025         return result;
2026 }
2027
2028 /*
2029  * Return a properly quoted identifier.
2030  * Uses quote_ident in quote.c
2031  */
2032 static char *
2033 quote_ident_cstr(char *rawstr)
2034 {
2035         text       *rawstr_text;
2036         text       *result_text;
2037         char       *result;
2038
2039         rawstr_text = cstring_to_text(rawstr);
2040         result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
2041                                                                                           PointerGetDatum(rawstr_text)));
2042         result = text_to_cstring(result_text);
2043
2044         return result;
2045 }
2046
2047 static int16
2048 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
2049 {
2050         int                     i;
2051
2052         /*
2053          * Not likely a long list anyway, so just scan for the value
2054          */
2055         for (i = 0; i < pknumatts; i++)
2056                 if (key == pkattnums->values[i])
2057                         return i;
2058
2059         return -1;
2060 }
2061
2062 static HeapTuple
2063 get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
2064 {
2065         Relation        rel;
2066         char       *relname;
2067         TupleDesc       tupdesc;
2068         StringInfoData buf;
2069         int                     ret;
2070         HeapTuple       tuple;
2071         int                     i;
2072
2073         initStringInfo(&buf);
2074
2075         /* get relation name including any needed schema prefix and quoting */
2076         relname = generate_relation_name(relid);
2077
2078         /*
2079          * Open relation using relid
2080          */
2081         rel = relation_open(relid, AccessShareLock);
2082         tupdesc = CreateTupleDescCopy(rel->rd_att);
2083         relation_close(rel, AccessShareLock);
2084
2085         /*
2086          * Connect to SPI manager
2087          */
2088         if ((ret = SPI_connect()) < 0)
2089                 /* internal error */
2090                 elog(ERROR, "SPI connect failure - returned %d", ret);
2091
2092         /*
2093          * Build sql statement to look up tuple of interest Use src_pkattvals as
2094          * the criteria.
2095          */
2096         appendStringInfo(&buf, "SELECT * FROM %s WHERE ", relname);
2097
2098         for (i = 0; i < pknumatts; i++)
2099         {
2100                 int16           pkattnum = pkattnums->values[i];
2101
2102                 if (i > 0)
2103                         appendStringInfo(&buf, " AND ");
2104
2105                 appendStringInfoString(&buf,
2106                    quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
2107
2108                 if (src_pkattvals[i] != NULL)
2109                         appendStringInfo(&buf, " = %s",
2110                                                          quote_literal_cstr(src_pkattvals[i]));
2111                 else
2112                         appendStringInfo(&buf, " IS NULL");
2113         }
2114
2115         /*
2116          * Retrieve the desired tuple
2117          */
2118         ret = SPI_exec(buf.data, 0);
2119         pfree(buf.data);
2120
2121         /*
2122          * Only allow one qualifying tuple
2123          */
2124         if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2125                 ereport(ERROR,
2126                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2127                                  errmsg("source criteria matched more than one record")));
2128
2129         else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2130         {
2131                 SPITupleTable *tuptable = SPI_tuptable;
2132
2133                 tuple = SPI_copytuple(tuptable->vals[0]);
2134                 SPI_finish();
2135
2136                 return tuple;
2137         }
2138         else
2139         {
2140                 /*
2141                  * no qualifying tuples
2142                  */
2143                 SPI_finish();
2144
2145                 return NULL;
2146         }
2147
2148         /*
2149          * never reached, but keep compiler quiet
2150          */
2151         return NULL;
2152 }
2153
2154 static Oid
2155 get_relid_from_relname(text *relname_text)
2156 {
2157         RangeVar   *relvar;
2158         Relation        rel;
2159         Oid                     relid;
2160
2161         relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2162         rel = heap_openrv(relvar, AccessShareLock);
2163         relid = RelationGetRelid(rel);
2164         relation_close(rel, AccessShareLock);
2165
2166         return relid;
2167 }
2168
2169 /*
2170  * generate_relation_name - copied from ruleutils.c
2171  *              Compute the name to display for a relation specified by OID
2172  *
2173  * The result includes all necessary quoting and schema-prefixing.
2174  */
2175 static char *
2176 generate_relation_name(Oid relid)
2177 {
2178         HeapTuple       tp;
2179         Form_pg_class reltup;
2180         char       *nspname;
2181         char       *result;
2182
2183         tp = SearchSysCache(RELOID,
2184                                                 ObjectIdGetDatum(relid),
2185                                                 0, 0, 0);
2186         if (!HeapTupleIsValid(tp))
2187                 elog(ERROR, "cache lookup failed for relation %u", relid);
2188
2189         reltup = (Form_pg_class) GETSTRUCT(tp);
2190
2191         /* Qualify the name if not visible in search path */
2192         if (RelationIsVisible(relid))
2193                 nspname = NULL;
2194         else
2195                 nspname = get_namespace_name(reltup->relnamespace);
2196
2197         result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
2198
2199         ReleaseSysCache(tp);
2200
2201         return result;
2202 }
2203
2204
2205 static remoteConn *
2206 getConnectionByName(const char *name)
2207 {
2208         remoteConnHashEnt *hentry;
2209         char            key[NAMEDATALEN];
2210
2211         if (!remoteConnHash)
2212                 remoteConnHash = createConnHash();
2213
2214         MemSet(key, 0, NAMEDATALEN);
2215         snprintf(key, NAMEDATALEN - 1, "%s", name);
2216         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2217                                                                                            key, HASH_FIND, NULL);
2218
2219         if (hentry)
2220                 return (hentry->rconn);
2221
2222         return (NULL);
2223 }
2224
2225 static HTAB *
2226 createConnHash(void)
2227 {
2228         HASHCTL         ctl;
2229
2230         ctl.keysize = NAMEDATALEN;
2231         ctl.entrysize = sizeof(remoteConnHashEnt);
2232
2233         return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2234 }
2235
2236 static void
2237 createNewConnection(const char *name, remoteConn *rconn)
2238 {
2239         remoteConnHashEnt *hentry;
2240         bool            found;
2241         char            key[NAMEDATALEN];
2242
2243         if (!remoteConnHash)
2244                 remoteConnHash = createConnHash();
2245
2246         MemSet(key, 0, NAMEDATALEN);
2247         snprintf(key, NAMEDATALEN - 1, "%s", name);
2248         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2249                                                                                            HASH_ENTER, &found);
2250
2251         if (found)
2252                 ereport(ERROR,
2253                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2254                                  errmsg("duplicate connection name")));
2255
2256         hentry->rconn = rconn;
2257         strlcpy(hentry->name, name, sizeof(hentry->name));
2258 }
2259
2260 static void
2261 deleteConnection(const char *name)
2262 {
2263         remoteConnHashEnt *hentry;
2264         bool            found;
2265         char            key[NAMEDATALEN];
2266
2267         if (!remoteConnHash)
2268                 remoteConnHash = createConnHash();
2269
2270         MemSet(key, 0, NAMEDATALEN);
2271         snprintf(key, NAMEDATALEN - 1, "%s", name);
2272
2273         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2274                                                                                            key, HASH_REMOVE, &found);
2275
2276         if (!hentry)
2277                 ereport(ERROR,
2278                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
2279                                  errmsg("undefined connection name")));
2280
2281 }
2282
2283 static void
2284 dblink_security_check(PGconn *conn, remoteConn *rconn)
2285 {
2286         if (!superuser())
2287         {
2288                 if (!PQconnectionUsedPassword(conn))
2289                 {
2290                         PQfinish(conn);
2291                         if (rconn)
2292                                 pfree(rconn);
2293
2294                         ereport(ERROR,
2295                                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2296                                    errmsg("password is required"),
2297                                    errdetail("Non-superuser cannot connect if the server does not request a password."),
2298                                    errhint("Target server's authentication method must be changed.")));
2299                 }
2300         }
2301 }
2302
2303 /*
2304  * For non-superusers, insist that the connstr specify a password.      This
2305  * prevents a password from being picked up from .pgpass, a service file,
2306  * the environment, etc.  We don't want the postgres user's passwords
2307  * to be accessible to non-superusers.
2308  */
2309 static void
2310 dblink_connstr_check(const char *connstr)
2311 {
2312         if (!superuser())
2313         {
2314                 PQconninfoOption *options;
2315                 PQconninfoOption *option;
2316                 bool            connstr_gives_password = false;
2317
2318                 options = PQconninfoParse(connstr, NULL);
2319                 if (options)
2320                 {
2321                         for (option = options; option->keyword != NULL; option++)
2322                         {
2323                                 if (strcmp(option->keyword, "password") == 0)
2324                                 {
2325                                         if (option->val != NULL && option->val[0] != '\0')
2326                                         {
2327                                                 connstr_gives_password = true;
2328                                                 break;
2329                                         }
2330                                 }
2331                         }
2332                         PQconninfoFree(options);
2333                 }
2334
2335                 if (!connstr_gives_password)
2336                         ereport(ERROR,
2337                                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2338                                    errmsg("password is required"),
2339                                    errdetail("Non-superusers must provide a password in the connection string.")));
2340         }
2341 }
2342
2343 static void
2344 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2345 {
2346         int                     level;
2347         char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2348         char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2349         char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2350         char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2351         char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2352         int                     sqlstate;
2353         char       *message_primary;
2354         char       *message_detail;
2355         char       *message_hint;
2356         char       *message_context;
2357         const char *dblink_context_conname = "unnamed";
2358
2359         if (fail)
2360                 level = ERROR;
2361         else
2362                 level = NOTICE;
2363
2364         if (pg_diag_sqlstate)
2365                 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2366                                                                  pg_diag_sqlstate[1],
2367                                                                  pg_diag_sqlstate[2],
2368                                                                  pg_diag_sqlstate[3],
2369                                                                  pg_diag_sqlstate[4]);
2370         else
2371                 sqlstate = ERRCODE_CONNECTION_FAILURE;
2372
2373         xpstrdup(message_primary, pg_diag_message_primary);
2374         xpstrdup(message_detail, pg_diag_message_detail);
2375         xpstrdup(message_hint, pg_diag_message_hint);
2376         xpstrdup(message_context, pg_diag_context);
2377
2378         if (res)
2379                 PQclear(res);
2380
2381         if (conname)
2382                 dblink_context_conname = conname;
2383
2384         ereport(level,
2385                         (errcode(sqlstate),
2386         message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2387                          message_detail ? errdetail("%s", message_detail) : 0,
2388                          message_hint ? errhint("%s", message_hint) : 0,
2389                          message_context ? errcontext("%s", message_context) : 0,
2390                   errcontext("Error occurred on dblink connection named \"%s\": %s.",
2391                                          dblink_context_conname, dblink_context_msg)));
2392 }
2393
2394 /*
2395  * Obtain connection string for a foreign server
2396  */
2397 static char *
2398 get_connect_string(const char *servername)
2399 {
2400         ForeignServer *foreign_server = NULL;
2401         UserMapping *user_mapping;
2402         ListCell   *cell;
2403         StringInfo      buf = makeStringInfo();
2404         ForeignDataWrapper *fdw;
2405         AclResult       aclresult;
2406
2407         /* first gather the server connstr options */
2408         if (strlen(servername) < NAMEDATALEN)
2409                 foreign_server = GetForeignServerByName(servername, true);
2410
2411         if (foreign_server)
2412         {
2413                 Oid                     serverid = foreign_server->serverid;
2414                 Oid                     fdwid = foreign_server->fdwid;
2415                 Oid                     userid = GetUserId();
2416
2417                 user_mapping = GetUserMapping(userid, serverid);
2418                 fdw = GetForeignDataWrapper(fdwid);
2419
2420                 /* Check permissions, user must have usage on the server. */
2421                 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2422                 if (aclresult != ACLCHECK_OK)
2423                         aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
2424
2425                 foreach(cell, fdw->options)
2426                 {
2427                         DefElem    *def = lfirst(cell);
2428
2429                         appendStringInfo(buf, "%s='%s' ", def->defname,
2430                                                          escape_param_str(strVal(def->arg)));
2431                 }
2432
2433                 foreach(cell, foreign_server->options)
2434                 {
2435                         DefElem    *def = lfirst(cell);
2436
2437                         appendStringInfo(buf, "%s='%s' ", def->defname,
2438                                                          escape_param_str(strVal(def->arg)));
2439                 }
2440
2441                 foreach(cell, user_mapping->options)
2442                 {
2443
2444                         DefElem    *def = lfirst(cell);
2445
2446                         appendStringInfo(buf, "%s='%s' ", def->defname,
2447                                                          escape_param_str(strVal(def->arg)));
2448                 }
2449
2450                 return buf->data;
2451         }
2452         else
2453                 return NULL;
2454 }
2455
2456 /*
2457  * Escaping libpq connect parameter strings.
2458  *
2459  * Replaces "'" with "\'" and "\" with "\\".
2460  */
2461 static char *
2462 escape_param_str(const char *str)
2463 {
2464         const char *cp;
2465         StringInfo      buf = makeStringInfo();
2466
2467         for (cp = str; *cp; cp++)
2468         {
2469                 if (*cp == '\\' || *cp == '\'')
2470                         appendStringInfoChar(buf, '\\');
2471                 appendStringInfoChar(buf, *cp);
2472         }
2473
2474         return buf->data;
2475 }
2476
2477 static int
2478 get_nondropped_natts(Oid relid)
2479 {
2480         int                     nondropped_natts = 0;
2481         TupleDesc       tupdesc;
2482         Relation        rel;
2483         int                     natts;
2484         int                     i;
2485
2486         rel = relation_open(relid, AccessShareLock);
2487         tupdesc = rel->rd_att;
2488         natts = tupdesc->natts;
2489
2490         for (i = 0; i < natts; i++)
2491         {
2492                 if (tupdesc->attrs[i]->attisdropped)
2493                         continue;
2494                 nondropped_natts++;
2495         }
2496
2497         relation_close(rel, AccessShareLock);
2498         return nondropped_natts;
2499 }
2500