1 /*-------------------------------------------------------------------------
4 * foreign-data wrapper for PostgreSQL
6 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
11 *-------------------------------------------------------------------------
15 #include "catalog/pg_operator.h"
16 #include "catalog/pg_proc.h"
19 #include "mb/pg_wchar.h"
20 #include "miscadmin.h"
21 #include "nodes/nodeFuncs.h"
22 #include "nodes/makefuncs.h"
23 #include "optimizer/clauses.h"
24 #include "parser/scansup.h"
25 #include "utils/builtins.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/syscache.h"
35 * WHERE caluse optimization level
37 #define EVAL_QUAL_LOCAL 0 /* evaluate none in foreign, all in local */
38 #define EVAL_QUAL_BOTH 1 /* evaluate some in foreign, all in local */
39 #define EVAL_QUAL_FOREIGN 2 /* evaluate some in foreign, rest in local */
41 #define OPTIMIZE_WHERE_CLAUSE EVAL_QUAL_FOREIGN
43 extern Datum
postgresql_fdw_handler(PG_FUNCTION_ARGS
);
45 static FSConnection
* pgConnectServer(ForeignServer
*server
, UserMapping
*user
);
46 static void pgFreeFSConnection(FSConnection
*conn
);
47 static void pgOpen(ForeignScanState
*scanstate
);
48 static void pgIterate(ForeignScanState
*scanstate
);
49 static void pgClose(ForeignScanState
*scanstate
);
50 static void pgReOpen(ForeignScanState
*scanstate
);
52 /* deparse SQL from the request */
53 static bool is_immutable_func(Oid funcid
);
54 static bool is_foreign_qual(ExprState
*state
);
55 static bool foreign_qual_walker(Node
*node
, void *context
);
56 static char *deparseSql(ForeignScanState
*scanstate
);
57 static int flatten_deflist(List
*options
,
58 const char **keywords
, const char **values
);
59 static void check_conn_params(const char **keywords
, const char **values
);
60 static remoteConn
*createRemoteConn(PGconn
*conn
);
61 static void dblink_connstr_check(const char *connstr
);
64 * Concrete cursor for a foreign table on a PostgreSQL
66 typedef struct pgFdwReply
68 char *sql
; /* SQL text sent to foreign server */
69 Tuplestorestate
*tupstore
; /* result set */
72 FdwRoutine postgresql_fdw_routine
=
83 * return foreign-data wrapper handler object to execute foreign-data wrapper
86 PG_FUNCTION_INFO_V1(postgresql_fdw_handler
);
88 postgresql_fdw_handler(PG_FUNCTION_ARGS
)
90 PG_RETURN_POINTER(&postgresql_fdw_routine
);
94 * Connect to foreign PostgreSQL server with libpq.
97 pgConnectServer(ForeignServer
*server
, UserMapping
*user
)
100 const char **keywords
;
104 elog(DEBUG2
, "%s(%s) called", PG_FUNCNAME_MACRO
, server
->servername
);
107 * construct connection params from options of ForeignDataWrapper,
108 * ForeignServer and UserMapping. Assuming all GENERIC OPTIONS are
109 * conneciton information.
111 * TODO: allow non-connection options and ignore them during constructing
114 n
= list_length(server
->options
) + list_length(user
->options
) + 1;
115 keywords
= (const char **) palloc(sizeof(char *) * n
);
116 values
= (const char **) palloc(sizeof(char *) * n
);
118 n
+= flatten_deflist(server
->options
, keywords
+ n
, values
+ n
);
119 n
+= flatten_deflist(user
->options
, keywords
+ n
, values
+ n
);
120 keywords
[n
] = values
[n
] = NULL
;
122 /* verify connection parameters and do connect */
123 check_conn_params(keywords
, values
);
124 conn
= PQconnectdbParams(keywords
, values
, 0);
128 return (FSConnection
*) createRemoteConn(conn
);
132 * Disconnect from the foreign server and free memory.
133 * The cache entry of the hash table will be remove by executor.
136 pgFreeFSConnection(FSConnection
*conn
)
138 Assert(conn
!= NULL
);
140 elog(DEBUG2
, "%s() called", PG_FUNCNAME_MACRO
);
142 PQfinish(((remoteConn
*) conn
)->conn
);
146 * Check whether the function is IMMUTABLE.
149 is_immutable_func(Oid funcid
)
155 tp
= SearchSysCache1(PROCOID
, funcid
);
156 if (!HeapTupleIsValid(tp
))
157 elog(ERROR
, "cache lookup failed for function %u", funcid
);
160 /* print function name and its immutability */
163 datum
= SysCacheGetAttr(PROCOID
, tp
, Anum_pg_proc_proname
, &isnull
);
164 proname
= pstrdup(DatumGetName(datum
)->data
);
165 elog(DEBUG1
, "func %s(%u) is%s immutable", proname
, funcid
,
166 (DatumGetChar(datum
) == PROVOLATILE_IMMUTABLE
) ? "" : " not");
171 datum
= SysCacheGetAttr(PROCOID
, tp
, Anum_pg_proc_provolatile
, &isnull
);
174 return (DatumGetChar(datum
) == PROVOLATILE_IMMUTABLE
);
178 * Check whether the ExprState node should be evaluated in foreign server.
180 * An expression which consists of expressions below will be evaluated in
181 * the foreign server.
183 * - variable (foreign table column)
184 * - external parameter (parameter of prepared statement)
186 * - bool expression (AND/OR/NOT)
187 * - NULL test (IS [NOT] NULL)
190 * - It is required that the meaning of the operator be the same as the
191 * local server in the foreign server.
194 * - It is required that the meaning of the operator be the same as the
195 * local server in the foreign server.
196 * - scalar array operator (ANY/ALL)
199 is_foreign_qual(ExprState
*state
)
201 return !foreign_qual_walker((Node
*) state
->expr
, NULL
);
205 * return true if node cannot be evaluatated in foreign server.
208 foreign_qual_walker(Node
*node
, void *context
)
213 switch (nodeTag(node
))
216 /* TODO: pass internal parameters to the foreign server */
217 if (((Param
*) node
)->paramkind
!= PARAM_EXTERN
)
223 * An operator which uses IMMUTABLE function can be evaluated in
224 * foreign server . It is not necessary to worry about oprrest
225 * and oprjoin here because they are invoked by planner but not
226 * executor. DistinctExpr is a typedef of OpExpr.
228 if (!is_immutable_func(((OpExpr
*) node
)->opfuncid
))
231 case T_ScalarArrayOpExpr
:
232 if (!is_immutable_func(((ScalarArrayOpExpr
*) node
)->opfuncid
))
236 /* IMMUTABLE function can be evaluated in foreign server */
237 if (!is_immutable_func(((FuncExpr
*) node
)->funcid
))
241 case T_PlaceHolderVar
:
242 case T_AppendRelInfo
:
243 case T_PlaceHolderInfo
:
244 /* TODO: research whether those complex nodes are evaluatable. */
250 return expression_tree_walker(node
, foreign_qual_walker
, context
);
254 * Deparse SQL string from query request.
256 * The expressions in Plan.qual are deparsed when it satisfies is_foreign_qual()
260 deparseSql(ForeignScanState
*scanstate
)
262 EState
*estate
= scanstate
->ss
.ps
.state
;
269 char *nspname
= NULL
;
270 char *relname
= NULL
;
271 const char *nspname_q
;
272 const char *relname_q
;
273 const char *aliasname_q
;
279 elog(DEBUG2
, "%s(%u) called", __FUNCTION__
, __LINE__
);
281 /* extract ForeignScan and RangeTblEntry */
282 scan
= (ForeignScan
*)scanstate
->ss
.ps
.plan
;
283 rte
= list_nth(estate
->es_range_table
, scan
->scan
.scanrelid
- 1);
285 /* prepare to deparse plan */
286 initStringInfo(&sql
);
287 context
= deparse_context_for_planstate((Node
*)&scanstate
->ss
.ps
, NULL
,
288 estate
->es_range_table
);
291 * Scanning multiple relations in a ForeignScan node is not supported.
295 prefix
= list_length(estate
->es_range_table
) > 1;
298 /* The alias of relation is used in both SELECT clause and FROM clause. */
299 aliasname_q
= quote_identifier(rte
->eref
->aliasname
);
301 /* deparse SELECT clause */
302 appendStringInfo(&sql
, "SELECT ");
305 * TODO: omit (deparse to "NULL") columns which are not used in the
308 * We must parse nodes parents of this ForeignScan node to determine unused
309 * columns because some columns may be used only in parent Sort/Agg/Limit
312 tupdesc
= scanstate
->ss
.ss_currentRelation
->rd_att
;
314 for (i
= 0; i
< tupdesc
->natts
; i
++)
316 /* skip dropped attributes */
317 if (tupdesc
->attrs
[i
]->attisdropped
)
321 appendStringInfoString(&sql
, ", ");
324 appendStringInfo(&sql
, "%s.%s",
325 aliasname_q
, tupdesc
->attrs
[i
]->attname
.data
);
327 appendStringInfo(&sql
, "%s", tupdesc
->attrs
[i
]->attname
.data
);
331 /* if target list is composed only of system attributes, add dummy column */
333 appendStringInfo(&sql
, "NULL");
335 /* deparse FROM clause */
336 appendStringInfo(&sql
, " FROM ");
339 * If the foreign table has generic option "nspname" and/or "relname", use
340 * them in the foreign query. Otherwise, use local catalog names.
341 * Each identifier must be quoted if they are case sensitive.
343 table
= GetForeignTable(rte
->relid
);
344 foreach(lc
, table
->options
)
346 DefElem
*opt
= lfirst(lc
);
347 if (strcmp(opt
->defname
, "nspname") == 0)
348 nspname
= pstrdup(strVal(opt
->arg
));
349 else if (strcmp(opt
->defname
, "relname") == 0)
350 relname
= pstrdup(strVal(opt
->arg
));
353 nspname
= get_namespace_name(get_rel_namespace(rte
->relid
));
355 relname
= get_rel_name(rte
->relid
);
356 nspname_q
= quote_identifier(nspname
);
357 relname_q
= quote_identifier(relname
);
358 appendStringInfo(&sql
, "%s.%s %s", nspname_q
, relname_q
, aliasname_q
);
361 if (nspname_q
!= nspname
)
362 pfree((char *) nspname_q
);
363 if (relname_q
!= relname
)
364 pfree((char * ) relname_q
);
365 if (aliasname_q
!= rte
->eref
->aliasname
)
366 pfree((char *) aliasname_q
);
369 * deparse WHERE cluase
371 * The expressions which satisfy is_foreign_qual() are deparsed into WHERE
372 * clause of result SQL string, and they could be removed from qual of
373 * PlanState to avoid duplicate evaluation at ExecScan().
375 * The Plan.qual is never changed, so multiple use of the Plan with
376 * PREPARE/EXECUTE work properly.
378 #if OPTIMIZE_WHERE_CLAUSE > EVAL_QUAL_LOCAL
379 if (scanstate
->ss
.ps
.plan
->qual
)
381 List
*local_qual
= NIL
;
382 List
*foreign_qual
= NIL
;
383 List
*foreign_expr
= NIL
;
387 * Divide qual of PlanState into two lists, one for local evaluation
388 * and one for foreign evaluation.
390 foreach (lc
, scanstate
->ss
.ps
.qual
)
392 ExprState
*state
= lfirst(lc
);
394 if (is_foreign_qual(state
))
396 elog(DEBUG1
, "foreign qual: %s", nodeToString(state
->expr
));
397 foreign_qual
= lappend(foreign_qual
, state
);
398 foreign_expr
= lappend(foreign_expr
, state
->expr
);
402 elog(DEBUG1
, "local qual: %s", nodeToString(state
->expr
));
403 local_qual
= lappend(local_qual
, state
);
406 #if OPTIMIZE_WHERE_CLAUSE == EVAL_QUAL_FOREIGN
408 * If the optimization level is EVAL_QUAL_FOREIGN, replace the original
409 * qual with the list of ExprStates which should be evaluated in the
412 scanstate
->ss
.ps
.qual
= local_qual
;
416 * Deparse quals to be evaluated in the foreign server if any.
417 * TODO: modify deparse_expression() to deparse conditions which use
418 * internal parameters.
420 if (foreign_expr
!= NIL
)
423 node
= (Node
*) make_ands_explicit(foreign_expr
);
424 appendStringInfo(&sql
, " WHERE ");
425 appendStringInfo(&sql
,
426 deparse_expression(node
, context
, prefix
, false));
428 * The contents of the list MUST NOT be free-ed because they are
429 * referenced from Plan.qual list.
431 list_free(foreign_expr
);
436 elog(DEBUG1
, "deparsed SQL is \"%s\"", sql
.data
);
443 * - deparse SQL statement from ForeignScanState and EState
446 pgOpen(ForeignScanState
*scanstate
)
450 elog(DEBUG2
, "%s(%u) called", __FUNCTION__
, __LINE__
);
452 /* FWD-specific portion */
453 reply
= (pgFdwReply
*) palloc0(sizeof(*reply
));
454 reply
->sql
= deparseSql(scanstate
);
455 scanstate
->reply
= (FdwReply
*) reply
;
459 * return tuples one by one.
460 * - execute SQL statement which was deparsed in pgOpen()
462 * The all of result are fetched at once when pgIterate() is called first after
463 * pgOpen() or pgReOpen().
464 * pgIterate() takes out a tuple from tupstore with tupslot and returns it.
467 pgIterate(ForeignScanState
*scanstate
)
469 pgFdwReply
*reply
= (pgFdwReply
*) scanstate
->reply
;
470 TupleTableSlot
*slot
= scanstate
->ss
.ss_ScanTupleSlot
;
472 elog(DEBUG2
, "%s(%u) called", __FUNCTION__
, __LINE__
);
475 * Execute query with current parameters.
477 if (reply
->tupstore
== NULL
)
479 PGconn
*conn
= ((remoteConn
*) scanstate
->conn
)->conn
;
481 ParamListInfo info
= scanstate
->ss
.ps
.state
->es_param_list_info
;
482 int numParams
= info
? info
->numParams
: 0;
484 const char **values
= NULL
;
486 /* construct parameter array in text format */
487 /* TODO: omit unused parameter */
492 types
= palloc0(sizeof(Oid
) * numParams
);
493 values
= palloc0(sizeof(char *) * numParams
);
494 for (i
= 0; i
< numParams
; i
++)
496 types
[i
] = info
->params
[i
].ptype
;
497 if (info
->params
[i
].isnull
)
505 /* TODO: cache FmgrInfo to use it again after pgReOpen() */
506 /* TODO: send parameters in binary format rather than text */
507 getTypeOutputInfo(types
[i
], &out_func_oid
, &isvarlena
);
508 fmgr_info(out_func_oid
, &func
);
510 OutputFunctionCall(&func
, info
->params
[i
].value
);
516 * Execute query with the parameters.
517 * TODO: support internal parameters(PARAM_EXTERN)
518 * TODO: support cursor mode for huge result sets.
520 res
= PQexecParams(conn
, reply
->sql
,
521 numParams
, types
, values
, NULL
, NULL
, 0);
526 for (i
= 0; i
< numParams
; i
++)
527 pfree((char *) values
[i
]);
530 if (!res
|| PQresultStatus(res
) != PGRES_TUPLES_OK
)
534 errmsg("could not execute foreign query"),
535 errdetail("%s", PQerrorMessage(conn
)),
536 errhint("%s", reply
->sql
)));
539 /* Note: use PG_TRY to ensure freeing PGresult. */
542 TupleDesc tupdesc
= ExecGetScanType((ScanState
*) scanstate
);
544 /* create tuplestore to store results */
545 reply
->tupstore
= tuplestore_begin_heap(true, false, work_mem
);
547 storeResult(reply
->tupstore
, false, tupdesc
, res
);
559 /* store the next tuple into the slot from the tuplestore */
560 if (tuplestore_gettupleslot(reply
->tupstore
, true, false, slot
))
563 * Because the tuples stored in the tupstore are minimal tuples,
564 * they have to be materialized to retrieve system attributes.
566 ExecMaterializeSlot(slot
);
570 /* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
575 * Finish scanning foreign table and dispose objects used for this scan.
578 pgClose(ForeignScanState
*scanstate
)
580 pgFdwReply
*reply
= (pgFdwReply
*) scanstate
->reply
;
582 elog(DEBUG2
, "%s(%u) called", __FUNCTION__
, __LINE__
);
587 if (reply
->tupstore
!= NULL
)
588 tuplestore_end(reply
->tupstore
);
591 * reply->conn is not free-ed here because foreign connections are
592 * managed by executor, not FDW.
595 scanstate
->reply
= NULL
;
599 * Execute query with new parameter.
602 pgReOpen(ForeignScanState
*scanstate
)
604 pgFdwReply
*reply
= (pgFdwReply
*) scanstate
->reply
;
606 elog(DEBUG2
, "%s(%u) called", __FUNCTION__
, __LINE__
);
608 /* Free Tuplestore to execute query again */
609 /* TODO: reuse tupstore through the scan to avoid overhead */
612 tuplestore_end(reply
->tupstore
);
613 reply
->tupstore
= NULL
;
614 elog(DEBUG1
, "tuplestore disposed");
619 * Flattern options into keywords and values buffers.
622 flatten_deflist(List
*options
, const char **keywords
, const char **values
)
627 foreach(cell
, options
)
629 DefElem
*def
= lfirst(cell
);
631 keywords
[n
] = def
->defname
;
632 values
[n
] = strVal(def
->arg
);
639 * For non-superusers, insist that the connstr specify a password. This
640 * prevents a password from being picked up from .pgpass, a service file,
641 * the environment, etc. We don't want the postgres user's passwords
642 * to be accessible to non-superusers.
645 check_conn_params(const char **keywords
, const char **values
)
649 /* no check required if superuser */
653 /* ok if params contain a non-empty password */
654 for (i
= 0; keywords
[i
] != NULL
; i
++)
656 if (strcmp(keywords
[i
], "password") == 0 && values
[i
][0] != '\0')
661 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
662 errmsg("password is required"),
663 errdetail("Non-superusers must provide a password in the connection string.")));
667 * Returns the connection which has name `conname' and is against PostgreSQL
668 * server from connection cache.
669 * If the connection which has the name was not found, returns NULL.
670 * If the connection which has the name was found but the connection is not
671 * against PostgreSQL, quit with error.
674 getConnectionByName(const char *conname
)
682 name
= pstrdup(conname
);
683 truncate_identifier(name
, strlen(name
), true);
686 name
= UNNAMED_CONN_NAME
;
688 conn
= GetFSConnectionByName(name
, &routine
);
692 if (routine
!= &postgresql_fdw_routine
)
694 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
695 errmsg("connection \"%s\" is not a postgresql_fdw", conname
)));
697 return (remoteConn
*) conn
;
701 * Create a new connection to a PostgreSQL server and register the connection
702 * to the FDW connection cache.
703 * If the same name connection found, abort even if an unnamed connection was
707 createNewConnection(const char *conname
, const char *conninfo
)
709 ForeignServer
*server
;
713 AssertArg(conninfo
!= NULL
);
717 name
= pstrdup(conname
);
718 truncate_identifier(name
, strlen(name
), true);
721 name
= UNNAMED_CONN_NAME
;
723 /* raise an error if the same name connection found. */
724 if (GetFSConnectionByName(name
, NULL
) != NULL
)
726 (errcode(ERRCODE_DUPLICATE_OBJECT
),
727 errmsg("duplicate connection name")));
729 /* first check for valid foreign data server */
730 srvname
= pstrdup(conninfo
);
731 truncate_identifier(srvname
, strlen(srvname
), false);
732 server
= GetForeignServerByName(srvname
, true);
737 ForeignDataWrapper
*wrapper
;
739 user
= GetUserMapping(GetOuterUserId(), server
->serverid
);
740 wrapper
= GetForeignDataWrapper(server
->fdwid
);
741 if (OidIsValid(wrapper
->fdwhandler
))
745 routine
= GetFdwRoutine(wrapper
->fdwhandler
);
746 (void) ConnectToForeignServer(routine
, server
, user
, name
);
751 * For backward compatibility, assume all of the options are
752 * for postgresql connections if the fdw doesn't have a handler.
754 RegisterFSConnection(
755 pgConnectServer(server
, user
),
756 &postgresql_fdw_routine
, name
);
761 /* check password in connection string if not superuser */
762 dblink_connstr_check(conninfo
);
764 RegisterFSConnection(
765 (FSConnection
*) createRemoteConn(PQconnectdb(conninfo
)),
766 &postgresql_fdw_routine
, name
);
771 * For non-superusers, insist that the connstr specify a password. This
772 * prevents a password from being picked up from .pgpass, a service file,
773 * the environment, etc. We don't want the postgres user's passwords
774 * to be accessible to non-superusers.
777 dblink_connstr_check(const char *connstr
)
781 PQconninfoOption
*options
;
782 PQconninfoOption
*option
;
783 bool connstr_gives_password
= false;
785 options
= PQconninfoParse(connstr
, NULL
);
788 for (option
= options
; option
->keyword
!= NULL
; option
++)
790 if (strcmp(option
->keyword
, "password") == 0)
792 if (option
->val
!= NULL
&& option
->val
[0] != '\0')
794 connstr_gives_password
= true;
799 PQconninfoFree(options
);
802 if (!connstr_gives_password
)
804 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
805 errmsg("password is required"),
806 errdetail("Non-superusers must provide a password in the connection string.")));
811 * Create a remoteConn object from a PGconn.
812 * Also check that password was specified explicitly if the user was a
816 createRemoteConn(PGconn
*conn
)
820 if (PQstatus(conn
) != CONNECTION_OK
)
822 char *message
= pstrdup(PQerrorMessage(conn
));
825 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
826 errmsg("could not establish connection"),
827 errdetail("%s", message
)));
830 if (!superuser() && !PQconnectionUsedPassword(conn
))
834 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
),
835 errmsg("password is required"),
836 errdetail("Non-superuser cannot connect if the server does not request a password."),
837 errhint("Target server's authentication method must be changed.")));
840 /* attempt to set client encoding to match server encoding */
841 PQsetClientEncoding(conn
, GetDatabaseEncodingName());
843 /* Cosntruct remoteConn */
844 rconn
= MemoryContextAlloc(TopMemoryContext
, sizeof(remoteConn
));
846 rconn
->openCursorCount
= 0;
847 rconn
->newXactForCursor
= false;