4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2011, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
40 #include "access/genam.h"
41 #include "access/heapam.h"
42 #include "access/tupdesc.h"
43 #include "catalog/indexing.h"
44 #include "catalog/namespace.h"
45 #include "catalog/pg_index.h"
46 #include "catalog/pg_type.h"
47 #include "executor/executor.h"
48 #include "executor/spi.h"
49 #include "foreign/foreign.h"
50 #include "lib/stringinfo.h"
51 #include "mb/pg_wchar.h"
52 #include "miscadmin.h"
53 #include "nodes/execnodes.h"
54 #include "nodes/nodes.h"
55 #include "nodes/pg_list.h"
56 #include "parser/parse_type.h"
57 #include "parser/scansup.h"
58 #include "utils/acl.h"
59 #include "utils/array.h"
60 #include "utils/builtins.h"
61 #include "utils/dynahash.h"
62 #include "utils/fmgroids.h"
63 #include "utils/hsearch.h"
64 #include "utils/lsyscache.h"
65 #include "utils/memutils.h"
66 #include "utils/syscache.h"
67 #include "utils/tqual.h"
73 typedef struct remoteConn
75 PGconn *conn; /* Hold the remote connection */
76 int openCursorCount; /* The number of open cursors */
77 bool newXactForCursor; /* Opened a transaction for a cursor */
81 * Internal declarations
83 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
84 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
85 static remoteConn *getConnectionByName(const char *name);
86 static HTAB *createConnHash(void);
87 static void createNewConnection(const char *name, remoteConn *rconn);
88 static void deleteConnection(const char *name);
89 static char **get_pkey_attnames(Relation rel, int16 *numatts);
90 static char **get_text_array_contents(ArrayType *array, int *numitems);
91 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
92 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
93 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
94 static char *quote_ident_cstr(char *rawstr);
95 static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
96 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
97 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
98 static char *generate_relation_name(Relation rel);
99 static void dblink_connstr_check(const char *connstr);
100 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
101 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
102 static char *get_connect_string(const char *servername);
103 static char *escape_param_str(const char *from);
104 static void validate_pkattnums(Relation rel,
105 int2vector *pkattnums_arg, int32 pknumatts_arg,
106 int **pkattnums, int *pknumatts);
109 static remoteConn *pconn = NULL;
110 static HTAB *remoteConnHash = NULL;
113 * Following is list that holds multiple remote connections.
114 * Calling convention of each dblink function changes to accept
115 * connection name as the first parameter. The connection list is
116 * much like ecpg e.g. a mapping between a name and a PGconn object.
119 typedef struct remoteConnHashEnt
121 char name[NAMEDATALEN];
125 /* initial number of connection hashes */
128 /* general utility */
129 #define xpfree(var_) \
138 #define xpstrdup(var_c, var_) \
141 var_c = pstrdup(var_); \
146 #define DBLINK_RES_INTERNALERROR(p2) \
148 msg = pstrdup(PQerrorMessage(conn)); \
151 elog(ERROR, "%s: %s", p2, msg); \
154 #define DBLINK_CONN_NOT_AVAIL \
158 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
159 errmsg("connection \"%s\" not available", conname))); \
162 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
163 errmsg("connection not available"))); \
166 #define DBLINK_GET_CONN \
168 char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
169 rconn = getConnectionByName(conname_or_str); \
172 conn = rconn->conn; \
176 connstr = get_connect_string(conname_or_str); \
177 if (connstr == NULL) \
179 connstr = conname_or_str; \
181 dblink_connstr_check(connstr); \
182 conn = PQconnectdb(connstr); \
183 if (PQstatus(conn) == CONNECTION_BAD) \
185 msg = pstrdup(PQerrorMessage(conn)); \
188 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
189 errmsg("could not establish connection"), \
190 errdetail("%s", msg))); \
192 dblink_security_check(conn, rconn); \
193 PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
198 #define DBLINK_GET_NAMED_CONN \
200 char *conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
201 rconn = getConnectionByName(conname); \
203 conn = rconn->conn; \
205 DBLINK_CONN_NOT_AVAIL; \
208 #define DBLINK_INIT \
212 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
213 pconn->conn = NULL; \
214 pconn->openCursorCount = 0; \
215 pconn->newXactForCursor = FALSE; \
220 * Create a persistent connection to another database
222 PG_FUNCTION_INFO_V1(dblink_connect);
224 dblink_connect(PG_FUNCTION_ARGS)
226 char *conname_or_str = NULL;
227 char *connstr = NULL;
228 char *connname = NULL;
231 remoteConn *rconn = NULL;
237 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
238 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
240 else if (PG_NARGS() == 1)
241 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
244 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
247 /* first check for valid foreign data server */
248 connstr = get_connect_string(conname_or_str);
250 connstr = conname_or_str;
252 /* check password in connection string if not superuser */
253 dblink_connstr_check(connstr);
254 conn = PQconnectdb(connstr);
256 if (PQstatus(conn) == CONNECTION_BAD)
258 msg = pstrdup(PQerrorMessage(conn));
264 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
265 errmsg("could not establish connection"),
266 errdetail("%s", msg)));
269 /* check password actually used if not superuser */
270 dblink_security_check(conn, rconn);
272 /* attempt to set client encoding to match server encoding */
273 PQsetClientEncoding(conn, GetDatabaseEncodingName());
278 createNewConnection(connname, rconn);
283 PG_RETURN_TEXT_P(cstring_to_text("OK"));
287 * Clear a persistent connection to another database
289 PG_FUNCTION_INFO_V1(dblink_disconnect);
291 dblink_disconnect(PG_FUNCTION_ARGS)
293 char *conname = NULL;
294 remoteConn *rconn = NULL;
301 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
302 rconn = getConnectionByName(conname);
310 DBLINK_CONN_NOT_AVAIL;
315 deleteConnection(conname);
321 PG_RETURN_TEXT_P(cstring_to_text("OK"));
325 * opens a cursor using a persistent connection
327 PG_FUNCTION_INFO_V1(dblink_open);
329 dblink_open(PG_FUNCTION_ARGS)
332 PGresult *res = NULL;
334 char *curname = NULL;
336 char *conname = NULL;
338 remoteConn *rconn = NULL;
339 bool fail = true; /* default to backward compatible behavior */
342 initStringInfo(&buf);
347 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
348 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
351 else if (PG_NARGS() == 3)
353 /* might be text,text,text or text,text,bool */
354 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
356 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
357 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
358 fail = PG_GETARG_BOOL(2);
363 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
364 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
365 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
366 rconn = getConnectionByName(conname);
369 else if (PG_NARGS() == 4)
371 /* text,text,text,bool */
372 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
373 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
374 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
375 fail = PG_GETARG_BOOL(3);
376 rconn = getConnectionByName(conname);
379 if (!rconn || !rconn->conn)
380 DBLINK_CONN_NOT_AVAIL;
384 /* If we are not in a transaction, start one */
385 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
387 res = PQexec(conn, "BEGIN");
388 if (PQresultStatus(res) != PGRES_COMMAND_OK)
389 DBLINK_RES_INTERNALERROR("begin error");
391 rconn->newXactForCursor = TRUE;
394 * Since transaction state was IDLE, we force cursor count to
395 * initially be 0. This is needed as a previous ABORT might have wiped
396 * out our transaction without maintaining the cursor count for us.
398 rconn->openCursorCount = 0;
401 /* if we started a transaction, increment cursor count */
402 if (rconn->newXactForCursor)
403 (rconn->openCursorCount)++;
405 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
406 res = PQexec(conn, buf.data);
407 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
409 dblink_res_error(conname, res, "could not open cursor", fail);
410 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
414 PG_RETURN_TEXT_P(cstring_to_text("OK"));
420 PG_FUNCTION_INFO_V1(dblink_close);
422 dblink_close(PG_FUNCTION_ARGS)
425 PGresult *res = NULL;
426 char *curname = NULL;
427 char *conname = NULL;
430 remoteConn *rconn = NULL;
431 bool fail = true; /* default to backward compatible behavior */
434 initStringInfo(&buf);
439 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
442 else if (PG_NARGS() == 2)
444 /* might be text,text or text,bool */
445 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
447 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
448 fail = PG_GETARG_BOOL(1);
453 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
454 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
455 rconn = getConnectionByName(conname);
461 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
462 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
463 fail = PG_GETARG_BOOL(2);
464 rconn = getConnectionByName(conname);
467 if (!rconn || !rconn->conn)
468 DBLINK_CONN_NOT_AVAIL;
472 appendStringInfo(&buf, "CLOSE %s", curname);
474 /* close the cursor */
475 res = PQexec(conn, buf.data);
476 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
478 dblink_res_error(conname, res, "could not close cursor", fail);
479 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
484 /* if we started a transaction, decrement cursor count */
485 if (rconn->newXactForCursor)
487 (rconn->openCursorCount)--;
489 /* if count is zero, commit the transaction */
490 if (rconn->openCursorCount == 0)
492 rconn->newXactForCursor = FALSE;
494 res = PQexec(conn, "COMMIT");
495 if (PQresultStatus(res) != PGRES_COMMAND_OK)
496 DBLINK_RES_INTERNALERROR("commit error");
501 PG_RETURN_TEXT_P(cstring_to_text("OK"));
505 * Fetch results from an open cursor
507 PG_FUNCTION_INFO_V1(dblink_fetch);
509 dblink_fetch(PG_FUNCTION_ARGS)
511 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
512 PGresult *res = NULL;
513 char *conname = NULL;
514 remoteConn *rconn = NULL;
517 char *curname = NULL;
519 bool fail = true; /* default to backward compatible */
525 /* text,text,int,bool */
526 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
527 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
528 howmany = PG_GETARG_INT32(2);
529 fail = PG_GETARG_BOOL(3);
531 rconn = getConnectionByName(conname);
535 else if (PG_NARGS() == 3)
537 /* text,text,int or text,int,bool */
538 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
540 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
541 howmany = PG_GETARG_INT32(1);
542 fail = PG_GETARG_BOOL(2);
547 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
548 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
549 howmany = PG_GETARG_INT32(2);
551 rconn = getConnectionByName(conname);
556 else if (PG_NARGS() == 2)
559 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
560 howmany = PG_GETARG_INT32(1);
565 DBLINK_CONN_NOT_AVAIL;
567 /* let the caller know we're sending back a tuplestore */
568 rsinfo->returnMode = SFRM_Materialize;
569 rsinfo->setResult = NULL;
570 rsinfo->setDesc = NULL;
572 initStringInfo(&buf);
573 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
576 * Try to execute the query. Note that since libpq uses malloc, the
577 * PGresult will be long-lived even though we are still in a short-lived
580 res = PQexec(conn, buf.data);
582 (PQresultStatus(res) != PGRES_COMMAND_OK &&
583 PQresultStatus(res) != PGRES_TUPLES_OK))
585 dblink_res_error(conname, res, "could not fetch from cursor", fail);
588 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
590 /* cursor does not exist - closed already or bad name */
593 (errcode(ERRCODE_INVALID_CURSOR_NAME),
594 errmsg("cursor \"%s\" does not exist", curname)));
597 materializeResult(fcinfo, res);
602 * Note: this is the new preferred version of dblink
604 PG_FUNCTION_INFO_V1(dblink_record);
606 dblink_record(PG_FUNCTION_ARGS)
608 return dblink_record_internal(fcinfo, false);
611 PG_FUNCTION_INFO_V1(dblink_send_query);
613 dblink_send_query(PG_FUNCTION_ARGS)
616 char *connstr = NULL;
618 remoteConn *rconn = NULL;
620 bool freeconn = false;
626 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
629 /* shouldn't happen */
630 elog(ERROR, "wrong number of arguments");
632 /* async query send */
633 retval = PQsendQuery(conn, sql);
635 elog(NOTICE, "%s", PQerrorMessage(conn));
637 PG_RETURN_INT32(retval);
640 PG_FUNCTION_INFO_V1(dblink_get_result);
642 dblink_get_result(PG_FUNCTION_ARGS)
644 return dblink_record_internal(fcinfo, true);
648 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
650 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
652 PGresult *res = NULL;
654 char *connstr = NULL;
656 char *conname = NULL;
657 remoteConn *rconn = NULL;
658 bool fail = true; /* default to backward compatible */
659 bool freeconn = false;
661 /* check to see if caller supports us returning a tuplestore */
662 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
664 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
665 errmsg("set-valued function called in context that cannot accept a set")));
666 if (!(rsinfo->allowedModes & SFRM_Materialize))
668 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
669 errmsg("materialize mode required, but it is not " \
670 "allowed in this context")));
680 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
681 fail = PG_GETARG_BOOL(2);
683 else if (PG_NARGS() == 2)
685 /* text,text or text,bool */
686 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
689 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
690 fail = PG_GETARG_BOOL(1);
695 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
698 else if (PG_NARGS() == 1)
702 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
705 /* shouldn't happen */
706 elog(ERROR, "wrong number of arguments");
710 /* get async result */
715 fail = PG_GETARG_BOOL(1);
717 else if (PG_NARGS() == 1)
723 /* shouldn't happen */
724 elog(ERROR, "wrong number of arguments");
728 DBLINK_CONN_NOT_AVAIL;
730 /* let the caller know we're sending back a tuplestore */
731 rsinfo->returnMode = SFRM_Materialize;
732 rsinfo->setResult = NULL;
733 rsinfo->setDesc = NULL;
735 /* synchronous query, or async result retrieval */
737 res = PQexec(conn, sql);
740 res = PQgetResult(conn);
741 /* NULL means we're all done with the async results */
746 /* if needed, close the connection to the database and cleanup */
751 (PQresultStatus(res) != PGRES_COMMAND_OK &&
752 PQresultStatus(res) != PGRES_TUPLES_OK))
754 dblink_res_error(conname, res, "could not execute query", fail);
758 materializeResult(fcinfo, res);
763 * Materialize the PGresult to return them as the function result.
764 * The res will be released in this function.
767 materializeResult(FunctionCallInfo fcinfo, PGresult *res)
769 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
771 Assert(rsinfo->returnMode == SFRM_Materialize);
776 bool is_sql_cmd = false;
780 if (PQresultStatus(res) == PGRES_COMMAND_OK)
785 * need a tuple descriptor representing one TEXT column to return
786 * the command status string as our result tuple
788 tupdesc = CreateTemplateTupleDesc(1, false);
789 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
796 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
800 /* get a tuple descriptor for our result type */
801 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
803 case TYPEFUNC_COMPOSITE:
806 case TYPEFUNC_RECORD:
807 /* failed to determine actual type of RECORD */
809 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
810 errmsg("function returning record called in context "
811 "that cannot accept type record")));
814 /* result type isn't composite */
815 elog(ERROR, "return type must be a row type");
819 /* make sure we have a persistent copy of the tupdesc */
820 tupdesc = CreateTupleDescCopy(tupdesc);
821 ntuples = PQntuples(res);
822 nfields = PQnfields(res);
826 * check result and tuple descriptor have the same number of columns
828 if (nfields != tupdesc->natts)
830 (errcode(ERRCODE_DATATYPE_MISMATCH),
831 errmsg("remote query result rowtype does not match "
832 "the specified FROM clause rowtype")));
836 AttInMetadata *attinmeta;
837 Tuplestorestate *tupstore;
838 MemoryContext oldcontext;
842 attinmeta = TupleDescGetAttInMetadata(tupdesc);
844 oldcontext = MemoryContextSwitchTo(
845 rsinfo->econtext->ecxt_per_query_memory);
846 tupstore = tuplestore_begin_heap(true, false, work_mem);
847 rsinfo->setResult = tupstore;
848 rsinfo->setDesc = tupdesc;
849 MemoryContextSwitchTo(oldcontext);
851 values = (char **) palloc(nfields * sizeof(char *));
853 /* put all tuples into the tuplestore */
854 for (row = 0; row < ntuples; row++)
862 for (i = 0; i < nfields; i++)
864 if (PQgetisnull(res, row, i))
867 values[i] = PQgetvalue(res, row, i);
872 values[0] = PQcmdStatus(res);
875 /* build the tuple and put it into the tuplestore. */
876 tuple = BuildTupleFromCStrings(attinmeta, values);
877 tuplestore_puttuple(tupstore, tuple);
880 /* clean up and return the tuplestore */
881 tuplestore_donestoring(tupstore);
888 /* be sure to release the libpq result */
896 * List all open dblink connections by name.
897 * Returns an array of all connection names.
900 PG_FUNCTION_INFO_V1(dblink_get_connections);
902 dblink_get_connections(PG_FUNCTION_ARGS)
904 HASH_SEQ_STATUS status;
905 remoteConnHashEnt *hentry;
906 ArrayBuildState *astate = NULL;
910 hash_seq_init(&status, remoteConnHash);
911 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
913 /* stash away current value */
914 astate = accumArrayResult(astate,
915 CStringGetTextDatum(hentry->name),
916 false, TEXTOID, CurrentMemoryContext);
921 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
922 CurrentMemoryContext));
928 * Checks if a given remote connection is busy
930 * Returns 1 if the connection is busy, 0 otherwise
932 * text connection_name - name of the connection to check
935 PG_FUNCTION_INFO_V1(dblink_is_busy);
937 dblink_is_busy(PG_FUNCTION_ARGS)
940 remoteConn *rconn = NULL;
943 DBLINK_GET_NAMED_CONN;
945 PQconsumeInput(conn);
946 PG_RETURN_INT32(PQisBusy(conn));
950 * Cancels a running request on a connection
953 * "OK" if the cancel request has been sent correctly,
954 * an error message otherwise
957 * text connection_name - name of the connection to check
960 PG_FUNCTION_INFO_V1(dblink_cancel_query);
962 dblink_cancel_query(PG_FUNCTION_ARGS)
966 remoteConn *rconn = NULL;
971 DBLINK_GET_NAMED_CONN;
972 cancel = PQgetCancel(conn);
974 res = PQcancel(cancel, errbuf, 256);
975 PQfreeCancel(cancel);
978 PG_RETURN_TEXT_P(cstring_to_text("OK"));
980 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
985 * Get error message from a connection
988 * "OK" if no error, an error message otherwise
991 * text connection_name - name of the connection to check
994 PG_FUNCTION_INFO_V1(dblink_error_message);
996 dblink_error_message(PG_FUNCTION_ARGS)
1000 remoteConn *rconn = NULL;
1003 DBLINK_GET_NAMED_CONN;
1005 msg = PQerrorMessage(conn);
1006 if (msg == NULL || msg[0] == '\0')
1007 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1009 PG_RETURN_TEXT_P(cstring_to_text(msg));
1013 * Execute an SQL non-SELECT command
1015 PG_FUNCTION_INFO_V1(dblink_exec);
1017 dblink_exec(PG_FUNCTION_ARGS)
1020 PGresult *res = NULL;
1021 text *sql_cmd_status = NULL;
1022 PGconn *conn = NULL;
1023 char *connstr = NULL;
1025 char *conname = NULL;
1026 remoteConn *rconn = NULL;
1027 bool freeconn = false;
1028 bool fail = true; /* default to backward compatible behavior */
1032 if (PG_NARGS() == 3)
1034 /* must be text,text,bool */
1036 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1037 fail = PG_GETARG_BOOL(2);
1039 else if (PG_NARGS() == 2)
1041 /* might be text,text or text,bool */
1042 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1045 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1046 fail = PG_GETARG_BOOL(1);
1051 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1054 else if (PG_NARGS() == 1)
1056 /* must be single text argument */
1058 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1061 /* shouldn't happen */
1062 elog(ERROR, "wrong number of arguments");
1065 DBLINK_CONN_NOT_AVAIL;
1067 res = PQexec(conn, sql);
1069 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1070 PQresultStatus(res) != PGRES_TUPLES_OK))
1072 dblink_res_error(conname, res, "could not execute command", fail);
1075 * and save a copy of the command status string to return as our
1078 sql_cmd_status = cstring_to_text("ERROR");
1080 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1083 * and save a copy of the command status string to return as our
1086 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1093 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1094 errmsg("statement returning results not allowed")));
1097 /* if needed, close the connection to the database and cleanup */
1101 PG_RETURN_TEXT_P(sql_cmd_status);
1108 * Return list of primary key fields for the supplied relation,
1109 * or NULL if none exists.
1111 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1113 dblink_get_pkey(PG_FUNCTION_ARGS)
1117 FuncCallContext *funcctx;
1120 AttInMetadata *attinmeta;
1121 MemoryContext oldcontext;
1123 /* stuff done only on the first call of the function */
1124 if (SRF_IS_FIRSTCALL())
1129 /* create a function context for cross-call persistence */
1130 funcctx = SRF_FIRSTCALL_INIT();
1133 * switch to memory context appropriate for multiple function calls
1135 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1137 /* open target relation */
1138 rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
1140 /* get the array of attnums */
1141 results = get_pkey_attnames(rel, &numatts);
1143 relation_close(rel, AccessShareLock);
1146 * need a tuple descriptor representing one INT and one TEXT column
1148 tupdesc = CreateTemplateTupleDesc(2, false);
1149 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1151 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1155 * Generate attribute metadata needed later to produce tuples from raw
1158 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1159 funcctx->attinmeta = attinmeta;
1161 if ((results != NULL) && (numatts > 0))
1163 funcctx->max_calls = numatts;
1165 /* got results, keep track of them */
1166 funcctx->user_fctx = results;
1170 /* fast track when no results */
1171 MemoryContextSwitchTo(oldcontext);
1172 SRF_RETURN_DONE(funcctx);
1175 MemoryContextSwitchTo(oldcontext);
1178 /* stuff done on every call of the function */
1179 funcctx = SRF_PERCALL_SETUP();
1182 * initialize per-call variables
1184 call_cntr = funcctx->call_cntr;
1185 max_calls = funcctx->max_calls;
1187 results = (char **) funcctx->user_fctx;
1188 attinmeta = funcctx->attinmeta;
1190 if (call_cntr < max_calls) /* do when there is more left to send */
1196 values = (char **) palloc(2 * sizeof(char *));
1197 values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
1199 sprintf(values[0], "%d", call_cntr + 1);
1201 values[1] = results[call_cntr];
1203 /* build the tuple */
1204 tuple = BuildTupleFromCStrings(attinmeta, values);
1206 /* make the tuple into a datum */
1207 result = HeapTupleGetDatum(tuple);
1209 SRF_RETURN_NEXT(funcctx, result);
1213 /* do when there is no more left */
1214 SRF_RETURN_DONE(funcctx);
1220 * dblink_build_sql_insert
1222 * Used to generate an SQL insert statement
1223 * based on an existing tuple in a local relation.
1224 * This is useful for selectively replicating data
1225 * to another server via dblink.
1228 * <relname> - name of local table of interest
1229 * <pkattnums> - an int2vector of attnums which will be used
1230 * to identify the local tuple of interest
1231 * <pknumatts> - number of attnums in pkattnums
1232 * <src_pkattvals_arry> - text array of key values which will be used
1233 * to identify the local tuple of interest
1234 * <tgt_pkattvals_arry> - text array of key values which will be used
1235 * to build the string for execution remotely. These are substituted
1236 * for their counterparts in src_pkattvals_arry
1238 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1240 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1242 text *relname_text = PG_GETARG_TEXT_P(0);
1243 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1244 int32 pknumatts_arg = PG_GETARG_INT32(2);
1245 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1246 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1250 char **src_pkattvals;
1251 char **tgt_pkattvals;
1257 * Open target relation.
1259 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1262 * Process pkattnums argument.
1264 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1265 &pkattnums, &pknumatts);
1268 * Source array is made up of key values that will be used to locate the
1269 * tuple of interest from the local system.
1271 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1274 * There should be one source array key value for each key attnum
1276 if (src_nitems != pknumatts)
1278 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1279 errmsg("source key array length must match number of key " \
1283 * Target array is made up of key values that will be used to build the
1284 * SQL string for use on the remote system.
1286 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1289 * There should be one target array key value for each key attnum
1291 if (tgt_nitems != pknumatts)
1293 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1294 errmsg("target key array length must match number of key " \
1298 * Prep work is finally done. Go get the SQL string.
1300 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1303 * Now we can close the relation.
1305 relation_close(rel, AccessShareLock);
1310 PG_RETURN_TEXT_P(cstring_to_text(sql));
1315 * dblink_build_sql_delete
1317 * Used to generate an SQL delete statement.
1318 * This is useful for selectively replicating a
1319 * delete to another server via dblink.
1322 * <relname> - name of remote table of interest
1323 * <pkattnums> - an int2vector of attnums which will be used
1324 * to identify the remote tuple of interest
1325 * <pknumatts> - number of attnums in pkattnums
1326 * <tgt_pkattvals_arry> - text array of key values which will be used
1327 * to build the string for execution remotely.
1329 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1331 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1333 text *relname_text = PG_GETARG_TEXT_P(0);
1334 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1335 int32 pknumatts_arg = PG_GETARG_INT32(2);
1336 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1340 char **tgt_pkattvals;
1345 * Open target relation.
1347 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1350 * Process pkattnums argument.
1352 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1353 &pkattnums, &pknumatts);
1356 * Target array is made up of key values that will be used to build the
1357 * SQL string for use on the remote system.
1359 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1362 * There should be one target array key value for each key attnum
1364 if (tgt_nitems != pknumatts)
1366 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1367 errmsg("target key array length must match number of key " \
1371 * Prep work is finally done. Go get the SQL string.
1373 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1376 * Now we can close the relation.
1378 relation_close(rel, AccessShareLock);
1383 PG_RETURN_TEXT_P(cstring_to_text(sql));
1388 * dblink_build_sql_update
1390 * Used to generate an SQL update statement
1391 * based on an existing tuple in a local relation.
1392 * This is useful for selectively replicating data
1393 * to another server via dblink.
1396 * <relname> - name of local table of interest
1397 * <pkattnums> - an int2vector of attnums which will be used
1398 * to identify the local tuple of interest
1399 * <pknumatts> - number of attnums in pkattnums
1400 * <src_pkattvals_arry> - text array of key values which will be used
1401 * to identify the local tuple of interest
1402 * <tgt_pkattvals_arry> - text array of key values which will be used
1403 * to build the string for execution remotely. These are substituted
1404 * for their counterparts in src_pkattvals_arry
1406 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1408 dblink_build_sql_update(PG_FUNCTION_ARGS)
1410 text *relname_text = PG_GETARG_TEXT_P(0);
1411 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1412 int32 pknumatts_arg = PG_GETARG_INT32(2);
1413 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1414 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1418 char **src_pkattvals;
1419 char **tgt_pkattvals;
1425 * Open target relation.
1427 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1430 * Process pkattnums argument.
1432 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1433 &pkattnums, &pknumatts);
1436 * Source array is made up of key values that will be used to locate the
1437 * tuple of interest from the local system.
1439 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1442 * There should be one source array key value for each key attnum
1444 if (src_nitems != pknumatts)
1446 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1447 errmsg("source key array length must match number of key " \
1451 * Target array is made up of key values that will be used to build the
1452 * SQL string for use on the remote system.
1454 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1457 * There should be one target array key value for each key attnum
1459 if (tgt_nitems != pknumatts)
1461 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1462 errmsg("target key array length must match number of key " \
1466 * Prep work is finally done. Go get the SQL string.
1468 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1471 * Now we can close the relation.
1473 relation_close(rel, AccessShareLock);
1478 PG_RETURN_TEXT_P(cstring_to_text(sql));
1482 * dblink_current_query
1483 * return the current query string
1484 * to allow its use in (among other things)
1487 PG_FUNCTION_INFO_V1(dblink_current_query);
1489 dblink_current_query(PG_FUNCTION_ARGS)
1491 /* This is now just an alias for the built-in function current_query() */
1492 PG_RETURN_DATUM(current_query(fcinfo));
1496 * Retrieve async notifications for a connection.
1498 * Returns an setof record of notifications, or an empty set if none recieved.
1499 * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
1502 #define DBLINK_NOTIFY_COLS 3
1504 PG_FUNCTION_INFO_V1(dblink_get_notify);
1506 dblink_get_notify(PG_FUNCTION_ARGS)
1508 PGconn *conn = NULL;
1509 remoteConn *rconn = NULL;
1511 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1513 Tuplestorestate *tupstore;
1514 MemoryContext per_query_ctx;
1515 MemoryContext oldcontext;
1518 if (PG_NARGS() == 1)
1519 DBLINK_GET_NAMED_CONN;
1523 /* create the tuplestore */
1524 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1525 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1527 tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
1528 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
1530 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
1532 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
1535 tupstore = tuplestore_begin_heap(true, false, work_mem);
1536 rsinfo->returnMode = SFRM_Materialize;
1537 rsinfo->setResult = tupstore;
1538 rsinfo->setDesc = tupdesc;
1540 MemoryContextSwitchTo(oldcontext);
1542 PQconsumeInput(conn);
1543 while ((notify = PQnotifies(conn)) != NULL)
1545 Datum values[DBLINK_NOTIFY_COLS];
1546 bool nulls[DBLINK_NOTIFY_COLS];
1548 memset(values, 0, sizeof(values));
1549 memset(nulls, 0, sizeof(nulls));
1551 if (notify->relname != NULL)
1552 values[0] = CStringGetTextDatum(notify->relname);
1556 values[1] = Int32GetDatum(notify->be_pid);
1558 if (notify->extra != NULL)
1559 values[2] = CStringGetTextDatum(notify->extra);
1563 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1566 PQconsumeInput(conn);
1569 /* clean up and return the tuplestore */
1570 tuplestore_donestoring(tupstore);
1575 /*************************************************************
1576 * internal functions
1583 * Get the primary key attnames for the given relation.
1584 * Return NULL, and set numatts = 0, if no primary key exists.
1587 get_pkey_attnames(Relation rel, int16 *numatts)
1589 Relation indexRelation;
1592 HeapTuple indexTuple;
1594 char **result = NULL;
1597 /* initialize numatts to 0 in case no primary key exists */
1600 tupdesc = rel->rd_att;
1602 /* Prepare to scan pg_index for entries having indrelid = this rel. */
1603 indexRelation = heap_open(IndexRelationId, AccessShareLock);
1605 Anum_pg_index_indrelid,
1606 BTEqualStrategyNumber, F_OIDEQ,
1607 ObjectIdGetDatum(RelationGetRelid(rel)));
1609 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
1610 SnapshotNow, 1, &skey);
1612 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
1614 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
1616 /* we're only interested if it is the primary key */
1617 if (index->indisprimary)
1619 *numatts = index->indnatts;
1622 result = (char **) palloc(*numatts * sizeof(char *));
1624 for (i = 0; i < *numatts; i++)
1625 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
1631 systable_endscan(scan);
1632 heap_close(indexRelation, AccessShareLock);
1638 * Deconstruct a text[] into C-strings (note any NULL elements will be
1639 * returned as NULL pointers)
1642 get_text_array_contents(ArrayType *array, int *numitems)
1644 int ndim = ARR_NDIM(array);
1645 int *dims = ARR_DIMS(array);
1656 Assert(ARR_ELEMTYPE(array) == TEXTOID);
1658 *numitems = nitems = ArrayGetNItems(ndim, dims);
1660 get_typlenbyvalalign(ARR_ELEMTYPE(array),
1661 &typlen, &typbyval, &typalign);
1663 values = (char **) palloc(nitems * sizeof(char *));
1665 ptr = ARR_DATA_PTR(array);
1666 bitmap = ARR_NULLBITMAP(array);
1669 for (i = 0; i < nitems; i++)
1671 if (bitmap && (*bitmap & bitmask) == 0)
1677 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
1678 ptr = att_addlength_pointer(ptr, typlen, ptr);
1679 ptr = (char *) att_align_nominal(ptr, typalign);
1682 /* advance bitmap pointer if any */
1686 if (bitmask == 0x100)
1698 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1710 initStringInfo(&buf);
1712 /* get relation name including any needed schema prefix and quoting */
1713 relname = generate_relation_name(rel);
1715 tupdesc = rel->rd_att;
1716 natts = tupdesc->natts;
1718 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
1721 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1722 errmsg("source row not found")));
1724 appendStringInfo(&buf, "INSERT INTO %s(", relname);
1727 for (i = 0; i < natts; i++)
1729 if (tupdesc->attrs[i]->attisdropped)
1733 appendStringInfo(&buf, ",");
1735 appendStringInfoString(&buf,
1736 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1740 appendStringInfo(&buf, ") VALUES(");
1743 * Note: i is physical column number (counting from 0).
1746 for (i = 0; i < natts; i++)
1748 if (tupdesc->attrs[i]->attisdropped)
1752 appendStringInfo(&buf, ",");
1754 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
1757 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1759 val = SPI_getvalue(tuple, tupdesc, i + 1);
1763 appendStringInfoString(&buf, quote_literal_cstr(val));
1767 appendStringInfo(&buf, "NULL");
1770 appendStringInfo(&buf, ")");
1776 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
1783 initStringInfo(&buf);
1785 /* get relation name including any needed schema prefix and quoting */
1786 relname = generate_relation_name(rel);
1788 tupdesc = rel->rd_att;
1790 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
1791 for (i = 0; i < pknumatts; i++)
1793 int pkattnum = pkattnums[i];
1796 appendStringInfo(&buf, " AND ");
1798 appendStringInfoString(&buf,
1799 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1801 if (tgt_pkattvals[i] != NULL)
1802 appendStringInfo(&buf, " = %s",
1803 quote_literal_cstr(tgt_pkattvals[i]));
1805 appendStringInfo(&buf, " IS NULL");
1812 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
1824 initStringInfo(&buf);
1826 /* get relation name including any needed schema prefix and quoting */
1827 relname = generate_relation_name(rel);
1829 tupdesc = rel->rd_att;
1830 natts = tupdesc->natts;
1832 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
1835 (errcode(ERRCODE_CARDINALITY_VIOLATION),
1836 errmsg("source row not found")));
1838 appendStringInfo(&buf, "UPDATE %s SET ", relname);
1841 * Note: i is physical column number (counting from 0).
1844 for (i = 0; i < natts; i++)
1846 if (tupdesc->attrs[i]->attisdropped)
1850 appendStringInfo(&buf, ", ");
1852 appendStringInfo(&buf, "%s = ",
1853 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1855 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
1858 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
1860 val = SPI_getvalue(tuple, tupdesc, i + 1);
1864 appendStringInfoString(&buf, quote_literal_cstr(val));
1868 appendStringInfoString(&buf, "NULL");
1872 appendStringInfo(&buf, " WHERE ");
1874 for (i = 0; i < pknumatts; i++)
1876 int pkattnum = pkattnums[i];
1879 appendStringInfo(&buf, " AND ");
1881 appendStringInfo(&buf, "%s",
1882 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1884 val = tgt_pkattvals[i];
1887 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
1889 appendStringInfo(&buf, " IS NULL");
1896 * Return a properly quoted identifier.
1897 * Uses quote_ident in quote.c
1900 quote_ident_cstr(char *rawstr)
1906 rawstr_text = cstring_to_text(rawstr);
1907 result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
1908 PointerGetDatum(rawstr_text)));
1909 result = text_to_cstring(result_text);
1915 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
1920 * Not likely a long list anyway, so just scan for the value
1922 for (i = 0; i < pknumatts; i++)
1923 if (key == pkattnums[i])
1930 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
1941 * Connect to SPI manager
1943 if ((ret = SPI_connect()) < 0)
1944 /* internal error */
1945 elog(ERROR, "SPI connect failure - returned %d", ret);
1947 initStringInfo(&buf);
1949 /* get relation name including any needed schema prefix and quoting */
1950 relname = generate_relation_name(rel);
1952 tupdesc = rel->rd_att;
1953 natts = tupdesc->natts;
1956 * Build sql statement to look up tuple of interest, ie, the one matching
1957 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
1958 * generate a result tuple that matches the table's physical structure,
1959 * with NULLs for any dropped columns. Otherwise we have to deal with two
1960 * different tupdescs and everything's very confusing.
1962 appendStringInfoString(&buf, "SELECT ");
1964 for (i = 0; i < natts; i++)
1967 appendStringInfoString(&buf, ", ");
1969 if (tupdesc->attrs[i]->attisdropped)
1970 appendStringInfoString(&buf, "NULL");
1972 appendStringInfoString(&buf,
1973 quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
1976 appendStringInfo(&buf, " FROM %s WHERE ", relname);
1978 for (i = 0; i < pknumatts; i++)
1980 int pkattnum = pkattnums[i];
1983 appendStringInfo(&buf, " AND ");
1985 appendStringInfoString(&buf,
1986 quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
1988 if (src_pkattvals[i] != NULL)
1989 appendStringInfo(&buf, " = %s",
1990 quote_literal_cstr(src_pkattvals[i]));
1992 appendStringInfo(&buf, " IS NULL");
1996 * Retrieve the desired tuple
1998 ret = SPI_exec(buf.data, 0);
2002 * Only allow one qualifying tuple
2004 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2006 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2007 errmsg("source criteria matched more than one record")));
2009 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2011 SPITupleTable *tuptable = SPI_tuptable;
2013 tuple = SPI_copytuple(tuptable->vals[0]);
2021 * no qualifying tuples
2029 * never reached, but keep compiler quiet
2035 * Open the relation named by relname_text, acquire specified type of lock,
2036 * verify we have specified permissions.
2037 * Caller must close rel when done with it.
2040 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2044 AclResult aclresult;
2046 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2047 rel = heap_openrv(relvar, lockmode);
2049 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2051 if (aclresult != ACLCHECK_OK)
2052 aclcheck_error(aclresult, ACL_KIND_CLASS,
2053 RelationGetRelationName(rel));
2059 * generate_relation_name - copied from ruleutils.c
2060 * Compute the name to display for a relation
2062 * The result includes all necessary quoting and schema-prefixing.
2065 generate_relation_name(Relation rel)
2070 /* Qualify the name if not visible in search path */
2071 if (RelationIsVisible(RelationGetRelid(rel)))
2074 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2076 result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2083 getConnectionByName(const char *name)
2085 remoteConnHashEnt *hentry;
2088 if (!remoteConnHash)
2089 remoteConnHash = createConnHash();
2091 key = pstrdup(name);
2092 truncate_identifier(key, strlen(key), false);
2093 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2094 key, HASH_FIND, NULL);
2097 return (hentry->rconn);
2103 createConnHash(void)
2107 ctl.keysize = NAMEDATALEN;
2108 ctl.entrysize = sizeof(remoteConnHashEnt);
2110 return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
2114 createNewConnection(const char *name, remoteConn *rconn)
2116 remoteConnHashEnt *hentry;
2120 if (!remoteConnHash)
2121 remoteConnHash = createConnHash();
2123 key = pstrdup(name);
2124 truncate_identifier(key, strlen(key), true);
2125 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2126 HASH_ENTER, &found);
2130 PQfinish(rconn->conn);
2134 (errcode(ERRCODE_DUPLICATE_OBJECT),
2135 errmsg("duplicate connection name")));
2138 hentry->rconn = rconn;
2139 strlcpy(hentry->name, name, sizeof(hentry->name));
2143 deleteConnection(const char *name)
2145 remoteConnHashEnt *hentry;
2149 if (!remoteConnHash)
2150 remoteConnHash = createConnHash();
2152 key = pstrdup(name);
2153 truncate_identifier(key, strlen(key), false);
2154 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2155 key, HASH_REMOVE, &found);
2159 (errcode(ERRCODE_UNDEFINED_OBJECT),
2160 errmsg("undefined connection name")));
2165 dblink_security_check(PGconn *conn, remoteConn *rconn)
2169 if (!PQconnectionUsedPassword(conn))
2176 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2177 errmsg("password is required"),
2178 errdetail("Non-superuser cannot connect if the server does not request a password."),
2179 errhint("Target server's authentication method must be changed.")));
2185 * For non-superusers, insist that the connstr specify a password. This
2186 * prevents a password from being picked up from .pgpass, a service file,
2187 * the environment, etc. We don't want the postgres user's passwords
2188 * to be accessible to non-superusers.
2191 dblink_connstr_check(const char *connstr)
2195 PQconninfoOption *options;
2196 PQconninfoOption *option;
2197 bool connstr_gives_password = false;
2199 options = PQconninfoParse(connstr, NULL);
2202 for (option = options; option->keyword != NULL; option++)
2204 if (strcmp(option->keyword, "password") == 0)
2206 if (option->val != NULL && option->val[0] != '\0')
2208 connstr_gives_password = true;
2213 PQconninfoFree(options);
2216 if (!connstr_gives_password)
2218 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2219 errmsg("password is required"),
2220 errdetail("Non-superusers must provide a password in the connection string.")));
2225 dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
2228 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2229 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2230 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2231 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2232 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2234 char *message_primary;
2235 char *message_detail;
2237 char *message_context;
2238 const char *dblink_context_conname = "unnamed";
2245 if (pg_diag_sqlstate)
2246 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2247 pg_diag_sqlstate[1],
2248 pg_diag_sqlstate[2],
2249 pg_diag_sqlstate[3],
2250 pg_diag_sqlstate[4]);
2252 sqlstate = ERRCODE_CONNECTION_FAILURE;
2254 xpstrdup(message_primary, pg_diag_message_primary);
2255 xpstrdup(message_detail, pg_diag_message_detail);
2256 xpstrdup(message_hint, pg_diag_message_hint);
2257 xpstrdup(message_context, pg_diag_context);
2263 dblink_context_conname = conname;
2267 message_primary ? errmsg("%s", message_primary) : errmsg("unknown error"),
2268 message_detail ? errdetail("%s", message_detail) : 0,
2269 message_hint ? errhint("%s", message_hint) : 0,
2270 message_context ? errcontext("%s", message_context) : 0,
2271 errcontext("Error occurred on dblink connection named \"%s\": %s.",
2272 dblink_context_conname, dblink_context_msg)));
2276 * Obtain connection string for a foreign server
2279 get_connect_string(const char *servername)
2281 ForeignServer *foreign_server = NULL;
2282 UserMapping *user_mapping;
2284 StringInfo buf = makeStringInfo();
2285 ForeignDataWrapper *fdw;
2286 AclResult aclresult;
2289 /* first gather the server connstr options */
2290 srvname = pstrdup(servername);
2291 truncate_identifier(srvname, strlen(srvname), false);
2292 foreign_server = GetForeignServerByName(srvname, true);
2296 Oid serverid = foreign_server->serverid;
2297 Oid fdwid = foreign_server->fdwid;
2298 Oid userid = GetUserId();
2300 user_mapping = GetUserMapping(userid, serverid);
2301 fdw = GetForeignDataWrapper(fdwid);
2303 /* Check permissions, user must have usage on the server. */
2304 aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
2305 if (aclresult != ACLCHECK_OK)
2306 aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
2308 foreach(cell, fdw->options)
2310 DefElem *def = lfirst(cell);
2312 appendStringInfo(buf, "%s='%s' ", def->defname,
2313 escape_param_str(strVal(def->arg)));
2316 foreach(cell, foreign_server->options)
2318 DefElem *def = lfirst(cell);
2320 appendStringInfo(buf, "%s='%s' ", def->defname,
2321 escape_param_str(strVal(def->arg)));
2324 foreach(cell, user_mapping->options)
2327 DefElem *def = lfirst(cell);
2329 appendStringInfo(buf, "%s='%s' ", def->defname,
2330 escape_param_str(strVal(def->arg)));
2340 * Escaping libpq connect parameter strings.
2342 * Replaces "'" with "\'" and "\" with "\\".
2345 escape_param_str(const char *str)
2348 StringInfo buf = makeStringInfo();
2350 for (cp = str; *cp; cp++)
2352 if (*cp == '\\' || *cp == '\'')
2353 appendStringInfoChar(buf, '\\');
2354 appendStringInfoChar(buf, *cp);
2361 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2362 * functions, and translate to the internal representation.
2364 * The user supplies an int2vector of 1-based logical attnums, plus a count
2365 * argument (the need for the separate count argument is historical, but we
2366 * still check it). We check that each attnum corresponds to a valid,
2367 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2368 * listed twice, though the actual use-case for such things is dubious.
2369 * Note that before Postgres 9.0, the user's attnums were interpreted as
2370 * physical not logical column numbers; this was changed for future-proofing.
2372 * The internal representation is a palloc'd int array of 0-based physical
2376 validate_pkattnums(Relation rel,
2377 int2vector *pkattnums_arg, int32 pknumatts_arg,
2378 int **pkattnums, int *pknumatts)
2380 TupleDesc tupdesc = rel->rd_att;
2381 int natts = tupdesc->natts;
2384 /* Don't take more array elements than there are */
2385 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2387 /* Must have at least one pk attnum selected */
2388 if (pknumatts_arg <= 0)
2390 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2391 errmsg("number of key attributes must be > 0")));
2393 /* Allocate output array */
2394 *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
2395 *pknumatts = pknumatts_arg;
2397 /* Validate attnums and convert to internal form */
2398 for (i = 0; i < pknumatts_arg; i++)
2400 int pkattnum = pkattnums_arg->values[i];
2404 /* Can throw error immediately if out of range */
2405 if (pkattnum <= 0 || pkattnum > natts)
2407 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2408 errmsg("invalid attribute number %d", pkattnum)));
2410 /* Identify which physical column has this logical number */
2412 for (j = 0; j < natts; j++)
2414 /* dropped columns don't count */
2415 if (tupdesc->attrs[j]->attisdropped)
2418 if (++lnum == pkattnum)
2423 (*pkattnums)[i] = j;
2426 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2427 errmsg("invalid attribute number %d", pkattnum)));