4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2023, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
37 #include "access/htup_details.h"
38 #include "access/relation.h"
39 #include "access/reloptions.h"
40 #include "access/table.h"
41 #include "catalog/namespace.h"
42 #include "catalog/pg_foreign_data_wrapper.h"
43 #include "catalog/pg_foreign_server.h"
44 #include "catalog/pg_type.h"
45 #include "catalog/pg_user_mapping.h"
46 #include "executor/spi.h"
47 #include "foreign/foreign.h"
49 #include "lib/stringinfo.h"
51 #include "libpq/libpq-be.h"
52 #include "libpq/libpq-be-fe-helpers.h"
53 #include "mb/pg_wchar.h"
54 #include "miscadmin.h"
55 #include "parser/scansup.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/guc.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/rel.h"
63 #include "utils/varlena.h"
67 typedef struct remoteConn
69 PGconn
*conn
; /* Hold the remote connection */
70 int openCursorCount
; /* The number of open cursors */
71 bool newXactForCursor
; /* Opened a transaction for a cursor */
74 typedef struct storeInfo
76 FunctionCallInfo fcinfo
;
77 Tuplestorestate
*tuplestore
;
78 AttInMetadata
*attinmeta
;
79 MemoryContext tmpcontext
;
81 /* temp storage for results to avoid leaks on exception */
87 * Internal declarations
89 static Datum
dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
);
90 static void prepTuplestoreResult(FunctionCallInfo fcinfo
);
91 static void materializeResult(FunctionCallInfo fcinfo
, PGconn
*conn
,
93 static void materializeQueryResult(FunctionCallInfo fcinfo
,
98 static PGresult
*storeQueryResult(volatile storeInfo
*sinfo
, PGconn
*conn
, const char *sql
);
99 static void storeRow(volatile storeInfo
*sinfo
, PGresult
*res
, bool first
);
100 static remoteConn
*getConnectionByName(const char *name
);
101 static HTAB
*createConnHash(void);
102 static void createNewConnection(const char *name
, remoteConn
*rconn
);
103 static void deleteConnection(const char *name
);
104 static char **get_pkey_attnames(Relation rel
, int16
*indnkeyatts
);
105 static char **get_text_array_contents(ArrayType
*array
, int *numitems
);
106 static char *get_sql_insert(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
107 static char *get_sql_delete(Relation rel
, int *pkattnums
, int pknumatts
, char **tgt_pkattvals
);
108 static char *get_sql_update(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
);
109 static char *quote_ident_cstr(char *rawstr
);
110 static int get_attnum_pk_pos(int *pkattnums
, int pknumatts
, int key
);
111 static HeapTuple
get_tuple_of_interest(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
);
112 static Relation
get_rel_from_relname(text
*relname_text
, LOCKMODE lockmode
, AclMode aclmode
);
113 static char *generate_relation_name(Relation rel
);
114 static void dblink_connstr_check(const char *connstr
);
115 static bool dblink_connstr_has_pw(const char *connstr
);
116 static void dblink_security_check(PGconn
*conn
, remoteConn
*rconn
, const char *connstr
);
117 static void dblink_res_error(PGconn
*conn
, const char *conname
, PGresult
*res
,
118 bool fail
, const char *fmt
,...) pg_attribute_printf(5, 6);
119 static char *get_connect_string(const char *servername
);
120 static char *escape_param_str(const char *str
);
121 static void validate_pkattnums(Relation rel
,
122 int2vector
*pkattnums_arg
, int32 pknumatts_arg
,
123 int **pkattnums
, int *pknumatts
);
124 static bool is_valid_dblink_option(const PQconninfoOption
*options
,
125 const char *option
, Oid context
);
126 static int applyRemoteGucs(PGconn
*conn
);
127 static void restoreLocalGucs(int nestlevel
);
130 static remoteConn
*pconn
= NULL
;
131 static HTAB
*remoteConnHash
= NULL
;
133 /* custom wait event values, retrieved from shared memory */
134 static uint32 dblink_we_connect
= 0;
135 static uint32 dblink_we_get_conn
= 0;
138 * Following is list that holds multiple remote connections.
139 * Calling convention of each dblink function changes to accept
140 * connection name as the first parameter. The connection list is
141 * much like ecpg e.g. a mapping between a name and a PGconn object.
144 typedef struct remoteConnHashEnt
146 char name
[NAMEDATALEN
];
150 /* initial number of connection hashes */
154 xpstrdup(const char *in
)
162 pg_attribute_noreturn()
163 dblink_res_internalerror(PGconn
*conn
, PGresult
*res
, const char *p2
)
165 char *msg
= pchomp(PQerrorMessage(conn
));
168 elog(ERROR
, "%s: %s", p2
, msg
);
172 pg_attribute_noreturn()
173 dblink_conn_not_avail(const char *conname
)
177 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST
),
178 errmsg("connection \"%s\" not available", conname
)));
181 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST
),
182 errmsg("connection not available")));
186 dblink_get_conn(char *conname_or_str
,
187 PGconn
*volatile *conn_p
, char **conname_p
, volatile bool *freeconn_p
)
189 remoteConn
*rconn
= getConnectionByName(conname_or_str
);
197 conname
= conname_or_str
;
204 connstr
= get_connect_string(conname_or_str
);
206 connstr
= conname_or_str
;
207 dblink_connstr_check(connstr
);
209 /* first time, allocate or get the custom wait event */
210 if (dblink_we_get_conn
== 0)
211 dblink_we_get_conn
= WaitEventExtensionNew("DblinkGetConnect");
213 /* OK to make connection */
214 conn
= libpqsrv_connect(connstr
, dblink_we_get_conn
);
216 if (PQstatus(conn
) == CONNECTION_BAD
)
218 char *msg
= pchomp(PQerrorMessage(conn
));
220 libpqsrv_disconnect(conn
);
222 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
223 errmsg("could not establish connection"),
224 errdetail_internal("%s", msg
)));
226 dblink_security_check(conn
, rconn
, connstr
);
227 if (PQclientEncoding(conn
) != GetDatabaseEncoding())
228 PQsetClientEncoding(conn
, GetDatabaseEncodingName());
234 *conname_p
= conname
;
235 *freeconn_p
= freeconn
;
239 dblink_get_named_conn(const char *conname
)
241 remoteConn
*rconn
= getConnectionByName(conname
);
246 dblink_conn_not_avail(conname
);
247 return NULL
; /* keep compiler quiet */
255 pconn
= (remoteConn
*) MemoryContextAlloc(TopMemoryContext
, sizeof(remoteConn
));
257 pconn
->openCursorCount
= 0;
258 pconn
->newXactForCursor
= false;
263 * Create a persistent connection to another database
265 PG_FUNCTION_INFO_V1(dblink_connect
);
267 dblink_connect(PG_FUNCTION_ARGS
)
269 char *conname_or_str
= NULL
;
270 char *connstr
= NULL
;
271 char *connname
= NULL
;
274 remoteConn
*rconn
= NULL
;
280 conname_or_str
= text_to_cstring(PG_GETARG_TEXT_PP(1));
281 connname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
283 else if (PG_NARGS() == 1)
284 conname_or_str
= text_to_cstring(PG_GETARG_TEXT_PP(0));
288 rconn
= (remoteConn
*) MemoryContextAlloc(TopMemoryContext
,
291 rconn
->openCursorCount
= 0;
292 rconn
->newXactForCursor
= false;
295 /* first check for valid foreign data server */
296 connstr
= get_connect_string(conname_or_str
);
298 connstr
= conname_or_str
;
300 /* check password in connection string if not superuser */
301 dblink_connstr_check(connstr
);
303 /* first time, allocate or get the custom wait event */
304 if (dblink_we_connect
== 0)
305 dblink_we_connect
= WaitEventExtensionNew("DblinkConnect");
307 /* OK to make connection */
308 conn
= libpqsrv_connect(connstr
, dblink_we_connect
);
310 if (PQstatus(conn
) == CONNECTION_BAD
)
312 msg
= pchomp(PQerrorMessage(conn
));
313 libpqsrv_disconnect(conn
);
318 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
319 errmsg("could not establish connection"),
320 errdetail_internal("%s", msg
)));
323 /* check password actually used if not superuser */
324 dblink_security_check(conn
, rconn
, connstr
);
326 /* attempt to set client encoding to match server encoding, if needed */
327 if (PQclientEncoding(conn
) != GetDatabaseEncoding())
328 PQsetClientEncoding(conn
, GetDatabaseEncodingName());
333 createNewConnection(connname
, rconn
);
338 libpqsrv_disconnect(pconn
->conn
);
342 PG_RETURN_TEXT_P(cstring_to_text("OK"));
346 * Clear a persistent connection to another database
348 PG_FUNCTION_INFO_V1(dblink_disconnect
);
350 dblink_disconnect(PG_FUNCTION_ARGS
)
352 char *conname
= NULL
;
353 remoteConn
*rconn
= NULL
;
360 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
361 rconn
= getConnectionByName(conname
);
369 dblink_conn_not_avail(conname
);
371 libpqsrv_disconnect(conn
);
374 deleteConnection(conname
);
380 PG_RETURN_TEXT_P(cstring_to_text("OK"));
384 * opens a cursor using a persistent connection
386 PG_FUNCTION_INFO_V1(dblink_open
);
388 dblink_open(PG_FUNCTION_ARGS
)
390 PGresult
*res
= NULL
;
392 char *curname
= NULL
;
394 char *conname
= NULL
;
396 remoteConn
*rconn
= NULL
;
397 bool fail
= true; /* default to backward compatible behavior */
400 initStringInfo(&buf
);
405 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
406 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
409 else if (PG_NARGS() == 3)
411 /* might be text,text,text or text,text,bool */
412 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
414 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
415 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
416 fail
= PG_GETARG_BOOL(2);
421 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
422 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
423 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
424 rconn
= getConnectionByName(conname
);
427 else if (PG_NARGS() == 4)
429 /* text,text,text,bool */
430 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
431 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
432 sql
= text_to_cstring(PG_GETARG_TEXT_PP(2));
433 fail
= PG_GETARG_BOOL(3);
434 rconn
= getConnectionByName(conname
);
437 if (!rconn
|| !rconn
->conn
)
438 dblink_conn_not_avail(conname
);
442 /* If we are not in a transaction, start one */
443 if (PQtransactionStatus(conn
) == PQTRANS_IDLE
)
445 res
= PQexec(conn
, "BEGIN");
446 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
447 dblink_res_internalerror(conn
, res
, "begin error");
449 rconn
->newXactForCursor
= true;
452 * Since transaction state was IDLE, we force cursor count to
453 * initially be 0. This is needed as a previous ABORT might have wiped
454 * out our transaction without maintaining the cursor count for us.
456 rconn
->openCursorCount
= 0;
459 /* if we started a transaction, increment cursor count */
460 if (rconn
->newXactForCursor
)
461 (rconn
->openCursorCount
)++;
463 appendStringInfo(&buf
, "DECLARE %s CURSOR FOR %s", curname
, sql
);
464 res
= PQexec(conn
, buf
.data
);
465 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
467 dblink_res_error(conn
, conname
, res
, fail
,
468 "while opening cursor \"%s\"", curname
);
469 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
473 PG_RETURN_TEXT_P(cstring_to_text("OK"));
479 PG_FUNCTION_INFO_V1(dblink_close
);
481 dblink_close(PG_FUNCTION_ARGS
)
484 PGresult
*res
= NULL
;
485 char *curname
= NULL
;
486 char *conname
= NULL
;
488 remoteConn
*rconn
= NULL
;
489 bool fail
= true; /* default to backward compatible behavior */
492 initStringInfo(&buf
);
497 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
500 else if (PG_NARGS() == 2)
502 /* might be text,text or text,bool */
503 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
505 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
506 fail
= PG_GETARG_BOOL(1);
511 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
512 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
513 rconn
= getConnectionByName(conname
);
519 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
520 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
521 fail
= PG_GETARG_BOOL(2);
522 rconn
= getConnectionByName(conname
);
525 if (!rconn
|| !rconn
->conn
)
526 dblink_conn_not_avail(conname
);
530 appendStringInfo(&buf
, "CLOSE %s", curname
);
532 /* close the cursor */
533 res
= PQexec(conn
, buf
.data
);
534 if (!res
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
536 dblink_res_error(conn
, conname
, res
, fail
,
537 "while closing cursor \"%s\"", curname
);
538 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
543 /* if we started a transaction, decrement cursor count */
544 if (rconn
->newXactForCursor
)
546 (rconn
->openCursorCount
)--;
548 /* if count is zero, commit the transaction */
549 if (rconn
->openCursorCount
== 0)
551 rconn
->newXactForCursor
= false;
553 res
= PQexec(conn
, "COMMIT");
554 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
555 dblink_res_internalerror(conn
, res
, "commit error");
560 PG_RETURN_TEXT_P(cstring_to_text("OK"));
564 * Fetch results from an open cursor
566 PG_FUNCTION_INFO_V1(dblink_fetch
);
568 dblink_fetch(PG_FUNCTION_ARGS
)
570 PGresult
*res
= NULL
;
571 char *conname
= NULL
;
572 remoteConn
*rconn
= NULL
;
575 char *curname
= NULL
;
577 bool fail
= true; /* default to backward compatible */
579 prepTuplestoreResult(fcinfo
);
585 /* text,text,int,bool */
586 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
587 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
588 howmany
= PG_GETARG_INT32(2);
589 fail
= PG_GETARG_BOOL(3);
591 rconn
= getConnectionByName(conname
);
595 else if (PG_NARGS() == 3)
597 /* text,text,int or text,int,bool */
598 if (get_fn_expr_argtype(fcinfo
->flinfo
, 2) == BOOLOID
)
600 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
601 howmany
= PG_GETARG_INT32(1);
602 fail
= PG_GETARG_BOOL(2);
607 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
608 curname
= text_to_cstring(PG_GETARG_TEXT_PP(1));
609 howmany
= PG_GETARG_INT32(2);
611 rconn
= getConnectionByName(conname
);
616 else if (PG_NARGS() == 2)
619 curname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
620 howmany
= PG_GETARG_INT32(1);
625 dblink_conn_not_avail(conname
);
627 initStringInfo(&buf
);
628 appendStringInfo(&buf
, "FETCH %d FROM %s", howmany
, curname
);
631 * Try to execute the query. Note that since libpq uses malloc, the
632 * PGresult will be long-lived even though we are still in a short-lived
635 res
= PQexec(conn
, buf
.data
);
637 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
638 PQresultStatus(res
) != PGRES_TUPLES_OK
))
640 dblink_res_error(conn
, conname
, res
, fail
,
641 "while fetching from cursor \"%s\"", curname
);
644 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
646 /* cursor does not exist - closed already or bad name */
649 (errcode(ERRCODE_INVALID_CURSOR_NAME
),
650 errmsg("cursor \"%s\" does not exist", curname
)));
653 materializeResult(fcinfo
, conn
, res
);
658 * Note: this is the new preferred version of dblink
660 PG_FUNCTION_INFO_V1(dblink_record
);
662 dblink_record(PG_FUNCTION_ARGS
)
664 return dblink_record_internal(fcinfo
, false);
667 PG_FUNCTION_INFO_V1(dblink_send_query
);
669 dblink_send_query(PG_FUNCTION_ARGS
)
677 conn
= dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
678 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
681 /* shouldn't happen */
682 elog(ERROR
, "wrong number of arguments");
684 /* async query send */
685 retval
= PQsendQuery(conn
, sql
);
687 elog(NOTICE
, "could not send query: %s", pchomp(PQerrorMessage(conn
)));
689 PG_RETURN_INT32(retval
);
692 PG_FUNCTION_INFO_V1(dblink_get_result
);
694 dblink_get_result(PG_FUNCTION_ARGS
)
696 return dblink_record_internal(fcinfo
, true);
700 dblink_record_internal(FunctionCallInfo fcinfo
, bool is_async
)
702 PGconn
*volatile conn
= NULL
;
703 volatile bool freeconn
= false;
705 prepTuplestoreResult(fcinfo
);
712 char *conname
= NULL
;
713 bool fail
= true; /* default to backward compatible */
720 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
721 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
722 fail
= PG_GETARG_BOOL(2);
723 dblink_get_conn(conname
, &conn
, &conname
, &freeconn
);
725 else if (PG_NARGS() == 2)
727 /* text,text or text,bool */
728 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
730 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
731 fail
= PG_GETARG_BOOL(1);
736 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
737 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
738 dblink_get_conn(conname
, &conn
, &conname
, &freeconn
);
741 else if (PG_NARGS() == 1)
745 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
748 /* shouldn't happen */
749 elog(ERROR
, "wrong number of arguments");
753 /* get async result */
754 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
759 fail
= PG_GETARG_BOOL(1);
760 conn
= dblink_get_named_conn(conname
);
762 else if (PG_NARGS() == 1)
765 conn
= dblink_get_named_conn(conname
);
768 /* shouldn't happen */
769 elog(ERROR
, "wrong number of arguments");
773 dblink_conn_not_avail(conname
);
777 /* synchronous query, use efficient tuple collection method */
778 materializeQueryResult(fcinfo
, conn
, conname
, sql
, fail
);
782 /* async result retrieval, do it the old way */
783 PGresult
*res
= PQgetResult(conn
);
785 /* NULL means we're all done with the async results */
788 if (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
789 PQresultStatus(res
) != PGRES_TUPLES_OK
)
791 dblink_res_error(conn
, conname
, res
, fail
,
792 "while executing query");
793 /* if fail isn't set, we'll return an empty query result */
797 materializeResult(fcinfo
, conn
, res
);
804 /* if needed, close the connection to the database */
806 libpqsrv_disconnect(conn
);
814 * Verify function caller can handle a tuplestore result, and set up for that.
816 * Note: if the caller returns without actually creating a tuplestore, the
817 * executor will treat the function result as an empty set.
820 prepTuplestoreResult(FunctionCallInfo fcinfo
)
822 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
824 /* check to see if query supports us returning a tuplestore */
825 if (rsinfo
== NULL
|| !IsA(rsinfo
, ReturnSetInfo
))
827 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
828 errmsg("set-valued function called in context that cannot accept a set")));
829 if (!(rsinfo
->allowedModes
& SFRM_Materialize
))
831 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
832 errmsg("materialize mode required, but it is not allowed in this context")));
834 /* let the executor know we're sending back a tuplestore */
835 rsinfo
->returnMode
= SFRM_Materialize
;
837 /* caller must fill these to return a non-empty result */
838 rsinfo
->setResult
= NULL
;
839 rsinfo
->setDesc
= NULL
;
843 * Copy the contents of the PGresult into a tuplestore to be returned
844 * as the result of the current function.
845 * The PGresult will be released in this function.
848 materializeResult(FunctionCallInfo fcinfo
, PGconn
*conn
, PGresult
*res
)
850 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
852 /* prepTuplestoreResult must have been called previously */
853 Assert(rsinfo
->returnMode
== SFRM_Materialize
);
862 if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
867 * need a tuple descriptor representing one TEXT column to return
868 * the command status string as our result tuple
870 tupdesc
= CreateTemplateTupleDesc(1);
871 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
878 Assert(PQresultStatus(res
) == PGRES_TUPLES_OK
);
882 /* get a tuple descriptor for our result type */
883 switch (get_call_result_type(fcinfo
, NULL
, &tupdesc
))
885 case TYPEFUNC_COMPOSITE
:
888 case TYPEFUNC_RECORD
:
889 /* failed to determine actual type of RECORD */
891 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
892 errmsg("function returning record called in context "
893 "that cannot accept type record")));
896 /* result type isn't composite */
897 elog(ERROR
, "return type must be a row type");
901 /* make sure we have a persistent copy of the tupdesc */
902 tupdesc
= CreateTupleDescCopy(tupdesc
);
903 ntuples
= PQntuples(res
);
904 nfields
= PQnfields(res
);
908 * check result and tuple descriptor have the same number of columns
910 if (nfields
!= tupdesc
->natts
)
912 (errcode(ERRCODE_DATATYPE_MISMATCH
),
913 errmsg("remote query result rowtype does not match "
914 "the specified FROM clause rowtype")));
918 AttInMetadata
*attinmeta
;
920 Tuplestorestate
*tupstore
;
921 MemoryContext oldcontext
;
925 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
927 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
929 nestlevel
= applyRemoteGucs(conn
);
931 oldcontext
= MemoryContextSwitchTo(rsinfo
->econtext
->ecxt_per_query_memory
);
932 tupstore
= tuplestore_begin_heap(true, false, work_mem
);
933 rsinfo
->setResult
= tupstore
;
934 rsinfo
->setDesc
= tupdesc
;
935 MemoryContextSwitchTo(oldcontext
);
937 values
= palloc_array(char *, nfields
);
939 /* put all tuples into the tuplestore */
940 for (row
= 0; row
< ntuples
; row
++)
948 for (i
= 0; i
< nfields
; i
++)
950 if (PQgetisnull(res
, row
, i
))
953 values
[i
] = PQgetvalue(res
, row
, i
);
958 values
[0] = PQcmdStatus(res
);
961 /* build the tuple and put it into the tuplestore. */
962 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
963 tuplestore_puttuple(tupstore
, tuple
);
966 /* clean up GUC settings, if we changed any */
967 restoreLocalGucs(nestlevel
);
972 /* be sure to release the libpq result */
979 * Execute the given SQL command and store its results into a tuplestore
980 * to be returned as the result of the current function.
982 * This is equivalent to PQexec followed by materializeResult, but we make
983 * use of libpq's single-row mode to avoid accumulating the whole result
984 * inside libpq before it gets transferred to the tuplestore.
987 materializeQueryResult(FunctionCallInfo fcinfo
,
993 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
994 PGresult
*volatile res
= NULL
;
995 volatile storeInfo sinfo
= {0};
997 /* prepTuplestoreResult must have been called previously */
998 Assert(rsinfo
->returnMode
== SFRM_Materialize
);
1000 sinfo
.fcinfo
= fcinfo
;
1004 /* Create short-lived memory context for data conversions */
1005 sinfo
.tmpcontext
= AllocSetContextCreate(CurrentMemoryContext
,
1006 "dblink temporary context",
1007 ALLOCSET_DEFAULT_SIZES
);
1009 /* execute query, collecting any tuples into the tuplestore */
1010 res
= storeQueryResult(&sinfo
, conn
, sql
);
1013 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
1014 PQresultStatus(res
) != PGRES_TUPLES_OK
))
1017 * dblink_res_error will clear the passed PGresult, so we need
1018 * this ugly dance to avoid doing so twice during error exit
1020 PGresult
*res1
= res
;
1023 dblink_res_error(conn
, conname
, res1
, fail
,
1024 "while executing query");
1025 /* if fail isn't set, we'll return an empty query result */
1027 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
1030 * storeRow didn't get called, so we need to convert the command
1031 * status string to a tuple manually
1034 AttInMetadata
*attinmeta
;
1035 Tuplestorestate
*tupstore
;
1038 MemoryContext oldcontext
;
1041 * need a tuple descriptor representing one TEXT column to return
1042 * the command status string as our result tuple
1044 tupdesc
= CreateTemplateTupleDesc(1);
1045 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "status",
1047 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
1049 oldcontext
= MemoryContextSwitchTo(rsinfo
->econtext
->ecxt_per_query_memory
);
1050 tupstore
= tuplestore_begin_heap(true, false, work_mem
);
1051 rsinfo
->setResult
= tupstore
;
1052 rsinfo
->setDesc
= tupdesc
;
1053 MemoryContextSwitchTo(oldcontext
);
1055 values
[0] = PQcmdStatus(res
);
1057 /* build the tuple and put it into the tuplestore. */
1058 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
1059 tuplestore_puttuple(tupstore
, tuple
);
1066 Assert(PQresultStatus(res
) == PGRES_TUPLES_OK
);
1067 /* storeRow should have created a tuplestore */
1068 Assert(rsinfo
->setResult
!= NULL
);
1074 /* clean up data conversion short-lived memory context */
1075 if (sinfo
.tmpcontext
!= NULL
)
1076 MemoryContextDelete(sinfo
.tmpcontext
);
1077 sinfo
.tmpcontext
= NULL
;
1079 PQclear(sinfo
.last_res
);
1080 sinfo
.last_res
= NULL
;
1081 PQclear(sinfo
.cur_res
);
1082 sinfo
.cur_res
= NULL
;
1086 /* be sure to release any libpq result we collected */
1088 PQclear(sinfo
.last_res
);
1089 PQclear(sinfo
.cur_res
);
1090 /* and clear out any pending data in libpq */
1091 while ((res
= PQgetResult(conn
)) != NULL
)
1099 * Execute query, and send any result rows to sinfo->tuplestore.
1102 storeQueryResult(volatile storeInfo
*sinfo
, PGconn
*conn
, const char *sql
)
1108 if (!PQsendQuery(conn
, sql
))
1109 elog(ERROR
, "could not send query: %s", pchomp(PQerrorMessage(conn
)));
1111 if (!PQsetSingleRowMode(conn
)) /* shouldn't fail */
1112 elog(ERROR
, "failed to set single-row mode for dblink query");
1116 CHECK_FOR_INTERRUPTS();
1118 sinfo
->cur_res
= PQgetResult(conn
);
1119 if (!sinfo
->cur_res
)
1122 if (PQresultStatus(sinfo
->cur_res
) == PGRES_SINGLE_TUPLE
)
1124 /* got one row from possibly-bigger resultset */
1127 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1128 * We shouldn't do this until we have a row in hand, to ensure
1129 * libpq has seen any earlier ParameterStatus protocol messages.
1131 if (first
&& nestlevel
< 0)
1132 nestlevel
= applyRemoteGucs(conn
);
1134 storeRow(sinfo
, sinfo
->cur_res
, first
);
1136 PQclear(sinfo
->cur_res
);
1137 sinfo
->cur_res
= NULL
;
1142 /* if empty resultset, fill tuplestore header */
1143 if (first
&& PQresultStatus(sinfo
->cur_res
) == PGRES_TUPLES_OK
)
1144 storeRow(sinfo
, sinfo
->cur_res
, first
);
1146 /* store completed result at last_res */
1147 PQclear(sinfo
->last_res
);
1148 sinfo
->last_res
= sinfo
->cur_res
;
1149 sinfo
->cur_res
= NULL
;
1154 /* clean up GUC settings, if we changed any */
1155 restoreLocalGucs(nestlevel
);
1157 /* return last_res */
1158 res
= sinfo
->last_res
;
1159 sinfo
->last_res
= NULL
;
1164 * Send single row to sinfo->tuplestore.
1166 * If "first" is true, create the tuplestore using PGresult's metadata
1167 * (in this case the PGresult might contain either zero or one row).
1170 storeRow(volatile storeInfo
*sinfo
, PGresult
*res
, bool first
)
1172 int nfields
= PQnfields(res
);
1175 MemoryContext oldcontext
;
1179 /* Prepare for new result set */
1180 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) sinfo
->fcinfo
->resultinfo
;
1184 * It's possible to get more than one result set if the query string
1185 * contained multiple SQL commands. In that case, we follow PQexec's
1186 * traditional behavior of throwing away all but the last result.
1188 if (sinfo
->tuplestore
)
1189 tuplestore_end(sinfo
->tuplestore
);
1190 sinfo
->tuplestore
= NULL
;
1192 /* get a tuple descriptor for our result type */
1193 switch (get_call_result_type(sinfo
->fcinfo
, NULL
, &tupdesc
))
1195 case TYPEFUNC_COMPOSITE
:
1198 case TYPEFUNC_RECORD
:
1199 /* failed to determine actual type of RECORD */
1201 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
1202 errmsg("function returning record called in context "
1203 "that cannot accept type record")));
1206 /* result type isn't composite */
1207 elog(ERROR
, "return type must be a row type");
1211 /* make sure we have a persistent copy of the tupdesc */
1212 tupdesc
= CreateTupleDescCopy(tupdesc
);
1214 /* check result and tuple descriptor have the same number of columns */
1215 if (nfields
!= tupdesc
->natts
)
1217 (errcode(ERRCODE_DATATYPE_MISMATCH
),
1218 errmsg("remote query result rowtype does not match "
1219 "the specified FROM clause rowtype")));
1221 /* Prepare attinmeta for later data conversions */
1222 sinfo
->attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
1224 /* Create a new, empty tuplestore */
1225 oldcontext
= MemoryContextSwitchTo(rsinfo
->econtext
->ecxt_per_query_memory
);
1226 sinfo
->tuplestore
= tuplestore_begin_heap(true, false, work_mem
);
1227 rsinfo
->setResult
= sinfo
->tuplestore
;
1228 rsinfo
->setDesc
= tupdesc
;
1229 MemoryContextSwitchTo(oldcontext
);
1231 /* Done if empty resultset */
1232 if (PQntuples(res
) == 0)
1236 * Set up sufficiently-wide string pointers array; this won't change
1237 * in size so it's easy to preallocate.
1240 pfree(sinfo
->cstrs
);
1241 sinfo
->cstrs
= palloc_array(char *, nfields
);
1244 /* Should have a single-row result if we get here */
1245 Assert(PQntuples(res
) == 1);
1248 * Do the following work in a temp context that we reset after each tuple.
1249 * This cleans up not only the data we have direct access to, but any
1250 * cruft the I/O functions might leak.
1252 oldcontext
= MemoryContextSwitchTo(sinfo
->tmpcontext
);
1255 * Fill cstrs with null-terminated strings of column values.
1257 for (i
= 0; i
< nfields
; i
++)
1259 if (PQgetisnull(res
, 0, i
))
1260 sinfo
->cstrs
[i
] = NULL
;
1262 sinfo
->cstrs
[i
] = PQgetvalue(res
, 0, i
);
1265 /* Convert row to a tuple, and add it to the tuplestore */
1266 tuple
= BuildTupleFromCStrings(sinfo
->attinmeta
, sinfo
->cstrs
);
1268 tuplestore_puttuple(sinfo
->tuplestore
, tuple
);
1271 MemoryContextSwitchTo(oldcontext
);
1272 MemoryContextReset(sinfo
->tmpcontext
);
1276 * List all open dblink connections by name.
1277 * Returns an array of all connection names.
1280 PG_FUNCTION_INFO_V1(dblink_get_connections
);
1282 dblink_get_connections(PG_FUNCTION_ARGS
)
1284 HASH_SEQ_STATUS status
;
1285 remoteConnHashEnt
*hentry
;
1286 ArrayBuildState
*astate
= NULL
;
1290 hash_seq_init(&status
, remoteConnHash
);
1291 while ((hentry
= (remoteConnHashEnt
*) hash_seq_search(&status
)) != NULL
)
1293 /* stash away current value */
1294 astate
= accumArrayResult(astate
,
1295 CStringGetTextDatum(hentry
->name
),
1296 false, TEXTOID
, CurrentMemoryContext
);
1301 PG_RETURN_DATUM(makeArrayResult(astate
,
1302 CurrentMemoryContext
));
1308 * Checks if a given remote connection is busy
1310 * Returns 1 if the connection is busy, 0 otherwise
1312 * text connection_name - name of the connection to check
1315 PG_FUNCTION_INFO_V1(dblink_is_busy
);
1317 dblink_is_busy(PG_FUNCTION_ARGS
)
1322 conn
= dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1324 PQconsumeInput(conn
);
1325 PG_RETURN_INT32(PQisBusy(conn
));
1329 * Cancels a running request on a connection
1332 * "OK" if the cancel request has been sent correctly,
1333 * an error message otherwise
1336 * text connection_name - name of the connection to check
1339 PG_FUNCTION_INFO_V1(dblink_cancel_query
);
1341 dblink_cancel_query(PG_FUNCTION_ARGS
)
1349 conn
= dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1350 cancel
= PQgetCancel(conn
);
1352 res
= PQcancel(cancel
, errbuf
, 256);
1353 PQfreeCancel(cancel
);
1356 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1358 PG_RETURN_TEXT_P(cstring_to_text(errbuf
));
1363 * Get error message from a connection
1366 * "OK" if no error, an error message otherwise
1369 * text connection_name - name of the connection to check
1372 PG_FUNCTION_INFO_V1(dblink_error_message
);
1374 dblink_error_message(PG_FUNCTION_ARGS
)
1380 conn
= dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1382 msg
= PQerrorMessage(conn
);
1383 if (msg
== NULL
|| msg
[0] == '\0')
1384 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1386 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg
)));
1390 * Execute an SQL non-SELECT command
1392 PG_FUNCTION_INFO_V1(dblink_exec
);
1394 dblink_exec(PG_FUNCTION_ARGS
)
1396 text
*volatile sql_cmd_status
= NULL
;
1397 PGconn
*volatile conn
= NULL
;
1398 volatile bool freeconn
= false;
1404 PGresult
*res
= NULL
;
1406 char *conname
= NULL
;
1407 bool fail
= true; /* default to backward compatible behavior */
1409 if (PG_NARGS() == 3)
1411 /* must be text,text,bool */
1412 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1413 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1414 fail
= PG_GETARG_BOOL(2);
1415 dblink_get_conn(conname
, &conn
, &conname
, &freeconn
);
1417 else if (PG_NARGS() == 2)
1419 /* might be text,text or text,bool */
1420 if (get_fn_expr_argtype(fcinfo
->flinfo
, 1) == BOOLOID
)
1422 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1423 fail
= PG_GETARG_BOOL(1);
1428 conname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1429 sql
= text_to_cstring(PG_GETARG_TEXT_PP(1));
1430 dblink_get_conn(conname
, &conn
, &conname
, &freeconn
);
1433 else if (PG_NARGS() == 1)
1435 /* must be single text argument */
1437 sql
= text_to_cstring(PG_GETARG_TEXT_PP(0));
1440 /* shouldn't happen */
1441 elog(ERROR
, "wrong number of arguments");
1444 dblink_conn_not_avail(conname
);
1446 res
= PQexec(conn
, sql
);
1448 (PQresultStatus(res
) != PGRES_COMMAND_OK
&&
1449 PQresultStatus(res
) != PGRES_TUPLES_OK
))
1451 dblink_res_error(conn
, conname
, res
, fail
,
1452 "while executing command");
1455 * and save a copy of the command status string to return as our
1458 sql_cmd_status
= cstring_to_text("ERROR");
1460 else if (PQresultStatus(res
) == PGRES_COMMAND_OK
)
1463 * and save a copy of the command status string to return as our
1466 sql_cmd_status
= cstring_to_text(PQcmdStatus(res
));
1473 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
1474 errmsg("statement returning results not allowed")));
1479 /* if needed, close the connection to the database */
1481 libpqsrv_disconnect(conn
);
1485 PG_RETURN_TEXT_P(sql_cmd_status
);
1492 * Return list of primary key fields for the supplied relation,
1493 * or NULL if none exists.
1495 PG_FUNCTION_INFO_V1(dblink_get_pkey
);
1497 dblink_get_pkey(PG_FUNCTION_ARGS
)
1501 FuncCallContext
*funcctx
;
1504 AttInMetadata
*attinmeta
;
1505 MemoryContext oldcontext
;
1507 /* stuff done only on the first call of the function */
1508 if (SRF_IS_FIRSTCALL())
1513 /* create a function context for cross-call persistence */
1514 funcctx
= SRF_FIRSTCALL_INIT();
1517 * switch to memory context appropriate for multiple function calls
1519 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
1521 /* open target relation */
1522 rel
= get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock
, ACL_SELECT
);
1524 /* get the array of attnums */
1525 results
= get_pkey_attnames(rel
, &indnkeyatts
);
1527 relation_close(rel
, AccessShareLock
);
1530 * need a tuple descriptor representing one INT and one TEXT column
1532 tupdesc
= CreateTemplateTupleDesc(2);
1533 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "position",
1535 TupleDescInitEntry(tupdesc
, (AttrNumber
) 2, "colname",
1539 * Generate attribute metadata needed later to produce tuples from raw
1542 attinmeta
= TupleDescGetAttInMetadata(tupdesc
);
1543 funcctx
->attinmeta
= attinmeta
;
1545 if ((results
!= NULL
) && (indnkeyatts
> 0))
1547 funcctx
->max_calls
= indnkeyatts
;
1549 /* got results, keep track of them */
1550 funcctx
->user_fctx
= results
;
1554 /* fast track when no results */
1555 MemoryContextSwitchTo(oldcontext
);
1556 SRF_RETURN_DONE(funcctx
);
1559 MemoryContextSwitchTo(oldcontext
);
1562 /* stuff done on every call of the function */
1563 funcctx
= SRF_PERCALL_SETUP();
1566 * initialize per-call variables
1568 call_cntr
= funcctx
->call_cntr
;
1569 max_calls
= funcctx
->max_calls
;
1571 results
= (char **) funcctx
->user_fctx
;
1572 attinmeta
= funcctx
->attinmeta
;
1574 if (call_cntr
< max_calls
) /* do when there is more left to send */
1580 values
= palloc_array(char *, 2);
1581 values
[0] = psprintf("%d", call_cntr
+ 1);
1582 values
[1] = results
[call_cntr
];
1584 /* build the tuple */
1585 tuple
= BuildTupleFromCStrings(attinmeta
, values
);
1587 /* make the tuple into a datum */
1588 result
= HeapTupleGetDatum(tuple
);
1590 SRF_RETURN_NEXT(funcctx
, result
);
1594 /* do when there is no more left */
1595 SRF_RETURN_DONE(funcctx
);
1601 * dblink_build_sql_insert
1603 * Used to generate an SQL insert statement
1604 * based on an existing tuple in a local relation.
1605 * This is useful for selectively replicating data
1606 * to another server via dblink.
1609 * <relname> - name of local table of interest
1610 * <pkattnums> - an int2vector of attnums which will be used
1611 * to identify the local tuple of interest
1612 * <pknumatts> - number of attnums in pkattnums
1613 * <src_pkattvals_arry> - text array of key values which will be used
1614 * to identify the local tuple of interest
1615 * <tgt_pkattvals_arry> - text array of key values which will be used
1616 * to build the string for execution remotely. These are substituted
1617 * for their counterparts in src_pkattvals_arry
1619 PG_FUNCTION_INFO_V1(dblink_build_sql_insert
);
1621 dblink_build_sql_insert(PG_FUNCTION_ARGS
)
1623 text
*relname_text
= PG_GETARG_TEXT_PP(0);
1624 int2vector
*pkattnums_arg
= (int2vector
*) PG_GETARG_POINTER(1);
1625 int32 pknumatts_arg
= PG_GETARG_INT32(2);
1626 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1627 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1631 char **src_pkattvals
;
1632 char **tgt_pkattvals
;
1638 * Open target relation.
1640 rel
= get_rel_from_relname(relname_text
, AccessShareLock
, ACL_SELECT
);
1643 * Process pkattnums argument.
1645 validate_pkattnums(rel
, pkattnums_arg
, pknumatts_arg
,
1646 &pkattnums
, &pknumatts
);
1649 * Source array is made up of key values that will be used to locate the
1650 * tuple of interest from the local system.
1652 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1655 * There should be one source array key value for each key attnum
1657 if (src_nitems
!= pknumatts
)
1659 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1660 errmsg("source key array length must match number of key attributes")));
1663 * Target array is made up of key values that will be used to build the
1664 * SQL string for use on the remote system.
1666 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1669 * There should be one target array key value for each key attnum
1671 if (tgt_nitems
!= pknumatts
)
1673 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1674 errmsg("target key array length must match number of key attributes")));
1677 * Prep work is finally done. Go get the SQL string.
1679 sql
= get_sql_insert(rel
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1682 * Now we can close the relation.
1684 relation_close(rel
, AccessShareLock
);
1689 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1694 * dblink_build_sql_delete
1696 * Used to generate an SQL delete statement.
1697 * This is useful for selectively replicating a
1698 * delete to another server via dblink.
1701 * <relname> - name of remote table of interest
1702 * <pkattnums> - an int2vector of attnums which will be used
1703 * to identify the remote tuple of interest
1704 * <pknumatts> - number of attnums in pkattnums
1705 * <tgt_pkattvals_arry> - text array of key values which will be used
1706 * to build the string for execution remotely.
1708 PG_FUNCTION_INFO_V1(dblink_build_sql_delete
);
1710 dblink_build_sql_delete(PG_FUNCTION_ARGS
)
1712 text
*relname_text
= PG_GETARG_TEXT_PP(0);
1713 int2vector
*pkattnums_arg
= (int2vector
*) PG_GETARG_POINTER(1);
1714 int32 pknumatts_arg
= PG_GETARG_INT32(2);
1715 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1719 char **tgt_pkattvals
;
1724 * Open target relation.
1726 rel
= get_rel_from_relname(relname_text
, AccessShareLock
, ACL_SELECT
);
1729 * Process pkattnums argument.
1731 validate_pkattnums(rel
, pkattnums_arg
, pknumatts_arg
,
1732 &pkattnums
, &pknumatts
);
1735 * Target array is made up of key values that will be used to build the
1736 * SQL string for use on the remote system.
1738 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1741 * There should be one target array key value for each key attnum
1743 if (tgt_nitems
!= pknumatts
)
1745 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1746 errmsg("target key array length must match number of key attributes")));
1749 * Prep work is finally done. Go get the SQL string.
1751 sql
= get_sql_delete(rel
, pkattnums
, pknumatts
, tgt_pkattvals
);
1754 * Now we can close the relation.
1756 relation_close(rel
, AccessShareLock
);
1761 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1766 * dblink_build_sql_update
1768 * Used to generate an SQL update statement
1769 * based on an existing tuple in a local relation.
1770 * This is useful for selectively replicating data
1771 * to another server via dblink.
1774 * <relname> - name of local table of interest
1775 * <pkattnums> - an int2vector of attnums which will be used
1776 * to identify the local tuple of interest
1777 * <pknumatts> - number of attnums in pkattnums
1778 * <src_pkattvals_arry> - text array of key values which will be used
1779 * to identify the local tuple of interest
1780 * <tgt_pkattvals_arry> - text array of key values which will be used
1781 * to build the string for execution remotely. These are substituted
1782 * for their counterparts in src_pkattvals_arry
1784 PG_FUNCTION_INFO_V1(dblink_build_sql_update
);
1786 dblink_build_sql_update(PG_FUNCTION_ARGS
)
1788 text
*relname_text
= PG_GETARG_TEXT_PP(0);
1789 int2vector
*pkattnums_arg
= (int2vector
*) PG_GETARG_POINTER(1);
1790 int32 pknumatts_arg
= PG_GETARG_INT32(2);
1791 ArrayType
*src_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(3);
1792 ArrayType
*tgt_pkattvals_arry
= PG_GETARG_ARRAYTYPE_P(4);
1796 char **src_pkattvals
;
1797 char **tgt_pkattvals
;
1803 * Open target relation.
1805 rel
= get_rel_from_relname(relname_text
, AccessShareLock
, ACL_SELECT
);
1808 * Process pkattnums argument.
1810 validate_pkattnums(rel
, pkattnums_arg
, pknumatts_arg
,
1811 &pkattnums
, &pknumatts
);
1814 * Source array is made up of key values that will be used to locate the
1815 * tuple of interest from the local system.
1817 src_pkattvals
= get_text_array_contents(src_pkattvals_arry
, &src_nitems
);
1820 * There should be one source array key value for each key attnum
1822 if (src_nitems
!= pknumatts
)
1824 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1825 errmsg("source key array length must match number of key attributes")));
1828 * Target array is made up of key values that will be used to build the
1829 * SQL string for use on the remote system.
1831 tgt_pkattvals
= get_text_array_contents(tgt_pkattvals_arry
, &tgt_nitems
);
1834 * There should be one target array key value for each key attnum
1836 if (tgt_nitems
!= pknumatts
)
1838 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR
),
1839 errmsg("target key array length must match number of key attributes")));
1842 * Prep work is finally done. Go get the SQL string.
1844 sql
= get_sql_update(rel
, pkattnums
, pknumatts
, src_pkattvals
, tgt_pkattvals
);
1847 * Now we can close the relation.
1849 relation_close(rel
, AccessShareLock
);
1854 PG_RETURN_TEXT_P(cstring_to_text(sql
));
1858 * dblink_current_query
1859 * return the current query string
1860 * to allow its use in (among other things)
1863 PG_FUNCTION_INFO_V1(dblink_current_query
);
1865 dblink_current_query(PG_FUNCTION_ARGS
)
1867 /* This is now just an alias for the built-in function current_query() */
1868 PG_RETURN_DATUM(current_query(fcinfo
));
1872 * Retrieve async notifications for a connection.
1874 * Returns a setof record of notifications, or an empty set if none received.
1875 * Can optionally take a named connection as parameter, but uses the unnamed
1876 * connection per default.
1879 #define DBLINK_NOTIFY_COLS 3
1881 PG_FUNCTION_INFO_V1(dblink_get_notify
);
1883 dblink_get_notify(PG_FUNCTION_ARGS
)
1887 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
1890 if (PG_NARGS() == 1)
1891 conn
= dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1895 InitMaterializedSRF(fcinfo
, 0);
1897 PQconsumeInput(conn
);
1898 while ((notify
= PQnotifies(conn
)) != NULL
)
1900 Datum values
[DBLINK_NOTIFY_COLS
];
1901 bool nulls
[DBLINK_NOTIFY_COLS
];
1903 memset(values
, 0, sizeof(values
));
1904 memset(nulls
, 0, sizeof(nulls
));
1906 if (notify
->relname
!= NULL
)
1907 values
[0] = CStringGetTextDatum(notify
->relname
);
1911 values
[1] = Int32GetDatum(notify
->be_pid
);
1913 if (notify
->extra
!= NULL
)
1914 values
[2] = CStringGetTextDatum(notify
->extra
);
1918 tuplestore_putvalues(rsinfo
->setResult
, rsinfo
->setDesc
, values
, nulls
);
1921 PQconsumeInput(conn
);
1928 * Validate the options given to a dblink foreign server or user mapping.
1929 * Raise an error if any option is invalid.
1931 * We just check the names of options here, so semantic errors in options,
1932 * such as invalid numeric format, will be detected at the attempt to connect.
1934 PG_FUNCTION_INFO_V1(dblink_fdw_validator
);
1936 dblink_fdw_validator(PG_FUNCTION_ARGS
)
1938 List
*options_list
= untransformRelOptions(PG_GETARG_DATUM(0));
1939 Oid context
= PG_GETARG_OID(1);
1942 static const PQconninfoOption
*options
= NULL
;
1945 * Get list of valid libpq options.
1947 * To avoid unnecessary work, we get the list once and use it throughout
1948 * the lifetime of this backend process. We don't need to care about
1949 * memory context issues, because PQconndefaults allocates with malloc.
1953 options
= PQconndefaults();
1954 if (!options
) /* assume reason for failure is OOM */
1956 (errcode(ERRCODE_FDW_OUT_OF_MEMORY
),
1957 errmsg("out of memory"),
1958 errdetail("Could not get libpq's default connection options.")));
1961 /* Validate each supplied option. */
1962 foreach(cell
, options_list
)
1964 DefElem
*def
= (DefElem
*) lfirst(cell
);
1966 if (!is_valid_dblink_option(options
, def
->defname
, context
))
1969 * Unknown option, or invalid option for the context specified, so
1970 * complain about it. Provide a hint with a valid option that
1971 * looks similar, if there is one.
1973 const PQconninfoOption
*opt
;
1974 const char *closest_match
;
1975 ClosestMatchState match_state
;
1976 bool has_valid_options
= false;
1978 initClosestMatch(&match_state
, def
->defname
, 4);
1979 for (opt
= options
; opt
->keyword
; opt
++)
1981 if (is_valid_dblink_option(options
, opt
->keyword
, context
))
1983 has_valid_options
= true;
1984 updateClosestMatch(&match_state
, opt
->keyword
);
1988 closest_match
= getClosestMatch(&match_state
);
1990 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND
),
1991 errmsg("invalid option \"%s\"", def
->defname
),
1992 has_valid_options
? closest_match
?
1993 errhint("Perhaps you meant the option \"%s\".",
1994 closest_match
) : 0 :
1995 errhint("There are no valid options in this context.")));
2003 /*************************************************************
2004 * internal functions
2011 * Get the primary key attnames for the given relation.
2012 * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2015 get_pkey_attnames(Relation rel
, int16
*indnkeyatts
)
2017 Relation indexRelation
;
2020 HeapTuple indexTuple
;
2022 char **result
= NULL
;
2025 /* initialize indnkeyatts to 0 in case no primary key exists */
2028 tupdesc
= rel
->rd_att
;
2030 /* Prepare to scan pg_index for entries having indrelid = this rel. */
2031 indexRelation
= table_open(IndexRelationId
, AccessShareLock
);
2033 Anum_pg_index_indrelid
,
2034 BTEqualStrategyNumber
, F_OIDEQ
,
2035 ObjectIdGetDatum(RelationGetRelid(rel
)));
2037 scan
= systable_beginscan(indexRelation
, IndexIndrelidIndexId
, true,
2040 while (HeapTupleIsValid(indexTuple
= systable_getnext(scan
)))
2042 Form_pg_index index
= (Form_pg_index
) GETSTRUCT(indexTuple
);
2044 /* we're only interested if it is the primary key */
2045 if (index
->indisprimary
)
2047 *indnkeyatts
= index
->indnkeyatts
;
2048 if (*indnkeyatts
> 0)
2050 result
= palloc_array(char *, *indnkeyatts
);
2052 for (i
= 0; i
< *indnkeyatts
; i
++)
2053 result
[i
] = SPI_fname(tupdesc
, index
->indkey
.values
[i
]);
2059 systable_endscan(scan
);
2060 table_close(indexRelation
, AccessShareLock
);
2066 * Deconstruct a text[] into C-strings (note any NULL elements will be
2067 * returned as NULL pointers)
2070 get_text_array_contents(ArrayType
*array
, int *numitems
)
2072 int ndim
= ARR_NDIM(array
);
2073 int *dims
= ARR_DIMS(array
);
2084 Assert(ARR_ELEMTYPE(array
) == TEXTOID
);
2086 *numitems
= nitems
= ArrayGetNItems(ndim
, dims
);
2088 get_typlenbyvalalign(ARR_ELEMTYPE(array
),
2089 &typlen
, &typbyval
, &typalign
);
2091 values
= palloc_array(char *, nitems
);
2093 ptr
= ARR_DATA_PTR(array
);
2094 bitmap
= ARR_NULLBITMAP(array
);
2097 for (i
= 0; i
< nitems
; i
++)
2099 if (bitmap
&& (*bitmap
& bitmask
) == 0)
2105 values
[i
] = TextDatumGetCString(PointerGetDatum(ptr
));
2106 ptr
= att_addlength_pointer(ptr
, typlen
, ptr
);
2107 ptr
= (char *) att_align_nominal(ptr
, typalign
);
2110 /* advance bitmap pointer if any */
2114 if (bitmask
== 0x100)
2126 get_sql_insert(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
2138 initStringInfo(&buf
);
2140 /* get relation name including any needed schema prefix and quoting */
2141 relname
= generate_relation_name(rel
);
2143 tupdesc
= rel
->rd_att
;
2144 natts
= tupdesc
->natts
;
2146 tuple
= get_tuple_of_interest(rel
, pkattnums
, pknumatts
, src_pkattvals
);
2149 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
2150 errmsg("source row not found")));
2152 appendStringInfo(&buf
, "INSERT INTO %s(", relname
);
2155 for (i
= 0; i
< natts
; i
++)
2157 Form_pg_attribute att
= TupleDescAttr(tupdesc
, i
);
2159 if (att
->attisdropped
)
2163 appendStringInfoChar(&buf
, ',');
2165 appendStringInfoString(&buf
,
2166 quote_ident_cstr(NameStr(att
->attname
)));
2170 appendStringInfoString(&buf
, ") VALUES(");
2173 * Note: i is physical column number (counting from 0).
2176 for (i
= 0; i
< natts
; i
++)
2178 if (TupleDescAttr(tupdesc
, i
)->attisdropped
)
2182 appendStringInfoChar(&buf
, ',');
2184 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
);
2187 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
2189 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
2193 appendStringInfoString(&buf
, quote_literal_cstr(val
));
2197 appendStringInfoString(&buf
, "NULL");
2200 appendStringInfoChar(&buf
, ')');
2206 get_sql_delete(Relation rel
, int *pkattnums
, int pknumatts
, char **tgt_pkattvals
)
2213 initStringInfo(&buf
);
2215 /* get relation name including any needed schema prefix and quoting */
2216 relname
= generate_relation_name(rel
);
2218 tupdesc
= rel
->rd_att
;
2220 appendStringInfo(&buf
, "DELETE FROM %s WHERE ", relname
);
2221 for (i
= 0; i
< pknumatts
; i
++)
2223 int pkattnum
= pkattnums
[i
];
2224 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, pkattnum
);
2227 appendStringInfoString(&buf
, " AND ");
2229 appendStringInfoString(&buf
,
2230 quote_ident_cstr(NameStr(attr
->attname
)));
2232 if (tgt_pkattvals
[i
] != NULL
)
2233 appendStringInfo(&buf
, " = %s",
2234 quote_literal_cstr(tgt_pkattvals
[i
]));
2236 appendStringInfoString(&buf
, " IS NULL");
2243 get_sql_update(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
, char **tgt_pkattvals
)
2255 initStringInfo(&buf
);
2257 /* get relation name including any needed schema prefix and quoting */
2258 relname
= generate_relation_name(rel
);
2260 tupdesc
= rel
->rd_att
;
2261 natts
= tupdesc
->natts
;
2263 tuple
= get_tuple_of_interest(rel
, pkattnums
, pknumatts
, src_pkattvals
);
2266 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
2267 errmsg("source row not found")));
2269 appendStringInfo(&buf
, "UPDATE %s SET ", relname
);
2272 * Note: i is physical column number (counting from 0).
2275 for (i
= 0; i
< natts
; i
++)
2277 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, i
);
2279 if (attr
->attisdropped
)
2283 appendStringInfoString(&buf
, ", ");
2285 appendStringInfo(&buf
, "%s = ",
2286 quote_ident_cstr(NameStr(attr
->attname
)));
2288 key
= get_attnum_pk_pos(pkattnums
, pknumatts
, i
);
2291 val
= tgt_pkattvals
[key
] ? pstrdup(tgt_pkattvals
[key
]) : NULL
;
2293 val
= SPI_getvalue(tuple
, tupdesc
, i
+ 1);
2297 appendStringInfoString(&buf
, quote_literal_cstr(val
));
2301 appendStringInfoString(&buf
, "NULL");
2305 appendStringInfoString(&buf
, " WHERE ");
2307 for (i
= 0; i
< pknumatts
; i
++)
2309 int pkattnum
= pkattnums
[i
];
2310 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, pkattnum
);
2313 appendStringInfoString(&buf
, " AND ");
2315 appendStringInfoString(&buf
,
2316 quote_ident_cstr(NameStr(attr
->attname
)));
2318 val
= tgt_pkattvals
[i
];
2321 appendStringInfo(&buf
, " = %s", quote_literal_cstr(val
));
2323 appendStringInfoString(&buf
, " IS NULL");
2330 * Return a properly quoted identifier.
2331 * Uses quote_ident in quote.c
2334 quote_ident_cstr(char *rawstr
)
2340 rawstr_text
= cstring_to_text(rawstr
);
2341 result_text
= DatumGetTextPP(DirectFunctionCall1(quote_ident
,
2342 PointerGetDatum(rawstr_text
)));
2343 result
= text_to_cstring(result_text
);
2349 get_attnum_pk_pos(int *pkattnums
, int pknumatts
, int key
)
2354 * Not likely a long list anyway, so just scan for the value
2356 for (i
= 0; i
< pknumatts
; i
++)
2357 if (key
== pkattnums
[i
])
2364 get_tuple_of_interest(Relation rel
, int *pkattnums
, int pknumatts
, char **src_pkattvals
)
2375 * Connect to SPI manager
2377 if ((ret
= SPI_connect()) < 0)
2378 /* internal error */
2379 elog(ERROR
, "SPI connect failure - returned %d", ret
);
2381 initStringInfo(&buf
);
2383 /* get relation name including any needed schema prefix and quoting */
2384 relname
= generate_relation_name(rel
);
2386 tupdesc
= rel
->rd_att
;
2387 natts
= tupdesc
->natts
;
2390 * Build sql statement to look up tuple of interest, ie, the one matching
2391 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2392 * generate a result tuple that matches the table's physical structure,
2393 * with NULLs for any dropped columns. Otherwise we have to deal with two
2394 * different tupdescs and everything's very confusing.
2396 appendStringInfoString(&buf
, "SELECT ");
2398 for (i
= 0; i
< natts
; i
++)
2400 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, i
);
2403 appendStringInfoString(&buf
, ", ");
2405 if (attr
->attisdropped
)
2406 appendStringInfoString(&buf
, "NULL");
2408 appendStringInfoString(&buf
,
2409 quote_ident_cstr(NameStr(attr
->attname
)));
2412 appendStringInfo(&buf
, " FROM %s WHERE ", relname
);
2414 for (i
= 0; i
< pknumatts
; i
++)
2416 int pkattnum
= pkattnums
[i
];
2417 Form_pg_attribute attr
= TupleDescAttr(tupdesc
, pkattnum
);
2420 appendStringInfoString(&buf
, " AND ");
2422 appendStringInfoString(&buf
,
2423 quote_ident_cstr(NameStr(attr
->attname
)));
2425 if (src_pkattvals
[i
] != NULL
)
2426 appendStringInfo(&buf
, " = %s",
2427 quote_literal_cstr(src_pkattvals
[i
]));
2429 appendStringInfoString(&buf
, " IS NULL");
2433 * Retrieve the desired tuple
2435 ret
= SPI_exec(buf
.data
, 0);
2439 * Only allow one qualifying tuple
2441 if ((ret
== SPI_OK_SELECT
) && (SPI_processed
> 1))
2443 (errcode(ERRCODE_CARDINALITY_VIOLATION
),
2444 errmsg("source criteria matched more than one record")));
2446 else if (ret
== SPI_OK_SELECT
&& SPI_processed
== 1)
2448 SPITupleTable
*tuptable
= SPI_tuptable
;
2450 tuple
= SPI_copytuple(tuptable
->vals
[0]);
2458 * no qualifying tuples
2466 * never reached, but keep compiler quiet
2472 * Open the relation named by relname_text, acquire specified type of lock,
2473 * verify we have specified permissions.
2474 * Caller must close rel when done with it.
2477 get_rel_from_relname(text
*relname_text
, LOCKMODE lockmode
, AclMode aclmode
)
2481 AclResult aclresult
;
2483 relvar
= makeRangeVarFromNameList(textToQualifiedNameList(relname_text
));
2484 rel
= table_openrv(relvar
, lockmode
);
2486 aclresult
= pg_class_aclcheck(RelationGetRelid(rel
), GetUserId(),
2488 if (aclresult
!= ACLCHECK_OK
)
2489 aclcheck_error(aclresult
, get_relkind_objtype(rel
->rd_rel
->relkind
),
2490 RelationGetRelationName(rel
));
2496 * generate_relation_name - copied from ruleutils.c
2497 * Compute the name to display for a relation
2499 * The result includes all necessary quoting and schema-prefixing.
2502 generate_relation_name(Relation rel
)
2507 /* Qualify the name if not visible in search path */
2508 if (RelationIsVisible(RelationGetRelid(rel
)))
2511 nspname
= get_namespace_name(rel
->rd_rel
->relnamespace
);
2513 result
= quote_qualified_identifier(nspname
, RelationGetRelationName(rel
));
2520 getConnectionByName(const char *name
)
2522 remoteConnHashEnt
*hentry
;
2525 if (!remoteConnHash
)
2526 remoteConnHash
= createConnHash();
2528 key
= pstrdup(name
);
2529 truncate_identifier(key
, strlen(key
), false);
2530 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2531 key
, HASH_FIND
, NULL
);
2534 return hentry
->rconn
;
2540 createConnHash(void)
2544 ctl
.keysize
= NAMEDATALEN
;
2545 ctl
.entrysize
= sizeof(remoteConnHashEnt
);
2547 return hash_create("Remote Con hash", NUMCONN
, &ctl
,
2548 HASH_ELEM
| HASH_STRINGS
);
2552 createNewConnection(const char *name
, remoteConn
*rconn
)
2554 remoteConnHashEnt
*hentry
;
2558 if (!remoteConnHash
)
2559 remoteConnHash
= createConnHash();
2561 key
= pstrdup(name
);
2562 truncate_identifier(key
, strlen(key
), true);
2563 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
, key
,
2564 HASH_ENTER
, &found
);
2568 libpqsrv_disconnect(rconn
->conn
);
2572 (errcode(ERRCODE_DUPLICATE_OBJECT
),
2573 errmsg("duplicate connection name")));
2576 hentry
->rconn
= rconn
;
2577 strlcpy(hentry
->name
, name
, sizeof(hentry
->name
));
2581 deleteConnection(const char *name
)
2583 remoteConnHashEnt
*hentry
;
2587 if (!remoteConnHash
)
2588 remoteConnHash
= createConnHash();
2590 key
= pstrdup(name
);
2591 truncate_identifier(key
, strlen(key
), false);
2592 hentry
= (remoteConnHashEnt
*) hash_search(remoteConnHash
,
2593 key
, HASH_REMOVE
, &found
);
2597 (errcode(ERRCODE_UNDEFINED_OBJECT
),
2598 errmsg("undefined connection name")));
2602 * We need to make sure that the connection made used credentials
2603 * which were provided by the user, so check what credentials were
2604 * used to connect and then make sure that they came from the user.
2607 dblink_security_check(PGconn
*conn
, remoteConn
*rconn
, const char *connstr
)
2609 /* Superuser bypasses security check */
2613 /* If password was used to connect, make sure it was one provided */
2614 if (PQconnectionUsedPassword(conn
) && dblink_connstr_has_pw(connstr
))
2618 /* If GSSAPI creds used to connect, make sure it was one delegated */
2619 if (PQconnectionUsedGSSAPI(conn
) && be_gssapi_get_delegation(MyProcPort
))
2623 /* Otherwise, fail out */
2624 libpqsrv_disconnect(conn
);
2629 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2630 errmsg("password or GSSAPI delegated credentials required"),
2631 errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2632 errhint("Ensure provided credentials match target server's authentication method.")));
2636 * Function to check if the connection string includes an explicit
2637 * password, needed to ensure that non-superuser password-based auth
2638 * is using a provided password and not one picked up from the
2642 dblink_connstr_has_pw(const char *connstr
)
2644 PQconninfoOption
*options
;
2645 PQconninfoOption
*option
;
2646 bool connstr_gives_password
= false;
2648 options
= PQconninfoParse(connstr
, NULL
);
2651 for (option
= options
; option
->keyword
!= NULL
; option
++)
2653 if (strcmp(option
->keyword
, "password") == 0)
2655 if (option
->val
!= NULL
&& option
->val
[0] != '\0')
2657 connstr_gives_password
= true;
2662 PQconninfoFree(options
);
2665 return connstr_gives_password
;
2669 * For non-superusers, insist that the connstr specify a password, except
2670 * if GSSAPI credentials have been delegated (and we check that they are used
2671 * for the connection in dblink_security_check later). This prevents a
2672 * password or GSSAPI credentials from being picked up from .pgpass, a
2673 * service file, the environment, etc. We don't want the postgres user's
2674 * passwords or Kerberos credentials to be accessible to non-superusers.
2677 dblink_connstr_check(const char *connstr
)
2682 if (dblink_connstr_has_pw(connstr
))
2686 if (be_gssapi_get_delegation(MyProcPort
))
2691 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
2692 errmsg("password or GSSAPI delegated credentials required"),
2693 errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2697 * Report an error received from the remote server
2699 * res: the received error result (will be freed)
2700 * fail: true for ERROR ereport, false for NOTICE
2701 * fmt and following args: sprintf-style format and values for errcontext;
2702 * the resulting string should be worded like "while <some action>"
2705 dblink_res_error(PGconn
*conn
, const char *conname
, PGresult
*res
,
2706 bool fail
, const char *fmt
,...)
2709 char *pg_diag_sqlstate
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
2710 char *pg_diag_message_primary
= PQresultErrorField(res
, PG_DIAG_MESSAGE_PRIMARY
);
2711 char *pg_diag_message_detail
= PQresultErrorField(res
, PG_DIAG_MESSAGE_DETAIL
);
2712 char *pg_diag_message_hint
= PQresultErrorField(res
, PG_DIAG_MESSAGE_HINT
);
2713 char *pg_diag_context
= PQresultErrorField(res
, PG_DIAG_CONTEXT
);
2715 char *message_primary
;
2716 char *message_detail
;
2718 char *message_context
;
2720 char dblink_context_msg
[512];
2727 if (pg_diag_sqlstate
)
2728 sqlstate
= MAKE_SQLSTATE(pg_diag_sqlstate
[0],
2729 pg_diag_sqlstate
[1],
2730 pg_diag_sqlstate
[2],
2731 pg_diag_sqlstate
[3],
2732 pg_diag_sqlstate
[4]);
2734 sqlstate
= ERRCODE_CONNECTION_FAILURE
;
2736 message_primary
= xpstrdup(pg_diag_message_primary
);
2737 message_detail
= xpstrdup(pg_diag_message_detail
);
2738 message_hint
= xpstrdup(pg_diag_message_hint
);
2739 message_context
= xpstrdup(pg_diag_context
);
2742 * If we don't get a message from the PGresult, try the PGconn. This is
2743 * needed because for connection-level failures, PQexec may just return
2744 * NULL, not a PGresult at all.
2746 if (message_primary
== NULL
)
2747 message_primary
= pchomp(PQerrorMessage(conn
));
2750 * Now that we've copied all the data we need out of the PGresult, it's
2751 * safe to free it. We must do this to avoid PGresult leakage. We're
2752 * leaking all the strings too, but those are in palloc'd memory that will
2753 * get cleaned up eventually.
2758 * Format the basic errcontext string. Below, we'll add on something
2759 * about the connection name. That's a violation of the translatability
2760 * guidelines about constructing error messages out of parts, but since
2761 * there's no translation support for dblink, there's no need to worry
2765 vsnprintf(dblink_context_msg
, sizeof(dblink_context_msg
), fmt
, ap
);
2770 (message_primary
!= NULL
&& message_primary
[0] != '\0') ?
2771 errmsg_internal("%s", message_primary
) :
2772 errmsg("could not obtain message string for remote error"),
2773 message_detail
? errdetail_internal("%s", message_detail
) : 0,
2774 message_hint
? errhint("%s", message_hint
) : 0,
2775 message_context
? (errcontext("%s", message_context
)) : 0,
2777 (errcontext("%s on dblink connection named \"%s\"",
2778 dblink_context_msg
, conname
)) :
2779 (errcontext("%s on unnamed dblink connection",
2780 dblink_context_msg
))));
2784 * Obtain connection string for a foreign server
2787 get_connect_string(const char *servername
)
2789 ForeignServer
*foreign_server
= NULL
;
2790 UserMapping
*user_mapping
;
2793 ForeignDataWrapper
*fdw
;
2794 AclResult aclresult
;
2797 static const PQconninfoOption
*options
= NULL
;
2799 initStringInfo(&buf
);
2802 * Get list of valid libpq options.
2804 * To avoid unnecessary work, we get the list once and use it throughout
2805 * the lifetime of this backend process. We don't need to care about
2806 * memory context issues, because PQconndefaults allocates with malloc.
2810 options
= PQconndefaults();
2811 if (!options
) /* assume reason for failure is OOM */
2813 (errcode(ERRCODE_FDW_OUT_OF_MEMORY
),
2814 errmsg("out of memory"),
2815 errdetail("Could not get libpq's default connection options.")));
2818 /* first gather the server connstr options */
2819 srvname
= pstrdup(servername
);
2820 truncate_identifier(srvname
, strlen(srvname
), false);
2821 foreign_server
= GetForeignServerByName(srvname
, true);
2825 Oid serverid
= foreign_server
->serverid
;
2826 Oid fdwid
= foreign_server
->fdwid
;
2827 Oid userid
= GetUserId();
2829 user_mapping
= GetUserMapping(userid
, serverid
);
2830 fdw
= GetForeignDataWrapper(fdwid
);
2832 /* Check permissions, user must have usage on the server. */
2833 aclresult
= object_aclcheck(ForeignServerRelationId
, serverid
, userid
, ACL_USAGE
);
2834 if (aclresult
!= ACLCHECK_OK
)
2835 aclcheck_error(aclresult
, OBJECT_FOREIGN_SERVER
, foreign_server
->servername
);
2837 foreach(cell
, fdw
->options
)
2839 DefElem
*def
= lfirst(cell
);
2841 if (is_valid_dblink_option(options
, def
->defname
, ForeignDataWrapperRelationId
))
2842 appendStringInfo(&buf
, "%s='%s' ", def
->defname
,
2843 escape_param_str(strVal(def
->arg
)));
2846 foreach(cell
, foreign_server
->options
)
2848 DefElem
*def
= lfirst(cell
);
2850 if (is_valid_dblink_option(options
, def
->defname
, ForeignServerRelationId
))
2851 appendStringInfo(&buf
, "%s='%s' ", def
->defname
,
2852 escape_param_str(strVal(def
->arg
)));
2855 foreach(cell
, user_mapping
->options
)
2858 DefElem
*def
= lfirst(cell
);
2860 if (is_valid_dblink_option(options
, def
->defname
, UserMappingRelationId
))
2861 appendStringInfo(&buf
, "%s='%s' ", def
->defname
,
2862 escape_param_str(strVal(def
->arg
)));
2872 * Escaping libpq connect parameter strings.
2874 * Replaces "'" with "\'" and "\" with "\\".
2877 escape_param_str(const char *str
)
2882 initStringInfo(&buf
);
2884 for (cp
= str
; *cp
; cp
++)
2886 if (*cp
== '\\' || *cp
== '\'')
2887 appendStringInfoChar(&buf
, '\\');
2888 appendStringInfoChar(&buf
, *cp
);
2895 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2896 * functions, and translate to the internal representation.
2898 * The user supplies an int2vector of 1-based logical attnums, plus a count
2899 * argument (the need for the separate count argument is historical, but we
2900 * still check it). We check that each attnum corresponds to a valid,
2901 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2902 * listed twice, though the actual use-case for such things is dubious.
2903 * Note that before Postgres 9.0, the user's attnums were interpreted as
2904 * physical not logical column numbers; this was changed for future-proofing.
2906 * The internal representation is a palloc'd int array of 0-based physical
2910 validate_pkattnums(Relation rel
,
2911 int2vector
*pkattnums_arg
, int32 pknumatts_arg
,
2912 int **pkattnums
, int *pknumatts
)
2914 TupleDesc tupdesc
= rel
->rd_att
;
2915 int natts
= tupdesc
->natts
;
2918 /* Don't take more array elements than there are */
2919 pknumatts_arg
= Min(pknumatts_arg
, pkattnums_arg
->dim1
);
2921 /* Must have at least one pk attnum selected */
2922 if (pknumatts_arg
<= 0)
2924 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
2925 errmsg("number of key attributes must be > 0")));
2927 /* Allocate output array */
2928 *pkattnums
= palloc_array(int, pknumatts_arg
);
2929 *pknumatts
= pknumatts_arg
;
2931 /* Validate attnums and convert to internal form */
2932 for (i
= 0; i
< pknumatts_arg
; i
++)
2934 int pkattnum
= pkattnums_arg
->values
[i
];
2938 /* Can throw error immediately if out of range */
2939 if (pkattnum
<= 0 || pkattnum
> natts
)
2941 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
2942 errmsg("invalid attribute number %d", pkattnum
)));
2944 /* Identify which physical column has this logical number */
2946 for (j
= 0; j
< natts
; j
++)
2948 /* dropped columns don't count */
2949 if (TupleDescAttr(tupdesc
, j
)->attisdropped
)
2952 if (++lnum
== pkattnum
)
2957 (*pkattnums
)[i
] = j
;
2960 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
2961 errmsg("invalid attribute number %d", pkattnum
)));
2966 * Check if the specified connection option is valid.
2968 * We basically allow whatever libpq thinks is an option, with these
2970 * debug options: disallowed
2971 * "client_encoding": disallowed
2972 * "user": valid only in USER MAPPING options
2973 * secure options (eg password): valid only in USER MAPPING options
2974 * others: valid only in FOREIGN SERVER options
2976 * We disallow client_encoding because it would be overridden anyway via
2977 * PQclientEncoding; allowing it to be specified would merely promote
2981 is_valid_dblink_option(const PQconninfoOption
*options
, const char *option
,
2984 const PQconninfoOption
*opt
;
2986 /* Look up the option in libpq result */
2987 for (opt
= options
; opt
->keyword
; opt
++)
2989 if (strcmp(opt
->keyword
, option
) == 0)
2992 if (opt
->keyword
== NULL
)
2995 /* Disallow debug options (particularly "replication") */
2996 if (strchr(opt
->dispchar
, 'D'))
2999 /* Disallow "client_encoding" */
3000 if (strcmp(opt
->keyword
, "client_encoding") == 0)
3004 * If the option is "user" or marked secure, it should be specified only
3005 * in USER MAPPING. Others should be specified only in SERVER.
3007 if (strcmp(opt
->keyword
, "user") == 0 || strchr(opt
->dispchar
, '*'))
3009 if (context
!= UserMappingRelationId
)
3014 if (context
!= ForeignServerRelationId
)
3022 * Copy the remote session's values of GUCs that affect datatype I/O
3023 * and apply them locally in a new GUC nesting level. Returns the new
3024 * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3025 * or -1 if no new nestlevel was needed.
3027 * We use the equivalent of a function SET option to allow the settings to
3028 * persist only until the caller calls restoreLocalGucs. If an error is
3029 * thrown in between, guc.c will take care of undoing the settings.
3032 applyRemoteGucs(PGconn
*conn
)
3034 static const char *const GUCsAffectingIO
[] = {
3042 for (i
= 0; i
< lengthof(GUCsAffectingIO
); i
++)
3044 const char *gucName
= GUCsAffectingIO
[i
];
3045 const char *remoteVal
= PQparameterStatus(conn
, gucName
);
3046 const char *localVal
;
3049 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3050 * that's okay because its output format won't be ambiguous. So just
3051 * skip the GUC if we don't get a value for it. (We might eventually
3052 * need more complicated logic with remote-version checks here.)
3054 if (remoteVal
== NULL
)
3058 * Avoid GUC-setting overhead if the remote and local GUCs already
3059 * have the same value.
3061 localVal
= GetConfigOption(gucName
, false, false);
3062 Assert(localVal
!= NULL
);
3064 if (strcmp(remoteVal
, localVal
) == 0)
3067 /* Create new GUC nest level if we didn't already */
3069 nestlevel
= NewGUCNestLevel();
3071 /* Apply the option (this will throw error on failure) */
3072 (void) set_config_option(gucName
, remoteVal
,
3073 PGC_USERSET
, PGC_S_SESSION
,
3074 GUC_ACTION_SAVE
, true, 0, false);
3081 * Restore local GUCs after they have been overlaid with remote settings.
3084 restoreLocalGucs(int nestlevel
)
3086 /* Do nothing if no new nestlevel was created */
3088 AtEOXact_GUC(true, nestlevel
);