README: remove duplicate download link & mention related softw.
[pgsql.git] / contrib / dblink / dblink.c
blob195b278f55903bf48cd12fe784e2d27beea23036
1 /*
2 * dblink.c
4 * Functions returning results from a remote database
6 * Joe Conway <mail@joeconway.com>
7 * And contributors:
8 * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
11 * contrib/dblink/dblink.c
12 * Copyright (c) 2001-2023, PostgreSQL Global Development Group
13 * ALL RIGHTS RESERVED;
15 * Permission to use, copy, modify, and distribute this software and its
16 * documentation for any purpose, without fee, and without a written agreement
17 * is hereby granted, provided that the above copyright notice and this
18 * paragraph and the following two paragraphs appear in all copies.
20 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
26 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
33 #include "postgres.h"
35 #include <limits.h>
37 #include "access/htup_details.h"
38 #include "access/relation.h"
39 #include "access/reloptions.h"
40 #include "access/table.h"
41 #include "catalog/namespace.h"
42 #include "catalog/pg_foreign_data_wrapper.h"
43 #include "catalog/pg_foreign_server.h"
44 #include "catalog/pg_type.h"
45 #include "catalog/pg_user_mapping.h"
46 #include "executor/spi.h"
47 #include "foreign/foreign.h"
48 #include "funcapi.h"
49 #include "lib/stringinfo.h"
50 #include "libpq-fe.h"
51 #include "libpq/libpq-be.h"
52 #include "libpq/libpq-be-fe-helpers.h"
53 #include "mb/pg_wchar.h"
54 #include "miscadmin.h"
55 #include "parser/scansup.h"
56 #include "utils/acl.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/guc.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/rel.h"
63 #include "utils/varlena.h"
65 PG_MODULE_MAGIC;
67 typedef struct remoteConn
69 PGconn *conn; /* Hold the remote connection */
70 int openCursorCount; /* The number of open cursors */
71 bool newXactForCursor; /* Opened a transaction for a cursor */
72 } remoteConn;
74 typedef struct storeInfo
76 FunctionCallInfo fcinfo;
77 Tuplestorestate *tuplestore;
78 AttInMetadata *attinmeta;
79 MemoryContext tmpcontext;
80 char **cstrs;
81 /* temp storage for results to avoid leaks on exception */
82 PGresult *last_res;
83 PGresult *cur_res;
84 } storeInfo;
87 * Internal declarations
89 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
90 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
91 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
92 PGresult *res);
93 static void materializeQueryResult(FunctionCallInfo fcinfo,
94 PGconn *conn,
95 const char *conname,
96 const char *sql,
97 bool fail);
98 static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
99 static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
100 static remoteConn *getConnectionByName(const char *name);
101 static HTAB *createConnHash(void);
102 static void createNewConnection(const char *name, remoteConn *rconn);
103 static void deleteConnection(const char *name);
104 static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
105 static char **get_text_array_contents(ArrayType *array, int *numitems);
106 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
107 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
108 static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
109 static char *quote_ident_cstr(char *rawstr);
110 static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
111 static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
112 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
113 static char *generate_relation_name(Relation rel);
114 static void dblink_connstr_check(const char *connstr);
115 static bool dblink_connstr_has_pw(const char *connstr);
116 static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
117 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
118 bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
119 static char *get_connect_string(const char *servername);
120 static char *escape_param_str(const char *str);
121 static void validate_pkattnums(Relation rel,
122 int2vector *pkattnums_arg, int32 pknumatts_arg,
123 int **pkattnums, int *pknumatts);
124 static bool is_valid_dblink_option(const PQconninfoOption *options,
125 const char *option, Oid context);
126 static int applyRemoteGucs(PGconn *conn);
127 static void restoreLocalGucs(int nestlevel);
129 /* Global */
130 static remoteConn *pconn = NULL;
131 static HTAB *remoteConnHash = NULL;
133 /* custom wait event values, retrieved from shared memory */
134 static uint32 dblink_we_connect = 0;
135 static uint32 dblink_we_get_conn = 0;
138 * Following is list that holds multiple remote connections.
139 * Calling convention of each dblink function changes to accept
140 * connection name as the first parameter. The connection list is
141 * much like ecpg e.g. a mapping between a name and a PGconn object.
144 typedef struct remoteConnHashEnt
146 char name[NAMEDATALEN];
147 remoteConn *rconn;
148 } remoteConnHashEnt;
150 /* initial number of connection hashes */
151 #define NUMCONN 16
153 static char *
154 xpstrdup(const char *in)
156 if (in == NULL)
157 return NULL;
158 return pstrdup(in);
161 static void
162 pg_attribute_noreturn()
163 dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
165 char *msg = pchomp(PQerrorMessage(conn));
167 PQclear(res);
168 elog(ERROR, "%s: %s", p2, msg);
171 static void
172 pg_attribute_noreturn()
173 dblink_conn_not_avail(const char *conname)
175 if (conname)
176 ereport(ERROR,
177 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
178 errmsg("connection \"%s\" not available", conname)));
179 else
180 ereport(ERROR,
181 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
182 errmsg("connection not available")));
185 static void
186 dblink_get_conn(char *conname_or_str,
187 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
189 remoteConn *rconn = getConnectionByName(conname_or_str);
190 PGconn *conn;
191 char *conname;
192 bool freeconn;
194 if (rconn)
196 conn = rconn->conn;
197 conname = conname_or_str;
198 freeconn = false;
200 else
202 const char *connstr;
204 connstr = get_connect_string(conname_or_str);
205 if (connstr == NULL)
206 connstr = conname_or_str;
207 dblink_connstr_check(connstr);
209 /* first time, allocate or get the custom wait event */
210 if (dblink_we_get_conn == 0)
211 dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
213 /* OK to make connection */
214 conn = libpqsrv_connect(connstr, dblink_we_get_conn);
216 if (PQstatus(conn) == CONNECTION_BAD)
218 char *msg = pchomp(PQerrorMessage(conn));
220 libpqsrv_disconnect(conn);
221 ereport(ERROR,
222 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
223 errmsg("could not establish connection"),
224 errdetail_internal("%s", msg)));
226 dblink_security_check(conn, rconn, connstr);
227 if (PQclientEncoding(conn) != GetDatabaseEncoding())
228 PQsetClientEncoding(conn, GetDatabaseEncodingName());
229 freeconn = true;
230 conname = NULL;
233 *conn_p = conn;
234 *conname_p = conname;
235 *freeconn_p = freeconn;
238 static PGconn *
239 dblink_get_named_conn(const char *conname)
241 remoteConn *rconn = getConnectionByName(conname);
243 if (rconn)
244 return rconn->conn;
246 dblink_conn_not_avail(conname);
247 return NULL; /* keep compiler quiet */
250 static void
251 dblink_init(void)
253 if (!pconn)
255 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
256 pconn->conn = NULL;
257 pconn->openCursorCount = 0;
258 pconn->newXactForCursor = false;
263 * Create a persistent connection to another database
265 PG_FUNCTION_INFO_V1(dblink_connect);
266 Datum
267 dblink_connect(PG_FUNCTION_ARGS)
269 char *conname_or_str = NULL;
270 char *connstr = NULL;
271 char *connname = NULL;
272 char *msg;
273 PGconn *conn = NULL;
274 remoteConn *rconn = NULL;
276 dblink_init();
278 if (PG_NARGS() == 2)
280 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
281 connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
283 else if (PG_NARGS() == 1)
284 conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
286 if (connname)
288 rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
289 sizeof(remoteConn));
290 rconn->conn = NULL;
291 rconn->openCursorCount = 0;
292 rconn->newXactForCursor = false;
295 /* first check for valid foreign data server */
296 connstr = get_connect_string(conname_or_str);
297 if (connstr == NULL)
298 connstr = conname_or_str;
300 /* check password in connection string if not superuser */
301 dblink_connstr_check(connstr);
303 /* first time, allocate or get the custom wait event */
304 if (dblink_we_connect == 0)
305 dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
307 /* OK to make connection */
308 conn = libpqsrv_connect(connstr, dblink_we_connect);
310 if (PQstatus(conn) == CONNECTION_BAD)
312 msg = pchomp(PQerrorMessage(conn));
313 libpqsrv_disconnect(conn);
314 if (rconn)
315 pfree(rconn);
317 ereport(ERROR,
318 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
319 errmsg("could not establish connection"),
320 errdetail_internal("%s", msg)));
323 /* check password actually used if not superuser */
324 dblink_security_check(conn, rconn, connstr);
326 /* attempt to set client encoding to match server encoding, if needed */
327 if (PQclientEncoding(conn) != GetDatabaseEncoding())
328 PQsetClientEncoding(conn, GetDatabaseEncodingName());
330 if (connname)
332 rconn->conn = conn;
333 createNewConnection(connname, rconn);
335 else
337 if (pconn->conn)
338 libpqsrv_disconnect(pconn->conn);
339 pconn->conn = conn;
342 PG_RETURN_TEXT_P(cstring_to_text("OK"));
346 * Clear a persistent connection to another database
348 PG_FUNCTION_INFO_V1(dblink_disconnect);
349 Datum
350 dblink_disconnect(PG_FUNCTION_ARGS)
352 char *conname = NULL;
353 remoteConn *rconn = NULL;
354 PGconn *conn = NULL;
356 dblink_init();
358 if (PG_NARGS() == 1)
360 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
361 rconn = getConnectionByName(conname);
362 if (rconn)
363 conn = rconn->conn;
365 else
366 conn = pconn->conn;
368 if (!conn)
369 dblink_conn_not_avail(conname);
371 libpqsrv_disconnect(conn);
372 if (rconn)
374 deleteConnection(conname);
375 pfree(rconn);
377 else
378 pconn->conn = NULL;
380 PG_RETURN_TEXT_P(cstring_to_text("OK"));
384 * opens a cursor using a persistent connection
386 PG_FUNCTION_INFO_V1(dblink_open);
387 Datum
388 dblink_open(PG_FUNCTION_ARGS)
390 PGresult *res = NULL;
391 PGconn *conn;
392 char *curname = NULL;
393 char *sql = NULL;
394 char *conname = NULL;
395 StringInfoData buf;
396 remoteConn *rconn = NULL;
397 bool fail = true; /* default to backward compatible behavior */
399 dblink_init();
400 initStringInfo(&buf);
402 if (PG_NARGS() == 2)
404 /* text,text */
405 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
406 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
407 rconn = pconn;
409 else if (PG_NARGS() == 3)
411 /* might be text,text,text or text,text,bool */
412 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
414 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
415 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
416 fail = PG_GETARG_BOOL(2);
417 rconn = pconn;
419 else
421 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
422 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
423 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
424 rconn = getConnectionByName(conname);
427 else if (PG_NARGS() == 4)
429 /* text,text,text,bool */
430 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
431 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
432 sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
433 fail = PG_GETARG_BOOL(3);
434 rconn = getConnectionByName(conname);
437 if (!rconn || !rconn->conn)
438 dblink_conn_not_avail(conname);
440 conn = rconn->conn;
442 /* If we are not in a transaction, start one */
443 if (PQtransactionStatus(conn) == PQTRANS_IDLE)
445 res = PQexec(conn, "BEGIN");
446 if (PQresultStatus(res) != PGRES_COMMAND_OK)
447 dblink_res_internalerror(conn, res, "begin error");
448 PQclear(res);
449 rconn->newXactForCursor = true;
452 * Since transaction state was IDLE, we force cursor count to
453 * initially be 0. This is needed as a previous ABORT might have wiped
454 * out our transaction without maintaining the cursor count for us.
456 rconn->openCursorCount = 0;
459 /* if we started a transaction, increment cursor count */
460 if (rconn->newXactForCursor)
461 (rconn->openCursorCount)++;
463 appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
464 res = PQexec(conn, buf.data);
465 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
467 dblink_res_error(conn, conname, res, fail,
468 "while opening cursor \"%s\"", curname);
469 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
472 PQclear(res);
473 PG_RETURN_TEXT_P(cstring_to_text("OK"));
477 * closes a cursor
479 PG_FUNCTION_INFO_V1(dblink_close);
480 Datum
481 dblink_close(PG_FUNCTION_ARGS)
483 PGconn *conn;
484 PGresult *res = NULL;
485 char *curname = NULL;
486 char *conname = NULL;
487 StringInfoData buf;
488 remoteConn *rconn = NULL;
489 bool fail = true; /* default to backward compatible behavior */
491 dblink_init();
492 initStringInfo(&buf);
494 if (PG_NARGS() == 1)
496 /* text */
497 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
498 rconn = pconn;
500 else if (PG_NARGS() == 2)
502 /* might be text,text or text,bool */
503 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
505 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
506 fail = PG_GETARG_BOOL(1);
507 rconn = pconn;
509 else
511 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
512 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
513 rconn = getConnectionByName(conname);
516 if (PG_NARGS() == 3)
518 /* text,text,bool */
519 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
520 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
521 fail = PG_GETARG_BOOL(2);
522 rconn = getConnectionByName(conname);
525 if (!rconn || !rconn->conn)
526 dblink_conn_not_avail(conname);
528 conn = rconn->conn;
530 appendStringInfo(&buf, "CLOSE %s", curname);
532 /* close the cursor */
533 res = PQexec(conn, buf.data);
534 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
536 dblink_res_error(conn, conname, res, fail,
537 "while closing cursor \"%s\"", curname);
538 PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
541 PQclear(res);
543 /* if we started a transaction, decrement cursor count */
544 if (rconn->newXactForCursor)
546 (rconn->openCursorCount)--;
548 /* if count is zero, commit the transaction */
549 if (rconn->openCursorCount == 0)
551 rconn->newXactForCursor = false;
553 res = PQexec(conn, "COMMIT");
554 if (PQresultStatus(res) != PGRES_COMMAND_OK)
555 dblink_res_internalerror(conn, res, "commit error");
556 PQclear(res);
560 PG_RETURN_TEXT_P(cstring_to_text("OK"));
564 * Fetch results from an open cursor
566 PG_FUNCTION_INFO_V1(dblink_fetch);
567 Datum
568 dblink_fetch(PG_FUNCTION_ARGS)
570 PGresult *res = NULL;
571 char *conname = NULL;
572 remoteConn *rconn = NULL;
573 PGconn *conn = NULL;
574 StringInfoData buf;
575 char *curname = NULL;
576 int howmany = 0;
577 bool fail = true; /* default to backward compatible */
579 prepTuplestoreResult(fcinfo);
581 dblink_init();
583 if (PG_NARGS() == 4)
585 /* text,text,int,bool */
586 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
587 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
588 howmany = PG_GETARG_INT32(2);
589 fail = PG_GETARG_BOOL(3);
591 rconn = getConnectionByName(conname);
592 if (rconn)
593 conn = rconn->conn;
595 else if (PG_NARGS() == 3)
597 /* text,text,int or text,int,bool */
598 if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
600 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
601 howmany = PG_GETARG_INT32(1);
602 fail = PG_GETARG_BOOL(2);
603 conn = pconn->conn;
605 else
607 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
608 curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
609 howmany = PG_GETARG_INT32(2);
611 rconn = getConnectionByName(conname);
612 if (rconn)
613 conn = rconn->conn;
616 else if (PG_NARGS() == 2)
618 /* text,int */
619 curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
620 howmany = PG_GETARG_INT32(1);
621 conn = pconn->conn;
624 if (!conn)
625 dblink_conn_not_avail(conname);
627 initStringInfo(&buf);
628 appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
631 * Try to execute the query. Note that since libpq uses malloc, the
632 * PGresult will be long-lived even though we are still in a short-lived
633 * memory context.
635 res = PQexec(conn, buf.data);
636 if (!res ||
637 (PQresultStatus(res) != PGRES_COMMAND_OK &&
638 PQresultStatus(res) != PGRES_TUPLES_OK))
640 dblink_res_error(conn, conname, res, fail,
641 "while fetching from cursor \"%s\"", curname);
642 return (Datum) 0;
644 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
646 /* cursor does not exist - closed already or bad name */
647 PQclear(res);
648 ereport(ERROR,
649 (errcode(ERRCODE_INVALID_CURSOR_NAME),
650 errmsg("cursor \"%s\" does not exist", curname)));
653 materializeResult(fcinfo, conn, res);
654 return (Datum) 0;
658 * Note: this is the new preferred version of dblink
660 PG_FUNCTION_INFO_V1(dblink_record);
661 Datum
662 dblink_record(PG_FUNCTION_ARGS)
664 return dblink_record_internal(fcinfo, false);
667 PG_FUNCTION_INFO_V1(dblink_send_query);
668 Datum
669 dblink_send_query(PG_FUNCTION_ARGS)
671 PGconn *conn;
672 char *sql;
673 int retval;
675 if (PG_NARGS() == 2)
677 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
678 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
680 else
681 /* shouldn't happen */
682 elog(ERROR, "wrong number of arguments");
684 /* async query send */
685 retval = PQsendQuery(conn, sql);
686 if (retval != 1)
687 elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
689 PG_RETURN_INT32(retval);
692 PG_FUNCTION_INFO_V1(dblink_get_result);
693 Datum
694 dblink_get_result(PG_FUNCTION_ARGS)
696 return dblink_record_internal(fcinfo, true);
699 static Datum
700 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
702 PGconn *volatile conn = NULL;
703 volatile bool freeconn = false;
705 prepTuplestoreResult(fcinfo);
707 dblink_init();
709 PG_TRY();
711 char *sql = NULL;
712 char *conname = NULL;
713 bool fail = true; /* default to backward compatible */
715 if (!is_async)
717 if (PG_NARGS() == 3)
719 /* text,text,bool */
720 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
721 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
722 fail = PG_GETARG_BOOL(2);
723 dblink_get_conn(conname, &conn, &conname, &freeconn);
725 else if (PG_NARGS() == 2)
727 /* text,text or text,bool */
728 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
730 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
731 fail = PG_GETARG_BOOL(1);
732 conn = pconn->conn;
734 else
736 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
737 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
738 dblink_get_conn(conname, &conn, &conname, &freeconn);
741 else if (PG_NARGS() == 1)
743 /* text */
744 conn = pconn->conn;
745 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
747 else
748 /* shouldn't happen */
749 elog(ERROR, "wrong number of arguments");
751 else /* is_async */
753 /* get async result */
754 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
756 if (PG_NARGS() == 2)
758 /* text,bool */
759 fail = PG_GETARG_BOOL(1);
760 conn = dblink_get_named_conn(conname);
762 else if (PG_NARGS() == 1)
764 /* text */
765 conn = dblink_get_named_conn(conname);
767 else
768 /* shouldn't happen */
769 elog(ERROR, "wrong number of arguments");
772 if (!conn)
773 dblink_conn_not_avail(conname);
775 if (!is_async)
777 /* synchronous query, use efficient tuple collection method */
778 materializeQueryResult(fcinfo, conn, conname, sql, fail);
780 else
782 /* async result retrieval, do it the old way */
783 PGresult *res = PQgetResult(conn);
785 /* NULL means we're all done with the async results */
786 if (res)
788 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
789 PQresultStatus(res) != PGRES_TUPLES_OK)
791 dblink_res_error(conn, conname, res, fail,
792 "while executing query");
793 /* if fail isn't set, we'll return an empty query result */
795 else
797 materializeResult(fcinfo, conn, res);
802 PG_FINALLY();
804 /* if needed, close the connection to the database */
805 if (freeconn)
806 libpqsrv_disconnect(conn);
808 PG_END_TRY();
810 return (Datum) 0;
814 * Verify function caller can handle a tuplestore result, and set up for that.
816 * Note: if the caller returns without actually creating a tuplestore, the
817 * executor will treat the function result as an empty set.
819 static void
820 prepTuplestoreResult(FunctionCallInfo fcinfo)
822 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
824 /* check to see if query supports us returning a tuplestore */
825 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
826 ereport(ERROR,
827 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
828 errmsg("set-valued function called in context that cannot accept a set")));
829 if (!(rsinfo->allowedModes & SFRM_Materialize))
830 ereport(ERROR,
831 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
832 errmsg("materialize mode required, but it is not allowed in this context")));
834 /* let the executor know we're sending back a tuplestore */
835 rsinfo->returnMode = SFRM_Materialize;
837 /* caller must fill these to return a non-empty result */
838 rsinfo->setResult = NULL;
839 rsinfo->setDesc = NULL;
843 * Copy the contents of the PGresult into a tuplestore to be returned
844 * as the result of the current function.
845 * The PGresult will be released in this function.
847 static void
848 materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
850 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
852 /* prepTuplestoreResult must have been called previously */
853 Assert(rsinfo->returnMode == SFRM_Materialize);
855 PG_TRY();
857 TupleDesc tupdesc;
858 bool is_sql_cmd;
859 int ntuples;
860 int nfields;
862 if (PQresultStatus(res) == PGRES_COMMAND_OK)
864 is_sql_cmd = true;
867 * need a tuple descriptor representing one TEXT column to return
868 * the command status string as our result tuple
870 tupdesc = CreateTemplateTupleDesc(1);
871 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
872 TEXTOID, -1, 0);
873 ntuples = 1;
874 nfields = 1;
876 else
878 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
880 is_sql_cmd = false;
882 /* get a tuple descriptor for our result type */
883 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
885 case TYPEFUNC_COMPOSITE:
886 /* success */
887 break;
888 case TYPEFUNC_RECORD:
889 /* failed to determine actual type of RECORD */
890 ereport(ERROR,
891 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
892 errmsg("function returning record called in context "
893 "that cannot accept type record")));
894 break;
895 default:
896 /* result type isn't composite */
897 elog(ERROR, "return type must be a row type");
898 break;
901 /* make sure we have a persistent copy of the tupdesc */
902 tupdesc = CreateTupleDescCopy(tupdesc);
903 ntuples = PQntuples(res);
904 nfields = PQnfields(res);
908 * check result and tuple descriptor have the same number of columns
910 if (nfields != tupdesc->natts)
911 ereport(ERROR,
912 (errcode(ERRCODE_DATATYPE_MISMATCH),
913 errmsg("remote query result rowtype does not match "
914 "the specified FROM clause rowtype")));
916 if (ntuples > 0)
918 AttInMetadata *attinmeta;
919 int nestlevel = -1;
920 Tuplestorestate *tupstore;
921 MemoryContext oldcontext;
922 int row;
923 char **values;
925 attinmeta = TupleDescGetAttInMetadata(tupdesc);
927 /* Set GUCs to ensure we read GUC-sensitive data types correctly */
928 if (!is_sql_cmd)
929 nestlevel = applyRemoteGucs(conn);
931 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
932 tupstore = tuplestore_begin_heap(true, false, work_mem);
933 rsinfo->setResult = tupstore;
934 rsinfo->setDesc = tupdesc;
935 MemoryContextSwitchTo(oldcontext);
937 values = palloc_array(char *, nfields);
939 /* put all tuples into the tuplestore */
940 for (row = 0; row < ntuples; row++)
942 HeapTuple tuple;
944 if (!is_sql_cmd)
946 int i;
948 for (i = 0; i < nfields; i++)
950 if (PQgetisnull(res, row, i))
951 values[i] = NULL;
952 else
953 values[i] = PQgetvalue(res, row, i);
956 else
958 values[0] = PQcmdStatus(res);
961 /* build the tuple and put it into the tuplestore. */
962 tuple = BuildTupleFromCStrings(attinmeta, values);
963 tuplestore_puttuple(tupstore, tuple);
966 /* clean up GUC settings, if we changed any */
967 restoreLocalGucs(nestlevel);
970 PG_FINALLY();
972 /* be sure to release the libpq result */
973 PQclear(res);
975 PG_END_TRY();
979 * Execute the given SQL command and store its results into a tuplestore
980 * to be returned as the result of the current function.
982 * This is equivalent to PQexec followed by materializeResult, but we make
983 * use of libpq's single-row mode to avoid accumulating the whole result
984 * inside libpq before it gets transferred to the tuplestore.
986 static void
987 materializeQueryResult(FunctionCallInfo fcinfo,
988 PGconn *conn,
989 const char *conname,
990 const char *sql,
991 bool fail)
993 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
994 PGresult *volatile res = NULL;
995 volatile storeInfo sinfo = {0};
997 /* prepTuplestoreResult must have been called previously */
998 Assert(rsinfo->returnMode == SFRM_Materialize);
1000 sinfo.fcinfo = fcinfo;
1002 PG_TRY();
1004 /* Create short-lived memory context for data conversions */
1005 sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1006 "dblink temporary context",
1007 ALLOCSET_DEFAULT_SIZES);
1009 /* execute query, collecting any tuples into the tuplestore */
1010 res = storeQueryResult(&sinfo, conn, sql);
1012 if (!res ||
1013 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1014 PQresultStatus(res) != PGRES_TUPLES_OK))
1017 * dblink_res_error will clear the passed PGresult, so we need
1018 * this ugly dance to avoid doing so twice during error exit
1020 PGresult *res1 = res;
1022 res = NULL;
1023 dblink_res_error(conn, conname, res1, fail,
1024 "while executing query");
1025 /* if fail isn't set, we'll return an empty query result */
1027 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1030 * storeRow didn't get called, so we need to convert the command
1031 * status string to a tuple manually
1033 TupleDesc tupdesc;
1034 AttInMetadata *attinmeta;
1035 Tuplestorestate *tupstore;
1036 HeapTuple tuple;
1037 char *values[1];
1038 MemoryContext oldcontext;
1041 * need a tuple descriptor representing one TEXT column to return
1042 * the command status string as our result tuple
1044 tupdesc = CreateTemplateTupleDesc(1);
1045 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1046 TEXTOID, -1, 0);
1047 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1049 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1050 tupstore = tuplestore_begin_heap(true, false, work_mem);
1051 rsinfo->setResult = tupstore;
1052 rsinfo->setDesc = tupdesc;
1053 MemoryContextSwitchTo(oldcontext);
1055 values[0] = PQcmdStatus(res);
1057 /* build the tuple and put it into the tuplestore. */
1058 tuple = BuildTupleFromCStrings(attinmeta, values);
1059 tuplestore_puttuple(tupstore, tuple);
1061 PQclear(res);
1062 res = NULL;
1064 else
1066 Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1067 /* storeRow should have created a tuplestore */
1068 Assert(rsinfo->setResult != NULL);
1070 PQclear(res);
1071 res = NULL;
1074 /* clean up data conversion short-lived memory context */
1075 if (sinfo.tmpcontext != NULL)
1076 MemoryContextDelete(sinfo.tmpcontext);
1077 sinfo.tmpcontext = NULL;
1079 PQclear(sinfo.last_res);
1080 sinfo.last_res = NULL;
1081 PQclear(sinfo.cur_res);
1082 sinfo.cur_res = NULL;
1084 PG_CATCH();
1086 /* be sure to release any libpq result we collected */
1087 PQclear(res);
1088 PQclear(sinfo.last_res);
1089 PQclear(sinfo.cur_res);
1090 /* and clear out any pending data in libpq */
1091 while ((res = PQgetResult(conn)) != NULL)
1092 PQclear(res);
1093 PG_RE_THROW();
1095 PG_END_TRY();
1099 * Execute query, and send any result rows to sinfo->tuplestore.
1101 static PGresult *
1102 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1104 bool first = true;
1105 int nestlevel = -1;
1106 PGresult *res;
1108 if (!PQsendQuery(conn, sql))
1109 elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1111 if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1112 elog(ERROR, "failed to set single-row mode for dblink query");
1114 for (;;)
1116 CHECK_FOR_INTERRUPTS();
1118 sinfo->cur_res = PQgetResult(conn);
1119 if (!sinfo->cur_res)
1120 break;
1122 if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1124 /* got one row from possibly-bigger resultset */
1127 * Set GUCs to ensure we read GUC-sensitive data types correctly.
1128 * We shouldn't do this until we have a row in hand, to ensure
1129 * libpq has seen any earlier ParameterStatus protocol messages.
1131 if (first && nestlevel < 0)
1132 nestlevel = applyRemoteGucs(conn);
1134 storeRow(sinfo, sinfo->cur_res, first);
1136 PQclear(sinfo->cur_res);
1137 sinfo->cur_res = NULL;
1138 first = false;
1140 else
1142 /* if empty resultset, fill tuplestore header */
1143 if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1144 storeRow(sinfo, sinfo->cur_res, first);
1146 /* store completed result at last_res */
1147 PQclear(sinfo->last_res);
1148 sinfo->last_res = sinfo->cur_res;
1149 sinfo->cur_res = NULL;
1150 first = true;
1154 /* clean up GUC settings, if we changed any */
1155 restoreLocalGucs(nestlevel);
1157 /* return last_res */
1158 res = sinfo->last_res;
1159 sinfo->last_res = NULL;
1160 return res;
1164 * Send single row to sinfo->tuplestore.
1166 * If "first" is true, create the tuplestore using PGresult's metadata
1167 * (in this case the PGresult might contain either zero or one row).
1169 static void
1170 storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1172 int nfields = PQnfields(res);
1173 HeapTuple tuple;
1174 int i;
1175 MemoryContext oldcontext;
1177 if (first)
1179 /* Prepare for new result set */
1180 ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1181 TupleDesc tupdesc;
1184 * It's possible to get more than one result set if the query string
1185 * contained multiple SQL commands. In that case, we follow PQexec's
1186 * traditional behavior of throwing away all but the last result.
1188 if (sinfo->tuplestore)
1189 tuplestore_end(sinfo->tuplestore);
1190 sinfo->tuplestore = NULL;
1192 /* get a tuple descriptor for our result type */
1193 switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1195 case TYPEFUNC_COMPOSITE:
1196 /* success */
1197 break;
1198 case TYPEFUNC_RECORD:
1199 /* failed to determine actual type of RECORD */
1200 ereport(ERROR,
1201 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1202 errmsg("function returning record called in context "
1203 "that cannot accept type record")));
1204 break;
1205 default:
1206 /* result type isn't composite */
1207 elog(ERROR, "return type must be a row type");
1208 break;
1211 /* make sure we have a persistent copy of the tupdesc */
1212 tupdesc = CreateTupleDescCopy(tupdesc);
1214 /* check result and tuple descriptor have the same number of columns */
1215 if (nfields != tupdesc->natts)
1216 ereport(ERROR,
1217 (errcode(ERRCODE_DATATYPE_MISMATCH),
1218 errmsg("remote query result rowtype does not match "
1219 "the specified FROM clause rowtype")));
1221 /* Prepare attinmeta for later data conversions */
1222 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1224 /* Create a new, empty tuplestore */
1225 oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1226 sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1227 rsinfo->setResult = sinfo->tuplestore;
1228 rsinfo->setDesc = tupdesc;
1229 MemoryContextSwitchTo(oldcontext);
1231 /* Done if empty resultset */
1232 if (PQntuples(res) == 0)
1233 return;
1236 * Set up sufficiently-wide string pointers array; this won't change
1237 * in size so it's easy to preallocate.
1239 if (sinfo->cstrs)
1240 pfree(sinfo->cstrs);
1241 sinfo->cstrs = palloc_array(char *, nfields);
1244 /* Should have a single-row result if we get here */
1245 Assert(PQntuples(res) == 1);
1248 * Do the following work in a temp context that we reset after each tuple.
1249 * This cleans up not only the data we have direct access to, but any
1250 * cruft the I/O functions might leak.
1252 oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1255 * Fill cstrs with null-terminated strings of column values.
1257 for (i = 0; i < nfields; i++)
1259 if (PQgetisnull(res, 0, i))
1260 sinfo->cstrs[i] = NULL;
1261 else
1262 sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1265 /* Convert row to a tuple, and add it to the tuplestore */
1266 tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1268 tuplestore_puttuple(sinfo->tuplestore, tuple);
1270 /* Clean up */
1271 MemoryContextSwitchTo(oldcontext);
1272 MemoryContextReset(sinfo->tmpcontext);
1276 * List all open dblink connections by name.
1277 * Returns an array of all connection names.
1278 * Takes no params
1280 PG_FUNCTION_INFO_V1(dblink_get_connections);
1281 Datum
1282 dblink_get_connections(PG_FUNCTION_ARGS)
1284 HASH_SEQ_STATUS status;
1285 remoteConnHashEnt *hentry;
1286 ArrayBuildState *astate = NULL;
1288 if (remoteConnHash)
1290 hash_seq_init(&status, remoteConnHash);
1291 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1293 /* stash away current value */
1294 astate = accumArrayResult(astate,
1295 CStringGetTextDatum(hentry->name),
1296 false, TEXTOID, CurrentMemoryContext);
1300 if (astate)
1301 PG_RETURN_DATUM(makeArrayResult(astate,
1302 CurrentMemoryContext));
1303 else
1304 PG_RETURN_NULL();
1308 * Checks if a given remote connection is busy
1310 * Returns 1 if the connection is busy, 0 otherwise
1311 * Params:
1312 * text connection_name - name of the connection to check
1315 PG_FUNCTION_INFO_V1(dblink_is_busy);
1316 Datum
1317 dblink_is_busy(PG_FUNCTION_ARGS)
1319 PGconn *conn;
1321 dblink_init();
1322 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1324 PQconsumeInput(conn);
1325 PG_RETURN_INT32(PQisBusy(conn));
1329 * Cancels a running request on a connection
1331 * Returns text:
1332 * "OK" if the cancel request has been sent correctly,
1333 * an error message otherwise
1335 * Params:
1336 * text connection_name - name of the connection to check
1339 PG_FUNCTION_INFO_V1(dblink_cancel_query);
1340 Datum
1341 dblink_cancel_query(PG_FUNCTION_ARGS)
1343 int res;
1344 PGconn *conn;
1345 PGcancel *cancel;
1346 char errbuf[256];
1348 dblink_init();
1349 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1350 cancel = PQgetCancel(conn);
1352 res = PQcancel(cancel, errbuf, 256);
1353 PQfreeCancel(cancel);
1355 if (res == 1)
1356 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1357 else
1358 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1363 * Get error message from a connection
1365 * Returns text:
1366 * "OK" if no error, an error message otherwise
1368 * Params:
1369 * text connection_name - name of the connection to check
1372 PG_FUNCTION_INFO_V1(dblink_error_message);
1373 Datum
1374 dblink_error_message(PG_FUNCTION_ARGS)
1376 char *msg;
1377 PGconn *conn;
1379 dblink_init();
1380 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1382 msg = PQerrorMessage(conn);
1383 if (msg == NULL || msg[0] == '\0')
1384 PG_RETURN_TEXT_P(cstring_to_text("OK"));
1385 else
1386 PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1390 * Execute an SQL non-SELECT command
1392 PG_FUNCTION_INFO_V1(dblink_exec);
1393 Datum
1394 dblink_exec(PG_FUNCTION_ARGS)
1396 text *volatile sql_cmd_status = NULL;
1397 PGconn *volatile conn = NULL;
1398 volatile bool freeconn = false;
1400 dblink_init();
1402 PG_TRY();
1404 PGresult *res = NULL;
1405 char *sql = NULL;
1406 char *conname = NULL;
1407 bool fail = true; /* default to backward compatible behavior */
1409 if (PG_NARGS() == 3)
1411 /* must be text,text,bool */
1412 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1413 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1414 fail = PG_GETARG_BOOL(2);
1415 dblink_get_conn(conname, &conn, &conname, &freeconn);
1417 else if (PG_NARGS() == 2)
1419 /* might be text,text or text,bool */
1420 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1422 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1423 fail = PG_GETARG_BOOL(1);
1424 conn = pconn->conn;
1426 else
1428 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1429 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1430 dblink_get_conn(conname, &conn, &conname, &freeconn);
1433 else if (PG_NARGS() == 1)
1435 /* must be single text argument */
1436 conn = pconn->conn;
1437 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1439 else
1440 /* shouldn't happen */
1441 elog(ERROR, "wrong number of arguments");
1443 if (!conn)
1444 dblink_conn_not_avail(conname);
1446 res = PQexec(conn, sql);
1447 if (!res ||
1448 (PQresultStatus(res) != PGRES_COMMAND_OK &&
1449 PQresultStatus(res) != PGRES_TUPLES_OK))
1451 dblink_res_error(conn, conname, res, fail,
1452 "while executing command");
1455 * and save a copy of the command status string to return as our
1456 * result tuple
1458 sql_cmd_status = cstring_to_text("ERROR");
1460 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1463 * and save a copy of the command status string to return as our
1464 * result tuple
1466 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1467 PQclear(res);
1469 else
1471 PQclear(res);
1472 ereport(ERROR,
1473 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1474 errmsg("statement returning results not allowed")));
1477 PG_FINALLY();
1479 /* if needed, close the connection to the database */
1480 if (freeconn)
1481 libpqsrv_disconnect(conn);
1483 PG_END_TRY();
1485 PG_RETURN_TEXT_P(sql_cmd_status);
1490 * dblink_get_pkey
1492 * Return list of primary key fields for the supplied relation,
1493 * or NULL if none exists.
1495 PG_FUNCTION_INFO_V1(dblink_get_pkey);
1496 Datum
1497 dblink_get_pkey(PG_FUNCTION_ARGS)
1499 int16 indnkeyatts;
1500 char **results;
1501 FuncCallContext *funcctx;
1502 int32 call_cntr;
1503 int32 max_calls;
1504 AttInMetadata *attinmeta;
1505 MemoryContext oldcontext;
1507 /* stuff done only on the first call of the function */
1508 if (SRF_IS_FIRSTCALL())
1510 Relation rel;
1511 TupleDesc tupdesc;
1513 /* create a function context for cross-call persistence */
1514 funcctx = SRF_FIRSTCALL_INIT();
1517 * switch to memory context appropriate for multiple function calls
1519 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1521 /* open target relation */
1522 rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1524 /* get the array of attnums */
1525 results = get_pkey_attnames(rel, &indnkeyatts);
1527 relation_close(rel, AccessShareLock);
1530 * need a tuple descriptor representing one INT and one TEXT column
1532 tupdesc = CreateTemplateTupleDesc(2);
1533 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1534 INT4OID, -1, 0);
1535 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1536 TEXTOID, -1, 0);
1539 * Generate attribute metadata needed later to produce tuples from raw
1540 * C strings
1542 attinmeta = TupleDescGetAttInMetadata(tupdesc);
1543 funcctx->attinmeta = attinmeta;
1545 if ((results != NULL) && (indnkeyatts > 0))
1547 funcctx->max_calls = indnkeyatts;
1549 /* got results, keep track of them */
1550 funcctx->user_fctx = results;
1552 else
1554 /* fast track when no results */
1555 MemoryContextSwitchTo(oldcontext);
1556 SRF_RETURN_DONE(funcctx);
1559 MemoryContextSwitchTo(oldcontext);
1562 /* stuff done on every call of the function */
1563 funcctx = SRF_PERCALL_SETUP();
1566 * initialize per-call variables
1568 call_cntr = funcctx->call_cntr;
1569 max_calls = funcctx->max_calls;
1571 results = (char **) funcctx->user_fctx;
1572 attinmeta = funcctx->attinmeta;
1574 if (call_cntr < max_calls) /* do when there is more left to send */
1576 char **values;
1577 HeapTuple tuple;
1578 Datum result;
1580 values = palloc_array(char *, 2);
1581 values[0] = psprintf("%d", call_cntr + 1);
1582 values[1] = results[call_cntr];
1584 /* build the tuple */
1585 tuple = BuildTupleFromCStrings(attinmeta, values);
1587 /* make the tuple into a datum */
1588 result = HeapTupleGetDatum(tuple);
1590 SRF_RETURN_NEXT(funcctx, result);
1592 else
1594 /* do when there is no more left */
1595 SRF_RETURN_DONE(funcctx);
1601 * dblink_build_sql_insert
1603 * Used to generate an SQL insert statement
1604 * based on an existing tuple in a local relation.
1605 * This is useful for selectively replicating data
1606 * to another server via dblink.
1608 * API:
1609 * <relname> - name of local table of interest
1610 * <pkattnums> - an int2vector of attnums which will be used
1611 * to identify the local tuple of interest
1612 * <pknumatts> - number of attnums in pkattnums
1613 * <src_pkattvals_arry> - text array of key values which will be used
1614 * to identify the local tuple of interest
1615 * <tgt_pkattvals_arry> - text array of key values which will be used
1616 * to build the string for execution remotely. These are substituted
1617 * for their counterparts in src_pkattvals_arry
1619 PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1620 Datum
1621 dblink_build_sql_insert(PG_FUNCTION_ARGS)
1623 text *relname_text = PG_GETARG_TEXT_PP(0);
1624 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1625 int32 pknumatts_arg = PG_GETARG_INT32(2);
1626 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1627 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1628 Relation rel;
1629 int *pkattnums;
1630 int pknumatts;
1631 char **src_pkattvals;
1632 char **tgt_pkattvals;
1633 int src_nitems;
1634 int tgt_nitems;
1635 char *sql;
1638 * Open target relation.
1640 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1643 * Process pkattnums argument.
1645 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1646 &pkattnums, &pknumatts);
1649 * Source array is made up of key values that will be used to locate the
1650 * tuple of interest from the local system.
1652 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1655 * There should be one source array key value for each key attnum
1657 if (src_nitems != pknumatts)
1658 ereport(ERROR,
1659 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1660 errmsg("source key array length must match number of key attributes")));
1663 * Target array is made up of key values that will be used to build the
1664 * SQL string for use on the remote system.
1666 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1669 * There should be one target array key value for each key attnum
1671 if (tgt_nitems != pknumatts)
1672 ereport(ERROR,
1673 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1674 errmsg("target key array length must match number of key attributes")));
1677 * Prep work is finally done. Go get the SQL string.
1679 sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1682 * Now we can close the relation.
1684 relation_close(rel, AccessShareLock);
1687 * And send it
1689 PG_RETURN_TEXT_P(cstring_to_text(sql));
1694 * dblink_build_sql_delete
1696 * Used to generate an SQL delete statement.
1697 * This is useful for selectively replicating a
1698 * delete to another server via dblink.
1700 * API:
1701 * <relname> - name of remote table of interest
1702 * <pkattnums> - an int2vector of attnums which will be used
1703 * to identify the remote tuple of interest
1704 * <pknumatts> - number of attnums in pkattnums
1705 * <tgt_pkattvals_arry> - text array of key values which will be used
1706 * to build the string for execution remotely.
1708 PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1709 Datum
1710 dblink_build_sql_delete(PG_FUNCTION_ARGS)
1712 text *relname_text = PG_GETARG_TEXT_PP(0);
1713 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1714 int32 pknumatts_arg = PG_GETARG_INT32(2);
1715 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1716 Relation rel;
1717 int *pkattnums;
1718 int pknumatts;
1719 char **tgt_pkattvals;
1720 int tgt_nitems;
1721 char *sql;
1724 * Open target relation.
1726 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1729 * Process pkattnums argument.
1731 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1732 &pkattnums, &pknumatts);
1735 * Target array is made up of key values that will be used to build the
1736 * SQL string for use on the remote system.
1738 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1741 * There should be one target array key value for each key attnum
1743 if (tgt_nitems != pknumatts)
1744 ereport(ERROR,
1745 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1746 errmsg("target key array length must match number of key attributes")));
1749 * Prep work is finally done. Go get the SQL string.
1751 sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1754 * Now we can close the relation.
1756 relation_close(rel, AccessShareLock);
1759 * And send it
1761 PG_RETURN_TEXT_P(cstring_to_text(sql));
1766 * dblink_build_sql_update
1768 * Used to generate an SQL update statement
1769 * based on an existing tuple in a local relation.
1770 * This is useful for selectively replicating data
1771 * to another server via dblink.
1773 * API:
1774 * <relname> - name of local table of interest
1775 * <pkattnums> - an int2vector of attnums which will be used
1776 * to identify the local tuple of interest
1777 * <pknumatts> - number of attnums in pkattnums
1778 * <src_pkattvals_arry> - text array of key values which will be used
1779 * to identify the local tuple of interest
1780 * <tgt_pkattvals_arry> - text array of key values which will be used
1781 * to build the string for execution remotely. These are substituted
1782 * for their counterparts in src_pkattvals_arry
1784 PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1785 Datum
1786 dblink_build_sql_update(PG_FUNCTION_ARGS)
1788 text *relname_text = PG_GETARG_TEXT_PP(0);
1789 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1790 int32 pknumatts_arg = PG_GETARG_INT32(2);
1791 ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1792 ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1793 Relation rel;
1794 int *pkattnums;
1795 int pknumatts;
1796 char **src_pkattvals;
1797 char **tgt_pkattvals;
1798 int src_nitems;
1799 int tgt_nitems;
1800 char *sql;
1803 * Open target relation.
1805 rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1808 * Process pkattnums argument.
1810 validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1811 &pkattnums, &pknumatts);
1814 * Source array is made up of key values that will be used to locate the
1815 * tuple of interest from the local system.
1817 src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1820 * There should be one source array key value for each key attnum
1822 if (src_nitems != pknumatts)
1823 ereport(ERROR,
1824 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1825 errmsg("source key array length must match number of key attributes")));
1828 * Target array is made up of key values that will be used to build the
1829 * SQL string for use on the remote system.
1831 tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1834 * There should be one target array key value for each key attnum
1836 if (tgt_nitems != pknumatts)
1837 ereport(ERROR,
1838 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1839 errmsg("target key array length must match number of key attributes")));
1842 * Prep work is finally done. Go get the SQL string.
1844 sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1847 * Now we can close the relation.
1849 relation_close(rel, AccessShareLock);
1852 * And send it
1854 PG_RETURN_TEXT_P(cstring_to_text(sql));
1858 * dblink_current_query
1859 * return the current query string
1860 * to allow its use in (among other things)
1861 * rewrite rules
1863 PG_FUNCTION_INFO_V1(dblink_current_query);
1864 Datum
1865 dblink_current_query(PG_FUNCTION_ARGS)
1867 /* This is now just an alias for the built-in function current_query() */
1868 PG_RETURN_DATUM(current_query(fcinfo));
1872 * Retrieve async notifications for a connection.
1874 * Returns a setof record of notifications, or an empty set if none received.
1875 * Can optionally take a named connection as parameter, but uses the unnamed
1876 * connection per default.
1879 #define DBLINK_NOTIFY_COLS 3
1881 PG_FUNCTION_INFO_V1(dblink_get_notify);
1882 Datum
1883 dblink_get_notify(PG_FUNCTION_ARGS)
1885 PGconn *conn;
1886 PGnotify *notify;
1887 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1889 dblink_init();
1890 if (PG_NARGS() == 1)
1891 conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1892 else
1893 conn = pconn->conn;
1895 InitMaterializedSRF(fcinfo, 0);
1897 PQconsumeInput(conn);
1898 while ((notify = PQnotifies(conn)) != NULL)
1900 Datum values[DBLINK_NOTIFY_COLS];
1901 bool nulls[DBLINK_NOTIFY_COLS];
1903 memset(values, 0, sizeof(values));
1904 memset(nulls, 0, sizeof(nulls));
1906 if (notify->relname != NULL)
1907 values[0] = CStringGetTextDatum(notify->relname);
1908 else
1909 nulls[0] = true;
1911 values[1] = Int32GetDatum(notify->be_pid);
1913 if (notify->extra != NULL)
1914 values[2] = CStringGetTextDatum(notify->extra);
1915 else
1916 nulls[2] = true;
1918 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1920 PQfreemem(notify);
1921 PQconsumeInput(conn);
1924 return (Datum) 0;
1928 * Validate the options given to a dblink foreign server or user mapping.
1929 * Raise an error if any option is invalid.
1931 * We just check the names of options here, so semantic errors in options,
1932 * such as invalid numeric format, will be detected at the attempt to connect.
1934 PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1935 Datum
1936 dblink_fdw_validator(PG_FUNCTION_ARGS)
1938 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1939 Oid context = PG_GETARG_OID(1);
1940 ListCell *cell;
1942 static const PQconninfoOption *options = NULL;
1945 * Get list of valid libpq options.
1947 * To avoid unnecessary work, we get the list once and use it throughout
1948 * the lifetime of this backend process. We don't need to care about
1949 * memory context issues, because PQconndefaults allocates with malloc.
1951 if (!options)
1953 options = PQconndefaults();
1954 if (!options) /* assume reason for failure is OOM */
1955 ereport(ERROR,
1956 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1957 errmsg("out of memory"),
1958 errdetail("Could not get libpq's default connection options.")));
1961 /* Validate each supplied option. */
1962 foreach(cell, options_list)
1964 DefElem *def = (DefElem *) lfirst(cell);
1966 if (!is_valid_dblink_option(options, def->defname, context))
1969 * Unknown option, or invalid option for the context specified, so
1970 * complain about it. Provide a hint with a valid option that
1971 * looks similar, if there is one.
1973 const PQconninfoOption *opt;
1974 const char *closest_match;
1975 ClosestMatchState match_state;
1976 bool has_valid_options = false;
1978 initClosestMatch(&match_state, def->defname, 4);
1979 for (opt = options; opt->keyword; opt++)
1981 if (is_valid_dblink_option(options, opt->keyword, context))
1983 has_valid_options = true;
1984 updateClosestMatch(&match_state, opt->keyword);
1988 closest_match = getClosestMatch(&match_state);
1989 ereport(ERROR,
1990 (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
1991 errmsg("invalid option \"%s\"", def->defname),
1992 has_valid_options ? closest_match ?
1993 errhint("Perhaps you meant the option \"%s\".",
1994 closest_match) : 0 :
1995 errhint("There are no valid options in this context.")));
1999 PG_RETURN_VOID();
2003 /*************************************************************
2004 * internal functions
2009 * get_pkey_attnames
2011 * Get the primary key attnames for the given relation.
2012 * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2014 static char **
2015 get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2017 Relation indexRelation;
2018 ScanKeyData skey;
2019 SysScanDesc scan;
2020 HeapTuple indexTuple;
2021 int i;
2022 char **result = NULL;
2023 TupleDesc tupdesc;
2025 /* initialize indnkeyatts to 0 in case no primary key exists */
2026 *indnkeyatts = 0;
2028 tupdesc = rel->rd_att;
2030 /* Prepare to scan pg_index for entries having indrelid = this rel. */
2031 indexRelation = table_open(IndexRelationId, AccessShareLock);
2032 ScanKeyInit(&skey,
2033 Anum_pg_index_indrelid,
2034 BTEqualStrategyNumber, F_OIDEQ,
2035 ObjectIdGetDatum(RelationGetRelid(rel)));
2037 scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2038 NULL, 1, &skey);
2040 while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2042 Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2044 /* we're only interested if it is the primary key */
2045 if (index->indisprimary)
2047 *indnkeyatts = index->indnkeyatts;
2048 if (*indnkeyatts > 0)
2050 result = palloc_array(char *, *indnkeyatts);
2052 for (i = 0; i < *indnkeyatts; i++)
2053 result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2055 break;
2059 systable_endscan(scan);
2060 table_close(indexRelation, AccessShareLock);
2062 return result;
2066 * Deconstruct a text[] into C-strings (note any NULL elements will be
2067 * returned as NULL pointers)
2069 static char **
2070 get_text_array_contents(ArrayType *array, int *numitems)
2072 int ndim = ARR_NDIM(array);
2073 int *dims = ARR_DIMS(array);
2074 int nitems;
2075 int16 typlen;
2076 bool typbyval;
2077 char typalign;
2078 char **values;
2079 char *ptr;
2080 bits8 *bitmap;
2081 int bitmask;
2082 int i;
2084 Assert(ARR_ELEMTYPE(array) == TEXTOID);
2086 *numitems = nitems = ArrayGetNItems(ndim, dims);
2088 get_typlenbyvalalign(ARR_ELEMTYPE(array),
2089 &typlen, &typbyval, &typalign);
2091 values = palloc_array(char *, nitems);
2093 ptr = ARR_DATA_PTR(array);
2094 bitmap = ARR_NULLBITMAP(array);
2095 bitmask = 1;
2097 for (i = 0; i < nitems; i++)
2099 if (bitmap && (*bitmap & bitmask) == 0)
2101 values[i] = NULL;
2103 else
2105 values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2106 ptr = att_addlength_pointer(ptr, typlen, ptr);
2107 ptr = (char *) att_align_nominal(ptr, typalign);
2110 /* advance bitmap pointer if any */
2111 if (bitmap)
2113 bitmask <<= 1;
2114 if (bitmask == 0x100)
2116 bitmap++;
2117 bitmask = 1;
2122 return values;
2125 static char *
2126 get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2128 char *relname;
2129 HeapTuple tuple;
2130 TupleDesc tupdesc;
2131 int natts;
2132 StringInfoData buf;
2133 char *val;
2134 int key;
2135 int i;
2136 bool needComma;
2138 initStringInfo(&buf);
2140 /* get relation name including any needed schema prefix and quoting */
2141 relname = generate_relation_name(rel);
2143 tupdesc = rel->rd_att;
2144 natts = tupdesc->natts;
2146 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2147 if (!tuple)
2148 ereport(ERROR,
2149 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2150 errmsg("source row not found")));
2152 appendStringInfo(&buf, "INSERT INTO %s(", relname);
2154 needComma = false;
2155 for (i = 0; i < natts; i++)
2157 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2159 if (att->attisdropped)
2160 continue;
2162 if (needComma)
2163 appendStringInfoChar(&buf, ',');
2165 appendStringInfoString(&buf,
2166 quote_ident_cstr(NameStr(att->attname)));
2167 needComma = true;
2170 appendStringInfoString(&buf, ") VALUES(");
2173 * Note: i is physical column number (counting from 0).
2175 needComma = false;
2176 for (i = 0; i < natts; i++)
2178 if (TupleDescAttr(tupdesc, i)->attisdropped)
2179 continue;
2181 if (needComma)
2182 appendStringInfoChar(&buf, ',');
2184 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2186 if (key >= 0)
2187 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2188 else
2189 val = SPI_getvalue(tuple, tupdesc, i + 1);
2191 if (val != NULL)
2193 appendStringInfoString(&buf, quote_literal_cstr(val));
2194 pfree(val);
2196 else
2197 appendStringInfoString(&buf, "NULL");
2198 needComma = true;
2200 appendStringInfoChar(&buf, ')');
2202 return buf.data;
2205 static char *
2206 get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2208 char *relname;
2209 TupleDesc tupdesc;
2210 StringInfoData buf;
2211 int i;
2213 initStringInfo(&buf);
2215 /* get relation name including any needed schema prefix and quoting */
2216 relname = generate_relation_name(rel);
2218 tupdesc = rel->rd_att;
2220 appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2221 for (i = 0; i < pknumatts; i++)
2223 int pkattnum = pkattnums[i];
2224 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2226 if (i > 0)
2227 appendStringInfoString(&buf, " AND ");
2229 appendStringInfoString(&buf,
2230 quote_ident_cstr(NameStr(attr->attname)));
2232 if (tgt_pkattvals[i] != NULL)
2233 appendStringInfo(&buf, " = %s",
2234 quote_literal_cstr(tgt_pkattvals[i]));
2235 else
2236 appendStringInfoString(&buf, " IS NULL");
2239 return buf.data;
2242 static char *
2243 get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2245 char *relname;
2246 HeapTuple tuple;
2247 TupleDesc tupdesc;
2248 int natts;
2249 StringInfoData buf;
2250 char *val;
2251 int key;
2252 int i;
2253 bool needComma;
2255 initStringInfo(&buf);
2257 /* get relation name including any needed schema prefix and quoting */
2258 relname = generate_relation_name(rel);
2260 tupdesc = rel->rd_att;
2261 natts = tupdesc->natts;
2263 tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2264 if (!tuple)
2265 ereport(ERROR,
2266 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2267 errmsg("source row not found")));
2269 appendStringInfo(&buf, "UPDATE %s SET ", relname);
2272 * Note: i is physical column number (counting from 0).
2274 needComma = false;
2275 for (i = 0; i < natts; i++)
2277 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2279 if (attr->attisdropped)
2280 continue;
2282 if (needComma)
2283 appendStringInfoString(&buf, ", ");
2285 appendStringInfo(&buf, "%s = ",
2286 quote_ident_cstr(NameStr(attr->attname)));
2288 key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2290 if (key >= 0)
2291 val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2292 else
2293 val = SPI_getvalue(tuple, tupdesc, i + 1);
2295 if (val != NULL)
2297 appendStringInfoString(&buf, quote_literal_cstr(val));
2298 pfree(val);
2300 else
2301 appendStringInfoString(&buf, "NULL");
2302 needComma = true;
2305 appendStringInfoString(&buf, " WHERE ");
2307 for (i = 0; i < pknumatts; i++)
2309 int pkattnum = pkattnums[i];
2310 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2312 if (i > 0)
2313 appendStringInfoString(&buf, " AND ");
2315 appendStringInfoString(&buf,
2316 quote_ident_cstr(NameStr(attr->attname)));
2318 val = tgt_pkattvals[i];
2320 if (val != NULL)
2321 appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2322 else
2323 appendStringInfoString(&buf, " IS NULL");
2326 return buf.data;
2330 * Return a properly quoted identifier.
2331 * Uses quote_ident in quote.c
2333 static char *
2334 quote_ident_cstr(char *rawstr)
2336 text *rawstr_text;
2337 text *result_text;
2338 char *result;
2340 rawstr_text = cstring_to_text(rawstr);
2341 result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2342 PointerGetDatum(rawstr_text)));
2343 result = text_to_cstring(result_text);
2345 return result;
2348 static int
2349 get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2351 int i;
2354 * Not likely a long list anyway, so just scan for the value
2356 for (i = 0; i < pknumatts; i++)
2357 if (key == pkattnums[i])
2358 return i;
2360 return -1;
2363 static HeapTuple
2364 get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2366 char *relname;
2367 TupleDesc tupdesc;
2368 int natts;
2369 StringInfoData buf;
2370 int ret;
2371 HeapTuple tuple;
2372 int i;
2375 * Connect to SPI manager
2377 if ((ret = SPI_connect()) < 0)
2378 /* internal error */
2379 elog(ERROR, "SPI connect failure - returned %d", ret);
2381 initStringInfo(&buf);
2383 /* get relation name including any needed schema prefix and quoting */
2384 relname = generate_relation_name(rel);
2386 tupdesc = rel->rd_att;
2387 natts = tupdesc->natts;
2390 * Build sql statement to look up tuple of interest, ie, the one matching
2391 * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2392 * generate a result tuple that matches the table's physical structure,
2393 * with NULLs for any dropped columns. Otherwise we have to deal with two
2394 * different tupdescs and everything's very confusing.
2396 appendStringInfoString(&buf, "SELECT ");
2398 for (i = 0; i < natts; i++)
2400 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2402 if (i > 0)
2403 appendStringInfoString(&buf, ", ");
2405 if (attr->attisdropped)
2406 appendStringInfoString(&buf, "NULL");
2407 else
2408 appendStringInfoString(&buf,
2409 quote_ident_cstr(NameStr(attr->attname)));
2412 appendStringInfo(&buf, " FROM %s WHERE ", relname);
2414 for (i = 0; i < pknumatts; i++)
2416 int pkattnum = pkattnums[i];
2417 Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2419 if (i > 0)
2420 appendStringInfoString(&buf, " AND ");
2422 appendStringInfoString(&buf,
2423 quote_ident_cstr(NameStr(attr->attname)));
2425 if (src_pkattvals[i] != NULL)
2426 appendStringInfo(&buf, " = %s",
2427 quote_literal_cstr(src_pkattvals[i]));
2428 else
2429 appendStringInfoString(&buf, " IS NULL");
2433 * Retrieve the desired tuple
2435 ret = SPI_exec(buf.data, 0);
2436 pfree(buf.data);
2439 * Only allow one qualifying tuple
2441 if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2442 ereport(ERROR,
2443 (errcode(ERRCODE_CARDINALITY_VIOLATION),
2444 errmsg("source criteria matched more than one record")));
2446 else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2448 SPITupleTable *tuptable = SPI_tuptable;
2450 tuple = SPI_copytuple(tuptable->vals[0]);
2451 SPI_finish();
2453 return tuple;
2455 else
2458 * no qualifying tuples
2460 SPI_finish();
2462 return NULL;
2466 * never reached, but keep compiler quiet
2468 return NULL;
2472 * Open the relation named by relname_text, acquire specified type of lock,
2473 * verify we have specified permissions.
2474 * Caller must close rel when done with it.
2476 static Relation
2477 get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2479 RangeVar *relvar;
2480 Relation rel;
2481 AclResult aclresult;
2483 relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2484 rel = table_openrv(relvar, lockmode);
2486 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2487 aclmode);
2488 if (aclresult != ACLCHECK_OK)
2489 aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2490 RelationGetRelationName(rel));
2492 return rel;
2496 * generate_relation_name - copied from ruleutils.c
2497 * Compute the name to display for a relation
2499 * The result includes all necessary quoting and schema-prefixing.
2501 static char *
2502 generate_relation_name(Relation rel)
2504 char *nspname;
2505 char *result;
2507 /* Qualify the name if not visible in search path */
2508 if (RelationIsVisible(RelationGetRelid(rel)))
2509 nspname = NULL;
2510 else
2511 nspname = get_namespace_name(rel->rd_rel->relnamespace);
2513 result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2515 return result;
2519 static remoteConn *
2520 getConnectionByName(const char *name)
2522 remoteConnHashEnt *hentry;
2523 char *key;
2525 if (!remoteConnHash)
2526 remoteConnHash = createConnHash();
2528 key = pstrdup(name);
2529 truncate_identifier(key, strlen(key), false);
2530 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2531 key, HASH_FIND, NULL);
2533 if (hentry)
2534 return hentry->rconn;
2536 return NULL;
2539 static HTAB *
2540 createConnHash(void)
2542 HASHCTL ctl;
2544 ctl.keysize = NAMEDATALEN;
2545 ctl.entrysize = sizeof(remoteConnHashEnt);
2547 return hash_create("Remote Con hash", NUMCONN, &ctl,
2548 HASH_ELEM | HASH_STRINGS);
2551 static void
2552 createNewConnection(const char *name, remoteConn *rconn)
2554 remoteConnHashEnt *hentry;
2555 bool found;
2556 char *key;
2558 if (!remoteConnHash)
2559 remoteConnHash = createConnHash();
2561 key = pstrdup(name);
2562 truncate_identifier(key, strlen(key), true);
2563 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2564 HASH_ENTER, &found);
2566 if (found)
2568 libpqsrv_disconnect(rconn->conn);
2569 pfree(rconn);
2571 ereport(ERROR,
2572 (errcode(ERRCODE_DUPLICATE_OBJECT),
2573 errmsg("duplicate connection name")));
2576 hentry->rconn = rconn;
2577 strlcpy(hentry->name, name, sizeof(hentry->name));
2580 static void
2581 deleteConnection(const char *name)
2583 remoteConnHashEnt *hentry;
2584 bool found;
2585 char *key;
2587 if (!remoteConnHash)
2588 remoteConnHash = createConnHash();
2590 key = pstrdup(name);
2591 truncate_identifier(key, strlen(key), false);
2592 hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2593 key, HASH_REMOVE, &found);
2595 if (!hentry)
2596 ereport(ERROR,
2597 (errcode(ERRCODE_UNDEFINED_OBJECT),
2598 errmsg("undefined connection name")));
2602 * We need to make sure that the connection made used credentials
2603 * which were provided by the user, so check what credentials were
2604 * used to connect and then make sure that they came from the user.
2606 static void
2607 dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
2609 /* Superuser bypasses security check */
2610 if (superuser())
2611 return;
2613 /* If password was used to connect, make sure it was one provided */
2614 if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2615 return;
2617 #ifdef ENABLE_GSS
2618 /* If GSSAPI creds used to connect, make sure it was one delegated */
2619 if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2620 return;
2621 #endif
2623 /* Otherwise, fail out */
2624 libpqsrv_disconnect(conn);
2625 if (rconn)
2626 pfree(rconn);
2628 ereport(ERROR,
2629 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2630 errmsg("password or GSSAPI delegated credentials required"),
2631 errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2632 errhint("Ensure provided credentials match target server's authentication method.")));
2636 * Function to check if the connection string includes an explicit
2637 * password, needed to ensure that non-superuser password-based auth
2638 * is using a provided password and not one picked up from the
2639 * environment.
2641 static bool
2642 dblink_connstr_has_pw(const char *connstr)
2644 PQconninfoOption *options;
2645 PQconninfoOption *option;
2646 bool connstr_gives_password = false;
2648 options = PQconninfoParse(connstr, NULL);
2649 if (options)
2651 for (option = options; option->keyword != NULL; option++)
2653 if (strcmp(option->keyword, "password") == 0)
2655 if (option->val != NULL && option->val[0] != '\0')
2657 connstr_gives_password = true;
2658 break;
2662 PQconninfoFree(options);
2665 return connstr_gives_password;
2669 * For non-superusers, insist that the connstr specify a password, except
2670 * if GSSAPI credentials have been delegated (and we check that they are used
2671 * for the connection in dblink_security_check later). This prevents a
2672 * password or GSSAPI credentials from being picked up from .pgpass, a
2673 * service file, the environment, etc. We don't want the postgres user's
2674 * passwords or Kerberos credentials to be accessible to non-superusers.
2676 static void
2677 dblink_connstr_check(const char *connstr)
2679 if (superuser())
2680 return;
2682 if (dblink_connstr_has_pw(connstr))
2683 return;
2685 #ifdef ENABLE_GSS
2686 if (be_gssapi_get_delegation(MyProcPort))
2687 return;
2688 #endif
2690 ereport(ERROR,
2691 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2692 errmsg("password or GSSAPI delegated credentials required"),
2693 errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2697 * Report an error received from the remote server
2699 * res: the received error result (will be freed)
2700 * fail: true for ERROR ereport, false for NOTICE
2701 * fmt and following args: sprintf-style format and values for errcontext;
2702 * the resulting string should be worded like "while <some action>"
2704 static void
2705 dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2706 bool fail, const char *fmt,...)
2708 int level;
2709 char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2710 char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2711 char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2712 char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2713 char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2714 int sqlstate;
2715 char *message_primary;
2716 char *message_detail;
2717 char *message_hint;
2718 char *message_context;
2719 va_list ap;
2720 char dblink_context_msg[512];
2722 if (fail)
2723 level = ERROR;
2724 else
2725 level = NOTICE;
2727 if (pg_diag_sqlstate)
2728 sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2729 pg_diag_sqlstate[1],
2730 pg_diag_sqlstate[2],
2731 pg_diag_sqlstate[3],
2732 pg_diag_sqlstate[4]);
2733 else
2734 sqlstate = ERRCODE_CONNECTION_FAILURE;
2736 message_primary = xpstrdup(pg_diag_message_primary);
2737 message_detail = xpstrdup(pg_diag_message_detail);
2738 message_hint = xpstrdup(pg_diag_message_hint);
2739 message_context = xpstrdup(pg_diag_context);
2742 * If we don't get a message from the PGresult, try the PGconn. This is
2743 * needed because for connection-level failures, PQexec may just return
2744 * NULL, not a PGresult at all.
2746 if (message_primary == NULL)
2747 message_primary = pchomp(PQerrorMessage(conn));
2750 * Now that we've copied all the data we need out of the PGresult, it's
2751 * safe to free it. We must do this to avoid PGresult leakage. We're
2752 * leaking all the strings too, but those are in palloc'd memory that will
2753 * get cleaned up eventually.
2755 PQclear(res);
2758 * Format the basic errcontext string. Below, we'll add on something
2759 * about the connection name. That's a violation of the translatability
2760 * guidelines about constructing error messages out of parts, but since
2761 * there's no translation support for dblink, there's no need to worry
2762 * about that (yet).
2764 va_start(ap, fmt);
2765 vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2766 va_end(ap);
2768 ereport(level,
2769 (errcode(sqlstate),
2770 (message_primary != NULL && message_primary[0] != '\0') ?
2771 errmsg_internal("%s", message_primary) :
2772 errmsg("could not obtain message string for remote error"),
2773 message_detail ? errdetail_internal("%s", message_detail) : 0,
2774 message_hint ? errhint("%s", message_hint) : 0,
2775 message_context ? (errcontext("%s", message_context)) : 0,
2776 conname ?
2777 (errcontext("%s on dblink connection named \"%s\"",
2778 dblink_context_msg, conname)) :
2779 (errcontext("%s on unnamed dblink connection",
2780 dblink_context_msg))));
2784 * Obtain connection string for a foreign server
2786 static char *
2787 get_connect_string(const char *servername)
2789 ForeignServer *foreign_server = NULL;
2790 UserMapping *user_mapping;
2791 ListCell *cell;
2792 StringInfoData buf;
2793 ForeignDataWrapper *fdw;
2794 AclResult aclresult;
2795 char *srvname;
2797 static const PQconninfoOption *options = NULL;
2799 initStringInfo(&buf);
2802 * Get list of valid libpq options.
2804 * To avoid unnecessary work, we get the list once and use it throughout
2805 * the lifetime of this backend process. We don't need to care about
2806 * memory context issues, because PQconndefaults allocates with malloc.
2808 if (!options)
2810 options = PQconndefaults();
2811 if (!options) /* assume reason for failure is OOM */
2812 ereport(ERROR,
2813 (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2814 errmsg("out of memory"),
2815 errdetail("Could not get libpq's default connection options.")));
2818 /* first gather the server connstr options */
2819 srvname = pstrdup(servername);
2820 truncate_identifier(srvname, strlen(srvname), false);
2821 foreign_server = GetForeignServerByName(srvname, true);
2823 if (foreign_server)
2825 Oid serverid = foreign_server->serverid;
2826 Oid fdwid = foreign_server->fdwid;
2827 Oid userid = GetUserId();
2829 user_mapping = GetUserMapping(userid, serverid);
2830 fdw = GetForeignDataWrapper(fdwid);
2832 /* Check permissions, user must have usage on the server. */
2833 aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2834 if (aclresult != ACLCHECK_OK)
2835 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2837 foreach(cell, fdw->options)
2839 DefElem *def = lfirst(cell);
2841 if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2842 appendStringInfo(&buf, "%s='%s' ", def->defname,
2843 escape_param_str(strVal(def->arg)));
2846 foreach(cell, foreign_server->options)
2848 DefElem *def = lfirst(cell);
2850 if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2851 appendStringInfo(&buf, "%s='%s' ", def->defname,
2852 escape_param_str(strVal(def->arg)));
2855 foreach(cell, user_mapping->options)
2858 DefElem *def = lfirst(cell);
2860 if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2861 appendStringInfo(&buf, "%s='%s' ", def->defname,
2862 escape_param_str(strVal(def->arg)));
2865 return buf.data;
2867 else
2868 return NULL;
2872 * Escaping libpq connect parameter strings.
2874 * Replaces "'" with "\'" and "\" with "\\".
2876 static char *
2877 escape_param_str(const char *str)
2879 const char *cp;
2880 StringInfoData buf;
2882 initStringInfo(&buf);
2884 for (cp = str; *cp; cp++)
2886 if (*cp == '\\' || *cp == '\'')
2887 appendStringInfoChar(&buf, '\\');
2888 appendStringInfoChar(&buf, *cp);
2891 return buf.data;
2895 * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2896 * functions, and translate to the internal representation.
2898 * The user supplies an int2vector of 1-based logical attnums, plus a count
2899 * argument (the need for the separate count argument is historical, but we
2900 * still check it). We check that each attnum corresponds to a valid,
2901 * non-dropped attribute of the rel. We do *not* prevent attnums from being
2902 * listed twice, though the actual use-case for such things is dubious.
2903 * Note that before Postgres 9.0, the user's attnums were interpreted as
2904 * physical not logical column numbers; this was changed for future-proofing.
2906 * The internal representation is a palloc'd int array of 0-based physical
2907 * attnums.
2909 static void
2910 validate_pkattnums(Relation rel,
2911 int2vector *pkattnums_arg, int32 pknumatts_arg,
2912 int **pkattnums, int *pknumatts)
2914 TupleDesc tupdesc = rel->rd_att;
2915 int natts = tupdesc->natts;
2916 int i;
2918 /* Don't take more array elements than there are */
2919 pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2921 /* Must have at least one pk attnum selected */
2922 if (pknumatts_arg <= 0)
2923 ereport(ERROR,
2924 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2925 errmsg("number of key attributes must be > 0")));
2927 /* Allocate output array */
2928 *pkattnums = palloc_array(int, pknumatts_arg);
2929 *pknumatts = pknumatts_arg;
2931 /* Validate attnums and convert to internal form */
2932 for (i = 0; i < pknumatts_arg; i++)
2934 int pkattnum = pkattnums_arg->values[i];
2935 int lnum;
2936 int j;
2938 /* Can throw error immediately if out of range */
2939 if (pkattnum <= 0 || pkattnum > natts)
2940 ereport(ERROR,
2941 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2942 errmsg("invalid attribute number %d", pkattnum)));
2944 /* Identify which physical column has this logical number */
2945 lnum = 0;
2946 for (j = 0; j < natts; j++)
2948 /* dropped columns don't count */
2949 if (TupleDescAttr(tupdesc, j)->attisdropped)
2950 continue;
2952 if (++lnum == pkattnum)
2953 break;
2956 if (j < natts)
2957 (*pkattnums)[i] = j;
2958 else
2959 ereport(ERROR,
2960 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2961 errmsg("invalid attribute number %d", pkattnum)));
2966 * Check if the specified connection option is valid.
2968 * We basically allow whatever libpq thinks is an option, with these
2969 * restrictions:
2970 * debug options: disallowed
2971 * "client_encoding": disallowed
2972 * "user": valid only in USER MAPPING options
2973 * secure options (eg password): valid only in USER MAPPING options
2974 * others: valid only in FOREIGN SERVER options
2976 * We disallow client_encoding because it would be overridden anyway via
2977 * PQclientEncoding; allowing it to be specified would merely promote
2978 * confusion.
2980 static bool
2981 is_valid_dblink_option(const PQconninfoOption *options, const char *option,
2982 Oid context)
2984 const PQconninfoOption *opt;
2986 /* Look up the option in libpq result */
2987 for (opt = options; opt->keyword; opt++)
2989 if (strcmp(opt->keyword, option) == 0)
2990 break;
2992 if (opt->keyword == NULL)
2993 return false;
2995 /* Disallow debug options (particularly "replication") */
2996 if (strchr(opt->dispchar, 'D'))
2997 return false;
2999 /* Disallow "client_encoding" */
3000 if (strcmp(opt->keyword, "client_encoding") == 0)
3001 return false;
3004 * If the option is "user" or marked secure, it should be specified only
3005 * in USER MAPPING. Others should be specified only in SERVER.
3007 if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3009 if (context != UserMappingRelationId)
3010 return false;
3012 else
3014 if (context != ForeignServerRelationId)
3015 return false;
3018 return true;
3022 * Copy the remote session's values of GUCs that affect datatype I/O
3023 * and apply them locally in a new GUC nesting level. Returns the new
3024 * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3025 * or -1 if no new nestlevel was needed.
3027 * We use the equivalent of a function SET option to allow the settings to
3028 * persist only until the caller calls restoreLocalGucs. If an error is
3029 * thrown in between, guc.c will take care of undoing the settings.
3031 static int
3032 applyRemoteGucs(PGconn *conn)
3034 static const char *const GUCsAffectingIO[] = {
3035 "DateStyle",
3036 "IntervalStyle"
3039 int nestlevel = -1;
3040 int i;
3042 for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3044 const char *gucName = GUCsAffectingIO[i];
3045 const char *remoteVal = PQparameterStatus(conn, gucName);
3046 const char *localVal;
3049 * If the remote server is pre-8.4, it won't have IntervalStyle, but
3050 * that's okay because its output format won't be ambiguous. So just
3051 * skip the GUC if we don't get a value for it. (We might eventually
3052 * need more complicated logic with remote-version checks here.)
3054 if (remoteVal == NULL)
3055 continue;
3058 * Avoid GUC-setting overhead if the remote and local GUCs already
3059 * have the same value.
3061 localVal = GetConfigOption(gucName, false, false);
3062 Assert(localVal != NULL);
3064 if (strcmp(remoteVal, localVal) == 0)
3065 continue;
3067 /* Create new GUC nest level if we didn't already */
3068 if (nestlevel < 0)
3069 nestlevel = NewGUCNestLevel();
3071 /* Apply the option (this will throw error on failure) */
3072 (void) set_config_option(gucName, remoteVal,
3073 PGC_USERSET, PGC_S_SESSION,
3074 GUC_ACTION_SAVE, true, 0, false);
3077 return nestlevel;
3081 * Restore local GUCs after they have been overlaid with remote settings.
3083 static void
3084 restoreLocalGucs(int nestlevel)
3086 /* Do nothing if no new nestlevel was created */
3087 if (nestlevel > 0)
3088 AtEOXact_GUC(true, nestlevel);