OSDN Git Service

19b98fb73d1d28e052648b75db48978be816f97f
[pg-rex/syncrep.git] / contrib / dblink / dblink.c
1 /*
2  * dblink.c
3  *
4  * Functions returning results from a remote database
5  *
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Darko Prenosil <Darko.Prenosil@finteh.hr>
9  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10  *
11  * contrib/dblink/dblink.c
12  * Copyright (c) 2001-2011, PostgreSQL Global Development Group
13  * ALL RIGHTS RESERVED;
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34
35 #include <limits.h>
36
37 #include "libpq-fe.h"
38 #include "fmgr.h"
39 #include "funcapi.h"
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "parser/scansup.h"
58 #include "utils/acl.h"
59 #include "utils/array.h"
60 #include "utils/builtins.h"
61 #include "utils/dynahash.h"
62 #include "utils/fmgroids.h"
63 #include "utils/hsearch.h"
64 #include "utils/lsyscache.h"
65 #include "utils/memutils.h"
66 #include "utils/syscache.h"
67 #include "utils/tqual.h"
68
69 #include "dblink.h"
70
71 PG_MODULE_MAGIC;
72
73 typedef struct remoteConn
74 {
75         PGconn     *conn;                       /* Hold the remote connection */
76         int                     openCursorCount;        /* The number of open cursors */
77         bool            newXactForCursor;               /* Opened a transaction for a cursor */
78 } remoteConn;
79
80 /*
81  * Internal declarations
82  */
83 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
84 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
85 static remoteConn *getConnectionByName(const char *name);
86 static HTAB *createConnHash(void);
87 static void createNewConnection(const char *name, remoteConn *rconn);
88 static void deleteConnection(const char *name);
89 static char **get_pkey_attnames(Relation rel, int16 *numatts);
90 static char **get_text_array_contents(ArrayType *array, int *numitems);
91 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
92 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
93 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
94 static char *quote_ident_cstr(char *rawstr);
95 static int      get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
96 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
97 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
98 static char *generate_relation_name(Relation rel);
99 static void dblink_connstr_check(const char *connstr);
100 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
101 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
102 static char *get_connect_string(const char *servername);
103 static char *escape_param_str(const char *from);
104 static void validate_pkattnums(Relation rel,
105                                    int2vector *pkattnums_arg, int32 pknumatts_arg,
106                                    int **pkattnums, int *pknumatts);
107
108 /* Global */
109 static remoteConn *pconn = NULL;
110 static HTAB *remoteConnHash = NULL;
111
112 /*
113  *      Following is list that holds multiple remote connections.
114  *      Calling convention of each dblink function changes to accept
115  *      connection name as the first parameter. The connection list is
116  *      much like ecpg e.g. a mapping between a name and a PGconn object.
117  */
118
119 typedef struct remoteConnHashEnt
120 {
121         char            name[NAMEDATALEN];
122         remoteConn *rconn;
123 } remoteConnHashEnt;
124
125 /* initial number of connection hashes */
126 #define NUMCONN 16
127
128 /* general utility */
129 #define xpfree(var_) \
130         do { \
131                 if (var_ != NULL) \
132                 { \
133                         pfree(var_); \
134                         var_ = NULL; \
135                 } \
136         } while (0)
137
138 #define xpstrdup(var_c, var_) \
139         do { \
140                 if (var_ != NULL) \
141                         var_c = pstrdup(var_); \
142                 else \
143                         var_c = NULL; \
144         } while (0)
145
146 #define DBLINK_RES_INTERNALERROR(p2) \
147         do { \
148                         msg = pstrdup(PQerrorMessage(conn)); \
149                         if (res) \
150                                 PQclear(res); \
151                         elog(ERROR, "%s: %s", p2, msg); \
152         } while (0)
153
154 #define DBLINK_CONN_NOT_AVAIL \
155         do { \
156                 if(conname) \
157                         ereport(ERROR, \
158                                         (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
159                                          errmsg("connection \"%s\" not available", conname))); \
160                 else \
161                         ereport(ERROR, \
162                                         (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
163                                          errmsg("connection not available"))); \
164         } while (0)
165
166 #define DBLINK_GET_CONN \
167         do { \
168                         char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
169                         rconn = getConnectionByName(conname_or_str); \
170                         if(rconn) \
171                         { \
172                                 conn = rconn->conn; \
173                         } \
174                         else \
175                         { \
176                                 connstr = get_connect_string(conname_or_str); \
177                                 if (connstr == NULL) \
178                                 { \
179                                         connstr = conname_or_str; \
180                                 } \
181                                 dblink_connstr_check(connstr); \
182                                 conn = PQconnectdb(connstr); \
183                                 if (PQstatus(conn) == CONNECTION_BAD) \
184                                 { \
185                                         msg = pstrdup(PQerrorMessage(conn)); \
186                                         PQfinish(conn); \
187                                         ereport(ERROR, \
188                                                         (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
189                                                          errmsg("could not establish connection"), \
190                                                          errdetail("%s", msg))); \
191                                 } \
192                                 dblink_security_check(conn, rconn); \
193                                 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
194                                 freeconn = true; \
195                         } \
196         } while (0)
197
198 #define DBLINK_GET_NAMED_CONN \
199         do { \
200                         char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
201                         rconn = getConnectionByName(conname); \
202                         if(rconn) \
203                                 conn = rconn->conn; \
204                         else \
205                                 DBLINK_CONN_NOT_AVAIL; \
206         } while (0)
207
208 #define DBLINK_INIT \
209         do { \
210                         if (!pconn) \
211                         { \
212                                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
213                                 pconn->conn = NULL; \
214                                 pconn->openCursorCount = 0; \
215                                 pconn->newXactForCursor = FALSE; \
216                         } \
217         } while (0)
218
219 /*
220  * Create a persistent connection to another database
221  */
222 PG_FUNCTION_INFO_V1(dblink_connect);
223 Datum
224 dblink_connect(PG_FUNCTION_ARGS)
225 {
226         char       *conname_or_str = NULL;
227         char       *connstr = NULL;
228         char       *connname = NULL;
229         char       *msg;
230         PGconn     *conn = NULL;
231         remoteConn *rconn = NULL;
232
233         DBLINK_INIT;
234
235         if (PG_NARGS() == 2)
236         {
237                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
238                 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
239         }
240         else if (PG_NARGS() == 1)
241                 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
242
243         if (connname)
244                 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
245                                                                                                   sizeof(remoteConn));
246
247         /* first check for valid foreign data server */
248         connstr = get_connect_string(conname_or_str);
249         if (connstr == NULL)
250                 connstr = conname_or_str;
251
252         /* check password in connection string if not superuser */
253         dblink_connstr_check(connstr);
254         conn = PQconnectdb(connstr);
255
256         if (PQstatus(conn) == CONNECTION_BAD)
257         {
258                 msg = pstrdup(PQerrorMessage(conn));
259                 PQfinish(conn);
260                 if (rconn)
261                         pfree(rconn);
262
263                 ereport(ERROR,
264                                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
265                                  errmsg("could not establish connection"),
266                                  errdetail("%s", msg)));
267         }
268
269         /* check password actually used if not superuser */
270         dblink_security_check(conn, rconn);
271
272         /* attempt to set client encoding to match server encoding */
273         PQsetClientEncoding(conn, GetDatabaseEncodingName());
274
275         if (connname)
276         {
277                 rconn->conn = conn;
278                 createNewConnection(connname, rconn);
279         }
280         else
281                 pconn->conn = conn;
282
283         PG_RETURN_TEXT_P(cstring_to_text("OK"));
284 }
285
286 /*
287  * Clear a persistent connection to another database
288  */
289 PG_FUNCTION_INFO_V1(dblink_disconnect);
290 Datum
291 dblink_disconnect(PG_FUNCTION_ARGS)
292 {
293         char       *conname = NULL;
294         remoteConn *rconn = NULL;
295         PGconn     *conn = NULL;
296
297         DBLINK_INIT;
298
299         if (PG_NARGS() == 1)
300         {
301                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
302                 rconn = getConnectionByName(conname);
303                 if (rconn)
304                         conn = rconn->conn;
305         }
306         else
307                 conn = pconn->conn;
308
309         if (!conn)
310                 DBLINK_CONN_NOT_AVAIL;
311
312         PQfinish(conn);
313         if (rconn)
314         {
315                 deleteConnection(conname);
316                 pfree(rconn);
317         }
318         else
319                 pconn->conn = NULL;
320
321         PG_RETURN_TEXT_P(cstring_to_text("OK"));
322 }
323
324 /*
325  * opens a cursor using a persistent connection
326  */
327 PG_FUNCTION_INFO_V1(dblink_open);
328 Datum
329 dblink_open(PG_FUNCTION_ARGS)
330 {
331         char       *msg;
332         PGresult   *res = NULL;
333         PGconn     *conn = NULL;
334         char       *curname = NULL;
335         char       *sql = NULL;
336         char       *conname = NULL;
337         StringInfoData buf;
338         remoteConn *rconn = NULL;
339         bool            fail = true;    /* default to backward compatible behavior */
340
341         DBLINK_INIT;
342         initStringInfo(&buf);
343
344         if (PG_NARGS() == 2)
345         {
346                 /* text,text */
347                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
348                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
349                 rconn = pconn;
350         }
351         else if (PG_NARGS() == 3)
352         {
353                 /* might be text,text,text or text,text,bool */
354                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
355                 {
356                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
357                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
358                         fail = PG_GETARG_BOOL(2);
359                         rconn = pconn;
360                 }
361                 else
362                 {
363                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
364                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
365                         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
366                         rconn = getConnectionByName(conname);
367                 }
368         }
369         else if (PG_NARGS() == 4)
370         {
371                 /* text,text,text,bool */
372                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
373                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
374                 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
375                 fail = PG_GETARG_BOOL(3);
376                 rconn = getConnectionByName(conname);
377         }
378
379         if (!rconn || !rconn->conn)
380                 DBLINK_CONN_NOT_AVAIL;
381         else
382                 conn = rconn->conn;
383
384         /* If we are not in a transaction, start one */
385         if (PQtransactionStatus(conn) == PQTRANS_IDLE)
386         {
387                 res = PQexec(conn, "BEGIN");
388                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
389                         DBLINK_RES_INTERNALERROR("begin error");
390                 PQclear(res);
391                 rconn->newXactForCursor = TRUE;
392
393                 /*
394                  * Since transaction state was IDLE, we force cursor count to
395                  * initially be 0. This is needed as a previous ABORT might have wiped
396                  * out our transaction without maintaining the cursor count for us.
397                  */
398                 rconn->openCursorCount = 0;
399         }
400
401         /* if we started a transaction, increment cursor count */
402         if (rconn->newXactForCursor)
403                 (rconn->openCursorCount)++;
404
405         appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
406         res = PQexec(conn, buf.data);
407         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
408         {
409                 dblink_res_error(conname, res, "could not open cursor", fail);
410                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
411         }
412
413         PQclear(res);
414         PG_RETURN_TEXT_P(cstring_to_text("OK"));
415 }
416
417 /*
418  * closes a cursor
419  */
420 PG_FUNCTION_INFO_V1(dblink_close);
421 Datum
422 dblink_close(PG_FUNCTION_ARGS)
423 {
424         PGconn     *conn = NULL;
425         PGresult   *res = NULL;
426         char       *curname = NULL;
427         char       *conname = NULL;
428         StringInfoData buf;
429         char       *msg;
430         remoteConn *rconn = NULL;
431         bool            fail = true;    /* default to backward compatible behavior */
432
433         DBLINK_INIT;
434         initStringInfo(&buf);
435
436         if (PG_NARGS() == 1)
437         {
438                 /* text */
439                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
440                 rconn = pconn;
441         }
442         else if (PG_NARGS() == 2)
443         {
444                 /* might be text,text or text,bool */
445                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
446                 {
447                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
448                         fail = PG_GETARG_BOOL(1);
449                         rconn = pconn;
450                 }
451                 else
452                 {
453                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
454                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
455                         rconn = getConnectionByName(conname);
456                 }
457         }
458         if (PG_NARGS() == 3)
459         {
460                 /* text,text,bool */
461                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
462                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
463                 fail = PG_GETARG_BOOL(2);
464                 rconn = getConnectionByName(conname);
465         }
466
467         if (!rconn || !rconn->conn)
468                 DBLINK_CONN_NOT_AVAIL;
469         else
470                 conn = rconn->conn;
471
472         appendStringInfo(&buf, "CLOSE %s", curname);
473
474         /* close the cursor */
475         res = PQexec(conn, buf.data);
476         if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
477         {
478                 dblink_res_error(conname, res, "could not close cursor", fail);
479                 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
480         }
481
482         PQclear(res);
483
484         /* if we started a transaction, decrement cursor count */
485         if (rconn->newXactForCursor)
486         {
487                 (rconn->openCursorCount)--;
488
489                 /* if count is zero, commit the transaction */
490                 if (rconn->openCursorCount == 0)
491                 {
492                         rconn->newXactForCursor = FALSE;
493
494                         res = PQexec(conn, "COMMIT");
495                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
496                                 DBLINK_RES_INTERNALERROR("commit error");
497                         PQclear(res);
498                 }
499         }
500
501         PG_RETURN_TEXT_P(cstring_to_text("OK"));
502 }
503
504 /*
505  * Fetch results from an open cursor
506  */
507 PG_FUNCTION_INFO_V1(dblink_fetch);
508 Datum
509 dblink_fetch(PG_FUNCTION_ARGS)
510 {
511         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
512         PGresult   *res = NULL;
513         char       *conname = NULL;
514         remoteConn *rconn = NULL;
515         PGconn     *conn = NULL;
516         StringInfoData buf;
517         char       *curname = NULL;
518         int                     howmany = 0;
519         bool            fail = true;    /* default to backward compatible */
520
521         DBLINK_INIT;
522
523         if (PG_NARGS() == 4)
524         {
525                 /* text,text,int,bool */
526                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
527                 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
528                 howmany = PG_GETARG_INT32(2);
529                 fail = PG_GETARG_BOOL(3);
530
531                 rconn = getConnectionByName(conname);
532                 if (rconn)
533                         conn = rconn->conn;
534         }
535         else if (PG_NARGS() == 3)
536         {
537                 /* text,text,int or text,int,bool */
538                 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
539                 {
540                         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
541                         howmany = PG_GETARG_INT32(1);
542                         fail = PG_GETARG_BOOL(2);
543                         conn = pconn->conn;
544                 }
545                 else
546                 {
547                         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
548                         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
549                         howmany = PG_GETARG_INT32(2);
550
551                         rconn = getConnectionByName(conname);
552                         if (rconn)
553                                 conn = rconn->conn;
554                 }
555         }
556         else if (PG_NARGS() == 2)
557         {
558                 /* text,int */
559                 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
560                 howmany = PG_GETARG_INT32(1);
561                 conn = pconn->conn;
562         }
563
564         if (!conn)
565                 DBLINK_CONN_NOT_AVAIL;
566
567         /* let the caller know we're sending back a tuplestore */
568         rsinfo->returnMode = SFRM_Materialize;
569         rsinfo->setResult = NULL;
570         rsinfo->setDesc = NULL;
571
572         initStringInfo(&buf);
573         appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
574
575         /*
576          * Try to execute the query.  Note that since libpq uses malloc, the
577          * PGresult will be long-lived even though we are still in a short-lived
578          * memory context.
579          */
580         res = PQexec(conn, buf.data);
581         if (!res ||
582                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
583                  PQresultStatus(res) != PGRES_TUPLES_OK))
584         {
585                 dblink_res_error(conname, res, "could not fetch from cursor", fail);
586                 return (Datum) 0;
587         }
588         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
589         {
590                 /* cursor does not exist - closed already or bad name */
591                 PQclear(res);
592                 ereport(ERROR,
593                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
594                                  errmsg("cursor \"%s\" does not exist", curname)));
595         }
596
597         materializeResult(fcinfo, res);
598         return (Datum) 0;
599 }
600
601 /*
602  * Note: this is the new preferred version of dblink
603  */
604 PG_FUNCTION_INFO_V1(dblink_record);
605 Datum
606 dblink_record(PG_FUNCTION_ARGS)
607 {
608         return dblink_record_internal(fcinfo, false);
609 }
610
611 PG_FUNCTION_INFO_V1(dblink_send_query);
612 Datum
613 dblink_send_query(PG_FUNCTION_ARGS)
614 {
615         PGconn     *conn = NULL;
616         char       *connstr = NULL;
617         char       *sql = NULL;
618         remoteConn *rconn = NULL;
619         char       *msg;
620         bool            freeconn = false;
621         int                     retval;
622
623         if (PG_NARGS() == 2)
624         {
625                 DBLINK_GET_CONN;
626                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
627         }
628         else
629                 /* shouldn't happen */
630                 elog(ERROR, "wrong number of arguments");
631
632         /* async query send */
633         retval = PQsendQuery(conn, sql);
634         if (retval != 1)
635                 elog(NOTICE, "%s", PQerrorMessage(conn));
636
637         PG_RETURN_INT32(retval);
638 }
639
640 PG_FUNCTION_INFO_V1(dblink_get_result);
641 Datum
642 dblink_get_result(PG_FUNCTION_ARGS)
643 {
644         return dblink_record_internal(fcinfo, true);
645 }
646
647 static Datum
648 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
649 {
650         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
651         char       *msg;
652         PGresult   *res = NULL;
653         PGconn     *conn = NULL;
654         char       *connstr = NULL;
655         char       *sql = NULL;
656         char       *conname = NULL;
657         remoteConn *rconn = NULL;
658         bool            fail = true;    /* default to backward compatible */
659         bool            freeconn = false;
660
661         /* check to see if caller supports us returning a tuplestore */
662         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
663                 ereport(ERROR,
664                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
665                                  errmsg("set-valued function called in context that cannot accept a set")));
666         if (!(rsinfo->allowedModes & SFRM_Materialize))
667                 ereport(ERROR,
668                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
669                                  errmsg("materialize mode required, but it is not " \
670                                                 "allowed in this context")));
671
672         DBLINK_INIT;
673
674         if (!is_async)
675         {
676                 if (PG_NARGS() == 3)
677                 {
678                         /* text,text,bool */
679                         DBLINK_GET_CONN;
680                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
681                         fail = PG_GETARG_BOOL(2);
682                 }
683                 else if (PG_NARGS() == 2)
684                 {
685                         /* text,text or text,bool */
686                         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
687                         {
688                                 conn = pconn->conn;
689                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
690                                 fail = PG_GETARG_BOOL(1);
691                         }
692                         else
693                         {
694                                 DBLINK_GET_CONN;
695                                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
696                         }
697                 }
698                 else if (PG_NARGS() == 1)
699                 {
700                         /* text */
701                         conn = pconn->conn;
702                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
703                 }
704                 else
705                         /* shouldn't happen */
706                         elog(ERROR, "wrong number of arguments");
707         }
708         else    /* is_async */
709         {
710                 /* get async result */
711                 if (PG_NARGS() == 2)
712                 {
713                         /* text,bool */
714                         DBLINK_GET_CONN;
715                         fail = PG_GETARG_BOOL(1);
716                 }
717                 else if (PG_NARGS() == 1)
718                 {
719                         /* text */
720                         DBLINK_GET_CONN;
721                 }
722                 else
723                         /* shouldn't happen */
724                         elog(ERROR, "wrong number of arguments");
725         }
726
727         if (!conn)
728                 DBLINK_CONN_NOT_AVAIL;
729
730         /* let the caller know we're sending back a tuplestore */
731         rsinfo->returnMode = SFRM_Materialize;
732         rsinfo->setResult = NULL;
733         rsinfo->setDesc = NULL;
734
735         /* synchronous query, or async result retrieval */
736         if (!is_async)
737                 res = PQexec(conn, sql);
738         else
739         {
740                 res = PQgetResult(conn);
741                 /* NULL means we're all done with the async results */
742                 if (!res)
743                         return (Datum) 0;
744         }
745
746         /* if needed, close the connection to the database and cleanup */
747         if (freeconn)
748                 PQfinish(conn);
749
750         if (!res ||
751                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
752                  PQresultStatus(res) != PGRES_TUPLES_OK))
753         {
754                 dblink_res_error(conname, res, "could not execute query", fail);
755                 return (Datum) 0;
756         }
757
758         materializeResult(fcinfo, res);
759         return (Datum) 0;
760 }
761
762 /*
763  * Materialize the PGresult to return them as the function result.
764  * The res will be released in this function.
765  */
766 static void
767 materializeResult(FunctionCallInfo fcinfo, PGresult *res)
768 {
769         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
770
771         Assert(rsinfo->returnMode == SFRM_Materialize);
772
773         PG_TRY();
774         {
775                 TupleDesc       tupdesc;
776                 bool            is_sql_cmd = false;
777                 int                     ntuples;
778                 int                     nfields;
779
780                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
781                 {
782                         is_sql_cmd = true;
783
784                         /*
785                          * need a tuple descriptor representing one TEXT column to return
786                          * the command status string as our result tuple
787                          */
788                         tupdesc = CreateTemplateTupleDesc(1, false);
789                         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
790                                                            TEXTOID, -1, 0);
791                         ntuples = 1;
792                         nfields = 1;
793                 }
794                 else
795                 {
796                         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
797
798                         is_sql_cmd = false;
799
800                         /* get a tuple descriptor for our result type */
801                         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
802                         {
803                                 case TYPEFUNC_COMPOSITE:
804                                         /* success */
805                                         break;
806                                 case TYPEFUNC_RECORD:
807                                         /* failed to determine actual type of RECORD */
808                                         ereport(ERROR,
809                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
810                                                 errmsg("function returning record called in context "
811                                                            "that cannot accept type record")));
812                                         break;
813                                 default:
814                                         /* result type isn't composite */
815                                         elog(ERROR, "return type must be a row type");
816                                         break;
817                         }
818
819                         /* make sure we have a persistent copy of the tupdesc */
820                         tupdesc = CreateTupleDescCopy(tupdesc);
821                         ntuples = PQntuples(res);
822                         nfields = PQnfields(res);
823                 }
824
825                 /*
826                  * check result and tuple descriptor have the same number of columns
827                  */
828                 if (nfields != tupdesc->natts)
829                         ereport(ERROR,
830                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
831                                          errmsg("remote query result rowtype does not match "
832                                                         "the specified FROM clause rowtype")));
833
834                 if (ntuples > 0)
835                 {
836                         AttInMetadata *attinmeta;
837                         Tuplestorestate *tupstore;
838                         MemoryContext oldcontext;
839                         int                     row;
840                         char      **values;
841
842                         attinmeta = TupleDescGetAttInMetadata(tupdesc);
843
844                         oldcontext = MemoryContextSwitchTo(
845                                                                         rsinfo->econtext->ecxt_per_query_memory);
846                         tupstore = tuplestore_begin_heap(true, false, work_mem);
847                         rsinfo->setResult = tupstore;
848                         rsinfo->setDesc = tupdesc;
849                         MemoryContextSwitchTo(oldcontext);
850
851                         values = (char **) palloc(nfields * sizeof(char *));
852
853                         /* put all tuples into the tuplestore */
854                         for (row = 0; row < ntuples; row++)
855                         {
856                                 HeapTuple       tuple;
857
858                                 if (!is_sql_cmd)
859                                 {
860                                         int                     i;
861
862                                         for (i = 0; i < nfields; i++)
863                                         {
864                                                 if (PQgetisnull(res, row, i))
865                                                         values[i] = NULL;
866                                                 else
867                                                         values[i] = PQgetvalue(res, row, i);
868                                         }
869                                 }
870                                 else
871                                 {
872                                         values[0] = PQcmdStatus(res);
873                                 }
874
875                                 /* build the tuple and put it into the tuplestore. */
876                                 tuple = BuildTupleFromCStrings(attinmeta, values);
877                                 tuplestore_puttuple(tupstore, tuple);
878                         }
879
880                         /* clean up and return the tuplestore */
881                         tuplestore_donestoring(tupstore);
882                 }
883
884                 PQclear(res);
885         }
886         PG_CATCH();
887         {
888                 /* be sure to release the libpq result */
889                 PQclear(res);
890                 PG_RE_THROW();
891         }
892         PG_END_TRY();
893 }
894
895 /*
896  * List all open dblink connections by name.
897  * Returns an array of all connection names.
898  * Takes no params
899  */
900 PG_FUNCTION_INFO_V1(dblink_get_connections);
901 Datum
902 dblink_get_connections(PG_FUNCTION_ARGS)
903 {
904         HASH_SEQ_STATUS status;
905         remoteConnHashEnt *hentry;
906         ArrayBuildState *astate = NULL;
907
908         if (remoteConnHash)
909         {
910                 hash_seq_init(&status, remoteConnHash);
911                 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
912                 {
913                         /* stash away current value */
914                         astate = accumArrayResult(astate,
915                                                                           CStringGetTextDatum(hentry->name),
916                                                                           false, TEXTOID, CurrentMemoryContext);
917                 }
918         }
919
920         if (astate)
921                 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
922                                                                                           CurrentMemoryContext));
923         else
924                 PG_RETURN_NULL();
925 }
926
927 /*
928  * Checks if a given remote connection is busy
929  *
930  * Returns 1 if the connection is busy, 0 otherwise
931  * Params:
932  *      text connection_name - name of the connection to check
933  *
934  */
935 PG_FUNCTION_INFO_V1(dblink_is_busy);
936 Datum
937 dblink_is_busy(PG_FUNCTION_ARGS)
938 {
939         PGconn     *conn = NULL;
940         remoteConn *rconn = NULL;
941
942         DBLINK_INIT;
943         DBLINK_GET_NAMED_CONN;
944
945         PQconsumeInput(conn);
946         PG_RETURN_INT32(PQisBusy(conn));
947 }
948
949 /*
950  * Cancels a running request on a connection
951  *
952  * Returns text:
953  *      "OK" if the cancel request has been sent correctly,
954  *              an error message otherwise
955  *
956  * Params:
957  *      text connection_name - name of the connection to check
958  *
959  */
960 PG_FUNCTION_INFO_V1(dblink_cancel_query);
961 Datum
962 dblink_cancel_query(PG_FUNCTION_ARGS)
963 {
964         int                     res = 0;
965         PGconn     *conn = NULL;
966         remoteConn *rconn = NULL;
967         PGcancel   *cancel;
968         char            errbuf[256];
969
970         DBLINK_INIT;
971         DBLINK_GET_NAMED_CONN;
972         cancel = PQgetCancel(conn);
973
974         res = PQcancel(cancel, errbuf, 256);
975         PQfreeCancel(cancel);
976
977         if (res == 1)
978                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
979         else
980                 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
981 }
982
983
984 /*
985  * Get error message from a connection
986  *
987  * Returns text:
988  *      "OK" if no error, an error message otherwise
989  *
990  * Params:
991  *      text connection_name - name of the connection to check
992  *
993  */
994 PG_FUNCTION_INFO_V1(dblink_error_message);
995 Datum
996 dblink_error_message(PG_FUNCTION_ARGS)
997 {
998         char       *msg;
999         PGconn     *conn = NULL;
1000         remoteConn *rconn = NULL;
1001
1002         DBLINK_INIT;
1003         DBLINK_GET_NAMED_CONN;
1004
1005         msg = PQerrorMessage(conn);
1006         if (msg == NULL || msg[0] == '\0')
1007                 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1008         else
1009                 PG_RETURN_TEXT_P(cstring_to_text(msg));
1010 }
1011
1012 /*
1013  * Execute an SQL non-SELECT command
1014  */
1015 PG_FUNCTION_INFO_V1(dblink_exec);
1016 Datum
1017 dblink_exec(PG_FUNCTION_ARGS)
1018 {
1019         char       *msg;
1020         PGresult   *res = NULL;
1021         text       *sql_cmd_status = NULL;
1022         PGconn     *conn = NULL;
1023         char       *connstr = NULL;
1024         char       *sql = NULL;
1025         char       *conname = NULL;
1026         remoteConn *rconn = NULL;
1027         bool            freeconn = false;
1028         bool            fail = true;    /* default to backward compatible behavior */
1029
1030         DBLINK_INIT;
1031
1032         if (PG_NARGS() == 3)
1033         {
1034                 /* must be text,text,bool */
1035                 DBLINK_GET_CONN;
1036                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1037                 fail = PG_GETARG_BOOL(2);
1038         }
1039         else if (PG_NARGS() == 2)
1040         {
1041                 /* might be text,text or text,bool */
1042                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1043                 {
1044                         conn = pconn->conn;
1045                         sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1046                         fail = PG_GETARG_BOOL(1);
1047                 }
1048                 else
1049                 {
1050                         DBLINK_GET_CONN;
1051                         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1052                 }
1053         }
1054         else if (PG_NARGS() == 1)
1055         {
1056                 /* must be single text argument */
1057                 conn = pconn->conn;
1058                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1059         }
1060         else
1061                 /* shouldn't happen */
1062                 elog(ERROR, "wrong number of arguments");
1063
1064         if (!conn)
1065                 DBLINK_CONN_NOT_AVAIL;
1066
1067         res = PQexec(conn, sql);
1068         if (!res ||
1069                 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1070                  PQresultStatus(res) != PGRES_TUPLES_OK))
1071         {
1072                 dblink_res_error(conname, res, "could not execute command", fail);
1073
1074                 /*
1075                  * and save a copy of the command status string to return as our
1076                  * result tuple
1077                  */
1078                 sql_cmd_status = cstring_to_text("ERROR");
1079         }
1080         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1081         {
1082                 /*
1083                  * and save a copy of the command status string to return as our
1084                  * result tuple
1085                  */
1086                 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1087                 PQclear(res);
1088         }
1089         else
1090         {
1091                 PQclear(res);
1092                 ereport(ERROR,
1093                                 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1094                                  errmsg("statement returning results not allowed")));
1095         }
1096
1097         /* if needed, close the connection to the database and cleanup */
1098         if (freeconn)
1099                 PQfinish(conn);
1100
1101         PG_RETURN_TEXT_P(sql_cmd_status);
1102 }
1103
1104
1105 /*
1106  * dblink_get_pkey
1107  *
1108  * Return list of primary key fields for the supplied relation,
1109  * or NULL if none exists.
1110  */
1111 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1112 Datum
1113 dblink_get_pkey(PG_FUNCTION_ARGS)
1114 {
1115         int16           numatts;
1116         char      **results;
1117         FuncCallContext *funcctx;
1118         int32           call_cntr;
1119         int32           max_calls;
1120         AttInMetadata *attinmeta;
1121         MemoryContext oldcontext;
1122
1123         /* stuff done only on the first call of the function */
1124         if (SRF_IS_FIRSTCALL())
1125         {
1126                 Relation        rel;
1127                 TupleDesc       tupdesc;
1128
1129                 /* create a function context for cross-call persistence */
1130                 funcctx = SRF_FIRSTCALL_INIT();
1131
1132                 /*
1133                  * switch to memory context appropriate for multiple function calls
1134                  */
1135                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1136
1137                 /* open target relation */
1138                 rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
1139
1140                 /* get the array of attnums */
1141                 results = get_pkey_attnames(rel, &numatts);
1142
1143                 relation_close(rel, AccessShareLock);
1144
1145                 /*
1146                  * need a tuple descriptor representing one INT and one TEXT column
1147                  */
1148                 tupdesc = CreateTemplateTupleDesc(2, false);
1149                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1150                                                    INT4OID, -1, 0);
1151                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1152                                                    TEXTOID, -1, 0);
1153
1154                 /*
1155                  * Generate attribute metadata needed later to produce tuples from raw
1156                  * C strings
1157                  */
1158                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1159                 funcctx->attinmeta = attinmeta;
1160
1161                 if ((results != NULL) && (numatts > 0))
1162                 {
1163                         funcctx->max_calls = numatts;
1164
1165                         /* got results, keep track of them */
1166                         funcctx->user_fctx = results;
1167                 }
1168                 else
1169                 {
1170                         /* fast track when no results */
1171                         MemoryContextSwitchTo(oldcontext);
1172                         SRF_RETURN_DONE(funcctx);
1173                 }
1174
1175                 MemoryContextSwitchTo(oldcontext);
1176         }
1177
1178         /* stuff done on every call of the function */
1179         funcctx = SRF_PERCALL_SETUP();
1180
1181         /*
1182          * initialize per-call variables
1183          */
1184         call_cntr = funcctx->call_cntr;
1185         max_calls = funcctx->max_calls;
1186
1187         results = (char **) funcctx->user_fctx;
1188         attinmeta = funcctx->attinmeta;
1189
1190         if (call_cntr < max_calls)      /* do when there is more left to send */
1191         {
1192                 char      **values;
1193                 HeapTuple       tuple;
1194                 Datum           result;
1195
1196                 values = (char **) palloc(2 * sizeof(char *));
1197                 values[0] = (char *) palloc(12);                /* sign, 10 digits, '\0' */
1198
1199                 sprintf(values[0], "%d", call_cntr + 1);
1200
1201                 values[1] = results[call_cntr];
1202
1203                 /* build the tuple */
1204                 tuple = BuildTupleFromCStrings(attinmeta, values);
1205
1206                 /* make the tuple into a datum */
1207                 result = HeapTupleGetDatum(tuple);
1208
1209                 SRF_RETURN_NEXT(funcctx, result);
1210         }
1211         else
1212         {
1213                 /* do when there is no more left */
1214                 SRF_RETURN_DONE(funcctx);
1215         }
1216 }
1217
1218
1219 /*
1220  * dblink_build_sql_insert
1221  *
1222  * Used to generate an SQL insert statement
1223  * based on an existing tuple in a local relation.
1224  * This is useful for selectively replicating data
1225  * to another server via dblink.
1226  *
1227  * API:
1228  * <relname> - name of local table of interest
1229  * <pkattnums> - an int2vector of attnums which will be used
1230  * to identify the local tuple of interest
1231  * <pknumatts> - number of attnums in pkattnums
1232  * <src_pkattvals_arry> - text array of key values which will be used
1233  * to identify the local tuple of interest
1234  * <tgt_pkattvals_arry> - text array of key values which will be used
1235  * to build the string for execution remotely. These are substituted
1236  * for their counterparts in src_pkattvals_arry
1237  */
1238 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1239 Datum
1240 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1241 {
1242         text       *relname_text = PG_GETARG_TEXT_P(0);
1243         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1244         int32           pknumatts_arg = PG_GETARG_INT32(2);
1245         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1246         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1247         Relation        rel;
1248         int                *pkattnums;
1249         int                     pknumatts;
1250         char      **src_pkattvals;
1251         char      **tgt_pkattvals;
1252         int                     src_nitems;
1253         int                     tgt_nitems;
1254         char       *sql;
1255
1256         /*
1257          * Open target relation.
1258          */
1259         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1260
1261         /*
1262          * Process pkattnums argument.
1263          */
1264         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1265                                            &pkattnums, &pknumatts);
1266
1267         /*
1268          * Source array is made up of key values that will be used to locate the
1269          * tuple of interest from the local system.
1270          */
1271         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1272
1273         /*
1274          * There should be one source array key value for each key attnum
1275          */
1276         if (src_nitems != pknumatts)
1277                 ereport(ERROR,
1278                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1279                                  errmsg("source key array length must match number of key " \
1280                                                 "attributes")));
1281
1282         /*
1283          * Target array is made up of key values that will be used to build the
1284          * SQL string for use on the remote system.
1285          */
1286         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1287
1288         /*
1289          * There should be one target array key value for each key attnum
1290          */
1291         if (tgt_nitems != pknumatts)
1292                 ereport(ERROR,
1293                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1294                                  errmsg("target key array length must match number of key " \
1295                                                 "attributes")));
1296
1297         /*
1298          * Prep work is finally done. Go get the SQL string.
1299          */
1300         sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1301
1302         /*
1303          * Now we can close the relation.
1304          */
1305         relation_close(rel, AccessShareLock);
1306
1307         /*
1308          * And send it
1309          */
1310         PG_RETURN_TEXT_P(cstring_to_text(sql));
1311 }
1312
1313
1314 /*
1315  * dblink_build_sql_delete
1316  *
1317  * Used to generate an SQL delete statement.
1318  * This is useful for selectively replicating a
1319  * delete to another server via dblink.
1320  *
1321  * API:
1322  * <relname> - name of remote table of interest
1323  * <pkattnums> - an int2vector of attnums which will be used
1324  * to identify the remote tuple of interest
1325  * <pknumatts> - number of attnums in pkattnums
1326  * <tgt_pkattvals_arry> - text array of key values which will be used
1327  * to build the string for execution remotely.
1328  */
1329 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1330 Datum
1331 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1332 {
1333         text       *relname_text = PG_GETARG_TEXT_P(0);
1334         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1335         int32           pknumatts_arg = PG_GETARG_INT32(2);
1336         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1337         Relation        rel;
1338         int                *pkattnums;
1339         int                     pknumatts;
1340         char      **tgt_pkattvals;
1341         int                     tgt_nitems;
1342         char       *sql;
1343
1344         /*
1345          * Open target relation.
1346          */
1347         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1348
1349         /*
1350          * Process pkattnums argument.
1351          */
1352         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1353                                            &pkattnums, &pknumatts);
1354
1355         /*
1356          * Target array is made up of key values that will be used to build the
1357          * SQL string for use on the remote system.
1358          */
1359         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1360
1361         /*
1362          * There should be one target array key value for each key attnum
1363          */
1364         if (tgt_nitems != pknumatts)
1365                 ereport(ERROR,
1366                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1367                                  errmsg("target key array length must match number of key " \
1368                                                 "attributes")));
1369
1370         /*
1371          * Prep work is finally done. Go get the SQL string.
1372          */
1373         sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1374
1375         /*
1376          * Now we can close the relation.
1377          */
1378         relation_close(rel, AccessShareLock);
1379
1380         /*
1381          * And send it
1382          */
1383         PG_RETURN_TEXT_P(cstring_to_text(sql));
1384 }
1385
1386
1387 /*
1388  * dblink_build_sql_update
1389  *
1390  * Used to generate an SQL update statement
1391  * based on an existing tuple in a local relation.
1392  * This is useful for selectively replicating data
1393  * to another server via dblink.
1394  *
1395  * API:
1396  * <relname> - name of local table of interest
1397  * <pkattnums> - an int2vector of attnums which will be used
1398  * to identify the local tuple of interest
1399  * <pknumatts> - number of attnums in pkattnums
1400  * <src_pkattvals_arry> - text array of key values which will be used
1401  * to identify the local tuple of interest
1402  * <tgt_pkattvals_arry> - text array of key values which will be used
1403  * to build the string for execution remotely. These are substituted
1404  * for their counterparts in src_pkattvals_arry
1405  */
1406 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1407 Datum
1408 dblink_build_sql_update(PG_FUNCTION_ARGS)
1409 {
1410         text       *relname_text = PG_GETARG_TEXT_P(0);
1411         int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1412         int32           pknumatts_arg = PG_GETARG_INT32(2);
1413         ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1414         ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1415         Relation        rel;
1416         int                *pkattnums;
1417         int                     pknumatts;
1418         char      **src_pkattvals;
1419         char      **tgt_pkattvals;
1420         int                     src_nitems;
1421         int                     tgt_nitems;
1422         char       *sql;
1423
1424         /*
1425          * Open target relation.
1426          */
1427         rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1428
1429         /*
1430          * Process pkattnums argument.
1431          */
1432         validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1433                                            &pkattnums, &pknumatts);
1434
1435         /*
1436          * Source array is made up of key values that will be used to locate the
1437          * tuple of interest from the local system.
1438          */
1439         src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1440
1441         /*
1442          * There should be one source array key value for each key attnum
1443          */
1444         if (src_nitems != pknumatts)
1445                 ereport(ERROR,
1446                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1447                                  errmsg("source key array length must match number of key " \
1448                                                 "attributes")));
1449
1450         /*
1451          * Target array is made up of key values that will be used to build the
1452          * SQL string for use on the remote system.
1453          */
1454         tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1455
1456         /*
1457          * There should be one target array key value for each key attnum
1458          */
1459         if (tgt_nitems != pknumatts)
1460                 ereport(ERROR,
1461                                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1462                                  errmsg("target key array length must match number of key " \
1463                                                 "attributes")));
1464
1465         /*
1466          * Prep work is finally done. Go get the SQL string.
1467          */
1468         sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1469
1470         /*
1471          * Now we can close the relation.
1472          */
1473         relation_close(rel, AccessShareLock);
1474
1475         /*
1476          * And send it
1477          */
1478         PG_RETURN_TEXT_P(cstring_to_text(sql));
1479 }
1480
1481 /*
1482  * dblink_current_query
1483  * return the current query string
1484  * to allow its use in (among other things)
1485  * rewrite rules
1486  */
1487 PG_FUNCTION_INFO_V1(dblink_current_query);
1488 Datum
1489 dblink_current_query(PG_FUNCTION_ARGS)
1490 {
1491         /* This is now just an alias for the built-in function current_query() */
1492         PG_RETURN_DATUM(current_query(fcinfo));
1493 }
1494
1495 /*
1496  * Retrieve async notifications for a connection.
1497  *
1498  * Returns an setof record of notifications, or an empty set if none recieved.
1499  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
1500  *
1501  */
1502 #define DBLINK_NOTIFY_COLS              3
1503
1504 PG_FUNCTION_INFO_V1(dblink_get_notify);
1505 Datum
1506 dblink_get_notify(PG_FUNCTION_ARGS)
1507 {
1508         PGconn     *conn = NULL;
1509         remoteConn *rconn = NULL;
1510         PGnotify   *notify;
1511         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1512         TupleDesc       tupdesc;
1513         Tuplestorestate *tupstore;
1514         MemoryContext per_query_ctx;
1515         MemoryContext oldcontext;
1516
1517         DBLINK_INIT;
1518         if (PG_NARGS() == 1)
1519                 DBLINK_GET_NAMED_CONN;
1520         else
1521                 conn = pconn->conn;
1522
1523         /* create the tuplestore */
1524         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1525         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1526
1527         tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
1528         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1529                                            TEXTOID, -1, 0);
1530         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1531                                            INT4OID, -1, 0);
1532         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1533                                            TEXTOID, -1, 0);
1534
1535         tupstore = tuplestore_begin_heap(true, false, work_mem);
1536         rsinfo->returnMode = SFRM_Materialize;
1537         rsinfo->setResult = tupstore;
1538         rsinfo->setDesc = tupdesc;
1539
1540         MemoryContextSwitchTo(oldcontext);
1541
1542         PQconsumeInput(conn);
1543         while ((notify = PQnotifies(conn)) != NULL)
1544         {
1545                 Datum           values[DBLINK_NOTIFY_COLS];
1546                 bool            nulls[DBLINK_NOTIFY_COLS];
1547
1548                 memset(values, 0, sizeof(values));
1549                 memset(nulls, 0, sizeof(nulls));
1550
1551                 if (notify->relname != NULL)
1552                         values[0] = CStringGetTextDatum(notify->relname);
1553                 else
1554                         nulls[0] = true;
1555
1556                 values[1] = Int32GetDatum(notify->be_pid);
1557
1558                 if (notify->extra != NULL)
1559                         values[2] = CStringGetTextDatum(notify->extra);
1560                 else
1561                         nulls[2] = true;
1562
1563                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1564
1565                 PQfreemem(notify);
1566                 PQconsumeInput(conn);
1567         }
1568
1569         /* clean up and return the tuplestore */
1570         tuplestore_donestoring(tupstore);
1571
1572         return (Datum) 0;
1573 }
1574
1575 /*************************************************************
1576  * internal functions
1577  */
1578
1579
1580 /*
1581  * get_pkey_attnames
1582  *
1583  * Get the primary key attnames for the given relation.
1584  * Return NULL, and set numatts = 0, if no primary key exists.
1585  */
1586 static char **
1587 get_pkey_attnames(Relation rel, int16 *numatts)
1588 {
1589         Relation        indexRelation;
1590         ScanKeyData skey;
1591         SysScanDesc scan;
1592         HeapTuple       indexTuple;
1593         int                     i;
1594         char      **result = NULL;
1595         TupleDesc       tupdesc;
1596
1597         /* initialize numatts to 0 in case no primary key exists */
1598         *numatts = 0;
1599
1600         tupdesc = rel->rd_att;
1601
1602         /* Prepare to scan pg_index for entries having indrelid = this rel. */
1603         indexRelation = heap_open(IndexRelationId, AccessShareLock);
1604         ScanKeyInit(&skey,
1605                                 Anum_pg_index_indrelid,
1606                                 BTEqualStrategyNumber, F_OIDEQ,
1607                                 ObjectIdGetDatum(RelationGetRelid(rel)));
1608
1609         scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1610                                                           SnapshotNow, 1, &skey);
1611
1612         while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1613         {
1614                 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1615
1616                 /* we're only interested if it is the primary key */
1617                 if (index->indisprimary)
1618                 {
1619                         *numatts = index->indnatts;
1620                         if (*numatts > 0)
1621                         {
1622                                 result = (char **) palloc(*numatts * sizeof(char *));
1623
1624                                 for (i = 0; i < *numatts; i++)
1625                                         result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1626                         }
1627                         break;
1628                 }
1629         }
1630
1631         systable_endscan(scan);
1632         heap_close(indexRelation, AccessShareLock);
1633
1634         return result;
1635 }
1636
1637 /*
1638  * Deconstruct a text[] into C-strings (note any NULL elements will be
1639  * returned as NULL pointers)
1640  */
1641 static char **
1642 get_text_array_contents(ArrayType *array, int *numitems)
1643 {
1644         int                     ndim = ARR_NDIM(array);
1645         int                *dims = ARR_DIMS(array);
1646         int                     nitems;
1647         int16           typlen;
1648         bool            typbyval;
1649         char            typalign;
1650         char      **values;
1651         char       *ptr;
1652         bits8      *bitmap;
1653         int                     bitmask;
1654         int                     i;
1655
1656         Assert(ARR_ELEMTYPE(array) == TEXTOID);
1657
1658         *numitems = nitems = ArrayGetNItems(ndim, dims);
1659
1660         get_typlenbyvalalign(ARR_ELEMTYPE(array),
1661                                                  &typlen, &typbyval, &typalign);
1662
1663         values = (char **) palloc(nitems * sizeof(char *));
1664
1665         ptr = ARR_DATA_PTR(array);
1666         bitmap = ARR_NULLBITMAP(array);
1667         bitmask = 1;
1668
1669         for (i = 0; i < nitems; i++)
1670         {
1671                 if (bitmap && (*bitmap & bitmask) == 0)
1672                 {
1673                         values[i] = NULL;
1674                 }
1675                 else
1676                 {
1677                         values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1678                         ptr = att_addlength_pointer(ptr, typlen, ptr);
1679                         ptr = (char *) att_align_nominal(ptr, typalign);
1680                 }
1681
1682                 /* advance bitmap pointer if any */
1683                 if (bitmap)
1684                 {
1685                         bitmask <<= 1;
1686                         if (bitmask == 0x100)
1687                         {
1688                                 bitmap++;
1689                                 bitmask = 1;
1690                         }
1691                 }
1692         }
1693
1694         return values;
1695 }
1696
1697 static char *
1698 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1699 {
1700         char       *relname;
1701         HeapTuple       tuple;
1702         TupleDesc       tupdesc;
1703         int                     natts;
1704         StringInfoData buf;
1705         char       *val;
1706         int                     key;
1707         int                     i;
1708         bool            needComma;
1709
1710         initStringInfo(&buf);
1711
1712         /* get relation name including any needed schema prefix and quoting */
1713         relname = generate_relation_name(rel);
1714
1715         tupdesc = rel->rd_att;
1716         natts = tupdesc->natts;
1717
1718         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
1719         if (!tuple)
1720                 ereport(ERROR,
1721                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1722                                  errmsg("source row not found")));
1723
1724         appendStringInfo(&buf, "INSERT INTO %s(", relname);
1725
1726         needComma = false;
1727         for (i = 0; i < natts; i++)
1728         {
1729                 if (tupdesc->attrs[i]->attisdropped)
1730                         continue;
1731
1732                 if (needComma)
1733                         appendStringInfo(&buf, ",");
1734
1735                 appendStringInfoString(&buf,
1736                                           quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1737                 needComma = true;
1738         }
1739
1740         appendStringInfo(&buf, ") VALUES(");
1741
1742         /*
1743          * Note: i is physical column number (counting from 0).
1744          */
1745         needComma = false;
1746         for (i = 0; i < natts; i++)
1747         {
1748                 if (tupdesc->attrs[i]->attisdropped)
1749                         continue;
1750
1751                 if (needComma)
1752                         appendStringInfo(&buf, ",");
1753
1754                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
1755
1756                 if (key >= 0)
1757                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1758                 else
1759                         val = SPI_getvalue(tuple, tupdesc, i + 1);
1760
1761                 if (val != NULL)
1762                 {
1763                         appendStringInfoString(&buf, quote_literal_cstr(val));
1764                         pfree(val);
1765                 }
1766                 else
1767                         appendStringInfo(&buf, "NULL");
1768                 needComma = true;
1769         }
1770         appendStringInfo(&buf, ")");
1771
1772         return (buf.data);
1773 }
1774
1775 static char *
1776 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
1777 {
1778         char       *relname;
1779         TupleDesc       tupdesc;
1780         StringInfoData buf;
1781         int                     i;
1782
1783         initStringInfo(&buf);
1784
1785         /* get relation name including any needed schema prefix and quoting */
1786         relname = generate_relation_name(rel);
1787
1788         tupdesc = rel->rd_att;
1789
1790         appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1791         for (i = 0; i < pknumatts; i++)
1792         {
1793                 int                     pkattnum = pkattnums[i];
1794
1795                 if (i > 0)
1796                         appendStringInfo(&buf, " AND ");
1797
1798                 appendStringInfoString(&buf,
1799                            quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1800
1801                 if (tgt_pkattvals[i] != NULL)
1802                         appendStringInfo(&buf, " = %s",
1803                                                          quote_literal_cstr(tgt_pkattvals[i]));
1804                 else
1805                         appendStringInfo(&buf, " IS NULL");
1806         }
1807
1808         return (buf.data);
1809 }
1810
1811 static char *
1812 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1813 {
1814         char       *relname;
1815         HeapTuple       tuple;
1816         TupleDesc       tupdesc;
1817         int                     natts;
1818         StringInfoData buf;
1819         char       *val;
1820         int                     key;
1821         int                     i;
1822         bool            needComma;
1823
1824         initStringInfo(&buf);
1825
1826         /* get relation name including any needed schema prefix and quoting */
1827         relname = generate_relation_name(rel);
1828
1829         tupdesc = rel->rd_att;
1830         natts = tupdesc->natts;
1831
1832         tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
1833         if (!tuple)
1834                 ereport(ERROR,
1835                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1836                                  errmsg("source row not found")));
1837
1838         appendStringInfo(&buf, "UPDATE %s SET ", relname);
1839
1840         /*
1841          * Note: i is physical column number (counting from 0).
1842          */
1843         needComma = false;
1844         for (i = 0; i < natts; i++)
1845         {
1846                 if (tupdesc->attrs[i]->attisdropped)
1847                         continue;
1848
1849                 if (needComma)
1850                         appendStringInfo(&buf, ", ");
1851
1852                 appendStringInfo(&buf, "%s = ",
1853                                           quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1854
1855                 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
1856
1857                 if (key >= 0)
1858                         val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1859                 else
1860                         val = SPI_getvalue(tuple, tupdesc, i + 1);
1861
1862                 if (val != NULL)
1863                 {
1864                         appendStringInfoString(&buf, quote_literal_cstr(val));
1865                         pfree(val);
1866                 }
1867                 else
1868                         appendStringInfoString(&buf, "NULL");
1869                 needComma = true;
1870         }
1871
1872         appendStringInfo(&buf, " WHERE ");
1873
1874         for (i = 0; i < pknumatts; i++)
1875         {
1876                 int                     pkattnum = pkattnums[i];
1877
1878                 if (i > 0)
1879                         appendStringInfo(&buf, " AND ");
1880
1881                 appendStringInfo(&buf, "%s",
1882                            quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1883
1884                 val = tgt_pkattvals[i];
1885
1886                 if (val != NULL)
1887                         appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
1888                 else
1889                         appendStringInfo(&buf, " IS NULL");
1890         }
1891
1892         return (buf.data);
1893 }
1894
1895 /*
1896  * Return a properly quoted identifier.
1897  * Uses quote_ident in quote.c
1898  */
1899 static char *
1900 quote_ident_cstr(char *rawstr)
1901 {
1902         text       *rawstr_text;
1903         text       *result_text;
1904         char       *result;
1905
1906         rawstr_text = cstring_to_text(rawstr);
1907         result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
1908                                                                                           PointerGetDatum(rawstr_text)));
1909         result = text_to_cstring(result_text);
1910
1911         return result;
1912 }
1913
1914 static int
1915 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
1916 {
1917         int                     i;
1918
1919         /*
1920          * Not likely a long list anyway, so just scan for the value
1921          */
1922         for (i = 0; i < pknumatts; i++)
1923                 if (key == pkattnums[i])
1924                         return i;
1925
1926         return -1;
1927 }
1928
1929 static HeapTuple
1930 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
1931 {
1932         char       *relname;
1933         TupleDesc       tupdesc;
1934         int                     natts;
1935         StringInfoData buf;
1936         int                     ret;
1937         HeapTuple       tuple;
1938         int                     i;
1939
1940         /*
1941          * Connect to SPI manager
1942          */
1943         if ((ret = SPI_connect()) < 0)
1944                 /* internal error */
1945                 elog(ERROR, "SPI connect failure - returned %d", ret);
1946
1947         initStringInfo(&buf);
1948
1949         /* get relation name including any needed schema prefix and quoting */
1950         relname = generate_relation_name(rel);
1951
1952         tupdesc = rel->rd_att;
1953         natts = tupdesc->natts;
1954
1955         /*
1956          * Build sql statement to look up tuple of interest, ie, the one matching
1957          * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
1958          * generate a result tuple that matches the table's physical structure,
1959          * with NULLs for any dropped columns.  Otherwise we have to deal with two
1960          * different tupdescs and everything's very confusing.
1961          */
1962         appendStringInfoString(&buf, "SELECT ");
1963
1964         for (i = 0; i < natts; i++)
1965         {
1966                 if (i > 0)
1967                         appendStringInfoString(&buf, ", ");
1968
1969                 if (tupdesc->attrs[i]->attisdropped)
1970                         appendStringInfoString(&buf, "NULL");
1971                 else
1972                         appendStringInfoString(&buf,
1973                                           quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1974         }
1975
1976         appendStringInfo(&buf, " FROM %s WHERE ", relname);
1977
1978         for (i = 0; i < pknumatts; i++)
1979         {
1980                 int                     pkattnum = pkattnums[i];
1981
1982                 if (i > 0)
1983                         appendStringInfo(&buf, " AND ");
1984
1985                 appendStringInfoString(&buf,
1986                            quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1987
1988                 if (src_pkattvals[i] != NULL)
1989                         appendStringInfo(&buf, " = %s",
1990                                                          quote_literal_cstr(src_pkattvals[i]));
1991                 else
1992                         appendStringInfo(&buf, " IS NULL");
1993         }
1994
1995         /*
1996          * Retrieve the desired tuple
1997          */
1998         ret = SPI_exec(buf.data, 0);
1999         pfree(buf.data);
2000
2001         /*
2002          * Only allow one qualifying tuple
2003          */
2004         if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2005                 ereport(ERROR,
2006                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2007                                  errmsg("source criteria matched more than one record")));
2008
2009         else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2010         {
2011                 SPITupleTable *tuptable = SPI_tuptable;
2012
2013                 tuple = SPI_copytuple(tuptable->vals[0]);
2014                 SPI_finish();
2015
2016                 return tuple;
2017         }
2018         else
2019         {
2020                 /*
2021                  * no qualifying tuples
2022                  */
2023                 SPI_finish();
2024
2025                 return NULL;
2026         }
2027
2028         /*
2029          * never reached, but keep compiler quiet
2030          */
2031         return NULL;
2032 }
2033
2034 /*
2035  * Open the relation named by relname_text, acquire specified type of lock,
2036  * verify we have specified permissions.
2037  * Caller must close rel when done with it.
2038  */
2039 static Relation
2040 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2041 {
2042         RangeVar   *relvar;
2043         Relation        rel;
2044         AclResult       aclresult;
2045
2046         relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2047         rel = heap_openrv(relvar, lockmode);
2048
2049         aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2050                                                                   aclmode);
2051         if (aclresult != ACLCHECK_OK)
2052                 aclcheck_error(aclresult, ACL_KIND_CLASS,
2053                                            RelationGetRelationName(rel));
2054
2055         return rel;
2056 }
2057
2058 /*
2059  * generate_relation_name - copied from ruleutils.c
2060  *              Compute the name to display for a relation
2061  *
2062  * The result includes all necessary quoting and schema-prefixing.
2063  */
2064 static char *
2065 generate_relation_name(Relation rel)
2066 {
2067         char       *nspname;
2068         char       *result;
2069
2070         /* Qualify the name if not visible in search path */
2071         if (RelationIsVisible(RelationGetRelid(rel)))
2072                 nspname = NULL;
2073         else
2074                 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2075
2076         result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2077
2078         return result;
2079 }
2080
2081
2082 static remoteConn *
2083 getConnectionByName(const char *name)
2084 {
2085         remoteConnHashEnt *hentry;
2086         char       *key;
2087
2088         if (!remoteConnHash)
2089                 remoteConnHash = createConnHash();
2090
2091         key = pstrdup(name);
2092         truncate_identifier(key, strlen(key), false);
2093         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2094                                                                                            key, HASH_FIND, NULL);
2095
2096         if (hentry)
2097                 return (hentry->rconn);
2098
2099         return (NULL);
2100 }
2101
2102 static HTAB *
2103 createConnHash(void)
2104 {
2105         HASHCTL         ctl;
2106
2107         ctl.keysize = NAMEDATALEN;
2108         ctl.entrysize = sizeof(remoteConnHashEnt);
2109
2110         return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2111 }
2112
2113 static void
2114 createNewConnection(const char *name, remoteConn *rconn)
2115 {
2116         remoteConnHashEnt *hentry;
2117         bool            found;
2118         char       *key;
2119
2120         if (!remoteConnHash)
2121                 remoteConnHash = createConnHash();
2122
2123         key = pstrdup(name);
2124         truncate_identifier(key, strlen(key), true);
2125         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2126                                                                                            HASH_ENTER, &found);
2127
2128         if (found)
2129         {
2130                 PQfinish(rconn->conn);
2131                 pfree(rconn);
2132
2133                 ereport(ERROR,
2134                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2135                                  errmsg("duplicate connection name")));
2136         }
2137
2138         hentry->rconn = rconn;
2139         strlcpy(hentry->name, name, sizeof(hentry->name));
2140 }
2141
2142 static void
2143 deleteConnection(const char *name)
2144 {
2145         remoteConnHashEnt *hentry;
2146         bool            found;
2147         char       *key;
2148
2149         if (!remoteConnHash)
2150                 remoteConnHash = createConnHash();
2151
2152         key = pstrdup(name);
2153         truncate_identifier(key, strlen(key), false);
2154         hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2155                                                                                            key, HASH_REMOVE, &found);
2156
2157         if (!hentry)
2158                 ereport(ERROR,
2159                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
2160                                  errmsg("undefined connection name")));
2161
2162 }
2163
2164 static void
2165 dblink_security_check(PGconn *conn, remoteConn *rconn)
2166 {
2167         if (!superuser())
2168         {
2169                 if (!PQconnectionUsedPassword(conn))
2170                 {
2171                         PQfinish(conn);
2172                         if (rconn)
2173                                 pfree(rconn);
2174
2175                         ereport(ERROR,
2176                                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2177                                    errmsg("password is required"),
2178                                    errdetail("Non-superuser cannot connect if the server does not request a password."),
2179                                    errhint("Target server's authentication method must be changed.")));
2180                 }
2181         }
2182 }
2183
2184 /*
2185  * For non-superusers, insist that the connstr specify a password.      This
2186  * prevents a password from being picked up from .pgpass, a service file,
2187  * the environment, etc.  We don't want the postgres user's passwords
2188  * to be accessible to non-superusers.
2189  */
2190 static void
2191 dblink_connstr_check(const char *connstr)
2192 {
2193         if (!superuser())
2194         {
2195                 PQconninfoOption *options;
2196                 PQconninfoOption *option;
2197                 bool            connstr_gives_password = false;
2198
2199                 options = PQconninfoParse(connstr, NULL);
2200                 if (options)
2201                 {
2202                         for (option = options; option->keyword != NULL; option++)
2203                         {
2204                                 if (strcmp(option->keyword, "password") == 0)
2205                                 {
2206                                         if (option->val != NULL && option->val[0] != '\0')
2207                                         {
2208                                                 connstr_gives_password = true;
2209                                                 break;
2210                                         }
2211                                 }
2212                         }
2213                         PQconninfoFree(options);
2214                 }
2215
2216                 if (!connstr_gives_password)
2217                         ereport(ERROR,
2218                                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2219                                    errmsg("password is required"),
2220                                    errdetail("Non-superusers must provide a password in the connection string.")));
2221         }
2222 }
2223
2224 static void
2225 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2226 {
2227         int                     level;
2228         char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2229         char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2230         char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2231         char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2232         char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2233         int                     sqlstate;
2234         char       *message_primary;
2235         char       *message_detail;
2236         char       *message_hint;
2237         char       *message_context;
2238         const char *dblink_context_conname = "unnamed";
2239
2240         if (fail)
2241                 level = ERROR;
2242         else
2243                 level = NOTICE;
2244
2245         if (pg_diag_sqlstate)
2246                 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2247                                                                  pg_diag_sqlstate[1],
2248                                                                  pg_diag_sqlstate[2],
2249                                                                  pg_diag_sqlstate[3],
2250                                                                  pg_diag_sqlstate[4]);
2251         else
2252                 sqlstate = ERRCODE_CONNECTION_FAILURE;
2253
2254         xpstrdup(message_primary, pg_diag_message_primary);
2255         xpstrdup(message_detail, pg_diag_message_detail);
2256         xpstrdup(message_hint, pg_diag_message_hint);
2257         xpstrdup(message_context, pg_diag_context);
2258
2259         if (res)
2260                 PQclear(res);
2261
2262         if (conname)
2263                 dblink_context_conname = conname;
2264
2265         ereport(level,
2266                         (errcode(sqlstate),
2267         message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2268                          message_detail ? errdetail("%s", message_detail) : 0,
2269                          message_hint ? errhint("%s", message_hint) : 0,
2270                          message_context ? errcontext("%s", message_context) : 0,
2271                   errcontext("Error occurred on dblink connection named \"%s\": %s.",
2272                                          dblink_context_conname, dblink_context_msg)));
2273 }
2274
2275 /*
2276  * Obtain connection string for a foreign server
2277  */
2278 static char *
2279 get_connect_string(const char *servername)
2280 {
2281         ForeignServer *foreign_server = NULL;
2282         UserMapping *user_mapping;
2283         ListCell   *cell;
2284         StringInfo      buf = makeStringInfo();
2285         ForeignDataWrapper *fdw;
2286         AclResult       aclresult;
2287         char       *srvname;
2288
2289         /* first gather the server connstr options */
2290         srvname = pstrdup(servername);
2291         truncate_identifier(srvname, strlen(srvname), false);
2292         foreign_server = GetForeignServerByName(srvname, true);
2293
2294         if (foreign_server)
2295         {
2296                 Oid                     serverid = foreign_server->serverid;
2297                 Oid                     fdwid = foreign_server->fdwid;
2298                 Oid                     userid = GetUserId();
2299
2300                 user_mapping = GetUserMapping(userid, serverid);
2301                 fdw = GetForeignDataWrapper(fdwid);
2302
2303                 /* Check permissions, user must have usage on the server. */
2304                 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2305                 if (aclresult != ACLCHECK_OK)
2306                         aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
2307
2308                 foreach(cell, fdw->options)
2309                 {
2310                         DefElem    *def = lfirst(cell);
2311
2312                         appendStringInfo(buf, "%s='%s' ", def->defname,
2313                                                          escape_param_str(strVal(def->arg)));
2314                 }
2315
2316                 foreach(cell, foreign_server->options)
2317                 {
2318                         DefElem    *def = lfirst(cell);
2319
2320                         appendStringInfo(buf, "%s='%s' ", def->defname,
2321                                                          escape_param_str(strVal(def->arg)));
2322                 }
2323
2324                 foreach(cell, user_mapping->options)
2325                 {
2326
2327                         DefElem    *def = lfirst(cell);
2328
2329                         appendStringInfo(buf, "%s='%s' ", def->defname,
2330                                                          escape_param_str(strVal(def->arg)));
2331                 }
2332
2333                 return buf->data;
2334         }
2335         else
2336                 return NULL;
2337 }
2338
2339 /*
2340  * Escaping libpq connect parameter strings.
2341  *
2342  * Replaces "'" with "\'" and "\" with "\\".
2343  */
2344 static char *
2345 escape_param_str(const char *str)
2346 {
2347         const char *cp;
2348         StringInfo      buf = makeStringInfo();
2349
2350         for (cp = str; *cp; cp++)
2351         {
2352                 if (*cp == '\\' || *cp == '\'')
2353                         appendStringInfoChar(buf, '\\');
2354                 appendStringInfoChar(buf, *cp);
2355         }
2356
2357         return buf->data;
2358 }
2359
2360 /*
2361  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2362  * functions, and translate to the internal representation.
2363  *
2364  * The user supplies an int2vector of 1-based logical attnums, plus a count
2365  * argument (the need for the separate count argument is historical, but we
2366  * still check it).  We check that each attnum corresponds to a valid,
2367  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
2368  * listed twice, though the actual use-case for such things is dubious.
2369  * Note that before Postgres 9.0, the user's attnums were interpreted as
2370  * physical not logical column numbers; this was changed for future-proofing.
2371  *
2372  * The internal representation is a palloc'd int array of 0-based physical
2373  * attnums.
2374  */
2375 static void
2376 validate_pkattnums(Relation rel,
2377                                    int2vector *pkattnums_arg, int32 pknumatts_arg,
2378                                    int **pkattnums, int *pknumatts)
2379 {
2380         TupleDesc       tupdesc = rel->rd_att;
2381         int                     natts = tupdesc->natts;
2382         int                     i;
2383
2384         /* Don't take more array elements than there are */
2385         pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2386
2387         /* Must have at least one pk attnum selected */
2388         if (pknumatts_arg <= 0)
2389                 ereport(ERROR,
2390                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2391                                  errmsg("number of key attributes must be > 0")));
2392
2393         /* Allocate output array */
2394         *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2395         *pknumatts = pknumatts_arg;
2396
2397         /* Validate attnums and convert to internal form */
2398         for (i = 0; i < pknumatts_arg; i++)
2399         {
2400                 int                     pkattnum = pkattnums_arg->values[i];
2401                 int                     lnum;
2402                 int                     j;
2403
2404                 /* Can throw error immediately if out of range */
2405                 if (pkattnum <= 0 || pkattnum > natts)
2406                         ereport(ERROR,
2407                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2408                                          errmsg("invalid attribute number %d", pkattnum)));
2409
2410                 /* Identify which physical column has this logical number */
2411                 lnum = 0;
2412                 for (j = 0; j < natts; j++)
2413                 {
2414                         /* dropped columns don't count */
2415                         if (tupdesc->attrs[j]->attisdropped)
2416                                 continue;
2417
2418                         if (++lnum == pkattnum)
2419                                 break;
2420                 }
2421
2422                 if (j < natts)
2423                         (*pkattnums)[i] = j;
2424                 else
2425                         ereport(ERROR,
2426                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2427                                          errmsg("invalid attribute number %d", pkattnum)));
2428         }
2429 }