Restore initdb's old behavior of always setting the lc_xxx GUCs.
[pgsql.git] / contrib / postgres_fdw / postgres_fdw.c
blobc5cada55fb732942ba586aeecefc395b646ba55c
1 /*-------------------------------------------------------------------------
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
6 * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include <limits.h>
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/inherit.h"
35 #include "optimizer/optimizer.h"
36 #include "optimizer/pathnode.h"
37 #include "optimizer/paths.h"
38 #include "optimizer/planmain.h"
39 #include "optimizer/prep.h"
40 #include "optimizer/restrictinfo.h"
41 #include "optimizer/tlist.h"
42 #include "parser/parsetree.h"
43 #include "postgres_fdw.h"
44 #include "storage/latch.h"
45 #include "utils/builtins.h"
46 #include "utils/float.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/sampling.h"
52 #include "utils/selfuncs.h"
54 PG_MODULE_MAGIC;
56 /* Default CPU cost to start up a foreign query. */
57 #define DEFAULT_FDW_STARTUP_COST 100.0
59 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
60 #define DEFAULT_FDW_TUPLE_COST 0.01
62 /* If no remote estimates, assume a sort costs 20% extra */
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
66 * Indexes of FDW-private information stored in fdw_private lists.
68 * These items are indexed with the enum FdwScanPrivateIndex, so an item
69 * can be fetched with list_nth(). For example, to get the SELECT statement:
70 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
72 enum FdwScanPrivateIndex
74 /* SQL statement to execute remotely (as a String node) */
75 FdwScanPrivateSelectSql,
76 /* Integer list of attribute numbers retrieved by the SELECT */
77 FdwScanPrivateRetrievedAttrs,
78 /* Integer representing the desired fetch_size */
79 FdwScanPrivateFetchSize,
82 * String describing join i.e. names of relations being joined and types
83 * of join, added when the scan is join
85 FdwScanPrivateRelations
89 * Similarly, this enum describes what's kept in the fdw_private list for
90 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
92 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
93 * 2) Integer list of target attribute numbers for INSERT/UPDATE
94 * (NIL for a DELETE)
95 * 3) Length till the end of VALUES clause for INSERT
96 * (-1 for a DELETE/UPDATE)
97 * 4) Boolean flag showing if the remote query has a RETURNING clause
98 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
100 enum FdwModifyPrivateIndex
102 /* SQL statement to execute remotely (as a String node) */
103 FdwModifyPrivateUpdateSql,
104 /* Integer list of target attribute numbers for INSERT/UPDATE */
105 FdwModifyPrivateTargetAttnums,
106 /* Length till the end of VALUES clause (as an Integer node) */
107 FdwModifyPrivateLen,
108 /* has-returning flag (as a Boolean node) */
109 FdwModifyPrivateHasReturning,
110 /* Integer list of attribute numbers retrieved by RETURNING */
111 FdwModifyPrivateRetrievedAttrs
115 * Similarly, this enum describes what's kept in the fdw_private list for
116 * a ForeignScan node that modifies a foreign table directly. We store:
118 * 1) UPDATE/DELETE statement text to be sent to the remote server
119 * 2) Boolean flag showing if the remote query has a RETURNING clause
120 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
121 * 4) Boolean flag showing if we set the command es_processed
123 enum FdwDirectModifyPrivateIndex
125 /* SQL statement to execute remotely (as a String node) */
126 FdwDirectModifyPrivateUpdateSql,
127 /* has-returning flag (as a Boolean node) */
128 FdwDirectModifyPrivateHasReturning,
129 /* Integer list of attribute numbers retrieved by RETURNING */
130 FdwDirectModifyPrivateRetrievedAttrs,
131 /* set-processed flag (as a Boolean node) */
132 FdwDirectModifyPrivateSetProcessed
136 * Execution state of a foreign scan using postgres_fdw.
138 typedef struct PgFdwScanState
140 Relation rel; /* relcache entry for the foreign table. NULL
141 * for a foreign join scan. */
142 TupleDesc tupdesc; /* tuple descriptor of scan */
143 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
145 /* extracted fdw_private data */
146 char *query; /* text of SELECT command */
147 List *retrieved_attrs; /* list of retrieved attribute numbers */
149 /* for remote query execution */
150 PGconn *conn; /* connection for the scan */
151 PgFdwConnState *conn_state; /* extra per-connection state */
152 unsigned int cursor_number; /* quasi-unique ID for my cursor */
153 bool cursor_exists; /* have we created the cursor? */
154 int numParams; /* number of parameters passed to query */
155 FmgrInfo *param_flinfo; /* output conversion functions for them */
156 List *param_exprs; /* executable expressions for param values */
157 const char **param_values; /* textual values of query parameters */
159 /* for storing result tuples */
160 HeapTuple *tuples; /* array of currently-retrieved tuples */
161 int num_tuples; /* # of tuples in array */
162 int next_tuple; /* index of next one to return */
164 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
165 int fetch_ct_2; /* Min(# of fetches done, 2) */
166 bool eof_reached; /* true if last fetch reached EOF */
168 /* for asynchronous execution */
169 bool async_capable; /* engage asynchronous-capable logic? */
171 /* working memory contexts */
172 MemoryContext batch_cxt; /* context holding current batch of tuples */
173 MemoryContext temp_cxt; /* context for per-tuple temporary data */
175 int fetch_size; /* number of tuples per fetch */
176 } PgFdwScanState;
179 * Execution state of a foreign insert/update/delete operation.
181 typedef struct PgFdwModifyState
183 Relation rel; /* relcache entry for the foreign table */
184 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
186 /* for remote query execution */
187 PGconn *conn; /* connection for the scan */
188 PgFdwConnState *conn_state; /* extra per-connection state */
189 char *p_name; /* name of prepared statement, if created */
191 /* extracted fdw_private data */
192 char *query; /* text of INSERT/UPDATE/DELETE command */
193 char *orig_query; /* original text of INSERT command */
194 List *target_attrs; /* list of target attribute numbers */
195 int values_end; /* length up to the end of VALUES */
196 int batch_size; /* value of FDW option "batch_size" */
197 bool has_returning; /* is there a RETURNING clause? */
198 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200 /* info about parameters for prepared statement */
201 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
202 int p_nums; /* number of parameters to transmit */
203 FmgrInfo *p_flinfo; /* output conversion functions for them */
205 /* batch operation stuff */
206 int num_slots; /* number of slots to insert */
208 /* working memory context */
209 MemoryContext temp_cxt; /* context for per-tuple temporary data */
211 /* for update row movement if subplan result rel */
212 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
213 * created */
214 } PgFdwModifyState;
217 * Execution state of a foreign scan that modifies a foreign table directly.
219 typedef struct PgFdwDirectModifyState
221 Relation rel; /* relcache entry for the foreign table */
222 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
224 /* extracted fdw_private data */
225 char *query; /* text of UPDATE/DELETE command */
226 bool has_returning; /* is there a RETURNING clause? */
227 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
228 bool set_processed; /* do we set the command es_processed? */
230 /* for remote query execution */
231 PGconn *conn; /* connection for the update */
232 PgFdwConnState *conn_state; /* extra per-connection state */
233 int numParams; /* number of parameters passed to query */
234 FmgrInfo *param_flinfo; /* output conversion functions for them */
235 List *param_exprs; /* executable expressions for param values */
236 const char **param_values; /* textual values of query parameters */
238 /* for storing result tuples */
239 PGresult *result; /* result for query */
240 int num_tuples; /* # of result tuples */
241 int next_tuple; /* index of next one to return */
242 Relation resultRel; /* relcache entry for the target relation */
243 AttrNumber *attnoMap; /* array of attnums of input user columns */
244 AttrNumber ctidAttno; /* attnum of input ctid column */
245 AttrNumber oidAttno; /* attnum of input oid column */
246 bool hasSystemCols; /* are there system columns of resultRel? */
248 /* working memory context */
249 MemoryContext temp_cxt; /* context for per-tuple temporary data */
250 } PgFdwDirectModifyState;
253 * Workspace for analyzing a foreign table.
255 typedef struct PgFdwAnalyzeState
257 Relation rel; /* relcache entry for the foreign table */
258 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
259 List *retrieved_attrs; /* attr numbers retrieved by query */
261 /* collected sample rows */
262 HeapTuple *rows; /* array of size targrows */
263 int targrows; /* target # of sample rows */
264 int numrows; /* # of sample rows collected */
266 /* for random sampling */
267 double samplerows; /* # of rows fetched */
268 double rowstoskip; /* # of rows to skip before next sample */
269 ReservoirStateData rstate; /* state for reservoir sampling */
271 /* working memory contexts */
272 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
273 MemoryContext temp_cxt; /* context for per-tuple temporary data */
274 } PgFdwAnalyzeState;
277 * This enum describes what's kept in the fdw_private list for a ForeignPath.
278 * We store:
280 * 1) Boolean flag showing if the remote query has the final sort
281 * 2) Boolean flag showing if the remote query has the LIMIT clause
283 enum FdwPathPrivateIndex
285 /* has-final-sort flag (as a Boolean node) */
286 FdwPathPrivateHasFinalSort,
287 /* has-limit flag (as a Boolean node) */
288 FdwPathPrivateHasLimit
291 /* Struct for extra information passed to estimate_path_cost_size() */
292 typedef struct
294 PathTarget *target;
295 bool has_final_sort;
296 bool has_limit;
297 double limit_tuples;
298 int64 count_est;
299 int64 offset_est;
300 } PgFdwPathExtraData;
303 * Identify the attribute where data conversion fails.
305 typedef struct ConversionLocation
307 AttrNumber cur_attno; /* attribute number being processed, or 0 */
308 Relation rel; /* foreign table being processed, or NULL */
309 ForeignScanState *fsstate; /* plan node being processed, or NULL */
310 } ConversionLocation;
312 /* Callback argument for ec_member_matches_foreign */
313 typedef struct
315 Expr *current; /* current expr, or NULL if not yet found */
316 List *already_used; /* expressions already dealt with */
317 } ec_member_foreign_arg;
320 * SQL functions
322 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
325 * FDW callback routines
327 static void postgresGetForeignRelSize(PlannerInfo *root,
328 RelOptInfo *baserel,
329 Oid foreigntableid);
330 static void postgresGetForeignPaths(PlannerInfo *root,
331 RelOptInfo *baserel,
332 Oid foreigntableid);
333 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
334 RelOptInfo *foreignrel,
335 Oid foreigntableid,
336 ForeignPath *best_path,
337 List *tlist,
338 List *scan_clauses,
339 Plan *outer_plan);
340 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
341 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
342 static void postgresReScanForeignScan(ForeignScanState *node);
343 static void postgresEndForeignScan(ForeignScanState *node);
344 static void postgresAddForeignUpdateTargets(PlannerInfo *root,
345 Index rtindex,
346 RangeTblEntry *target_rte,
347 Relation target_relation);
348 static List *postgresPlanForeignModify(PlannerInfo *root,
349 ModifyTable *plan,
350 Index resultRelation,
351 int subplan_index);
352 static void postgresBeginForeignModify(ModifyTableState *mtstate,
353 ResultRelInfo *resultRelInfo,
354 List *fdw_private,
355 int subplan_index,
356 int eflags);
357 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
358 ResultRelInfo *resultRelInfo,
359 TupleTableSlot *slot,
360 TupleTableSlot *planSlot);
361 static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
362 ResultRelInfo *resultRelInfo,
363 TupleTableSlot **slots,
364 TupleTableSlot **planSlots,
365 int *numSlots);
366 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
367 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
368 ResultRelInfo *resultRelInfo,
369 TupleTableSlot *slot,
370 TupleTableSlot *planSlot);
371 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
372 ResultRelInfo *resultRelInfo,
373 TupleTableSlot *slot,
374 TupleTableSlot *planSlot);
375 static void postgresEndForeignModify(EState *estate,
376 ResultRelInfo *resultRelInfo);
377 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
378 ResultRelInfo *resultRelInfo);
379 static void postgresEndForeignInsert(EState *estate,
380 ResultRelInfo *resultRelInfo);
381 static int postgresIsForeignRelUpdatable(Relation rel);
382 static bool postgresPlanDirectModify(PlannerInfo *root,
383 ModifyTable *plan,
384 Index resultRelation,
385 int subplan_index);
386 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
387 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
388 static void postgresEndDirectModify(ForeignScanState *node);
389 static void postgresExplainForeignScan(ForeignScanState *node,
390 ExplainState *es);
391 static void postgresExplainForeignModify(ModifyTableState *mtstate,
392 ResultRelInfo *rinfo,
393 List *fdw_private,
394 int subplan_index,
395 ExplainState *es);
396 static void postgresExplainDirectModify(ForeignScanState *node,
397 ExplainState *es);
398 static void postgresExecForeignTruncate(List *rels,
399 DropBehavior behavior,
400 bool restart_seqs);
401 static bool postgresAnalyzeForeignTable(Relation relation,
402 AcquireSampleRowsFunc *func,
403 BlockNumber *totalpages);
404 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
405 Oid serverOid);
406 static void postgresGetForeignJoinPaths(PlannerInfo *root,
407 RelOptInfo *joinrel,
408 RelOptInfo *outerrel,
409 RelOptInfo *innerrel,
410 JoinType jointype,
411 JoinPathExtraData *extra);
412 static bool postgresRecheckForeignScan(ForeignScanState *node,
413 TupleTableSlot *slot);
414 static void postgresGetForeignUpperPaths(PlannerInfo *root,
415 UpperRelationKind stage,
416 RelOptInfo *input_rel,
417 RelOptInfo *output_rel,
418 void *extra);
419 static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
420 static void postgresForeignAsyncRequest(AsyncRequest *areq);
421 static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
422 static void postgresForeignAsyncNotify(AsyncRequest *areq);
425 * Helper functions
427 static void estimate_path_cost_size(PlannerInfo *root,
428 RelOptInfo *foreignrel,
429 List *param_join_conds,
430 List *pathkeys,
431 PgFdwPathExtraData *fpextra,
432 double *p_rows, int *p_width,
433 Cost *p_startup_cost, Cost *p_total_cost);
434 static void get_remote_estimate(const char *sql,
435 PGconn *conn,
436 double *rows,
437 int *width,
438 Cost *startup_cost,
439 Cost *total_cost);
440 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
441 List *pathkeys,
442 double retrieved_rows,
443 double width,
444 double limit_tuples,
445 Cost *p_startup_cost,
446 Cost *p_run_cost);
447 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
448 EquivalenceClass *ec, EquivalenceMember *em,
449 void *arg);
450 static void create_cursor(ForeignScanState *node);
451 static void fetch_more_data(ForeignScanState *node);
452 static void close_cursor(PGconn *conn, unsigned int cursor_number,
453 PgFdwConnState *conn_state);
454 static PgFdwModifyState *create_foreign_modify(EState *estate,
455 RangeTblEntry *rte,
456 ResultRelInfo *resultRelInfo,
457 CmdType operation,
458 Plan *subplan,
459 char *query,
460 List *target_attrs,
461 int values_end,
462 bool has_returning,
463 List *retrieved_attrs);
464 static TupleTableSlot **execute_foreign_modify(EState *estate,
465 ResultRelInfo *resultRelInfo,
466 CmdType operation,
467 TupleTableSlot **slots,
468 TupleTableSlot **planSlots,
469 int *numSlots);
470 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
471 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
472 ItemPointer tupleid,
473 TupleTableSlot **slots,
474 int numSlots);
475 static void store_returning_result(PgFdwModifyState *fmstate,
476 TupleTableSlot *slot, PGresult *res);
477 static void finish_foreign_modify(PgFdwModifyState *fmstate);
478 static void deallocate_query(PgFdwModifyState *fmstate);
479 static List *build_remote_returning(Index rtindex, Relation rel,
480 List *returningList);
481 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
482 static void execute_dml_stmt(ForeignScanState *node);
483 static TupleTableSlot *get_returning_data(ForeignScanState *node);
484 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
485 List *fdw_scan_tlist,
486 Index rtindex);
487 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
488 ResultRelInfo *resultRelInfo,
489 TupleTableSlot *slot,
490 EState *estate);
491 static void prepare_query_params(PlanState *node,
492 List *fdw_exprs,
493 int numParams,
494 FmgrInfo **param_flinfo,
495 List **param_exprs,
496 const char ***param_values);
497 static void process_query_params(ExprContext *econtext,
498 FmgrInfo *param_flinfo,
499 List *param_exprs,
500 const char **param_values);
501 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
502 HeapTuple *rows, int targrows,
503 double *totalrows,
504 double *totaldeadrows);
505 static void analyze_row_processor(PGresult *res, int row,
506 PgFdwAnalyzeState *astate);
507 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
508 static void fetch_more_data_begin(AsyncRequest *areq);
509 static void complete_pending_request(AsyncRequest *areq);
510 static HeapTuple make_tuple_from_result_row(PGresult *res,
511 int row,
512 Relation rel,
513 AttInMetadata *attinmeta,
514 List *retrieved_attrs,
515 ForeignScanState *fsstate,
516 MemoryContext temp_context);
517 static void conversion_error_callback(void *arg);
518 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
519 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
520 JoinPathExtraData *extra);
521 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
522 Node *havingQual);
523 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
524 RelOptInfo *rel);
525 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
526 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
527 Path *epq_path);
528 static void add_foreign_grouping_paths(PlannerInfo *root,
529 RelOptInfo *input_rel,
530 RelOptInfo *grouped_rel,
531 GroupPathExtraData *extra);
532 static void add_foreign_ordered_paths(PlannerInfo *root,
533 RelOptInfo *input_rel,
534 RelOptInfo *ordered_rel);
535 static void add_foreign_final_paths(PlannerInfo *root,
536 RelOptInfo *input_rel,
537 RelOptInfo *final_rel,
538 FinalPathExtraData *extra);
539 static void apply_server_options(PgFdwRelationInfo *fpinfo);
540 static void apply_table_options(PgFdwRelationInfo *fpinfo);
541 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
542 const PgFdwRelationInfo *fpinfo_o,
543 const PgFdwRelationInfo *fpinfo_i);
544 static int get_batch_size_option(Relation rel);
548 * Foreign-data wrapper handler function: return a struct with pointers
549 * to my callback routines.
551 Datum
552 postgres_fdw_handler(PG_FUNCTION_ARGS)
554 FdwRoutine *routine = makeNode(FdwRoutine);
556 /* Functions for scanning foreign tables */
557 routine->GetForeignRelSize = postgresGetForeignRelSize;
558 routine->GetForeignPaths = postgresGetForeignPaths;
559 routine->GetForeignPlan = postgresGetForeignPlan;
560 routine->BeginForeignScan = postgresBeginForeignScan;
561 routine->IterateForeignScan = postgresIterateForeignScan;
562 routine->ReScanForeignScan = postgresReScanForeignScan;
563 routine->EndForeignScan = postgresEndForeignScan;
565 /* Functions for updating foreign tables */
566 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
567 routine->PlanForeignModify = postgresPlanForeignModify;
568 routine->BeginForeignModify = postgresBeginForeignModify;
569 routine->ExecForeignInsert = postgresExecForeignInsert;
570 routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
571 routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize;
572 routine->ExecForeignUpdate = postgresExecForeignUpdate;
573 routine->ExecForeignDelete = postgresExecForeignDelete;
574 routine->EndForeignModify = postgresEndForeignModify;
575 routine->BeginForeignInsert = postgresBeginForeignInsert;
576 routine->EndForeignInsert = postgresEndForeignInsert;
577 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
578 routine->PlanDirectModify = postgresPlanDirectModify;
579 routine->BeginDirectModify = postgresBeginDirectModify;
580 routine->IterateDirectModify = postgresIterateDirectModify;
581 routine->EndDirectModify = postgresEndDirectModify;
583 /* Function for EvalPlanQual rechecks */
584 routine->RecheckForeignScan = postgresRecheckForeignScan;
585 /* Support functions for EXPLAIN */
586 routine->ExplainForeignScan = postgresExplainForeignScan;
587 routine->ExplainForeignModify = postgresExplainForeignModify;
588 routine->ExplainDirectModify = postgresExplainDirectModify;
590 /* Support function for TRUNCATE */
591 routine->ExecForeignTruncate = postgresExecForeignTruncate;
593 /* Support functions for ANALYZE */
594 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
596 /* Support functions for IMPORT FOREIGN SCHEMA */
597 routine->ImportForeignSchema = postgresImportForeignSchema;
599 /* Support functions for join push-down */
600 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
602 /* Support functions for upper relation push-down */
603 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
605 /* Support functions for asynchronous execution */
606 routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
607 routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
608 routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
609 routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
611 PG_RETURN_POINTER(routine);
615 * postgresGetForeignRelSize
616 * Estimate # of rows and width of the result of the scan
618 * We should consider the effect of all baserestrictinfo clauses here, but
619 * not any join clauses.
621 static void
622 postgresGetForeignRelSize(PlannerInfo *root,
623 RelOptInfo *baserel,
624 Oid foreigntableid)
626 PgFdwRelationInfo *fpinfo;
627 ListCell *lc;
630 * We use PgFdwRelationInfo to pass various information to subsequent
631 * functions.
633 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
634 baserel->fdw_private = (void *) fpinfo;
636 /* Base foreign tables need to be pushed down always. */
637 fpinfo->pushdown_safe = true;
639 /* Look up foreign-table catalog info. */
640 fpinfo->table = GetForeignTable(foreigntableid);
641 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
644 * Extract user-settable option values. Note that per-table settings of
645 * use_remote_estimate, fetch_size and async_capable override per-server
646 * settings of them, respectively.
648 fpinfo->use_remote_estimate = false;
649 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
650 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
651 fpinfo->shippable_extensions = NIL;
652 fpinfo->fetch_size = 100;
653 fpinfo->async_capable = false;
655 apply_server_options(fpinfo);
656 apply_table_options(fpinfo);
659 * If the table or the server is configured to use remote estimates,
660 * identify which user to do remote access as during planning. This
661 * should match what ExecCheckPermissions() does. If we fail due to lack
662 * of permissions, the query would have failed at runtime anyway.
664 if (fpinfo->use_remote_estimate)
666 Oid userid;
668 userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
669 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
671 else
672 fpinfo->user = NULL;
675 * Identify which baserestrictinfo clauses can be sent to the remote
676 * server and which can't.
678 classifyConditions(root, baserel, baserel->baserestrictinfo,
679 &fpinfo->remote_conds, &fpinfo->local_conds);
682 * Identify which attributes will need to be retrieved from the remote
683 * server. These include all attrs needed for joins or final output, plus
684 * all attrs used in the local_conds. (Note: if we end up using a
685 * parameterized scan, it's possible that some of the join clauses will be
686 * sent to the remote and thus we wouldn't really need to retrieve the
687 * columns used in them. Doesn't seem worth detecting that case though.)
689 fpinfo->attrs_used = NULL;
690 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
691 &fpinfo->attrs_used);
692 foreach(lc, fpinfo->local_conds)
694 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
696 pull_varattnos((Node *) rinfo->clause, baserel->relid,
697 &fpinfo->attrs_used);
701 * Compute the selectivity and cost of the local_conds, so we don't have
702 * to do it over again for each path. The best we can do for these
703 * conditions is to estimate selectivity on the basis of local statistics.
705 fpinfo->local_conds_sel = clauselist_selectivity(root,
706 fpinfo->local_conds,
707 baserel->relid,
708 JOIN_INNER,
709 NULL);
711 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
714 * Set # of retrieved rows and cached relation costs to some negative
715 * value, so that we can detect when they are set to some sensible values,
716 * during one (usually the first) of the calls to estimate_path_cost_size.
718 fpinfo->retrieved_rows = -1;
719 fpinfo->rel_startup_cost = -1;
720 fpinfo->rel_total_cost = -1;
723 * If the table or the server is configured to use remote estimates,
724 * connect to the foreign server and execute EXPLAIN to estimate the
725 * number of rows selected by the restriction clauses, as well as the
726 * average row width. Otherwise, estimate using whatever statistics we
727 * have locally, in a way similar to ordinary tables.
729 if (fpinfo->use_remote_estimate)
732 * Get cost/size estimates with help of remote server. Save the
733 * values in fpinfo so we don't need to do it again to generate the
734 * basic foreign path.
736 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
737 &fpinfo->rows, &fpinfo->width,
738 &fpinfo->startup_cost, &fpinfo->total_cost);
740 /* Report estimated baserel size to planner. */
741 baserel->rows = fpinfo->rows;
742 baserel->reltarget->width = fpinfo->width;
744 else
747 * If the foreign table has never been ANALYZEd, it will have
748 * reltuples < 0, meaning "unknown". We can't do much if we're not
749 * allowed to consult the remote server, but we can use a hack similar
750 * to plancat.c's treatment of empty relations: use a minimum size
751 * estimate of 10 pages, and divide by the column-datatype-based width
752 * estimate to get the corresponding number of tuples.
754 if (baserel->tuples < 0)
756 baserel->pages = 10;
757 baserel->tuples =
758 (10 * BLCKSZ) / (baserel->reltarget->width +
759 MAXALIGN(SizeofHeapTupleHeader));
762 /* Estimate baserel size as best we can with local statistics. */
763 set_baserel_size_estimates(root, baserel);
765 /* Fill in basically-bogus cost estimates for use later. */
766 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
767 &fpinfo->rows, &fpinfo->width,
768 &fpinfo->startup_cost, &fpinfo->total_cost);
772 * fpinfo->relation_name gets the numeric rangetable index of the foreign
773 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
774 * human-readable string at that time.)
776 fpinfo->relation_name = psprintf("%u", baserel->relid);
778 /* No outer and inner relations. */
779 fpinfo->make_outerrel_subquery = false;
780 fpinfo->make_innerrel_subquery = false;
781 fpinfo->lower_subquery_rels = NULL;
782 /* Set the relation index. */
783 fpinfo->relation_index = baserel->relid;
787 * get_useful_ecs_for_relation
788 * Determine which EquivalenceClasses might be involved in useful
789 * orderings of this relation.
791 * This function is in some respects a mirror image of the core function
792 * pathkeys_useful_for_merging: for a regular table, we know what indexes
793 * we have and want to test whether any of them are useful. For a foreign
794 * table, we don't know what indexes are present on the remote side but
795 * want to speculate about which ones we'd like to use if they existed.
797 * This function returns a list of potentially-useful equivalence classes,
798 * but it does not guarantee that an EquivalenceMember exists which contains
799 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
800 * ft1.x + t1.x = 0, this function will say that the equivalence class
801 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
802 * t1 is local (or on a different server), it will turn out that no useful
803 * ORDER BY clause can be generated. It's not our job to figure that out
804 * here; we're only interested in identifying relevant ECs.
806 static List *
807 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
809 List *useful_eclass_list = NIL;
810 ListCell *lc;
811 Relids relids;
814 * First, consider whether any active EC is potentially useful for a merge
815 * join against this relation.
817 if (rel->has_eclass_joins)
819 foreach(lc, root->eq_classes)
821 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
823 if (eclass_useful_for_merging(root, cur_ec, rel))
824 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
829 * Next, consider whether there are any non-EC derivable join clauses that
830 * are merge-joinable. If the joininfo list is empty, we can exit
831 * quickly.
833 if (rel->joininfo == NIL)
834 return useful_eclass_list;
836 /* If this is a child rel, we must use the topmost parent rel to search. */
837 if (IS_OTHER_REL(rel))
839 Assert(!bms_is_empty(rel->top_parent_relids));
840 relids = rel->top_parent_relids;
842 else
843 relids = rel->relids;
845 /* Check each join clause in turn. */
846 foreach(lc, rel->joininfo)
848 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
850 /* Consider only mergejoinable clauses */
851 if (restrictinfo->mergeopfamilies == NIL)
852 continue;
854 /* Make sure we've got canonical ECs. */
855 update_mergeclause_eclasses(root, restrictinfo);
858 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
859 * that left_ec and right_ec will be initialized, per comments in
860 * distribute_qual_to_rels.
862 * We want to identify which side of this merge-joinable clause
863 * contains columns from the relation produced by this RelOptInfo. We
864 * test for overlap, not containment, because there could be extra
865 * relations on either side. For example, suppose we've got something
866 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
867 * A.y = D.y. The input rel might be the joinrel between A and B, and
868 * we'll consider the join clause A.y = D.y. relids contains a
869 * relation not involved in the join class (B) and the equivalence
870 * class for the left-hand side of the clause contains a relation not
871 * involved in the input rel (C). Despite the fact that we have only
872 * overlap and not containment in either direction, A.y is potentially
873 * useful as a sort column.
875 * Note that it's even possible that relids overlaps neither side of
876 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
877 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
878 * but overlaps neither side of B. In that case, we just skip this
879 * join clause, since it doesn't suggest a useful sort order for this
880 * relation.
882 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
883 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
884 restrictinfo->right_ec);
885 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
886 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
887 restrictinfo->left_ec);
890 return useful_eclass_list;
894 * get_useful_pathkeys_for_relation
895 * Determine which orderings of a relation might be useful.
897 * Getting data in sorted order can be useful either because the requested
898 * order matches the final output ordering for the overall query we're
899 * planning, or because it enables an efficient merge join. Here, we try
900 * to figure out which pathkeys to consider.
902 static List *
903 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
905 List *useful_pathkeys_list = NIL;
906 List *useful_eclass_list;
907 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
908 EquivalenceClass *query_ec = NULL;
909 ListCell *lc;
912 * Pushing the query_pathkeys to the remote server is always worth
913 * considering, because it might let us avoid a local sort.
915 fpinfo->qp_is_pushdown_safe = false;
916 if (root->query_pathkeys)
918 bool query_pathkeys_ok = true;
920 foreach(lc, root->query_pathkeys)
922 PathKey *pathkey = (PathKey *) lfirst(lc);
925 * The planner and executor don't have any clever strategy for
926 * taking data sorted by a prefix of the query's pathkeys and
927 * getting it to be sorted by all of those pathkeys. We'll just
928 * end up resorting the entire data set. So, unless we can push
929 * down all of the query pathkeys, forget it.
931 if (!is_foreign_pathkey(root, rel, pathkey))
933 query_pathkeys_ok = false;
934 break;
938 if (query_pathkeys_ok)
940 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
941 fpinfo->qp_is_pushdown_safe = true;
946 * Even if we're not using remote estimates, having the remote side do the
947 * sort generally won't be any worse than doing it locally, and it might
948 * be much better if the remote side can generate data in the right order
949 * without needing a sort at all. However, what we're going to do next is
950 * try to generate pathkeys that seem promising for possible merge joins,
951 * and that's more speculative. A wrong choice might hurt quite a bit, so
952 * bail out if we can't use remote estimates.
954 if (!fpinfo->use_remote_estimate)
955 return useful_pathkeys_list;
957 /* Get the list of interesting EquivalenceClasses. */
958 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
960 /* Extract unique EC for query, if any, so we don't consider it again. */
961 if (list_length(root->query_pathkeys) == 1)
963 PathKey *query_pathkey = linitial(root->query_pathkeys);
965 query_ec = query_pathkey->pk_eclass;
969 * As a heuristic, the only pathkeys we consider here are those of length
970 * one. It's surely possible to consider more, but since each one we
971 * choose to consider will generate a round-trip to the remote side, we
972 * need to be a bit cautious here. It would sure be nice to have a local
973 * cache of information about remote index definitions...
975 foreach(lc, useful_eclass_list)
977 EquivalenceClass *cur_ec = lfirst(lc);
978 PathKey *pathkey;
980 /* If redundant with what we did above, skip it. */
981 if (cur_ec == query_ec)
982 continue;
984 /* Can't push down the sort if the EC's opfamily is not shippable. */
985 if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies),
986 OperatorFamilyRelationId, fpinfo))
987 continue;
989 /* If no pushable expression for this rel, skip it. */
990 if (find_em_for_rel(root, cur_ec, rel) == NULL)
991 continue;
993 /* Looks like we can generate a pathkey, so let's do it. */
994 pathkey = make_canonical_pathkey(root, cur_ec,
995 linitial_oid(cur_ec->ec_opfamilies),
996 BTLessStrategyNumber,
997 false);
998 useful_pathkeys_list = lappend(useful_pathkeys_list,
999 list_make1(pathkey));
1002 return useful_pathkeys_list;
1006 * postgresGetForeignPaths
1007 * Create possible scan paths for a scan on the foreign table
1009 static void
1010 postgresGetForeignPaths(PlannerInfo *root,
1011 RelOptInfo *baserel,
1012 Oid foreigntableid)
1014 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1015 ForeignPath *path;
1016 List *ppi_list;
1017 ListCell *lc;
1020 * Create simplest ForeignScan path node and add it to baserel. This path
1021 * corresponds to SeqScan path of regular tables (though depending on what
1022 * baserestrict conditions we were able to send to remote, there might
1023 * actually be an indexscan happening there). We already did all the work
1024 * to estimate cost and size of this path.
1026 * Although this path uses no join clauses, it could still have required
1027 * parameterization due to LATERAL refs in its tlist.
1029 path = create_foreignscan_path(root, baserel,
1030 NULL, /* default pathtarget */
1031 fpinfo->rows,
1032 fpinfo->startup_cost,
1033 fpinfo->total_cost,
1034 NIL, /* no pathkeys */
1035 baserel->lateral_relids,
1036 NULL, /* no extra plan */
1037 NIL); /* no fdw_private list */
1038 add_path(baserel, (Path *) path);
1040 /* Add paths with pathkeys */
1041 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1044 * If we're not using remote estimates, stop here. We have no way to
1045 * estimate whether any join clauses would be worth sending across, so
1046 * don't bother building parameterized paths.
1048 if (!fpinfo->use_remote_estimate)
1049 return;
1052 * Thumb through all join clauses for the rel to identify which outer
1053 * relations could supply one or more safe-to-send-to-remote join clauses.
1054 * We'll build a parameterized path for each such outer relation.
1056 * It's convenient to manage this by representing each candidate outer
1057 * relation by the ParamPathInfo node for it. We can then use the
1058 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1059 * interesting join clauses for that rel. This takes care of the
1060 * possibility that there are multiple safe join clauses for such a rel,
1061 * and also ensures that we account for unsafe join clauses that we'll
1062 * still have to enforce locally (since the parameterized-path machinery
1063 * insists that we handle all movable clauses).
1065 ppi_list = NIL;
1066 foreach(lc, baserel->joininfo)
1068 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1069 Relids required_outer;
1070 ParamPathInfo *param_info;
1072 /* Check if clause can be moved to this rel */
1073 if (!join_clause_is_movable_to(rinfo, baserel))
1074 continue;
1076 /* See if it is safe to send to remote */
1077 if (!is_foreign_expr(root, baserel, rinfo->clause))
1078 continue;
1080 /* Calculate required outer rels for the resulting path */
1081 required_outer = bms_union(rinfo->clause_relids,
1082 baserel->lateral_relids);
1083 /* We do not want the foreign rel itself listed in required_outer */
1084 required_outer = bms_del_member(required_outer, baserel->relid);
1087 * required_outer probably can't be empty here, but if it were, we
1088 * couldn't make a parameterized path.
1090 if (bms_is_empty(required_outer))
1091 continue;
1093 /* Get the ParamPathInfo */
1094 param_info = get_baserel_parampathinfo(root, baserel,
1095 required_outer);
1096 Assert(param_info != NULL);
1099 * Add it to list unless we already have it. Testing pointer equality
1100 * is OK since get_baserel_parampathinfo won't make duplicates.
1102 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1106 * The above scan examined only "generic" join clauses, not those that
1107 * were absorbed into EquivalenceClauses. See if we can make anything out
1108 * of EquivalenceClauses.
1110 if (baserel->has_eclass_joins)
1113 * We repeatedly scan the eclass list looking for column references
1114 * (or expressions) belonging to the foreign rel. Each time we find
1115 * one, we generate a list of equivalence joinclauses for it, and then
1116 * see if any are safe to send to the remote. Repeat till there are
1117 * no more candidate EC members.
1119 ec_member_foreign_arg arg;
1121 arg.already_used = NIL;
1122 for (;;)
1124 List *clauses;
1126 /* Make clauses, skipping any that join to lateral_referencers */
1127 arg.current = NULL;
1128 clauses = generate_implied_equalities_for_column(root,
1129 baserel,
1130 ec_member_matches_foreign,
1131 (void *) &arg,
1132 baserel->lateral_referencers);
1134 /* Done if there are no more expressions in the foreign rel */
1135 if (arg.current == NULL)
1137 Assert(clauses == NIL);
1138 break;
1141 /* Scan the extracted join clauses */
1142 foreach(lc, clauses)
1144 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1145 Relids required_outer;
1146 ParamPathInfo *param_info;
1148 /* Check if clause can be moved to this rel */
1149 if (!join_clause_is_movable_to(rinfo, baserel))
1150 continue;
1152 /* See if it is safe to send to remote */
1153 if (!is_foreign_expr(root, baserel, rinfo->clause))
1154 continue;
1156 /* Calculate required outer rels for the resulting path */
1157 required_outer = bms_union(rinfo->clause_relids,
1158 baserel->lateral_relids);
1159 required_outer = bms_del_member(required_outer, baserel->relid);
1160 if (bms_is_empty(required_outer))
1161 continue;
1163 /* Get the ParamPathInfo */
1164 param_info = get_baserel_parampathinfo(root, baserel,
1165 required_outer);
1166 Assert(param_info != NULL);
1168 /* Add it to list unless we already have it */
1169 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1172 /* Try again, now ignoring the expression we found this time */
1173 arg.already_used = lappend(arg.already_used, arg.current);
1178 * Now build a path for each useful outer relation.
1180 foreach(lc, ppi_list)
1182 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1183 double rows;
1184 int width;
1185 Cost startup_cost;
1186 Cost total_cost;
1188 /* Get a cost estimate from the remote */
1189 estimate_path_cost_size(root, baserel,
1190 param_info->ppi_clauses, NIL, NULL,
1191 &rows, &width,
1192 &startup_cost, &total_cost);
1195 * ppi_rows currently won't get looked at by anything, but still we
1196 * may as well ensure that it matches our idea of the rowcount.
1198 param_info->ppi_rows = rows;
1200 /* Make the path */
1201 path = create_foreignscan_path(root, baserel,
1202 NULL, /* default pathtarget */
1203 rows,
1204 startup_cost,
1205 total_cost,
1206 NIL, /* no pathkeys */
1207 param_info->ppi_req_outer,
1208 NULL,
1209 NIL); /* no fdw_private list */
1210 add_path(baserel, (Path *) path);
1215 * postgresGetForeignPlan
1216 * Create ForeignScan plan node which implements selected best path
1218 static ForeignScan *
1219 postgresGetForeignPlan(PlannerInfo *root,
1220 RelOptInfo *foreignrel,
1221 Oid foreigntableid,
1222 ForeignPath *best_path,
1223 List *tlist,
1224 List *scan_clauses,
1225 Plan *outer_plan)
1227 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1228 Index scan_relid;
1229 List *fdw_private;
1230 List *remote_exprs = NIL;
1231 List *local_exprs = NIL;
1232 List *params_list = NIL;
1233 List *fdw_scan_tlist = NIL;
1234 List *fdw_recheck_quals = NIL;
1235 List *retrieved_attrs;
1236 StringInfoData sql;
1237 bool has_final_sort = false;
1238 bool has_limit = false;
1239 ListCell *lc;
1242 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1244 if (best_path->fdw_private)
1246 has_final_sort = boolVal(list_nth(best_path->fdw_private,
1247 FdwPathPrivateHasFinalSort));
1248 has_limit = boolVal(list_nth(best_path->fdw_private,
1249 FdwPathPrivateHasLimit));
1252 if (IS_SIMPLE_REL(foreignrel))
1255 * For base relations, set scan_relid as the relid of the relation.
1257 scan_relid = foreignrel->relid;
1260 * In a base-relation scan, we must apply the given scan_clauses.
1262 * Separate the scan_clauses into those that can be executed remotely
1263 * and those that can't. baserestrictinfo clauses that were
1264 * previously determined to be safe or unsafe by classifyConditions
1265 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1266 * else in the scan_clauses list will be a join clause, which we have
1267 * to check for remote-safety.
1269 * Note: the join clauses we see here should be the exact same ones
1270 * previously examined by postgresGetForeignPaths. Possibly it'd be
1271 * worth passing forward the classification work done then, rather
1272 * than repeating it here.
1274 * This code must match "extract_actual_clauses(scan_clauses, false)"
1275 * except for the additional decision about remote versus local
1276 * execution.
1278 foreach(lc, scan_clauses)
1280 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1282 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1283 if (rinfo->pseudoconstant)
1284 continue;
1286 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1287 remote_exprs = lappend(remote_exprs, rinfo->clause);
1288 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1289 local_exprs = lappend(local_exprs, rinfo->clause);
1290 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1291 remote_exprs = lappend(remote_exprs, rinfo->clause);
1292 else
1293 local_exprs = lappend(local_exprs, rinfo->clause);
1297 * For a base-relation scan, we have to support EPQ recheck, which
1298 * should recheck all the remote quals.
1300 fdw_recheck_quals = remote_exprs;
1302 else
1305 * Join relation or upper relation - set scan_relid to 0.
1307 scan_relid = 0;
1310 * For a join rel, baserestrictinfo is NIL and we are not considering
1311 * parameterization right now, so there should be no scan_clauses for
1312 * a joinrel or an upper rel either.
1314 Assert(!scan_clauses);
1317 * Instead we get the conditions to apply from the fdw_private
1318 * structure.
1320 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1321 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1324 * We leave fdw_recheck_quals empty in this case, since we never need
1325 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1326 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1327 * If we're planning an upperrel (ie, remote grouping or aggregation)
1328 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1329 * allowed, and indeed we *can't* put the remote clauses into
1330 * fdw_recheck_quals because the unaggregated Vars won't be available
1331 * locally.
1334 /* Build the list of columns to be fetched from the foreign server. */
1335 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1338 * Ensure that the outer plan produces a tuple whose descriptor
1339 * matches our scan tuple slot. Also, remove the local conditions
1340 * from outer plan's quals, lest they be evaluated twice, once by the
1341 * local plan and once by the scan.
1343 if (outer_plan)
1346 * Right now, we only consider grouping and aggregation beyond
1347 * joins. Queries involving aggregates or grouping do not require
1348 * EPQ mechanism, hence should not have an outer plan here.
1350 Assert(!IS_UPPER_REL(foreignrel));
1353 * First, update the plan's qual list if possible. In some cases
1354 * the quals might be enforced below the topmost plan level, in
1355 * which case we'll fail to remove them; it's not worth working
1356 * harder than this.
1358 foreach(lc, local_exprs)
1360 Node *qual = lfirst(lc);
1362 outer_plan->qual = list_delete(outer_plan->qual, qual);
1365 * For an inner join the local conditions of foreign scan plan
1366 * can be part of the joinquals as well. (They might also be
1367 * in the mergequals or hashquals, but we can't touch those
1368 * without breaking the plan.)
1370 if (IsA(outer_plan, NestLoop) ||
1371 IsA(outer_plan, MergeJoin) ||
1372 IsA(outer_plan, HashJoin))
1374 Join *join_plan = (Join *) outer_plan;
1376 if (join_plan->jointype == JOIN_INNER)
1377 join_plan->joinqual = list_delete(join_plan->joinqual,
1378 qual);
1383 * Now fix the subplan's tlist --- this might result in inserting
1384 * a Result node atop the plan tree.
1386 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1387 best_path->path.parallel_safe);
1392 * Build the query string to be sent for execution, and identify
1393 * expressions to be sent as parameters.
1395 initStringInfo(&sql);
1396 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1397 remote_exprs, best_path->path.pathkeys,
1398 has_final_sort, has_limit, false,
1399 &retrieved_attrs, &params_list);
1401 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1402 fpinfo->final_remote_exprs = remote_exprs;
1405 * Build the fdw_private list that will be available to the executor.
1406 * Items in the list must match order in enum FdwScanPrivateIndex.
1408 fdw_private = list_make3(makeString(sql.data),
1409 retrieved_attrs,
1410 makeInteger(fpinfo->fetch_size));
1411 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1412 fdw_private = lappend(fdw_private,
1413 makeString(fpinfo->relation_name));
1416 * Create the ForeignScan node for the given relation.
1418 * Note that the remote parameter expressions are stored in the fdw_exprs
1419 * field of the finished plan node; we can't keep them in private state
1420 * because then they wouldn't be subject to later planner processing.
1422 return make_foreignscan(tlist,
1423 local_exprs,
1424 scan_relid,
1425 params_list,
1426 fdw_private,
1427 fdw_scan_tlist,
1428 fdw_recheck_quals,
1429 outer_plan);
1433 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1435 static TupleDesc
1436 get_tupdesc_for_join_scan_tuples(ForeignScanState *node)
1438 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1439 EState *estate = node->ss.ps.state;
1440 TupleDesc tupdesc;
1443 * The core code has already set up a scan tuple slot based on
1444 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1445 * but there's one case where it isn't. If we have any whole-row row
1446 * identifier Vars, they may have vartype RECORD, and we need to replace
1447 * that with the associated table's actual composite type. This ensures
1448 * that when we read those ROW() expression values from the remote server,
1449 * we can convert them to a composite type the local server knows.
1451 tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1452 for (int i = 0; i < tupdesc->natts; i++)
1454 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1455 Var *var;
1456 RangeTblEntry *rte;
1457 Oid reltype;
1459 /* Nothing to do if it's not a generic RECORD attribute */
1460 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1461 continue;
1464 * If we can't identify the referenced table, do nothing. This'll
1465 * likely lead to failure later, but perhaps we can muddle through.
1467 var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1468 i)->expr;
1469 if (!IsA(var, Var) || var->varattno != 0)
1470 continue;
1471 rte = list_nth(estate->es_range_table, var->varno - 1);
1472 if (rte->rtekind != RTE_RELATION)
1473 continue;
1474 reltype = get_rel_type_id(rte->relid);
1475 if (!OidIsValid(reltype))
1476 continue;
1477 att->atttypid = reltype;
1478 /* shouldn't need to change anything else */
1480 return tupdesc;
1484 * postgresBeginForeignScan
1485 * Initiate an executor scan of a foreign PostgreSQL table.
1487 static void
1488 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1490 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1491 EState *estate = node->ss.ps.state;
1492 PgFdwScanState *fsstate;
1493 RangeTblEntry *rte;
1494 Oid userid;
1495 ForeignTable *table;
1496 UserMapping *user;
1497 int rtindex;
1498 int numParams;
1501 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1503 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1504 return;
1507 * We'll save private state in node->fdw_state.
1509 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1510 node->fdw_state = (void *) fsstate;
1513 * Identify which user to do the remote access as. This should match what
1514 * ExecCheckPermissions() does.
1516 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1517 if (fsplan->scan.scanrelid > 0)
1518 rtindex = fsplan->scan.scanrelid;
1519 else
1520 rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1521 rte = exec_rt_fetch(rtindex, estate);
1523 /* Get info about foreign table. */
1524 table = GetForeignTable(rte->relid);
1525 user = GetUserMapping(userid, table->serverid);
1528 * Get connection to the foreign server. Connection manager will
1529 * establish new connection if necessary.
1531 fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1533 /* Assign a unique ID for my cursor */
1534 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1535 fsstate->cursor_exists = false;
1537 /* Get private info created by planner functions. */
1538 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1539 FdwScanPrivateSelectSql));
1540 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1541 FdwScanPrivateRetrievedAttrs);
1542 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1543 FdwScanPrivateFetchSize));
1545 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1546 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1547 "postgres_fdw tuple data",
1548 ALLOCSET_DEFAULT_SIZES);
1549 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1550 "postgres_fdw temporary data",
1551 ALLOCSET_SMALL_SIZES);
1554 * Get info we'll need for converting data fetched from the foreign server
1555 * into local representation and error reporting during that process.
1557 if (fsplan->scan.scanrelid > 0)
1559 fsstate->rel = node->ss.ss_currentRelation;
1560 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1562 else
1564 fsstate->rel = NULL;
1565 fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1568 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1571 * Prepare for processing of parameters used in remote query, if any.
1573 numParams = list_length(fsplan->fdw_exprs);
1574 fsstate->numParams = numParams;
1575 if (numParams > 0)
1576 prepare_query_params((PlanState *) node,
1577 fsplan->fdw_exprs,
1578 numParams,
1579 &fsstate->param_flinfo,
1580 &fsstate->param_exprs,
1581 &fsstate->param_values);
1583 /* Set the async-capable flag */
1584 fsstate->async_capable = node->ss.ps.async_capable;
1588 * postgresIterateForeignScan
1589 * Retrieve next row from the result set, or clear tuple slot to indicate
1590 * EOF.
1592 static TupleTableSlot *
1593 postgresIterateForeignScan(ForeignScanState *node)
1595 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1596 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1599 * In sync mode, if this is the first call after Begin or ReScan, we need
1600 * to create the cursor on the remote side. In async mode, we would have
1601 * already created the cursor before we get here, even if this is the
1602 * first call after Begin or ReScan.
1604 if (!fsstate->cursor_exists)
1605 create_cursor(node);
1608 * Get some more tuples, if we've run out.
1610 if (fsstate->next_tuple >= fsstate->num_tuples)
1612 /* In async mode, just clear tuple slot. */
1613 if (fsstate->async_capable)
1614 return ExecClearTuple(slot);
1615 /* No point in another fetch if we already detected EOF, though. */
1616 if (!fsstate->eof_reached)
1617 fetch_more_data(node);
1618 /* If we didn't get any tuples, must be end of data. */
1619 if (fsstate->next_tuple >= fsstate->num_tuples)
1620 return ExecClearTuple(slot);
1624 * Return the next tuple.
1626 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1627 slot,
1628 false);
1630 return slot;
1634 * postgresReScanForeignScan
1635 * Restart the scan.
1637 static void
1638 postgresReScanForeignScan(ForeignScanState *node)
1640 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1641 char sql[64];
1642 PGresult *res;
1644 /* If we haven't created the cursor yet, nothing to do. */
1645 if (!fsstate->cursor_exists)
1646 return;
1649 * If the node is async-capable, and an asynchronous fetch for it has
1650 * begun, the asynchronous fetch might not have yet completed. Check if
1651 * the node is async-capable, and an asynchronous fetch for it is still in
1652 * progress; if so, complete the asynchronous fetch before restarting the
1653 * scan.
1655 if (fsstate->async_capable &&
1656 fsstate->conn_state->pendingAreq &&
1657 fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1658 fetch_more_data(node);
1661 * If any internal parameters affecting this node have changed, we'd
1662 * better destroy and recreate the cursor. Otherwise, rewinding it should
1663 * be good enough. If we've only fetched zero or one batch, we needn't
1664 * even rewind the cursor, just rescan what we have.
1666 if (node->ss.ps.chgParam != NULL)
1668 fsstate->cursor_exists = false;
1669 snprintf(sql, sizeof(sql), "CLOSE c%u",
1670 fsstate->cursor_number);
1672 else if (fsstate->fetch_ct_2 > 1)
1674 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1675 fsstate->cursor_number);
1677 else
1679 /* Easy: just rescan what we already have in memory, if anything */
1680 fsstate->next_tuple = 0;
1681 return;
1685 * We don't use a PG_TRY block here, so be careful not to throw error
1686 * without releasing the PGresult.
1688 res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1689 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1690 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1691 PQclear(res);
1693 /* Now force a fresh FETCH. */
1694 fsstate->tuples = NULL;
1695 fsstate->num_tuples = 0;
1696 fsstate->next_tuple = 0;
1697 fsstate->fetch_ct_2 = 0;
1698 fsstate->eof_reached = false;
1702 * postgresEndForeignScan
1703 * Finish scanning foreign table and dispose objects used for this scan
1705 static void
1706 postgresEndForeignScan(ForeignScanState *node)
1708 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1710 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1711 if (fsstate == NULL)
1712 return;
1714 /* Close the cursor if open, to prevent accumulation of cursors */
1715 if (fsstate->cursor_exists)
1716 close_cursor(fsstate->conn, fsstate->cursor_number,
1717 fsstate->conn_state);
1719 /* Release remote connection */
1720 ReleaseConnection(fsstate->conn);
1721 fsstate->conn = NULL;
1723 /* MemoryContexts will be deleted automatically. */
1727 * postgresAddForeignUpdateTargets
1728 * Add resjunk column(s) needed for update/delete on a foreign table
1730 static void
1731 postgresAddForeignUpdateTargets(PlannerInfo *root,
1732 Index rtindex,
1733 RangeTblEntry *target_rte,
1734 Relation target_relation)
1736 Var *var;
1739 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1742 /* Make a Var representing the desired value */
1743 var = makeVar(rtindex,
1744 SelfItemPointerAttributeNumber,
1745 TIDOID,
1747 InvalidOid,
1750 /* Register it as a row-identity column needed by this target rel */
1751 add_row_identity_var(root, var, rtindex, "ctid");
1755 * postgresPlanForeignModify
1756 * Plan an insert/update/delete operation on a foreign table
1758 static List *
1759 postgresPlanForeignModify(PlannerInfo *root,
1760 ModifyTable *plan,
1761 Index resultRelation,
1762 int subplan_index)
1764 CmdType operation = plan->operation;
1765 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1766 Relation rel;
1767 StringInfoData sql;
1768 List *targetAttrs = NIL;
1769 List *withCheckOptionList = NIL;
1770 List *returningList = NIL;
1771 List *retrieved_attrs = NIL;
1772 bool doNothing = false;
1773 int values_end_len = -1;
1775 initStringInfo(&sql);
1778 * Core code already has some lock on each rel being planned, so we can
1779 * use NoLock here.
1781 rel = table_open(rte->relid, NoLock);
1784 * In an INSERT, we transmit all columns that are defined in the foreign
1785 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1786 * foreign table, we transmit all columns like INSERT; else we transmit
1787 * only columns that were explicitly targets of the UPDATE, so as to avoid
1788 * unnecessary data transmission. (We can't do that for INSERT since we
1789 * would miss sending default values for columns not listed in the source
1790 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1791 * those triggers might change values for non-target columns, in which
1792 * case we would miss sending changed values for those columns.)
1794 if (operation == CMD_INSERT ||
1795 (operation == CMD_UPDATE &&
1796 rel->trigdesc &&
1797 rel->trigdesc->trig_update_before_row))
1799 TupleDesc tupdesc = RelationGetDescr(rel);
1800 int attnum;
1802 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1804 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1806 if (!attr->attisdropped)
1807 targetAttrs = lappend_int(targetAttrs, attnum);
1810 else if (operation == CMD_UPDATE)
1812 int col;
1813 RelOptInfo *rel = find_base_rel(root, resultRelation);
1814 Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel);
1816 col = -1;
1817 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1819 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1820 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1822 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1823 elog(ERROR, "system-column update is not supported");
1824 targetAttrs = lappend_int(targetAttrs, attno);
1829 * Extract the relevant WITH CHECK OPTION list if any.
1831 if (plan->withCheckOptionLists)
1832 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1833 subplan_index);
1836 * Extract the relevant RETURNING list if any.
1838 if (plan->returningLists)
1839 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1842 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1843 * should have already been rejected in the optimizer, as presently there
1844 * is no way to recognize an arbiter index on a foreign table. Only DO
1845 * NOTHING is supported without an inference specification.
1847 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1848 doNothing = true;
1849 else if (plan->onConflictAction != ONCONFLICT_NONE)
1850 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1851 (int) plan->onConflictAction);
1854 * Construct the SQL command string.
1856 switch (operation)
1858 case CMD_INSERT:
1859 deparseInsertSql(&sql, rte, resultRelation, rel,
1860 targetAttrs, doNothing,
1861 withCheckOptionList, returningList,
1862 &retrieved_attrs, &values_end_len);
1863 break;
1864 case CMD_UPDATE:
1865 deparseUpdateSql(&sql, rte, resultRelation, rel,
1866 targetAttrs,
1867 withCheckOptionList, returningList,
1868 &retrieved_attrs);
1869 break;
1870 case CMD_DELETE:
1871 deparseDeleteSql(&sql, rte, resultRelation, rel,
1872 returningList,
1873 &retrieved_attrs);
1874 break;
1875 default:
1876 elog(ERROR, "unexpected operation: %d", (int) operation);
1877 break;
1880 table_close(rel, NoLock);
1883 * Build the fdw_private list that will be available to the executor.
1884 * Items in the list must match enum FdwModifyPrivateIndex, above.
1886 return list_make5(makeString(sql.data),
1887 targetAttrs,
1888 makeInteger(values_end_len),
1889 makeBoolean((retrieved_attrs != NIL)),
1890 retrieved_attrs);
1894 * postgresBeginForeignModify
1895 * Begin an insert/update/delete operation on a foreign table
1897 static void
1898 postgresBeginForeignModify(ModifyTableState *mtstate,
1899 ResultRelInfo *resultRelInfo,
1900 List *fdw_private,
1901 int subplan_index,
1902 int eflags)
1904 PgFdwModifyState *fmstate;
1905 char *query;
1906 List *target_attrs;
1907 bool has_returning;
1908 int values_end_len;
1909 List *retrieved_attrs;
1910 RangeTblEntry *rte;
1913 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1914 * stays NULL.
1916 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1917 return;
1919 /* Deconstruct fdw_private data. */
1920 query = strVal(list_nth(fdw_private,
1921 FdwModifyPrivateUpdateSql));
1922 target_attrs = (List *) list_nth(fdw_private,
1923 FdwModifyPrivateTargetAttnums);
1924 values_end_len = intVal(list_nth(fdw_private,
1925 FdwModifyPrivateLen));
1926 has_returning = boolVal(list_nth(fdw_private,
1927 FdwModifyPrivateHasReturning));
1928 retrieved_attrs = (List *) list_nth(fdw_private,
1929 FdwModifyPrivateRetrievedAttrs);
1931 /* Find RTE. */
1932 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1933 mtstate->ps.state);
1935 /* Construct an execution state. */
1936 fmstate = create_foreign_modify(mtstate->ps.state,
1937 rte,
1938 resultRelInfo,
1939 mtstate->operation,
1940 outerPlanState(mtstate)->plan,
1941 query,
1942 target_attrs,
1943 values_end_len,
1944 has_returning,
1945 retrieved_attrs);
1947 resultRelInfo->ri_FdwState = fmstate;
1951 * postgresExecForeignInsert
1952 * Insert one row into a foreign table
1954 static TupleTableSlot *
1955 postgresExecForeignInsert(EState *estate,
1956 ResultRelInfo *resultRelInfo,
1957 TupleTableSlot *slot,
1958 TupleTableSlot *planSlot)
1960 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1961 TupleTableSlot **rslot;
1962 int numSlots = 1;
1965 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1966 * postgresBeginForeignInsert())
1968 if (fmstate->aux_fmstate)
1969 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1970 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1971 &slot, &planSlot, &numSlots);
1972 /* Revert that change */
1973 if (fmstate->aux_fmstate)
1974 resultRelInfo->ri_FdwState = fmstate;
1976 return rslot ? *rslot : NULL;
1980 * postgresExecForeignBatchInsert
1981 * Insert multiple rows into a foreign table
1983 static TupleTableSlot **
1984 postgresExecForeignBatchInsert(EState *estate,
1985 ResultRelInfo *resultRelInfo,
1986 TupleTableSlot **slots,
1987 TupleTableSlot **planSlots,
1988 int *numSlots)
1990 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1991 TupleTableSlot **rslot;
1994 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1995 * postgresBeginForeignInsert())
1997 if (fmstate->aux_fmstate)
1998 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1999 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2000 slots, planSlots, numSlots);
2001 /* Revert that change */
2002 if (fmstate->aux_fmstate)
2003 resultRelInfo->ri_FdwState = fmstate;
2005 return rslot;
2009 * postgresGetForeignModifyBatchSize
2010 * Determine the maximum number of tuples that can be inserted in bulk
2012 * Returns the batch size specified for server or table. When batching is not
2013 * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2014 * clause), returns 1.
2016 static int
2017 postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
2019 int batch_size;
2020 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2022 /* should be called only once */
2023 Assert(resultRelInfo->ri_BatchSize == 0);
2026 * Should never get called when the insert is being performed on a table
2027 * that is also among the target relations of an UPDATE operation, because
2028 * postgresBeginForeignInsert() currently rejects such insert attempts.
2030 Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2033 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2034 * the option directly in server/table options. Otherwise just use the
2035 * value we determined earlier.
2037 if (fmstate)
2038 batch_size = fmstate->batch_size;
2039 else
2040 batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2043 * Disable batching when we have to use RETURNING, there are any
2044 * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2045 * WITH CHECK OPTION constraints from parent views.
2047 * When there are any BEFORE ROW INSERT triggers on the table, we can't
2048 * support it, because such triggers might query the table we're inserting
2049 * into and act differently if the tuples that have already been processed
2050 * and prepared for insertion are not there.
2052 if (resultRelInfo->ri_projectReturning != NULL ||
2053 resultRelInfo->ri_WithCheckOptions != NIL ||
2054 (resultRelInfo->ri_TrigDesc &&
2055 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2056 resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2057 return 1;
2060 * If the foreign table has no columns, disable batching as the INSERT
2061 * syntax doesn't allow batching multiple empty rows into a zero-column
2062 * table in a single statement. This is needed for COPY FROM, in which
2063 * case fmstate must be non-NULL.
2065 if (fmstate && list_length(fmstate->target_attrs) == 0)
2066 return 1;
2069 * Otherwise use the batch size specified for server/table. The number of
2070 * parameters in a batch is limited to 65535 (uint16), so make sure we
2071 * don't exceed this limit by using the maximum batch_size possible.
2073 if (fmstate && fmstate->p_nums > 0)
2074 batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2076 return batch_size;
2080 * postgresExecForeignUpdate
2081 * Update one row in a foreign table
2083 static TupleTableSlot *
2084 postgresExecForeignUpdate(EState *estate,
2085 ResultRelInfo *resultRelInfo,
2086 TupleTableSlot *slot,
2087 TupleTableSlot *planSlot)
2089 TupleTableSlot **rslot;
2090 int numSlots = 1;
2092 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2093 &slot, &planSlot, &numSlots);
2095 return rslot ? rslot[0] : NULL;
2099 * postgresExecForeignDelete
2100 * Delete one row from a foreign table
2102 static TupleTableSlot *
2103 postgresExecForeignDelete(EState *estate,
2104 ResultRelInfo *resultRelInfo,
2105 TupleTableSlot *slot,
2106 TupleTableSlot *planSlot)
2108 TupleTableSlot **rslot;
2109 int numSlots = 1;
2111 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2112 &slot, &planSlot, &numSlots);
2114 return rslot ? rslot[0] : NULL;
2118 * postgresEndForeignModify
2119 * Finish an insert/update/delete operation on a foreign table
2121 static void
2122 postgresEndForeignModify(EState *estate,
2123 ResultRelInfo *resultRelInfo)
2125 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2127 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2128 if (fmstate == NULL)
2129 return;
2131 /* Destroy the execution state */
2132 finish_foreign_modify(fmstate);
2136 * postgresBeginForeignInsert
2137 * Begin an insert operation on a foreign table
2139 static void
2140 postgresBeginForeignInsert(ModifyTableState *mtstate,
2141 ResultRelInfo *resultRelInfo)
2143 PgFdwModifyState *fmstate;
2144 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2145 EState *estate = mtstate->ps.state;
2146 Index resultRelation;
2147 Relation rel = resultRelInfo->ri_RelationDesc;
2148 RangeTblEntry *rte;
2149 TupleDesc tupdesc = RelationGetDescr(rel);
2150 int attnum;
2151 int values_end_len;
2152 StringInfoData sql;
2153 List *targetAttrs = NIL;
2154 List *retrieved_attrs = NIL;
2155 bool doNothing = false;
2158 * If the foreign table we are about to insert routed rows into is also an
2159 * UPDATE subplan result rel that will be updated later, proceeding with
2160 * the INSERT will result in the later UPDATE incorrectly modifying those
2161 * routed rows, so prevent the INSERT --- it would be nice if we could
2162 * handle this case; but for now, throw an error for safety.
2164 if (plan && plan->operation == CMD_UPDATE &&
2165 (resultRelInfo->ri_usesFdwDirectModify ||
2166 resultRelInfo->ri_FdwState))
2167 ereport(ERROR,
2168 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2169 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2170 RelationGetRelationName(rel))));
2172 initStringInfo(&sql);
2174 /* We transmit all columns that are defined in the foreign table. */
2175 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2177 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2179 if (!attr->attisdropped)
2180 targetAttrs = lappend_int(targetAttrs, attnum);
2183 /* Check if we add the ON CONFLICT clause to the remote query. */
2184 if (plan)
2186 OnConflictAction onConflictAction = plan->onConflictAction;
2188 /* We only support DO NOTHING without an inference specification. */
2189 if (onConflictAction == ONCONFLICT_NOTHING)
2190 doNothing = true;
2191 else if (onConflictAction != ONCONFLICT_NONE)
2192 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2193 (int) onConflictAction);
2197 * If the foreign table is a partition that doesn't have a corresponding
2198 * RTE entry, we need to create a new RTE describing the foreign table for
2199 * use by deparseInsertSql and create_foreign_modify() below, after first
2200 * copying the parent's RTE and modifying some fields to describe the
2201 * foreign partition to work on. However, if this is invoked by UPDATE,
2202 * the existing RTE may already correspond to this partition if it is one
2203 * of the UPDATE subplan target rels; in that case, we can just use the
2204 * existing RTE as-is.
2206 if (resultRelInfo->ri_RangeTableIndex == 0)
2208 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2210 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2211 rte = copyObject(rte);
2212 rte->relid = RelationGetRelid(rel);
2213 rte->relkind = RELKIND_FOREIGN_TABLE;
2216 * For UPDATE, we must use the RT index of the first subplan target
2217 * rel's RTE, because the core code would have built expressions for
2218 * the partition, such as RETURNING, using that RT index as varno of
2219 * Vars contained in those expressions.
2221 if (plan && plan->operation == CMD_UPDATE &&
2222 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2223 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2224 else
2225 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2227 else
2229 resultRelation = resultRelInfo->ri_RangeTableIndex;
2230 rte = exec_rt_fetch(resultRelation, estate);
2233 /* Construct the SQL command string. */
2234 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2235 resultRelInfo->ri_WithCheckOptions,
2236 resultRelInfo->ri_returningList,
2237 &retrieved_attrs, &values_end_len);
2239 /* Construct an execution state. */
2240 fmstate = create_foreign_modify(mtstate->ps.state,
2241 rte,
2242 resultRelInfo,
2243 CMD_INSERT,
2244 NULL,
2245 sql.data,
2246 targetAttrs,
2247 values_end_len,
2248 retrieved_attrs != NIL,
2249 retrieved_attrs);
2252 * If the given resultRelInfo already has PgFdwModifyState set, it means
2253 * the foreign table is an UPDATE subplan result rel; in which case, store
2254 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2256 if (resultRelInfo->ri_FdwState)
2258 Assert(plan && plan->operation == CMD_UPDATE);
2259 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2260 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2262 else
2263 resultRelInfo->ri_FdwState = fmstate;
2267 * postgresEndForeignInsert
2268 * Finish an insert operation on a foreign table
2270 static void
2271 postgresEndForeignInsert(EState *estate,
2272 ResultRelInfo *resultRelInfo)
2274 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2276 Assert(fmstate != NULL);
2279 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2280 * postgresBeginForeignInsert())
2282 if (fmstate->aux_fmstate)
2283 fmstate = fmstate->aux_fmstate;
2285 /* Destroy the execution state */
2286 finish_foreign_modify(fmstate);
2290 * postgresIsForeignRelUpdatable
2291 * Determine whether a foreign table supports INSERT, UPDATE and/or
2292 * DELETE.
2294 static int
2295 postgresIsForeignRelUpdatable(Relation rel)
2297 bool updatable;
2298 ForeignTable *table;
2299 ForeignServer *server;
2300 ListCell *lc;
2303 * By default, all postgres_fdw foreign tables are assumed updatable. This
2304 * can be overridden by a per-server setting, which in turn can be
2305 * overridden by a per-table setting.
2307 updatable = true;
2309 table = GetForeignTable(RelationGetRelid(rel));
2310 server = GetForeignServer(table->serverid);
2312 foreach(lc, server->options)
2314 DefElem *def = (DefElem *) lfirst(lc);
2316 if (strcmp(def->defname, "updatable") == 0)
2317 updatable = defGetBoolean(def);
2319 foreach(lc, table->options)
2321 DefElem *def = (DefElem *) lfirst(lc);
2323 if (strcmp(def->defname, "updatable") == 0)
2324 updatable = defGetBoolean(def);
2328 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2330 return updatable ?
2331 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2335 * postgresRecheckForeignScan
2336 * Execute a local join execution plan for a foreign join
2338 static bool
2339 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2341 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2342 PlanState *outerPlan = outerPlanState(node);
2343 TupleTableSlot *result;
2345 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2346 if (scanrelid > 0)
2347 return true;
2349 Assert(outerPlan != NULL);
2351 /* Execute a local join execution plan */
2352 result = ExecProcNode(outerPlan);
2353 if (TupIsNull(result))
2354 return false;
2356 /* Store result in the given slot */
2357 ExecCopySlot(slot, result);
2359 return true;
2363 * find_modifytable_subplan
2364 * Helper routine for postgresPlanDirectModify to find the
2365 * ModifyTable subplan node that scans the specified RTI.
2367 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2368 * error condition, we just abandon trying to do the update directly.
2370 static ForeignScan *
2371 find_modifytable_subplan(PlannerInfo *root,
2372 ModifyTable *plan,
2373 Index rtindex,
2374 int subplan_index)
2376 Plan *subplan = outerPlan(plan);
2379 * The cases we support are (1) the desired ForeignScan is the immediate
2380 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2381 * Append node that is the immediate child of ModifyTable. There is no
2382 * point in looking further down, as that would mean that local joins are
2383 * involved, so we can't do the update directly.
2385 * There could be a Result atop the Append too, acting to compute the
2386 * UPDATE targetlist values. We ignore that here; the tlist will be
2387 * checked by our caller.
2389 * In principle we could examine all the children of the Append, but it's
2390 * currently unlikely that the core planner would generate such a plan
2391 * with the children out-of-order. Moreover, such a search risks costing
2392 * O(N^2) time when there are a lot of children.
2394 if (IsA(subplan, Append))
2396 Append *appendplan = (Append *) subplan;
2398 if (subplan_index < list_length(appendplan->appendplans))
2399 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2401 else if (IsA(subplan, Result) &&
2402 outerPlan(subplan) != NULL &&
2403 IsA(outerPlan(subplan), Append))
2405 Append *appendplan = (Append *) outerPlan(subplan);
2407 if (subplan_index < list_length(appendplan->appendplans))
2408 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2411 /* Now, have we got a ForeignScan on the desired rel? */
2412 if (IsA(subplan, ForeignScan))
2414 ForeignScan *fscan = (ForeignScan *) subplan;
2416 if (bms_is_member(rtindex, fscan->fs_base_relids))
2417 return fscan;
2420 return NULL;
2424 * postgresPlanDirectModify
2425 * Consider a direct foreign table modification
2427 * Decide whether it is safe to modify a foreign table directly, and if so,
2428 * rewrite subplan accordingly.
2430 static bool
2431 postgresPlanDirectModify(PlannerInfo *root,
2432 ModifyTable *plan,
2433 Index resultRelation,
2434 int subplan_index)
2436 CmdType operation = plan->operation;
2437 RelOptInfo *foreignrel;
2438 RangeTblEntry *rte;
2439 PgFdwRelationInfo *fpinfo;
2440 Relation rel;
2441 StringInfoData sql;
2442 ForeignScan *fscan;
2443 List *processed_tlist = NIL;
2444 List *targetAttrs = NIL;
2445 List *remote_exprs;
2446 List *params_list = NIL;
2447 List *returningList = NIL;
2448 List *retrieved_attrs = NIL;
2451 * Decide whether it is safe to modify a foreign table directly.
2455 * The table modification must be an UPDATE or DELETE.
2457 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2458 return false;
2461 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2463 fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2464 if (!fscan)
2465 return false;
2468 * It's unsafe to modify a foreign table directly if there are any quals
2469 * that should be evaluated locally.
2471 if (fscan->scan.plan.qual != NIL)
2472 return false;
2474 /* Safe to fetch data about the target foreign rel */
2475 if (fscan->scan.scanrelid == 0)
2477 foreignrel = find_join_rel(root, fscan->fs_relids);
2478 /* We should have a rel for this foreign join. */
2479 Assert(foreignrel);
2481 else
2482 foreignrel = root->simple_rel_array[resultRelation];
2483 rte = root->simple_rte_array[resultRelation];
2484 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2487 * It's unsafe to update a foreign table directly, if any expressions to
2488 * assign to the target columns are unsafe to evaluate remotely.
2490 if (operation == CMD_UPDATE)
2492 ListCell *lc,
2493 *lc2;
2496 * The expressions of concern are the first N columns of the processed
2497 * targetlist, where N is the length of the rel's update_colnos.
2499 get_translated_update_targetlist(root, resultRelation,
2500 &processed_tlist, &targetAttrs);
2501 forboth(lc, processed_tlist, lc2, targetAttrs)
2503 TargetEntry *tle = lfirst_node(TargetEntry, lc);
2504 AttrNumber attno = lfirst_int(lc2);
2506 /* update's new-value expressions shouldn't be resjunk */
2507 Assert(!tle->resjunk);
2509 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2510 elog(ERROR, "system-column update is not supported");
2512 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2513 return false;
2518 * Ok, rewrite subplan so as to modify the foreign table directly.
2520 initStringInfo(&sql);
2523 * Core code already has some lock on each rel being planned, so we can
2524 * use NoLock here.
2526 rel = table_open(rte->relid, NoLock);
2529 * Recall the qual clauses that must be evaluated remotely. (These are
2530 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2531 * doesn't care.)
2533 remote_exprs = fpinfo->final_remote_exprs;
2536 * Extract the relevant RETURNING list if any.
2538 if (plan->returningLists)
2540 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2543 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2544 * we fetch from the foreign server any Vars specified in RETURNING
2545 * that refer not only to the target relation but to non-target
2546 * relations. So we'll deparse them into the RETURNING clause of the
2547 * remote query; use a targetlist consisting of them instead, which
2548 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2549 * node below.
2551 if (fscan->scan.scanrelid == 0)
2552 returningList = build_remote_returning(resultRelation, rel,
2553 returningList);
2557 * Construct the SQL command string.
2559 switch (operation)
2561 case CMD_UPDATE:
2562 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2563 foreignrel,
2564 processed_tlist,
2565 targetAttrs,
2566 remote_exprs, &params_list,
2567 returningList, &retrieved_attrs);
2568 break;
2569 case CMD_DELETE:
2570 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2571 foreignrel,
2572 remote_exprs, &params_list,
2573 returningList, &retrieved_attrs);
2574 break;
2575 default:
2576 elog(ERROR, "unexpected operation: %d", (int) operation);
2577 break;
2581 * Update the operation and target relation info.
2583 fscan->operation = operation;
2584 fscan->resultRelation = resultRelation;
2587 * Update the fdw_exprs list that will be available to the executor.
2589 fscan->fdw_exprs = params_list;
2592 * Update the fdw_private list that will be available to the executor.
2593 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2595 fscan->fdw_private = list_make4(makeString(sql.data),
2596 makeBoolean((retrieved_attrs != NIL)),
2597 retrieved_attrs,
2598 makeBoolean(plan->canSetTag));
2601 * Update the foreign-join-related fields.
2603 if (fscan->scan.scanrelid == 0)
2605 /* No need for the outer subplan. */
2606 fscan->scan.plan.lefttree = NULL;
2608 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2609 if (returningList)
2610 rebuild_fdw_scan_tlist(fscan, returningList);
2614 * Finally, unset the async-capable flag if it is set, as we currently
2615 * don't support asynchronous execution of direct modifications.
2617 if (fscan->scan.plan.async_capable)
2618 fscan->scan.plan.async_capable = false;
2620 table_close(rel, NoLock);
2621 return true;
2625 * postgresBeginDirectModify
2626 * Prepare a direct foreign table modification
2628 static void
2629 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2631 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2632 EState *estate = node->ss.ps.state;
2633 PgFdwDirectModifyState *dmstate;
2634 Index rtindex;
2635 Oid userid;
2636 ForeignTable *table;
2637 UserMapping *user;
2638 int numParams;
2641 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2643 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2644 return;
2647 * We'll save private state in node->fdw_state.
2649 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2650 node->fdw_state = (void *) dmstate;
2653 * Identify which user to do the remote access as. This should match what
2654 * ExecCheckPermissions() does.
2656 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2658 /* Get info about foreign table. */
2659 rtindex = node->resultRelInfo->ri_RangeTableIndex;
2660 if (fsplan->scan.scanrelid == 0)
2661 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2662 else
2663 dmstate->rel = node->ss.ss_currentRelation;
2664 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2665 user = GetUserMapping(userid, table->serverid);
2668 * Get connection to the foreign server. Connection manager will
2669 * establish new connection if necessary.
2671 dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2673 /* Update the foreign-join-related fields. */
2674 if (fsplan->scan.scanrelid == 0)
2676 /* Save info about foreign table. */
2677 dmstate->resultRel = dmstate->rel;
2680 * Set dmstate->rel to NULL to teach get_returning_data() and
2681 * make_tuple_from_result_row() that columns fetched from the remote
2682 * server are described by fdw_scan_tlist of the foreign-scan plan
2683 * node, not the tuple descriptor for the target relation.
2685 dmstate->rel = NULL;
2688 /* Initialize state variable */
2689 dmstate->num_tuples = -1; /* -1 means not set yet */
2691 /* Get private info created by planner functions. */
2692 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2693 FdwDirectModifyPrivateUpdateSql));
2694 dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2695 FdwDirectModifyPrivateHasReturning));
2696 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2697 FdwDirectModifyPrivateRetrievedAttrs);
2698 dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2699 FdwDirectModifyPrivateSetProcessed));
2701 /* Create context for per-tuple temp workspace. */
2702 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2703 "postgres_fdw temporary data",
2704 ALLOCSET_SMALL_SIZES);
2706 /* Prepare for input conversion of RETURNING results. */
2707 if (dmstate->has_returning)
2709 TupleDesc tupdesc;
2711 if (fsplan->scan.scanrelid == 0)
2712 tupdesc = get_tupdesc_for_join_scan_tuples(node);
2713 else
2714 tupdesc = RelationGetDescr(dmstate->rel);
2716 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2719 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2720 * initialize a filter to extract an updated/deleted tuple from a scan
2721 * tuple.
2723 if (fsplan->scan.scanrelid == 0)
2724 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2728 * Prepare for processing of parameters used in remote query, if any.
2730 numParams = list_length(fsplan->fdw_exprs);
2731 dmstate->numParams = numParams;
2732 if (numParams > 0)
2733 prepare_query_params((PlanState *) node,
2734 fsplan->fdw_exprs,
2735 numParams,
2736 &dmstate->param_flinfo,
2737 &dmstate->param_exprs,
2738 &dmstate->param_values);
2742 * postgresIterateDirectModify
2743 * Execute a direct foreign table modification
2745 static TupleTableSlot *
2746 postgresIterateDirectModify(ForeignScanState *node)
2748 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2749 EState *estate = node->ss.ps.state;
2750 ResultRelInfo *resultRelInfo = node->resultRelInfo;
2753 * If this is the first call after Begin, execute the statement.
2755 if (dmstate->num_tuples == -1)
2756 execute_dml_stmt(node);
2759 * If the local query doesn't specify RETURNING, just clear tuple slot.
2761 if (!resultRelInfo->ri_projectReturning)
2763 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2764 Instrumentation *instr = node->ss.ps.instrument;
2766 Assert(!dmstate->has_returning);
2768 /* Increment the command es_processed count if necessary. */
2769 if (dmstate->set_processed)
2770 estate->es_processed += dmstate->num_tuples;
2772 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2773 if (instr)
2774 instr->tuplecount += dmstate->num_tuples;
2776 return ExecClearTuple(slot);
2780 * Get the next RETURNING tuple.
2782 return get_returning_data(node);
2786 * postgresEndDirectModify
2787 * Finish a direct foreign table modification
2789 static void
2790 postgresEndDirectModify(ForeignScanState *node)
2792 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2794 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2795 if (dmstate == NULL)
2796 return;
2798 /* Release PGresult */
2799 PQclear(dmstate->result);
2801 /* Release remote connection */
2802 ReleaseConnection(dmstate->conn);
2803 dmstate->conn = NULL;
2805 /* MemoryContext will be deleted automatically. */
2809 * postgresExplainForeignScan
2810 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2812 static void
2813 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2815 ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2816 List *fdw_private = plan->fdw_private;
2819 * Identify foreign scans that are really joins or upper relations. The
2820 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2821 * digit string(s), which are RT indexes, with the correct relation names.
2822 * We do that here, not when the plan is created, because we can't know
2823 * what aliases ruleutils.c will assign at plan creation time.
2825 if (list_length(fdw_private) > FdwScanPrivateRelations)
2827 StringInfo relations;
2828 char *rawrelations;
2829 char *ptr;
2830 int minrti,
2831 rtoffset;
2833 rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2836 * A difficulty with using a string representation of RT indexes is
2837 * that setrefs.c won't update the string when flattening the
2838 * rangetable. To find out what rtoffset was applied, identify the
2839 * minimum RT index appearing in the string and compare it to the
2840 * minimum member of plan->fs_base_relids. (We expect all the relids
2841 * in the join will have been offset by the same amount; the Asserts
2842 * below should catch it if that ever changes.)
2844 minrti = INT_MAX;
2845 ptr = rawrelations;
2846 while (*ptr)
2848 if (isdigit((unsigned char) *ptr))
2850 int rti = strtol(ptr, &ptr, 10);
2852 if (rti < minrti)
2853 minrti = rti;
2855 else
2856 ptr++;
2858 rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2860 /* Now we can translate the string */
2861 relations = makeStringInfo();
2862 ptr = rawrelations;
2863 while (*ptr)
2865 if (isdigit((unsigned char) *ptr))
2867 int rti = strtol(ptr, &ptr, 10);
2868 RangeTblEntry *rte;
2869 char *relname;
2870 char *refname;
2872 rti += rtoffset;
2873 Assert(bms_is_member(rti, plan->fs_base_relids));
2874 rte = rt_fetch(rti, es->rtable);
2875 Assert(rte->rtekind == RTE_RELATION);
2876 /* This logic should agree with explain.c's ExplainTargetRel */
2877 relname = get_rel_name(rte->relid);
2878 if (es->verbose)
2880 char *namespace;
2882 namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2883 appendStringInfo(relations, "%s.%s",
2884 quote_identifier(namespace),
2885 quote_identifier(relname));
2887 else
2888 appendStringInfoString(relations,
2889 quote_identifier(relname));
2890 refname = (char *) list_nth(es->rtable_names, rti - 1);
2891 if (refname == NULL)
2892 refname = rte->eref->aliasname;
2893 if (strcmp(refname, relname) != 0)
2894 appendStringInfo(relations, " %s",
2895 quote_identifier(refname));
2897 else
2898 appendStringInfoChar(relations, *ptr++);
2900 ExplainPropertyText("Relations", relations->data, es);
2904 * Add remote query, when VERBOSE option is specified.
2906 if (es->verbose)
2908 char *sql;
2910 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2911 ExplainPropertyText("Remote SQL", sql, es);
2916 * postgresExplainForeignModify
2917 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2919 static void
2920 postgresExplainForeignModify(ModifyTableState *mtstate,
2921 ResultRelInfo *rinfo,
2922 List *fdw_private,
2923 int subplan_index,
2924 ExplainState *es)
2926 if (es->verbose)
2928 char *sql = strVal(list_nth(fdw_private,
2929 FdwModifyPrivateUpdateSql));
2931 ExplainPropertyText("Remote SQL", sql, es);
2934 * For INSERT we should always have batch size >= 1, but UPDATE and
2935 * DELETE don't support batching so don't show the property.
2937 if (rinfo->ri_BatchSize > 0)
2938 ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2943 * postgresExplainDirectModify
2944 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2945 * foreign table directly
2947 static void
2948 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2950 List *fdw_private;
2951 char *sql;
2953 if (es->verbose)
2955 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2956 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2957 ExplainPropertyText("Remote SQL", sql, es);
2962 * postgresExecForeignTruncate
2963 * Truncate one or more foreign tables
2965 static void
2966 postgresExecForeignTruncate(List *rels,
2967 DropBehavior behavior,
2968 bool restart_seqs)
2970 Oid serverid = InvalidOid;
2971 UserMapping *user = NULL;
2972 PGconn *conn = NULL;
2973 StringInfoData sql;
2974 ListCell *lc;
2975 bool server_truncatable = true;
2978 * By default, all postgres_fdw foreign tables are assumed truncatable.
2979 * This can be overridden by a per-server setting, which in turn can be
2980 * overridden by a per-table setting.
2982 foreach(lc, rels)
2984 ForeignServer *server = NULL;
2985 Relation rel = lfirst(lc);
2986 ForeignTable *table = GetForeignTable(RelationGetRelid(rel));
2987 ListCell *cell;
2988 bool truncatable;
2991 * First time through, determine whether the foreign server allows
2992 * truncates. Since all specified foreign tables are assumed to belong
2993 * to the same foreign server, this result can be used for other
2994 * foreign tables.
2996 if (!OidIsValid(serverid))
2998 serverid = table->serverid;
2999 server = GetForeignServer(serverid);
3001 foreach(cell, server->options)
3003 DefElem *defel = (DefElem *) lfirst(cell);
3005 if (strcmp(defel->defname, "truncatable") == 0)
3007 server_truncatable = defGetBoolean(defel);
3008 break;
3014 * Confirm that all specified foreign tables belong to the same
3015 * foreign server.
3017 Assert(table->serverid == serverid);
3019 /* Determine whether this foreign table allows truncations */
3020 truncatable = server_truncatable;
3021 foreach(cell, table->options)
3023 DefElem *defel = (DefElem *) lfirst(cell);
3025 if (strcmp(defel->defname, "truncatable") == 0)
3027 truncatable = defGetBoolean(defel);
3028 break;
3032 if (!truncatable)
3033 ereport(ERROR,
3034 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3035 errmsg("foreign table \"%s\" does not allow truncates",
3036 RelationGetRelationName(rel))));
3038 Assert(OidIsValid(serverid));
3041 * Get connection to the foreign server. Connection manager will
3042 * establish new connection if necessary.
3044 user = GetUserMapping(GetUserId(), serverid);
3045 conn = GetConnection(user, false, NULL);
3047 /* Construct the TRUNCATE command string */
3048 initStringInfo(&sql);
3049 deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3051 /* Issue the TRUNCATE command to remote server */
3052 do_sql_command(conn, sql.data);
3054 pfree(sql.data);
3058 * estimate_path_cost_size
3059 * Get cost and size estimates for a foreign scan on given foreign relation
3060 * either a base relation or a join between foreign relations or an upper
3061 * relation containing foreign relations.
3063 * param_join_conds are the parameterization clauses with outer relations.
3064 * pathkeys specify the expected sort order if any for given path being costed.
3065 * fpextra specifies additional post-scan/join-processing steps such as the
3066 * final sort and the LIMIT restriction.
3068 * The function returns the cost and size estimates in p_rows, p_width,
3069 * p_startup_cost and p_total_cost variables.
3071 static void
3072 estimate_path_cost_size(PlannerInfo *root,
3073 RelOptInfo *foreignrel,
3074 List *param_join_conds,
3075 List *pathkeys,
3076 PgFdwPathExtraData *fpextra,
3077 double *p_rows, int *p_width,
3078 Cost *p_startup_cost, Cost *p_total_cost)
3080 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3081 double rows;
3082 double retrieved_rows;
3083 int width;
3084 Cost startup_cost;
3085 Cost total_cost;
3087 /* Make sure the core code has set up the relation's reltarget */
3088 Assert(foreignrel->reltarget);
3091 * If the table or the server is configured to use remote estimates,
3092 * connect to the foreign server and execute EXPLAIN to estimate the
3093 * number of rows selected by the restriction+join clauses. Otherwise,
3094 * estimate rows using whatever statistics we have locally, in a way
3095 * similar to ordinary tables.
3097 if (fpinfo->use_remote_estimate)
3099 List *remote_param_join_conds;
3100 List *local_param_join_conds;
3101 StringInfoData sql;
3102 PGconn *conn;
3103 Selectivity local_sel;
3104 QualCost local_cost;
3105 List *fdw_scan_tlist = NIL;
3106 List *remote_conds;
3108 /* Required only to be passed to deparseSelectStmtForRel */
3109 List *retrieved_attrs;
3112 * param_join_conds might contain both clauses that are safe to send
3113 * across, and clauses that aren't.
3115 classifyConditions(root, foreignrel, param_join_conds,
3116 &remote_param_join_conds, &local_param_join_conds);
3118 /* Build the list of columns to be fetched from the foreign server. */
3119 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3120 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3121 else
3122 fdw_scan_tlist = NIL;
3125 * The complete list of remote conditions includes everything from
3126 * baserestrictinfo plus any extra join_conds relevant to this
3127 * particular path.
3129 remote_conds = list_concat(remote_param_join_conds,
3130 fpinfo->remote_conds);
3133 * Construct EXPLAIN query including the desired SELECT, FROM, and
3134 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3135 * values, so don't request params_list.
3137 initStringInfo(&sql);
3138 appendStringInfoString(&sql, "EXPLAIN ");
3139 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3140 remote_conds, pathkeys,
3141 fpextra ? fpextra->has_final_sort : false,
3142 fpextra ? fpextra->has_limit : false,
3143 false, &retrieved_attrs, NULL);
3145 /* Get the remote estimate */
3146 conn = GetConnection(fpinfo->user, false, NULL);
3147 get_remote_estimate(sql.data, conn, &rows, &width,
3148 &startup_cost, &total_cost);
3149 ReleaseConnection(conn);
3151 retrieved_rows = rows;
3153 /* Factor in the selectivity of the locally-checked quals */
3154 local_sel = clauselist_selectivity(root,
3155 local_param_join_conds,
3156 foreignrel->relid,
3157 JOIN_INNER,
3158 NULL);
3159 local_sel *= fpinfo->local_conds_sel;
3161 rows = clamp_row_est(rows * local_sel);
3163 /* Add in the eval cost of the locally-checked quals */
3164 startup_cost += fpinfo->local_conds_cost.startup;
3165 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3166 cost_qual_eval(&local_cost, local_param_join_conds, root);
3167 startup_cost += local_cost.startup;
3168 total_cost += local_cost.per_tuple * retrieved_rows;
3171 * Add in tlist eval cost for each output row. In case of an
3172 * aggregate, some of the tlist expressions such as grouping
3173 * expressions will be evaluated remotely, so adjust the costs.
3175 startup_cost += foreignrel->reltarget->cost.startup;
3176 total_cost += foreignrel->reltarget->cost.startup;
3177 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3178 if (IS_UPPER_REL(foreignrel))
3180 QualCost tlist_cost;
3182 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3183 startup_cost -= tlist_cost.startup;
3184 total_cost -= tlist_cost.startup;
3185 total_cost -= tlist_cost.per_tuple * rows;
3188 else
3190 Cost run_cost = 0;
3193 * We don't support join conditions in this mode (hence, no
3194 * parameterized paths can be made).
3196 Assert(param_join_conds == NIL);
3199 * We will come here again and again with different set of pathkeys or
3200 * additional post-scan/join-processing steps that caller wants to
3201 * cost. We don't need to calculate the cost/size estimates for the
3202 * underlying scan, join, or grouping each time. Instead, use those
3203 * estimates if we have cached them already.
3205 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3207 Assert(fpinfo->retrieved_rows >= 0);
3209 rows = fpinfo->rows;
3210 retrieved_rows = fpinfo->retrieved_rows;
3211 width = fpinfo->width;
3212 startup_cost = fpinfo->rel_startup_cost;
3213 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3216 * If we estimate the costs of a foreign scan or a foreign join
3217 * with additional post-scan/join-processing steps, the scan or
3218 * join costs obtained from the cache wouldn't yet contain the
3219 * eval costs for the final scan/join target, which would've been
3220 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3221 * now.
3223 if (fpextra && !IS_UPPER_REL(foreignrel))
3225 /* Shouldn't get here unless we have LIMIT */
3226 Assert(fpextra->has_limit);
3227 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3228 foreignrel->reloptkind == RELOPT_JOINREL);
3229 startup_cost += foreignrel->reltarget->cost.startup;
3230 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3233 else if (IS_JOIN_REL(foreignrel))
3235 PgFdwRelationInfo *fpinfo_i;
3236 PgFdwRelationInfo *fpinfo_o;
3237 QualCost join_cost;
3238 QualCost remote_conds_cost;
3239 double nrows;
3241 /* Use rows/width estimates made by the core code. */
3242 rows = foreignrel->rows;
3243 width = foreignrel->reltarget->width;
3245 /* For join we expect inner and outer relations set */
3246 Assert(fpinfo->innerrel && fpinfo->outerrel);
3248 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3249 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3251 /* Estimate of number of rows in cross product */
3252 nrows = fpinfo_i->rows * fpinfo_o->rows;
3255 * Back into an estimate of the number of retrieved rows. Just in
3256 * case this is nuts, clamp to at most nrows.
3258 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3259 retrieved_rows = Min(retrieved_rows, nrows);
3262 * The cost of foreign join is estimated as cost of generating
3263 * rows for the joining relations + cost for applying quals on the
3264 * rows.
3268 * Calculate the cost of clauses pushed down to the foreign server
3270 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3271 /* Calculate the cost of applying join clauses */
3272 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3275 * Startup cost includes startup cost of joining relations and the
3276 * startup cost for join and other clauses. We do not include the
3277 * startup cost specific to join strategy (e.g. setting up hash
3278 * tables) since we do not know what strategy the foreign server
3279 * is going to use.
3281 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3282 startup_cost += join_cost.startup;
3283 startup_cost += remote_conds_cost.startup;
3284 startup_cost += fpinfo->local_conds_cost.startup;
3287 * Run time cost includes:
3289 * 1. Run time cost (total_cost - startup_cost) of relations being
3290 * joined
3292 * 2. Run time cost of applying join clauses on the cross product
3293 * of the joining relations.
3295 * 3. Run time cost of applying pushed down other clauses on the
3296 * result of join
3298 * 4. Run time cost of applying nonpushable other clauses locally
3299 * on the result fetched from the foreign server.
3301 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3302 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3303 run_cost += nrows * join_cost.per_tuple;
3304 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3305 run_cost += nrows * remote_conds_cost.per_tuple;
3306 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3308 /* Add in tlist eval cost for each output row */
3309 startup_cost += foreignrel->reltarget->cost.startup;
3310 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3312 else if (IS_UPPER_REL(foreignrel))
3314 RelOptInfo *outerrel = fpinfo->outerrel;
3315 PgFdwRelationInfo *ofpinfo;
3316 AggClauseCosts aggcosts;
3317 double input_rows;
3318 int numGroupCols;
3319 double numGroups = 1;
3321 /* The upper relation should have its outer relation set */
3322 Assert(outerrel);
3323 /* and that outer relation should have its reltarget set */
3324 Assert(outerrel->reltarget);
3327 * This cost model is mixture of costing done for sorted and
3328 * hashed aggregates in cost_agg(). We are not sure which
3329 * strategy will be considered at remote side, thus for
3330 * simplicity, we put all startup related costs in startup_cost
3331 * and all finalization and run cost are added in total_cost.
3334 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3336 /* Get rows from input rel */
3337 input_rows = ofpinfo->rows;
3339 /* Collect statistics about aggregates for estimating costs. */
3340 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3341 if (root->parse->hasAggs)
3343 get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3346 /* Get number of grouping columns and possible number of groups */
3347 numGroupCols = list_length(root->processed_groupClause);
3348 numGroups = estimate_num_groups(root,
3349 get_sortgrouplist_exprs(root->processed_groupClause,
3350 fpinfo->grouped_tlist),
3351 input_rows, NULL, NULL);
3354 * Get the retrieved_rows and rows estimates. If there are HAVING
3355 * quals, account for their selectivity.
3357 if (root->hasHavingQual)
3359 /* Factor in the selectivity of the remotely-checked quals */
3360 retrieved_rows =
3361 clamp_row_est(numGroups *
3362 clauselist_selectivity(root,
3363 fpinfo->remote_conds,
3365 JOIN_INNER,
3366 NULL));
3367 /* Factor in the selectivity of the locally-checked quals */
3368 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3370 else
3372 rows = retrieved_rows = numGroups;
3375 /* Use width estimate made by the core code. */
3376 width = foreignrel->reltarget->width;
3378 /*-----
3379 * Startup cost includes:
3380 * 1. Startup cost for underneath input relation, adjusted for
3381 * tlist replacement by apply_scanjoin_target_to_paths()
3382 * 2. Cost of performing aggregation, per cost_agg()
3383 *-----
3385 startup_cost = ofpinfo->rel_startup_cost;
3386 startup_cost += outerrel->reltarget->cost.startup;
3387 startup_cost += aggcosts.transCost.startup;
3388 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3389 startup_cost += aggcosts.finalCost.startup;
3390 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3392 /*-----
3393 * Run time cost includes:
3394 * 1. Run time cost of underneath input relation, adjusted for
3395 * tlist replacement by apply_scanjoin_target_to_paths()
3396 * 2. Run time cost of performing aggregation, per cost_agg()
3397 *-----
3399 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3400 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3401 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3402 run_cost += cpu_tuple_cost * numGroups;
3404 /* Account for the eval cost of HAVING quals, if any */
3405 if (root->hasHavingQual)
3407 QualCost remote_cost;
3409 /* Add in the eval cost of the remotely-checked quals */
3410 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3411 startup_cost += remote_cost.startup;
3412 run_cost += remote_cost.per_tuple * numGroups;
3413 /* Add in the eval cost of the locally-checked quals */
3414 startup_cost += fpinfo->local_conds_cost.startup;
3415 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3418 /* Add in tlist eval cost for each output row */
3419 startup_cost += foreignrel->reltarget->cost.startup;
3420 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3422 else
3424 Cost cpu_per_tuple;
3426 /* Use rows/width estimates made by set_baserel_size_estimates. */
3427 rows = foreignrel->rows;
3428 width = foreignrel->reltarget->width;
3431 * Back into an estimate of the number of retrieved rows. Just in
3432 * case this is nuts, clamp to at most foreignrel->tuples.
3434 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3435 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3438 * Cost as though this were a seqscan, which is pessimistic. We
3439 * effectively imagine the local_conds are being evaluated
3440 * remotely, too.
3442 startup_cost = 0;
3443 run_cost = 0;
3444 run_cost += seq_page_cost * foreignrel->pages;
3446 startup_cost += foreignrel->baserestrictcost.startup;
3447 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3448 run_cost += cpu_per_tuple * foreignrel->tuples;
3450 /* Add in tlist eval cost for each output row */
3451 startup_cost += foreignrel->reltarget->cost.startup;
3452 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3456 * Without remote estimates, we have no real way to estimate the cost
3457 * of generating sorted output. It could be free if the query plan
3458 * the remote side would have chosen generates properly-sorted output
3459 * anyway, but in most cases it will cost something. Estimate a value
3460 * high enough that we won't pick the sorted path when the ordering
3461 * isn't locally useful, but low enough that we'll err on the side of
3462 * pushing down the ORDER BY clause when it's useful to do so.
3464 if (pathkeys != NIL)
3466 if (IS_UPPER_REL(foreignrel))
3468 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3469 fpinfo->stage == UPPERREL_GROUP_AGG);
3470 adjust_foreign_grouping_path_cost(root, pathkeys,
3471 retrieved_rows, width,
3472 fpextra->limit_tuples,
3473 &startup_cost, &run_cost);
3475 else
3477 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3478 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3482 total_cost = startup_cost + run_cost;
3484 /* Adjust the cost estimates if we have LIMIT */
3485 if (fpextra && fpextra->has_limit)
3487 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3488 fpextra->offset_est, fpextra->count_est);
3489 retrieved_rows = rows;
3494 * If this includes the final sort step, the given target, which will be
3495 * applied to the resulting path, might have different expressions from
3496 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3497 * eval costs.
3499 if (fpextra && fpextra->has_final_sort &&
3500 fpextra->target != foreignrel->reltarget)
3502 QualCost oldcost = foreignrel->reltarget->cost;
3503 QualCost newcost = fpextra->target->cost;
3505 startup_cost += newcost.startup - oldcost.startup;
3506 total_cost += newcost.startup - oldcost.startup;
3507 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3511 * Cache the retrieved rows and cost estimates for scans, joins, or
3512 * groupings without any parameterization, pathkeys, or additional
3513 * post-scan/join-processing steps, before adding the costs for
3514 * transferring data from the foreign server. These estimates are useful
3515 * for costing remote joins involving this relation or costing other
3516 * remote operations on this relation such as remote sorts and remote
3517 * LIMIT restrictions, when the costs can not be obtained from the foreign
3518 * server. This function will be called at least once for every foreign
3519 * relation without any parameterization, pathkeys, or additional
3520 * post-scan/join-processing steps.
3522 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3524 fpinfo->retrieved_rows = retrieved_rows;
3525 fpinfo->rel_startup_cost = startup_cost;
3526 fpinfo->rel_total_cost = total_cost;
3530 * Add some additional cost factors to account for connection overhead
3531 * (fdw_startup_cost), transferring data across the network
3532 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3533 * (cpu_tuple_cost per retrieved row).
3535 startup_cost += fpinfo->fdw_startup_cost;
3536 total_cost += fpinfo->fdw_startup_cost;
3537 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3538 total_cost += cpu_tuple_cost * retrieved_rows;
3541 * If we have LIMIT, we should prefer performing the restriction remotely
3542 * rather than locally, as the former avoids extra row fetches from the
3543 * remote that the latter might cause. But since the core code doesn't
3544 * account for such fetches when estimating the costs of the local
3545 * restriction (see create_limit_path()), there would be no difference
3546 * between the costs of the local restriction and the costs of the remote
3547 * restriction estimated above if we don't use remote estimates (except
3548 * for the case where the foreignrel is a grouping relation, the given
3549 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3550 * accounted for in costing the remote restriction). Tweak the costs of
3551 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3552 * one.
3554 if (!fpinfo->use_remote_estimate &&
3555 fpextra && fpextra->has_limit &&
3556 fpextra->limit_tuples > 0 &&
3557 fpextra->limit_tuples < fpinfo->rows)
3559 Assert(fpinfo->rows > 0);
3560 total_cost -= (total_cost - startup_cost) * 0.05 *
3561 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3564 /* Return results. */
3565 *p_rows = rows;
3566 *p_width = width;
3567 *p_startup_cost = startup_cost;
3568 *p_total_cost = total_cost;
3572 * Estimate costs of executing a SQL statement remotely.
3573 * The given "sql" must be an EXPLAIN command.
3575 static void
3576 get_remote_estimate(const char *sql, PGconn *conn,
3577 double *rows, int *width,
3578 Cost *startup_cost, Cost *total_cost)
3580 PGresult *volatile res = NULL;
3582 /* PGresult must be released before leaving this function. */
3583 PG_TRY();
3585 char *line;
3586 char *p;
3587 int n;
3590 * Execute EXPLAIN remotely.
3592 res = pgfdw_exec_query(conn, sql, NULL);
3593 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3594 pgfdw_report_error(ERROR, res, conn, false, sql);
3597 * Extract cost numbers for topmost plan node. Note we search for a
3598 * left paren from the end of the line to avoid being confused by
3599 * other uses of parentheses.
3601 line = PQgetvalue(res, 0, 0);
3602 p = strrchr(line, '(');
3603 if (p == NULL)
3604 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3605 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3606 startup_cost, total_cost, rows, width);
3607 if (n != 4)
3608 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3610 PG_FINALLY();
3612 PQclear(res);
3614 PG_END_TRY();
3618 * Adjust the cost estimates of a foreign grouping path to include the cost of
3619 * generating properly-sorted output.
3621 static void
3622 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3623 List *pathkeys,
3624 double retrieved_rows,
3625 double width,
3626 double limit_tuples,
3627 Cost *p_startup_cost,
3628 Cost *p_run_cost)
3631 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3632 * side is unlikely to generate properly-sorted output, so it would need
3633 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3634 * if the GROUP BY clause is sort-able but isn't a superset of the given
3635 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3636 * costs by applying the same heuristic as for the scan or join case.
3638 if (!grouping_is_sortable(root->processed_groupClause) ||
3639 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3641 Path sort_path; /* dummy for result of cost_sort */
3643 cost_sort(&sort_path,
3644 root,
3645 pathkeys,
3646 *p_startup_cost + *p_run_cost,
3647 retrieved_rows,
3648 width,
3649 0.0,
3650 work_mem,
3651 limit_tuples);
3653 *p_startup_cost = sort_path.startup_cost;
3654 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3656 else
3659 * The default extra cost seems too large for foreign-grouping cases;
3660 * add 1/4th of that default.
3662 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3663 - 1.0) * 0.25;
3665 *p_startup_cost *= sort_multiplier;
3666 *p_run_cost *= sort_multiplier;
3671 * Detect whether we want to process an EquivalenceClass member.
3673 * This is a callback for use by generate_implied_equalities_for_column.
3675 static bool
3676 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3677 EquivalenceClass *ec, EquivalenceMember *em,
3678 void *arg)
3680 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3681 Expr *expr = em->em_expr;
3684 * If we've identified what we're processing in the current scan, we only
3685 * want to match that expression.
3687 if (state->current != NULL)
3688 return equal(expr, state->current);
3691 * Otherwise, ignore anything we've already processed.
3693 if (list_member(state->already_used, expr))
3694 return false;
3696 /* This is the new target to process. */
3697 state->current = expr;
3698 return true;
3702 * Create cursor for node's query with current parameter values.
3704 static void
3705 create_cursor(ForeignScanState *node)
3707 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3708 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3709 int numParams = fsstate->numParams;
3710 const char **values = fsstate->param_values;
3711 PGconn *conn = fsstate->conn;
3712 StringInfoData buf;
3713 PGresult *res;
3715 /* First, process a pending asynchronous request, if any. */
3716 if (fsstate->conn_state->pendingAreq)
3717 process_pending_request(fsstate->conn_state->pendingAreq);
3720 * Construct array of query parameter values in text format. We do the
3721 * conversions in the short-lived per-tuple context, so as not to cause a
3722 * memory leak over repeated scans.
3724 if (numParams > 0)
3726 MemoryContext oldcontext;
3728 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3730 process_query_params(econtext,
3731 fsstate->param_flinfo,
3732 fsstate->param_exprs,
3733 values);
3735 MemoryContextSwitchTo(oldcontext);
3738 /* Construct the DECLARE CURSOR command */
3739 initStringInfo(&buf);
3740 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3741 fsstate->cursor_number, fsstate->query);
3744 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3745 * to infer types for all parameters. Since we explicitly cast every
3746 * parameter (see deparse.c), the "inference" is trivial and will produce
3747 * the desired result. This allows us to avoid assuming that the remote
3748 * server has the same OIDs we do for the parameters' types.
3750 if (!PQsendQueryParams(conn, buf.data, numParams,
3751 NULL, values, NULL, NULL, 0))
3752 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3755 * Get the result, and check for success.
3757 * We don't use a PG_TRY block here, so be careful not to throw error
3758 * without releasing the PGresult.
3760 res = pgfdw_get_result(conn, buf.data);
3761 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3762 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3763 PQclear(res);
3765 /* Mark the cursor as created, and show no tuples have been retrieved */
3766 fsstate->cursor_exists = true;
3767 fsstate->tuples = NULL;
3768 fsstate->num_tuples = 0;
3769 fsstate->next_tuple = 0;
3770 fsstate->fetch_ct_2 = 0;
3771 fsstate->eof_reached = false;
3773 /* Clean up */
3774 pfree(buf.data);
3778 * Fetch some more rows from the node's cursor.
3780 static void
3781 fetch_more_data(ForeignScanState *node)
3783 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3784 PGresult *volatile res = NULL;
3785 MemoryContext oldcontext;
3788 * We'll store the tuples in the batch_cxt. First, flush the previous
3789 * batch.
3791 fsstate->tuples = NULL;
3792 MemoryContextReset(fsstate->batch_cxt);
3793 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3795 /* PGresult must be released before leaving this function. */
3796 PG_TRY();
3798 PGconn *conn = fsstate->conn;
3799 int numrows;
3800 int i;
3802 if (fsstate->async_capable)
3804 Assert(fsstate->conn_state->pendingAreq);
3807 * The query was already sent by an earlier call to
3808 * fetch_more_data_begin. So now we just fetch the result.
3810 res = pgfdw_get_result(conn, fsstate->query);
3811 /* On error, report the original query, not the FETCH. */
3812 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3813 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3815 /* Reset per-connection state */
3816 fsstate->conn_state->pendingAreq = NULL;
3818 else
3820 char sql[64];
3822 /* This is a regular synchronous fetch. */
3823 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3824 fsstate->fetch_size, fsstate->cursor_number);
3826 res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3827 /* On error, report the original query, not the FETCH. */
3828 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3829 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3832 /* Convert the data into HeapTuples */
3833 numrows = PQntuples(res);
3834 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3835 fsstate->num_tuples = numrows;
3836 fsstate->next_tuple = 0;
3838 for (i = 0; i < numrows; i++)
3840 Assert(IsA(node->ss.ps.plan, ForeignScan));
3842 fsstate->tuples[i] =
3843 make_tuple_from_result_row(res, i,
3844 fsstate->rel,
3845 fsstate->attinmeta,
3846 fsstate->retrieved_attrs,
3847 node,
3848 fsstate->temp_cxt);
3851 /* Update fetch_ct_2 */
3852 if (fsstate->fetch_ct_2 < 2)
3853 fsstate->fetch_ct_2++;
3855 /* Must be EOF if we didn't get as many tuples as we asked for. */
3856 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3858 PG_FINALLY();
3860 PQclear(res);
3862 PG_END_TRY();
3864 MemoryContextSwitchTo(oldcontext);
3868 * Force assorted GUC parameters to settings that ensure that we'll output
3869 * data values in a form that is unambiguous to the remote server.
3871 * This is rather expensive and annoying to do once per row, but there's
3872 * little choice if we want to be sure values are transmitted accurately;
3873 * we can't leave the settings in place between rows for fear of affecting
3874 * user-visible computations.
3876 * We use the equivalent of a function SET option to allow the settings to
3877 * persist only until the caller calls reset_transmission_modes(). If an
3878 * error is thrown in between, guc.c will take care of undoing the settings.
3880 * The return value is the nestlevel that must be passed to
3881 * reset_transmission_modes() to undo things.
3884 set_transmission_modes(void)
3886 int nestlevel = NewGUCNestLevel();
3889 * The values set here should match what pg_dump does. See also
3890 * configure_remote_session in connection.c.
3892 if (DateStyle != USE_ISO_DATES)
3893 (void) set_config_option("datestyle", "ISO",
3894 PGC_USERSET, PGC_S_SESSION,
3895 GUC_ACTION_SAVE, true, 0, false);
3896 if (IntervalStyle != INTSTYLE_POSTGRES)
3897 (void) set_config_option("intervalstyle", "postgres",
3898 PGC_USERSET, PGC_S_SESSION,
3899 GUC_ACTION_SAVE, true, 0, false);
3900 if (extra_float_digits < 3)
3901 (void) set_config_option("extra_float_digits", "3",
3902 PGC_USERSET, PGC_S_SESSION,
3903 GUC_ACTION_SAVE, true, 0, false);
3906 * In addition force restrictive search_path, in case there are any
3907 * regproc or similar constants to be printed.
3909 (void) set_config_option("search_path", "pg_catalog",
3910 PGC_USERSET, PGC_S_SESSION,
3911 GUC_ACTION_SAVE, true, 0, false);
3913 return nestlevel;
3917 * Undo the effects of set_transmission_modes().
3919 void
3920 reset_transmission_modes(int nestlevel)
3922 AtEOXact_GUC(true, nestlevel);
3926 * Utility routine to close a cursor.
3928 static void
3929 close_cursor(PGconn *conn, unsigned int cursor_number,
3930 PgFdwConnState *conn_state)
3932 char sql[64];
3933 PGresult *res;
3935 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3938 * We don't use a PG_TRY block here, so be careful not to throw error
3939 * without releasing the PGresult.
3941 res = pgfdw_exec_query(conn, sql, conn_state);
3942 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3943 pgfdw_report_error(ERROR, res, conn, true, sql);
3944 PQclear(res);
3948 * create_foreign_modify
3949 * Construct an execution state of a foreign insert/update/delete
3950 * operation
3952 static PgFdwModifyState *
3953 create_foreign_modify(EState *estate,
3954 RangeTblEntry *rte,
3955 ResultRelInfo *resultRelInfo,
3956 CmdType operation,
3957 Plan *subplan,
3958 char *query,
3959 List *target_attrs,
3960 int values_end,
3961 bool has_returning,
3962 List *retrieved_attrs)
3964 PgFdwModifyState *fmstate;
3965 Relation rel = resultRelInfo->ri_RelationDesc;
3966 TupleDesc tupdesc = RelationGetDescr(rel);
3967 Oid userid;
3968 ForeignTable *table;
3969 UserMapping *user;
3970 AttrNumber n_params;
3971 Oid typefnoid;
3972 bool isvarlena;
3973 ListCell *lc;
3975 /* Begin constructing PgFdwModifyState. */
3976 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3977 fmstate->rel = rel;
3979 /* Identify which user to do the remote access as. */
3980 userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
3982 /* Get info about foreign table. */
3983 table = GetForeignTable(RelationGetRelid(rel));
3984 user = GetUserMapping(userid, table->serverid);
3986 /* Open connection; report that we'll create a prepared statement. */
3987 fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3988 fmstate->p_name = NULL; /* prepared statement not made yet */
3990 /* Set up remote query information. */
3991 fmstate->query = query;
3992 if (operation == CMD_INSERT)
3994 fmstate->query = pstrdup(fmstate->query);
3995 fmstate->orig_query = pstrdup(fmstate->query);
3997 fmstate->target_attrs = target_attrs;
3998 fmstate->values_end = values_end;
3999 fmstate->has_returning = has_returning;
4000 fmstate->retrieved_attrs = retrieved_attrs;
4002 /* Create context for per-tuple temp workspace. */
4003 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4004 "postgres_fdw temporary data",
4005 ALLOCSET_SMALL_SIZES);
4007 /* Prepare for input conversion of RETURNING results. */
4008 if (fmstate->has_returning)
4009 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4011 /* Prepare for output conversion of parameters used in prepared stmt. */
4012 n_params = list_length(fmstate->target_attrs) + 1;
4013 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4014 fmstate->p_nums = 0;
4016 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4018 Assert(subplan != NULL);
4020 /* Find the ctid resjunk column in the subplan's result */
4021 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
4022 "ctid");
4023 if (!AttributeNumberIsValid(fmstate->ctidAttno))
4024 elog(ERROR, "could not find junk ctid column");
4026 /* First transmittable parameter will be ctid */
4027 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4028 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4029 fmstate->p_nums++;
4032 if (operation == CMD_INSERT || operation == CMD_UPDATE)
4034 /* Set up for remaining transmittable parameters */
4035 foreach(lc, fmstate->target_attrs)
4037 int attnum = lfirst_int(lc);
4038 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4040 Assert(!attr->attisdropped);
4042 /* Ignore generated columns; they are set to DEFAULT */
4043 if (attr->attgenerated)
4044 continue;
4045 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4046 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4047 fmstate->p_nums++;
4051 Assert(fmstate->p_nums <= n_params);
4053 /* Set batch_size from foreign server/table options. */
4054 if (operation == CMD_INSERT)
4055 fmstate->batch_size = get_batch_size_option(rel);
4057 fmstate->num_slots = 1;
4059 /* Initialize auxiliary state */
4060 fmstate->aux_fmstate = NULL;
4062 return fmstate;
4066 * execute_foreign_modify
4067 * Perform foreign-table modification as required, and fetch RETURNING
4068 * result if any. (This is the shared guts of postgresExecForeignInsert,
4069 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4070 * postgresExecForeignDelete.)
4072 static TupleTableSlot **
4073 execute_foreign_modify(EState *estate,
4074 ResultRelInfo *resultRelInfo,
4075 CmdType operation,
4076 TupleTableSlot **slots,
4077 TupleTableSlot **planSlots,
4078 int *numSlots)
4080 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4081 ItemPointer ctid = NULL;
4082 const char **p_values;
4083 PGresult *res;
4084 int n_rows;
4085 StringInfoData sql;
4087 /* The operation should be INSERT, UPDATE, or DELETE */
4088 Assert(operation == CMD_INSERT ||
4089 operation == CMD_UPDATE ||
4090 operation == CMD_DELETE);
4092 /* First, process a pending asynchronous request, if any. */
4093 if (fmstate->conn_state->pendingAreq)
4094 process_pending_request(fmstate->conn_state->pendingAreq);
4097 * If the existing query was deparsed and prepared for a different number
4098 * of rows, rebuild it for the proper number.
4100 if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4102 /* Destroy the prepared statement created previously */
4103 if (fmstate->p_name)
4104 deallocate_query(fmstate);
4106 /* Build INSERT string with numSlots records in its VALUES clause. */
4107 initStringInfo(&sql);
4108 rebuildInsertSql(&sql, fmstate->rel,
4109 fmstate->orig_query, fmstate->target_attrs,
4110 fmstate->values_end, fmstate->p_nums,
4111 *numSlots - 1);
4112 pfree(fmstate->query);
4113 fmstate->query = sql.data;
4114 fmstate->num_slots = *numSlots;
4117 /* Set up the prepared statement on the remote server, if we didn't yet */
4118 if (!fmstate->p_name)
4119 prepare_foreign_modify(fmstate);
4122 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4124 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4126 Datum datum;
4127 bool isNull;
4129 datum = ExecGetJunkAttribute(planSlots[0],
4130 fmstate->ctidAttno,
4131 &isNull);
4132 /* shouldn't ever get a null result... */
4133 if (isNull)
4134 elog(ERROR, "ctid is NULL");
4135 ctid = (ItemPointer) DatumGetPointer(datum);
4138 /* Convert parameters needed by prepared statement to text form */
4139 p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4142 * Execute the prepared statement.
4144 if (!PQsendQueryPrepared(fmstate->conn,
4145 fmstate->p_name,
4146 fmstate->p_nums * (*numSlots),
4147 p_values,
4148 NULL,
4149 NULL,
4151 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4154 * Get the result, and check for success.
4156 * We don't use a PG_TRY block here, so be careful not to throw error
4157 * without releasing the PGresult.
4159 res = pgfdw_get_result(fmstate->conn, fmstate->query);
4160 if (PQresultStatus(res) !=
4161 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4162 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4164 /* Check number of rows affected, and fetch RETURNING tuple if any */
4165 if (fmstate->has_returning)
4167 Assert(*numSlots == 1);
4168 n_rows = PQntuples(res);
4169 if (n_rows > 0)
4170 store_returning_result(fmstate, slots[0], res);
4172 else
4173 n_rows = atoi(PQcmdTuples(res));
4175 /* And clean up */
4176 PQclear(res);
4178 MemoryContextReset(fmstate->temp_cxt);
4180 *numSlots = n_rows;
4183 * Return NULL if nothing was inserted/updated/deleted on the remote end
4185 return (n_rows > 0) ? slots : NULL;
4189 * prepare_foreign_modify
4190 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4192 static void
4193 prepare_foreign_modify(PgFdwModifyState *fmstate)
4195 char prep_name[NAMEDATALEN];
4196 char *p_name;
4197 PGresult *res;
4200 * The caller would already have processed a pending asynchronous request
4201 * if any, so no need to do it here.
4204 /* Construct name we'll use for the prepared statement. */
4205 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4206 GetPrepStmtNumber(fmstate->conn));
4207 p_name = pstrdup(prep_name);
4210 * We intentionally do not specify parameter types here, but leave the
4211 * remote server to derive them by default. This avoids possible problems
4212 * with the remote server using different type OIDs than we do. All of
4213 * the prepared statements we use in this module are simple enough that
4214 * the remote server will make the right choices.
4216 if (!PQsendPrepare(fmstate->conn,
4217 p_name,
4218 fmstate->query,
4220 NULL))
4221 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4224 * Get the result, and check for success.
4226 * We don't use a PG_TRY block here, so be careful not to throw error
4227 * without releasing the PGresult.
4229 res = pgfdw_get_result(fmstate->conn, fmstate->query);
4230 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4231 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4232 PQclear(res);
4234 /* This action shows that the prepare has been done. */
4235 fmstate->p_name = p_name;
4239 * convert_prep_stmt_params
4240 * Create array of text strings representing parameter values
4242 * tupleid is ctid to send, or NULL if none
4243 * slot is slot to get remaining parameters from, or NULL if none
4245 * Data is constructed in temp_cxt; caller should reset that after use.
4247 static const char **
4248 convert_prep_stmt_params(PgFdwModifyState *fmstate,
4249 ItemPointer tupleid,
4250 TupleTableSlot **slots,
4251 int numSlots)
4253 const char **p_values;
4254 int i;
4255 int j;
4256 int pindex = 0;
4257 MemoryContext oldcontext;
4259 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4261 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4263 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4264 Assert(!(tupleid != NULL && numSlots > 1));
4266 /* 1st parameter should be ctid, if it's in use */
4267 if (tupleid != NULL)
4269 Assert(numSlots == 1);
4270 /* don't need set_transmission_modes for TID output */
4271 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4272 PointerGetDatum(tupleid));
4273 pindex++;
4276 /* get following parameters from slots */
4277 if (slots != NULL && fmstate->target_attrs != NIL)
4279 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4280 int nestlevel;
4281 ListCell *lc;
4283 nestlevel = set_transmission_modes();
4285 for (i = 0; i < numSlots; i++)
4287 j = (tupleid != NULL) ? 1 : 0;
4288 foreach(lc, fmstate->target_attrs)
4290 int attnum = lfirst_int(lc);
4291 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4292 Datum value;
4293 bool isnull;
4295 /* Ignore generated columns; they are set to DEFAULT */
4296 if (attr->attgenerated)
4297 continue;
4298 value = slot_getattr(slots[i], attnum, &isnull);
4299 if (isnull)
4300 p_values[pindex] = NULL;
4301 else
4302 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4303 value);
4304 pindex++;
4305 j++;
4309 reset_transmission_modes(nestlevel);
4312 Assert(pindex == fmstate->p_nums * numSlots);
4314 MemoryContextSwitchTo(oldcontext);
4316 return p_values;
4320 * store_returning_result
4321 * Store the result of a RETURNING clause
4323 * On error, be sure to release the PGresult on the way out. Callers do not
4324 * have PG_TRY blocks to ensure this happens.
4326 static void
4327 store_returning_result(PgFdwModifyState *fmstate,
4328 TupleTableSlot *slot, PGresult *res)
4330 PG_TRY();
4332 HeapTuple newtup;
4334 newtup = make_tuple_from_result_row(res, 0,
4335 fmstate->rel,
4336 fmstate->attinmeta,
4337 fmstate->retrieved_attrs,
4338 NULL,
4339 fmstate->temp_cxt);
4342 * The returning slot will not necessarily be suitable to store
4343 * heaptuples directly, so allow for conversion.
4345 ExecForceStoreHeapTuple(newtup, slot, true);
4347 PG_CATCH();
4349 PQclear(res);
4350 PG_RE_THROW();
4352 PG_END_TRY();
4356 * finish_foreign_modify
4357 * Release resources for a foreign insert/update/delete operation
4359 static void
4360 finish_foreign_modify(PgFdwModifyState *fmstate)
4362 Assert(fmstate != NULL);
4364 /* If we created a prepared statement, destroy it */
4365 deallocate_query(fmstate);
4367 /* Release remote connection */
4368 ReleaseConnection(fmstate->conn);
4369 fmstate->conn = NULL;
4373 * deallocate_query
4374 * Deallocate a prepared statement for a foreign insert/update/delete
4375 * operation
4377 static void
4378 deallocate_query(PgFdwModifyState *fmstate)
4380 char sql[64];
4381 PGresult *res;
4383 /* do nothing if the query is not allocated */
4384 if (!fmstate->p_name)
4385 return;
4387 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4390 * We don't use a PG_TRY block here, so be careful not to throw error
4391 * without releasing the PGresult.
4393 res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4394 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4395 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4396 PQclear(res);
4397 pfree(fmstate->p_name);
4398 fmstate->p_name = NULL;
4402 * build_remote_returning
4403 * Build a RETURNING targetlist of a remote query for performing an
4404 * UPDATE/DELETE .. RETURNING on a join directly
4406 static List *
4407 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4409 bool have_wholerow = false;
4410 List *tlist = NIL;
4411 List *vars;
4412 ListCell *lc;
4414 Assert(returningList);
4416 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4419 * If there's a whole-row reference to the target relation, then we'll
4420 * need all the columns of the relation.
4422 foreach(lc, vars)
4424 Var *var = (Var *) lfirst(lc);
4426 if (IsA(var, Var) &&
4427 var->varno == rtindex &&
4428 var->varattno == InvalidAttrNumber)
4430 have_wholerow = true;
4431 break;
4435 if (have_wholerow)
4437 TupleDesc tupdesc = RelationGetDescr(rel);
4438 int i;
4440 for (i = 1; i <= tupdesc->natts; i++)
4442 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4443 Var *var;
4445 /* Ignore dropped attributes. */
4446 if (attr->attisdropped)
4447 continue;
4449 var = makeVar(rtindex,
4451 attr->atttypid,
4452 attr->atttypmod,
4453 attr->attcollation,
4456 tlist = lappend(tlist,
4457 makeTargetEntry((Expr *) var,
4458 list_length(tlist) + 1,
4459 NULL,
4460 false));
4464 /* Now add any remaining columns to tlist. */
4465 foreach(lc, vars)
4467 Var *var = (Var *) lfirst(lc);
4470 * No need for whole-row references to the target relation. We don't
4471 * need system columns other than ctid and oid either, since those are
4472 * set locally.
4474 if (IsA(var, Var) &&
4475 var->varno == rtindex &&
4476 var->varattno <= InvalidAttrNumber &&
4477 var->varattno != SelfItemPointerAttributeNumber)
4478 continue; /* don't need it */
4480 if (tlist_member((Expr *) var, tlist))
4481 continue; /* already got it */
4483 tlist = lappend(tlist,
4484 makeTargetEntry((Expr *) var,
4485 list_length(tlist) + 1,
4486 NULL,
4487 false));
4490 list_free(vars);
4492 return tlist;
4496 * rebuild_fdw_scan_tlist
4497 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4498 * tlist
4500 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4501 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4502 * have contained resjunk columns such as 'ctid' of the target relation and
4503 * 'wholerow' of non-target relations, but the tlist might not contain them,
4504 * for example. So, adjust the tlist so it contains all the columns specified
4505 * in the fdw_scan_tlist; else setrefs.c will get confused.
4507 static void
4508 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
4510 List *new_tlist = tlist;
4511 List *old_tlist = fscan->fdw_scan_tlist;
4512 ListCell *lc;
4514 foreach(lc, old_tlist)
4516 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4518 if (tlist_member(tle->expr, new_tlist))
4519 continue; /* already got it */
4521 new_tlist = lappend(new_tlist,
4522 makeTargetEntry(tle->expr,
4523 list_length(new_tlist) + 1,
4524 NULL,
4525 false));
4527 fscan->fdw_scan_tlist = new_tlist;
4531 * Execute a direct UPDATE/DELETE statement.
4533 static void
4534 execute_dml_stmt(ForeignScanState *node)
4536 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4537 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4538 int numParams = dmstate->numParams;
4539 const char **values = dmstate->param_values;
4541 /* First, process a pending asynchronous request, if any. */
4542 if (dmstate->conn_state->pendingAreq)
4543 process_pending_request(dmstate->conn_state->pendingAreq);
4546 * Construct array of query parameter values in text format.
4548 if (numParams > 0)
4549 process_query_params(econtext,
4550 dmstate->param_flinfo,
4551 dmstate->param_exprs,
4552 values);
4555 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4556 * to infer types for all parameters. Since we explicitly cast every
4557 * parameter (see deparse.c), the "inference" is trivial and will produce
4558 * the desired result. This allows us to avoid assuming that the remote
4559 * server has the same OIDs we do for the parameters' types.
4561 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4562 NULL, values, NULL, NULL, 0))
4563 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4566 * Get the result, and check for success.
4568 * We don't use a PG_TRY block here, so be careful not to throw error
4569 * without releasing the PGresult.
4571 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4572 if (PQresultStatus(dmstate->result) !=
4573 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4574 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4575 dmstate->query);
4577 /* Get the number of rows affected. */
4578 if (dmstate->has_returning)
4579 dmstate->num_tuples = PQntuples(dmstate->result);
4580 else
4581 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4585 * Get the result of a RETURNING clause.
4587 static TupleTableSlot *
4588 get_returning_data(ForeignScanState *node)
4590 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4591 EState *estate = node->ss.ps.state;
4592 ResultRelInfo *resultRelInfo = node->resultRelInfo;
4593 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4594 TupleTableSlot *resultSlot;
4596 Assert(resultRelInfo->ri_projectReturning);
4598 /* If we didn't get any tuples, must be end of data. */
4599 if (dmstate->next_tuple >= dmstate->num_tuples)
4600 return ExecClearTuple(slot);
4602 /* Increment the command es_processed count if necessary. */
4603 if (dmstate->set_processed)
4604 estate->es_processed += 1;
4607 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4608 * tuple. (has_returning is false when the local query is of the form
4609 * "UPDATE/DELETE .. RETURNING 1" for example.)
4611 if (!dmstate->has_returning)
4613 ExecStoreAllNullTuple(slot);
4614 resultSlot = slot;
4616 else
4619 * On error, be sure to release the PGresult on the way out. Callers
4620 * do not have PG_TRY blocks to ensure this happens.
4622 PG_TRY();
4624 HeapTuple newtup;
4626 newtup = make_tuple_from_result_row(dmstate->result,
4627 dmstate->next_tuple,
4628 dmstate->rel,
4629 dmstate->attinmeta,
4630 dmstate->retrieved_attrs,
4631 node,
4632 dmstate->temp_cxt);
4633 ExecStoreHeapTuple(newtup, slot, false);
4635 PG_CATCH();
4637 PQclear(dmstate->result);
4638 PG_RE_THROW();
4640 PG_END_TRY();
4642 /* Get the updated/deleted tuple. */
4643 if (dmstate->rel)
4644 resultSlot = slot;
4645 else
4646 resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4648 dmstate->next_tuple++;
4650 /* Make slot available for evaluation of the local query RETURNING list. */
4651 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4652 resultSlot;
4654 return slot;
4658 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4660 static void
4661 init_returning_filter(PgFdwDirectModifyState *dmstate,
4662 List *fdw_scan_tlist,
4663 Index rtindex)
4665 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4666 ListCell *lc;
4667 int i;
4670 * Calculate the mapping between the fdw_scan_tlist's entries and the
4671 * result tuple's attributes.
4673 * The "map" is an array of indexes of the result tuple's attributes in
4674 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4675 * tuple. We store zero for any attributes that don't have the
4676 * corresponding entries in that list, marking that a NULL is needed in
4677 * the result tuple.
4679 * Also get the indexes of the entries for ctid and oid if any.
4681 dmstate->attnoMap = (AttrNumber *)
4682 palloc0(resultTupType->natts * sizeof(AttrNumber));
4684 dmstate->ctidAttno = dmstate->oidAttno = 0;
4686 i = 1;
4687 dmstate->hasSystemCols = false;
4688 foreach(lc, fdw_scan_tlist)
4690 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4691 Var *var = (Var *) tle->expr;
4693 Assert(IsA(var, Var));
4696 * If the Var is a column of the target relation to be retrieved from
4697 * the foreign server, get the index of the entry.
4699 if (var->varno == rtindex &&
4700 list_member_int(dmstate->retrieved_attrs, i))
4702 int attrno = var->varattno;
4704 if (attrno < 0)
4707 * We don't retrieve system columns other than ctid and oid.
4709 if (attrno == SelfItemPointerAttributeNumber)
4710 dmstate->ctidAttno = i;
4711 else
4712 Assert(false);
4713 dmstate->hasSystemCols = true;
4715 else
4718 * We don't retrieve whole-row references to the target
4719 * relation either.
4721 Assert(attrno > 0);
4723 dmstate->attnoMap[attrno - 1] = i;
4726 i++;
4731 * Extract and return an updated/deleted tuple from a scan tuple.
4733 static TupleTableSlot *
4734 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4735 ResultRelInfo *resultRelInfo,
4736 TupleTableSlot *slot,
4737 EState *estate)
4739 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4740 TupleTableSlot *resultSlot;
4741 Datum *values;
4742 bool *isnull;
4743 Datum *old_values;
4744 bool *old_isnull;
4745 int i;
4748 * Use the return tuple slot as a place to store the result tuple.
4750 resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4753 * Extract all the values of the scan tuple.
4755 slot_getallattrs(slot);
4756 old_values = slot->tts_values;
4757 old_isnull = slot->tts_isnull;
4760 * Prepare to build the result tuple.
4762 ExecClearTuple(resultSlot);
4763 values = resultSlot->tts_values;
4764 isnull = resultSlot->tts_isnull;
4767 * Transpose data into proper fields of the result tuple.
4769 for (i = 0; i < resultTupType->natts; i++)
4771 int j = dmstate->attnoMap[i];
4773 if (j == 0)
4775 values[i] = (Datum) 0;
4776 isnull[i] = true;
4778 else
4780 values[i] = old_values[j - 1];
4781 isnull[i] = old_isnull[j - 1];
4786 * Build the virtual tuple.
4788 ExecStoreVirtualTuple(resultSlot);
4791 * If we have any system columns to return, materialize a heap tuple in
4792 * the slot from column values set above and install system columns in
4793 * that tuple.
4795 if (dmstate->hasSystemCols)
4797 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4799 /* ctid */
4800 if (dmstate->ctidAttno)
4802 ItemPointer ctid = NULL;
4804 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4805 resultTup->t_self = *ctid;
4809 * And remaining columns
4811 * Note: since we currently don't allow the target relation to appear
4812 * on the nullable side of an outer join, any system columns wouldn't
4813 * go to NULL.
4815 * Note: no need to care about tableoid here because it will be
4816 * initialized in ExecProcessReturning().
4818 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4819 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4820 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4824 * And return the result tuple.
4826 return resultSlot;
4830 * Prepare for processing of parameters used in remote query.
4832 static void
4833 prepare_query_params(PlanState *node,
4834 List *fdw_exprs,
4835 int numParams,
4836 FmgrInfo **param_flinfo,
4837 List **param_exprs,
4838 const char ***param_values)
4840 int i;
4841 ListCell *lc;
4843 Assert(numParams > 0);
4845 /* Prepare for output conversion of parameters used in remote query. */
4846 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4848 i = 0;
4849 foreach(lc, fdw_exprs)
4851 Node *param_expr = (Node *) lfirst(lc);
4852 Oid typefnoid;
4853 bool isvarlena;
4855 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4856 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4857 i++;
4861 * Prepare remote-parameter expressions for evaluation. (Note: in
4862 * practice, we expect that all these expressions will be just Params, so
4863 * we could possibly do something more efficient than using the full
4864 * expression-eval machinery for this. But probably there would be little
4865 * benefit, and it'd require postgres_fdw to know more than is desirable
4866 * about Param evaluation.)
4868 *param_exprs = ExecInitExprList(fdw_exprs, node);
4870 /* Allocate buffer for text form of query parameters. */
4871 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4875 * Construct array of query parameter values in text format.
4877 static void
4878 process_query_params(ExprContext *econtext,
4879 FmgrInfo *param_flinfo,
4880 List *param_exprs,
4881 const char **param_values)
4883 int nestlevel;
4884 int i;
4885 ListCell *lc;
4887 nestlevel = set_transmission_modes();
4889 i = 0;
4890 foreach(lc, param_exprs)
4892 ExprState *expr_state = (ExprState *) lfirst(lc);
4893 Datum expr_value;
4894 bool isNull;
4896 /* Evaluate the parameter expression */
4897 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4900 * Get string representation of each parameter value by invoking
4901 * type-specific output function, unless the value is null.
4903 if (isNull)
4904 param_values[i] = NULL;
4905 else
4906 param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4908 i++;
4911 reset_transmission_modes(nestlevel);
4915 * postgresAnalyzeForeignTable
4916 * Test whether analyzing this foreign table is supported
4918 static bool
4919 postgresAnalyzeForeignTable(Relation relation,
4920 AcquireSampleRowsFunc *func,
4921 BlockNumber *totalpages)
4923 ForeignTable *table;
4924 UserMapping *user;
4925 PGconn *conn;
4926 StringInfoData sql;
4927 PGresult *volatile res = NULL;
4929 /* Return the row-analysis function pointer */
4930 *func = postgresAcquireSampleRowsFunc;
4933 * Now we have to get the number of pages. It's annoying that the ANALYZE
4934 * API requires us to return that now, because it forces some duplication
4935 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4936 * it's probably not worth redefining that API at this point.
4940 * Get the connection to use. We do the remote access as the table's
4941 * owner, even if the ANALYZE was started by some other user.
4943 table = GetForeignTable(RelationGetRelid(relation));
4944 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4945 conn = GetConnection(user, false, NULL);
4948 * Construct command to get page count for relation.
4950 initStringInfo(&sql);
4951 deparseAnalyzeSizeSql(&sql, relation);
4953 /* In what follows, do not risk leaking any PGresults. */
4954 PG_TRY();
4956 res = pgfdw_exec_query(conn, sql.data, NULL);
4957 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4958 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4960 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4961 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4962 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4964 PG_FINALLY();
4966 PQclear(res);
4968 PG_END_TRY();
4970 ReleaseConnection(conn);
4972 return true;
4976 * postgresGetAnalyzeInfoForForeignTable
4977 * Count tuples in foreign table (just get pg_class.reltuples).
4979 * can_tablesample determines if the remote relation supports acquiring the
4980 * sample using TABLESAMPLE.
4982 static double
4983 postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
4985 ForeignTable *table;
4986 UserMapping *user;
4987 PGconn *conn;
4988 StringInfoData sql;
4989 PGresult *volatile res = NULL;
4990 volatile double reltuples = -1;
4991 volatile char relkind = 0;
4993 /* assume the remote relation does not support TABLESAMPLE */
4994 *can_tablesample = false;
4997 * Get the connection to use. We do the remote access as the table's
4998 * owner, even if the ANALYZE was started by some other user.
5000 table = GetForeignTable(RelationGetRelid(relation));
5001 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5002 conn = GetConnection(user, false, NULL);
5005 * Construct command to get page count for relation.
5007 initStringInfo(&sql);
5008 deparseAnalyzeInfoSql(&sql, relation);
5010 /* In what follows, do not risk leaking any PGresults. */
5011 PG_TRY();
5013 res = pgfdw_exec_query(conn, sql.data, NULL);
5014 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5015 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5017 if (PQntuples(res) != 1 || PQnfields(res) != 2)
5018 elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
5019 reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
5020 relkind = *(PQgetvalue(res, 0, 1));
5022 PG_FINALLY();
5024 if (res)
5025 PQclear(res);
5027 PG_END_TRY();
5029 ReleaseConnection(conn);
5031 /* TABLESAMPLE is supported only for regular tables and matviews */
5032 *can_tablesample = (relkind == RELKIND_RELATION ||
5033 relkind == RELKIND_MATVIEW ||
5034 relkind == RELKIND_PARTITIONED_TABLE);
5036 return reltuples;
5040 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5042 * Selected rows are returned in the caller-allocated array rows[],
5043 * which must have at least targrows entries.
5044 * The actual number of rows selected is returned as the function result.
5045 * We also count the total number of rows in the table and return it into
5046 * *totalrows. Note that *totaldeadrows is always set to 0.
5048 * Note that the returned list of rows is not always in order by physical
5049 * position in the table. Therefore, correlation estimates derived later
5050 * may be meaningless, but it's OK because we don't use the estimates
5051 * currently (the planner only pays attention to correlation for indexscans).
5053 static int
5054 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
5055 HeapTuple *rows, int targrows,
5056 double *totalrows,
5057 double *totaldeadrows)
5059 PgFdwAnalyzeState astate;
5060 ForeignTable *table;
5061 ForeignServer *server;
5062 UserMapping *user;
5063 PGconn *conn;
5064 int server_version_num;
5065 PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5066 double sample_frac = -1.0;
5067 double reltuples;
5068 unsigned int cursor_number;
5069 StringInfoData sql;
5070 PGresult *volatile res = NULL;
5071 ListCell *lc;
5073 /* Initialize workspace state */
5074 astate.rel = relation;
5075 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
5077 astate.rows = rows;
5078 astate.targrows = targrows;
5079 astate.numrows = 0;
5080 astate.samplerows = 0;
5081 astate.rowstoskip = -1; /* -1 means not set yet */
5082 reservoir_init_selection_state(&astate.rstate, targrows);
5084 /* Remember ANALYZE context, and create a per-tuple temp context */
5085 astate.anl_cxt = CurrentMemoryContext;
5086 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
5087 "postgres_fdw temporary data",
5088 ALLOCSET_SMALL_SIZES);
5091 * Get the connection to use. We do the remote access as the table's
5092 * owner, even if the ANALYZE was started by some other user.
5094 table = GetForeignTable(RelationGetRelid(relation));
5095 server = GetForeignServer(table->serverid);
5096 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5097 conn = GetConnection(user, false, NULL);
5099 /* We'll need server version, so fetch it now. */
5100 server_version_num = PQserverVersion(conn);
5103 * What sampling method should we use?
5105 foreach(lc, server->options)
5107 DefElem *def = (DefElem *) lfirst(lc);
5109 if (strcmp(def->defname, "analyze_sampling") == 0)
5111 char *value = defGetString(def);
5113 if (strcmp(value, "off") == 0)
5114 method = ANALYZE_SAMPLE_OFF;
5115 else if (strcmp(value, "auto") == 0)
5116 method = ANALYZE_SAMPLE_AUTO;
5117 else if (strcmp(value, "random") == 0)
5118 method = ANALYZE_SAMPLE_RANDOM;
5119 else if (strcmp(value, "system") == 0)
5120 method = ANALYZE_SAMPLE_SYSTEM;
5121 else if (strcmp(value, "bernoulli") == 0)
5122 method = ANALYZE_SAMPLE_BERNOULLI;
5124 break;
5128 foreach(lc, table->options)
5130 DefElem *def = (DefElem *) lfirst(lc);
5132 if (strcmp(def->defname, "analyze_sampling") == 0)
5134 char *value = defGetString(def);
5136 if (strcmp(value, "off") == 0)
5137 method = ANALYZE_SAMPLE_OFF;
5138 else if (strcmp(value, "auto") == 0)
5139 method = ANALYZE_SAMPLE_AUTO;
5140 else if (strcmp(value, "random") == 0)
5141 method = ANALYZE_SAMPLE_RANDOM;
5142 else if (strcmp(value, "system") == 0)
5143 method = ANALYZE_SAMPLE_SYSTEM;
5144 else if (strcmp(value, "bernoulli") == 0)
5145 method = ANALYZE_SAMPLE_BERNOULLI;
5147 break;
5152 * Error-out if explicitly required one of the TABLESAMPLE methods, but
5153 * the server does not support it.
5155 if ((server_version_num < 95000) &&
5156 (method == ANALYZE_SAMPLE_SYSTEM ||
5157 method == ANALYZE_SAMPLE_BERNOULLI))
5158 ereport(ERROR,
5159 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5160 errmsg("remote server does not support TABLESAMPLE feature")));
5163 * If we've decided to do remote sampling, calculate the sampling rate. We
5164 * need to get the number of tuples from the remote server, but skip that
5165 * network round-trip if not needed.
5167 if (method != ANALYZE_SAMPLE_OFF)
5169 bool can_tablesample;
5171 reltuples = postgresGetAnalyzeInfoForForeignTable(relation,
5172 &can_tablesample);
5175 * Make sure we're not choosing TABLESAMPLE when the remote relation
5176 * does not support that. But only do this for "auto" - if the user
5177 * explicitly requested BERNOULLI/SYSTEM, it's better to fail.
5179 if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO))
5180 method = ANALYZE_SAMPLE_RANDOM;
5183 * Remote's reltuples could be 0 or -1 if the table has never been
5184 * vacuumed/analyzed. In that case, disable sampling after all.
5186 if ((reltuples <= 0) || (targrows >= reltuples))
5187 method = ANALYZE_SAMPLE_OFF;
5188 else
5191 * All supported sampling methods require sampling rate, not
5192 * target rows directly, so we calculate that using the remote
5193 * reltuples value. That's imperfect, because it might be off a
5194 * good deal, but that's not something we can (or should) address
5195 * here.
5197 * If reltuples is too low (i.e. when table grew), we'll end up
5198 * sampling more rows - but then we'll apply the local sampling,
5199 * so we get the expected sample size. This is the same outcome as
5200 * without remote sampling.
5202 * If reltuples is too high (e.g. after bulk DELETE), we will end
5203 * up sampling too few rows.
5205 * We can't really do much better here - we could try sampling a
5206 * bit more rows, but we don't know how off the reltuples value is
5207 * so how much is "a bit more"?
5209 * Furthermore, the targrows value for partitions is determined
5210 * based on table size (relpages), which can be off in different
5211 * ways too. Adjusting the sampling rate here might make the issue
5212 * worse.
5214 sample_frac = targrows / reltuples;
5217 * We should never get sampling rate outside the valid range
5218 * (between 0.0 and 1.0), because those cases should be covered by
5219 * the previous branch that sets ANALYZE_SAMPLE_OFF.
5221 Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5226 * For "auto" method, pick the one we believe is best. For servers with
5227 * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5228 * random() to at least reduce network transfer.
5230 if (method == ANALYZE_SAMPLE_AUTO)
5232 if (server_version_num < 95000)
5233 method = ANALYZE_SAMPLE_RANDOM;
5234 else
5235 method = ANALYZE_SAMPLE_BERNOULLI;
5239 * Construct cursor that retrieves whole rows from remote.
5241 cursor_number = GetCursorNumber(conn);
5242 initStringInfo(&sql);
5243 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5245 deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
5247 /* In what follows, do not risk leaking any PGresults. */
5248 PG_TRY();
5250 char fetch_sql[64];
5251 int fetch_size;
5253 res = pgfdw_exec_query(conn, sql.data, NULL);
5254 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5255 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5256 PQclear(res);
5257 res = NULL;
5260 * Determine the fetch size. The default is arbitrary, but shouldn't
5261 * be enormous.
5263 fetch_size = 100;
5264 foreach(lc, server->options)
5266 DefElem *def = (DefElem *) lfirst(lc);
5268 if (strcmp(def->defname, "fetch_size") == 0)
5270 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5271 break;
5274 foreach(lc, table->options)
5276 DefElem *def = (DefElem *) lfirst(lc);
5278 if (strcmp(def->defname, "fetch_size") == 0)
5280 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5281 break;
5285 /* Construct command to fetch rows from remote. */
5286 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5287 fetch_size, cursor_number);
5289 /* Retrieve and process rows a batch at a time. */
5290 for (;;)
5292 int numrows;
5293 int i;
5295 /* Allow users to cancel long query */
5296 CHECK_FOR_INTERRUPTS();
5299 * XXX possible future improvement: if rowstoskip is large, we
5300 * could issue a MOVE rather than physically fetching the rows,
5301 * then just adjust rowstoskip and samplerows appropriately.
5304 /* Fetch some rows */
5305 res = pgfdw_exec_query(conn, fetch_sql, NULL);
5306 /* On error, report the original query, not the FETCH. */
5307 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5308 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5310 /* Process whatever we got. */
5311 numrows = PQntuples(res);
5312 for (i = 0; i < numrows; i++)
5313 analyze_row_processor(res, i, &astate);
5315 PQclear(res);
5316 res = NULL;
5318 /* Must be EOF if we didn't get all the rows requested. */
5319 if (numrows < fetch_size)
5320 break;
5323 /* Close the cursor, just to be tidy. */
5324 close_cursor(conn, cursor_number, NULL);
5326 PG_CATCH();
5328 PQclear(res);
5329 PG_RE_THROW();
5331 PG_END_TRY();
5333 ReleaseConnection(conn);
5335 /* We assume that we have no dead tuple. */
5336 *totaldeadrows = 0.0;
5339 * Without sampling, we've retrieved all living tuples from foreign
5340 * server, so report that as totalrows. Otherwise use the reltuples
5341 * estimate we got from the remote side.
5343 if (method == ANALYZE_SAMPLE_OFF)
5344 *totalrows = astate.samplerows;
5345 else
5346 *totalrows = reltuples;
5349 * Emit some interesting relation info
5351 ereport(elevel,
5352 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5353 RelationGetRelationName(relation),
5354 *totalrows, astate.numrows)));
5356 return astate.numrows;
5360 * Collect sample rows from the result of query.
5361 * - Use all tuples in sample until target # of samples are collected.
5362 * - Subsequently, replace already-sampled tuples randomly.
5364 static void
5365 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
5367 int targrows = astate->targrows;
5368 int pos; /* array index to store tuple in */
5369 MemoryContext oldcontext;
5371 /* Always increment sample row counter. */
5372 astate->samplerows += 1;
5375 * Determine the slot where this sample row should be stored. Set pos to
5376 * negative value to indicate the row should be skipped.
5378 if (astate->numrows < targrows)
5380 /* First targrows rows are always included into the sample */
5381 pos = astate->numrows++;
5383 else
5386 * Now we start replacing tuples in the sample until we reach the end
5387 * of the relation. Same algorithm as in acquire_sample_rows in
5388 * analyze.c; see Jeff Vitter's paper.
5390 if (astate->rowstoskip < 0)
5391 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5393 if (astate->rowstoskip <= 0)
5395 /* Choose a random reservoir element to replace. */
5396 pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5397 Assert(pos >= 0 && pos < targrows);
5398 heap_freetuple(astate->rows[pos]);
5400 else
5402 /* Skip this tuple. */
5403 pos = -1;
5406 astate->rowstoskip -= 1;
5409 if (pos >= 0)
5412 * Create sample tuple from current result row, and store it in the
5413 * position determined above. The tuple has to be created in anl_cxt.
5415 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5417 astate->rows[pos] = make_tuple_from_result_row(res, row,
5418 astate->rel,
5419 astate->attinmeta,
5420 astate->retrieved_attrs,
5421 NULL,
5422 astate->temp_cxt);
5424 MemoryContextSwitchTo(oldcontext);
5429 * Import a foreign schema
5431 static List *
5432 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
5434 List *commands = NIL;
5435 bool import_collate = true;
5436 bool import_default = false;
5437 bool import_generated = true;
5438 bool import_not_null = true;
5439 ForeignServer *server;
5440 UserMapping *mapping;
5441 PGconn *conn;
5442 StringInfoData buf;
5443 PGresult *volatile res = NULL;
5444 int numrows,
5446 ListCell *lc;
5448 /* Parse statement options */
5449 foreach(lc, stmt->options)
5451 DefElem *def = (DefElem *) lfirst(lc);
5453 if (strcmp(def->defname, "import_collate") == 0)
5454 import_collate = defGetBoolean(def);
5455 else if (strcmp(def->defname, "import_default") == 0)
5456 import_default = defGetBoolean(def);
5457 else if (strcmp(def->defname, "import_generated") == 0)
5458 import_generated = defGetBoolean(def);
5459 else if (strcmp(def->defname, "import_not_null") == 0)
5460 import_not_null = defGetBoolean(def);
5461 else
5462 ereport(ERROR,
5463 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5464 errmsg("invalid option \"%s\"", def->defname)));
5468 * Get connection to the foreign server. Connection manager will
5469 * establish new connection if necessary.
5471 server = GetForeignServer(serverOid);
5472 mapping = GetUserMapping(GetUserId(), server->serverid);
5473 conn = GetConnection(mapping, false, NULL);
5475 /* Don't attempt to import collation if remote server hasn't got it */
5476 if (PQserverVersion(conn) < 90100)
5477 import_collate = false;
5479 /* Create workspace for strings */
5480 initStringInfo(&buf);
5482 /* In what follows, do not risk leaking any PGresults. */
5483 PG_TRY();
5485 /* Check that the schema really exists */
5486 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5487 deparseStringLiteral(&buf, stmt->remote_schema);
5489 res = pgfdw_exec_query(conn, buf.data, NULL);
5490 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5491 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5493 if (PQntuples(res) != 1)
5494 ereport(ERROR,
5495 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5496 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5497 stmt->remote_schema, server->servername)));
5499 PQclear(res);
5500 res = NULL;
5501 resetStringInfo(&buf);
5504 * Fetch all table data from this schema, possibly restricted by
5505 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5506 * to EXCEPT/LIMIT TO here, because the core code will filter the
5507 * statements we return according to those lists anyway. But it
5508 * should save a few cycles to not process excluded tables in the
5509 * first place.)
5511 * Import table data for partitions only when they are explicitly
5512 * specified in LIMIT TO clause. Otherwise ignore them and only
5513 * include the definitions of the root partitioned tables to allow
5514 * access to the complete remote data set locally in the schema
5515 * imported.
5517 * Note: because we run the connection with search_path restricted to
5518 * pg_catalog, the format_type() and pg_get_expr() outputs will always
5519 * include a schema name for types/functions in other schemas, which
5520 * is what we want.
5522 appendStringInfoString(&buf,
5523 "SELECT relname, "
5524 " attname, "
5525 " format_type(atttypid, atttypmod), "
5526 " attnotnull, "
5527 " pg_get_expr(adbin, adrelid), ");
5529 /* Generated columns are supported since Postgres 12 */
5530 if (PQserverVersion(conn) >= 120000)
5531 appendStringInfoString(&buf,
5532 " attgenerated, ");
5533 else
5534 appendStringInfoString(&buf,
5535 " NULL, ");
5537 if (import_collate)
5538 appendStringInfoString(&buf,
5539 " collname, "
5540 " collnsp.nspname ");
5541 else
5542 appendStringInfoString(&buf,
5543 " NULL, NULL ");
5545 appendStringInfoString(&buf,
5546 "FROM pg_class c "
5547 " JOIN pg_namespace n ON "
5548 " relnamespace = n.oid "
5549 " LEFT JOIN pg_attribute a ON "
5550 " attrelid = c.oid AND attnum > 0 "
5551 " AND NOT attisdropped "
5552 " LEFT JOIN pg_attrdef ad ON "
5553 " adrelid = c.oid AND adnum = attnum ");
5555 if (import_collate)
5556 appendStringInfoString(&buf,
5557 " LEFT JOIN pg_collation coll ON "
5558 " coll.oid = attcollation "
5559 " LEFT JOIN pg_namespace collnsp ON "
5560 " collnsp.oid = collnamespace ");
5562 appendStringInfoString(&buf,
5563 "WHERE c.relkind IN ("
5564 CppAsString2(RELKIND_RELATION) ","
5565 CppAsString2(RELKIND_VIEW) ","
5566 CppAsString2(RELKIND_FOREIGN_TABLE) ","
5567 CppAsString2(RELKIND_MATVIEW) ","
5568 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
5569 " AND n.nspname = ");
5570 deparseStringLiteral(&buf, stmt->remote_schema);
5572 /* Partitions are supported since Postgres 10 */
5573 if (PQserverVersion(conn) >= 100000 &&
5574 stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
5575 appendStringInfoString(&buf, " AND NOT c.relispartition ");
5577 /* Apply restrictions for LIMIT TO and EXCEPT */
5578 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
5579 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5581 bool first_item = true;
5583 appendStringInfoString(&buf, " AND c.relname ");
5584 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5585 appendStringInfoString(&buf, "NOT ");
5586 appendStringInfoString(&buf, "IN (");
5588 /* Append list of table names within IN clause */
5589 foreach(lc, stmt->table_list)
5591 RangeVar *rv = (RangeVar *) lfirst(lc);
5593 if (first_item)
5594 first_item = false;
5595 else
5596 appendStringInfoString(&buf, ", ");
5597 deparseStringLiteral(&buf, rv->relname);
5599 appendStringInfoChar(&buf, ')');
5602 /* Append ORDER BY at the end of query to ensure output ordering */
5603 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
5605 /* Fetch the data */
5606 res = pgfdw_exec_query(conn, buf.data, NULL);
5607 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5608 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5610 /* Process results */
5611 numrows = PQntuples(res);
5612 /* note: incrementation of i happens in inner loop's while() test */
5613 for (i = 0; i < numrows;)
5615 char *tablename = PQgetvalue(res, i, 0);
5616 bool first_item = true;
5618 resetStringInfo(&buf);
5619 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
5620 quote_identifier(tablename));
5622 /* Scan all rows for this table */
5625 char *attname;
5626 char *typename;
5627 char *attnotnull;
5628 char *attgenerated;
5629 char *attdefault;
5630 char *collname;
5631 char *collnamespace;
5633 /* If table has no columns, we'll see nulls here */
5634 if (PQgetisnull(res, i, 1))
5635 continue;
5637 attname = PQgetvalue(res, i, 1);
5638 typename = PQgetvalue(res, i, 2);
5639 attnotnull = PQgetvalue(res, i, 3);
5640 attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
5641 PQgetvalue(res, i, 4);
5642 attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL :
5643 PQgetvalue(res, i, 5);
5644 collname = PQgetisnull(res, i, 6) ? (char *) NULL :
5645 PQgetvalue(res, i, 6);
5646 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
5647 PQgetvalue(res, i, 7);
5649 if (first_item)
5650 first_item = false;
5651 else
5652 appendStringInfoString(&buf, ",\n");
5654 /* Print column name and type */
5655 appendStringInfo(&buf, " %s %s",
5656 quote_identifier(attname),
5657 typename);
5660 * Add column_name option so that renaming the foreign table's
5661 * column doesn't break the association to the underlying
5662 * column.
5664 appendStringInfoString(&buf, " OPTIONS (column_name ");
5665 deparseStringLiteral(&buf, attname);
5666 appendStringInfoChar(&buf, ')');
5668 /* Add COLLATE if needed */
5669 if (import_collate && collname != NULL && collnamespace != NULL)
5670 appendStringInfo(&buf, " COLLATE %s.%s",
5671 quote_identifier(collnamespace),
5672 quote_identifier(collname));
5674 /* Add DEFAULT if needed */
5675 if (import_default && attdefault != NULL &&
5676 (!attgenerated || !attgenerated[0]))
5677 appendStringInfo(&buf, " DEFAULT %s", attdefault);
5679 /* Add GENERATED if needed */
5680 if (import_generated && attgenerated != NULL &&
5681 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5683 Assert(attdefault != NULL);
5684 appendStringInfo(&buf,
5685 " GENERATED ALWAYS AS (%s) STORED",
5686 attdefault);
5689 /* Add NOT NULL if needed */
5690 if (import_not_null && attnotnull[0] == 't')
5691 appendStringInfoString(&buf, " NOT NULL");
5693 while (++i < numrows &&
5694 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
5697 * Add server name and table-level options. We specify remote
5698 * schema and table name as options (the latter to ensure that
5699 * renaming the foreign table doesn't break the association).
5701 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5702 quote_identifier(server->servername));
5704 appendStringInfoString(&buf, "schema_name ");
5705 deparseStringLiteral(&buf, stmt->remote_schema);
5706 appendStringInfoString(&buf, ", table_name ");
5707 deparseStringLiteral(&buf, tablename);
5709 appendStringInfoString(&buf, ");");
5711 commands = lappend(commands, pstrdup(buf.data));
5714 PG_FINALLY();
5716 PQclear(res);
5718 PG_END_TRY();
5720 ReleaseConnection(conn);
5722 return commands;
5726 * Assess whether the join between inner and outer relations can be pushed down
5727 * to the foreign server. As a side effect, save information we obtain in this
5728 * function to PgFdwRelationInfo passed in.
5730 static bool
5731 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
5732 RelOptInfo *outerrel, RelOptInfo *innerrel,
5733 JoinPathExtraData *extra)
5735 PgFdwRelationInfo *fpinfo;
5736 PgFdwRelationInfo *fpinfo_o;
5737 PgFdwRelationInfo *fpinfo_i;
5738 ListCell *lc;
5739 List *joinclauses;
5742 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5743 * Constructing queries representing SEMI and ANTI joins is hard, hence
5744 * not considered right now.
5746 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5747 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5748 return false;
5751 * If either of the joining relations is marked as unsafe to pushdown, the
5752 * join can not be pushed down.
5754 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5755 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5756 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5757 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5758 !fpinfo_i || !fpinfo_i->pushdown_safe)
5759 return false;
5762 * If joining relations have local conditions, those conditions are
5763 * required to be applied before joining the relations. Hence the join can
5764 * not be pushed down.
5766 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5767 return false;
5770 * Merge FDW options. We might be tempted to do this after we have deemed
5771 * the foreign join to be OK. But we must do this beforehand so that we
5772 * know which quals can be evaluated on the foreign server, which might
5773 * depend on shippable_extensions.
5775 fpinfo->server = fpinfo_o->server;
5776 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5779 * Separate restrict list into join quals and pushed-down (other) quals.
5781 * Join quals belonging to an outer join must all be shippable, else we
5782 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5784 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5785 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5786 * either locally or remotely; the same is true for pushed-down conditions
5787 * at an outer join.
5789 * Note we might return failure after having already scribbled on
5790 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5791 * won't consult those lists again if we deem the join unshippable.
5793 joinclauses = NIL;
5794 foreach(lc, extra->restrictlist)
5796 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5797 bool is_remote_clause = is_foreign_expr(root, joinrel,
5798 rinfo->clause);
5800 if (IS_OUTER_JOIN(jointype) &&
5801 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5803 if (!is_remote_clause)
5804 return false;
5805 joinclauses = lappend(joinclauses, rinfo);
5807 else
5809 if (is_remote_clause)
5810 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5811 else
5812 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5817 * deparseExplicitTargetList() isn't smart enough to handle anything other
5818 * than a Var. In particular, if there's some PlaceHolderVar that would
5819 * need to be evaluated within this join tree (because there's an upper
5820 * reference to a quantity that may go to NULL as a result of an outer
5821 * join), then we can't try to push the join down because we'll fail when
5822 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5823 * needs to be evaluated *at the top* of this join tree is OK, because we
5824 * can do that locally after fetching the results from the remote side.
5826 foreach(lc, root->placeholder_list)
5828 PlaceHolderInfo *phinfo = lfirst(lc);
5829 Relids relids;
5831 /* PlaceHolderInfo refers to parent relids, not child relids. */
5832 relids = IS_OTHER_REL(joinrel) ?
5833 joinrel->top_parent_relids : joinrel->relids;
5835 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5836 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5837 return false;
5840 /* Save the join clauses, for later use. */
5841 fpinfo->joinclauses = joinclauses;
5843 fpinfo->outerrel = outerrel;
5844 fpinfo->innerrel = innerrel;
5845 fpinfo->jointype = jointype;
5848 * By default, both the input relations are not required to be deparsed as
5849 * subqueries, but there might be some relations covered by the input
5850 * relations that are required to be deparsed as subqueries, so save the
5851 * relids of those relations for later use by the deparser.
5853 fpinfo->make_outerrel_subquery = false;
5854 fpinfo->make_innerrel_subquery = false;
5855 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5856 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5857 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5858 fpinfo_i->lower_subquery_rels);
5861 * Pull the other remote conditions from the joining relations into join
5862 * clauses or other remote clauses (remote_conds) of this relation
5863 * wherever possible. This avoids building subqueries at every join step.
5865 * For an inner join, clauses from both the relations are added to the
5866 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5867 * the outer side are added to remote_conds since those can be evaluated
5868 * after the join is evaluated. The clauses from inner side are added to
5869 * the joinclauses, since they need to be evaluated while constructing the
5870 * join.
5872 * For a FULL OUTER JOIN, the other clauses from either relation can not
5873 * be added to the joinclauses or remote_conds, since each relation acts
5874 * as an outer relation for the other.
5876 * The joining sides can not have local conditions, thus no need to test
5877 * shippability of the clauses being pulled up.
5879 switch (jointype)
5881 case JOIN_INNER:
5882 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5883 fpinfo_i->remote_conds);
5884 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5885 fpinfo_o->remote_conds);
5886 break;
5888 case JOIN_LEFT:
5889 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5890 fpinfo_i->remote_conds);
5891 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5892 fpinfo_o->remote_conds);
5893 break;
5895 case JOIN_RIGHT:
5896 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5897 fpinfo_o->remote_conds);
5898 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5899 fpinfo_i->remote_conds);
5900 break;
5902 case JOIN_FULL:
5905 * In this case, if any of the input relations has conditions, we
5906 * need to deparse that relation as a subquery so that the
5907 * conditions can be evaluated before the join. Remember it in
5908 * the fpinfo of this relation so that the deparser can take
5909 * appropriate action. Also, save the relids of base relations
5910 * covered by that relation for later use by the deparser.
5912 if (fpinfo_o->remote_conds)
5914 fpinfo->make_outerrel_subquery = true;
5915 fpinfo->lower_subquery_rels =
5916 bms_add_members(fpinfo->lower_subquery_rels,
5917 outerrel->relids);
5919 if (fpinfo_i->remote_conds)
5921 fpinfo->make_innerrel_subquery = true;
5922 fpinfo->lower_subquery_rels =
5923 bms_add_members(fpinfo->lower_subquery_rels,
5924 innerrel->relids);
5926 break;
5928 default:
5929 /* Should not happen, we have just checked this above */
5930 elog(ERROR, "unsupported join type %d", jointype);
5934 * For an inner join, all restrictions can be treated alike. Treating the
5935 * pushed down conditions as join conditions allows a top level full outer
5936 * join to be deparsed without requiring subqueries.
5938 if (jointype == JOIN_INNER)
5940 Assert(!fpinfo->joinclauses);
5941 fpinfo->joinclauses = fpinfo->remote_conds;
5942 fpinfo->remote_conds = NIL;
5945 /* Mark that this join can be pushed down safely */
5946 fpinfo->pushdown_safe = true;
5948 /* Get user mapping */
5949 if (fpinfo->use_remote_estimate)
5951 if (fpinfo_o->use_remote_estimate)
5952 fpinfo->user = fpinfo_o->user;
5953 else
5954 fpinfo->user = fpinfo_i->user;
5956 else
5957 fpinfo->user = NULL;
5960 * Set # of retrieved rows and cached relation costs to some negative
5961 * value, so that we can detect when they are set to some sensible values,
5962 * during one (usually the first) of the calls to estimate_path_cost_size.
5964 fpinfo->retrieved_rows = -1;
5965 fpinfo->rel_startup_cost = -1;
5966 fpinfo->rel_total_cost = -1;
5969 * Set the string describing this join relation to be used in EXPLAIN
5970 * output of corresponding ForeignScan. Note that the decoration we add
5971 * to the base relation names mustn't include any digits, or it'll confuse
5972 * postgresExplainForeignScan.
5974 fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
5975 fpinfo_o->relation_name,
5976 get_jointype_name(fpinfo->jointype),
5977 fpinfo_i->relation_name);
5980 * Set the relation index. This is defined as the position of this
5981 * joinrel in the join_rel_list list plus the length of the rtable list.
5982 * Note that since this joinrel is at the end of the join_rel_list list
5983 * when we are called, we can get the position by list_length.
5985 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5986 fpinfo->relation_index =
5987 list_length(root->parse->rtable) + list_length(root->join_rel_list);
5989 return true;
5992 static void
5993 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5994 Path *epq_path)
5996 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5997 ListCell *lc;
5999 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
6002 * Before creating sorted paths, arrange for the passed-in EPQ path, if
6003 * any, to return columns needed by the parent ForeignScan node so that
6004 * they will propagate up through Sort nodes injected below, if necessary.
6006 if (epq_path != NULL && useful_pathkeys_list != NIL)
6008 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
6009 PathTarget *target = copy_pathtarget(epq_path->pathtarget);
6011 /* Include columns required for evaluating PHVs in the tlist. */
6012 add_new_columns_to_pathtarget(target,
6013 pull_var_clause((Node *) target->exprs,
6014 PVC_RECURSE_PLACEHOLDERS));
6016 /* Include columns required for evaluating the local conditions. */
6017 foreach(lc, fpinfo->local_conds)
6019 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
6021 add_new_columns_to_pathtarget(target,
6022 pull_var_clause((Node *) rinfo->clause,
6023 PVC_RECURSE_PLACEHOLDERS));
6027 * If we have added any new columns, adjust the tlist of the EPQ path.
6029 * Note: the plan created using this path will only be used to execute
6030 * EPQ checks, where accuracy of the plan cost and width estimates
6031 * would not be important, so we do not do set_pathtarget_cost_width()
6032 * for the new pathtarget here. See also postgresGetForeignPlan().
6034 if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs))
6036 /* The EPQ path is a join path, so it is projection-capable. */
6037 Assert(is_projection_capable_path(epq_path));
6040 * Use create_projection_path() here, so as to avoid modifying it
6041 * in place.
6043 epq_path = (Path *) create_projection_path(root,
6044 rel,
6045 epq_path,
6046 target);
6050 /* Create one path for each set of pathkeys we found above. */
6051 foreach(lc, useful_pathkeys_list)
6053 double rows;
6054 int width;
6055 Cost startup_cost;
6056 Cost total_cost;
6057 List *useful_pathkeys = lfirst(lc);
6058 Path *sorted_epq_path;
6060 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
6061 &rows, &width, &startup_cost, &total_cost);
6064 * The EPQ path must be at least as well sorted as the path itself, in
6065 * case it gets used as input to a mergejoin.
6067 sorted_epq_path = epq_path;
6068 if (sorted_epq_path != NULL &&
6069 !pathkeys_contained_in(useful_pathkeys,
6070 sorted_epq_path->pathkeys))
6071 sorted_epq_path = (Path *)
6072 create_sort_path(root,
6073 rel,
6074 sorted_epq_path,
6075 useful_pathkeys,
6076 -1.0);
6078 if (IS_SIMPLE_REL(rel))
6079 add_path(rel, (Path *)
6080 create_foreignscan_path(root, rel,
6081 NULL,
6082 rows,
6083 startup_cost,
6084 total_cost,
6085 useful_pathkeys,
6086 rel->lateral_relids,
6087 sorted_epq_path,
6088 NIL));
6089 else
6090 add_path(rel, (Path *)
6091 create_foreign_join_path(root, rel,
6092 NULL,
6093 rows,
6094 startup_cost,
6095 total_cost,
6096 useful_pathkeys,
6097 rel->lateral_relids,
6098 sorted_epq_path,
6099 NIL));
6104 * Parse options from foreign server and apply them to fpinfo.
6106 * New options might also require tweaking merge_fdw_options().
6108 static void
6109 apply_server_options(PgFdwRelationInfo *fpinfo)
6111 ListCell *lc;
6113 foreach(lc, fpinfo->server->options)
6115 DefElem *def = (DefElem *) lfirst(lc);
6117 if (strcmp(def->defname, "use_remote_estimate") == 0)
6118 fpinfo->use_remote_estimate = defGetBoolean(def);
6119 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
6120 (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0,
6121 NULL);
6122 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
6123 (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0,
6124 NULL);
6125 else if (strcmp(def->defname, "extensions") == 0)
6126 fpinfo->shippable_extensions =
6127 ExtractExtensionList(defGetString(def), false);
6128 else if (strcmp(def->defname, "fetch_size") == 0)
6129 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6130 else if (strcmp(def->defname, "async_capable") == 0)
6131 fpinfo->async_capable = defGetBoolean(def);
6136 * Parse options from foreign table and apply them to fpinfo.
6138 * New options might also require tweaking merge_fdw_options().
6140 static void
6141 apply_table_options(PgFdwRelationInfo *fpinfo)
6143 ListCell *lc;
6145 foreach(lc, fpinfo->table->options)
6147 DefElem *def = (DefElem *) lfirst(lc);
6149 if (strcmp(def->defname, "use_remote_estimate") == 0)
6150 fpinfo->use_remote_estimate = defGetBoolean(def);
6151 else if (strcmp(def->defname, "fetch_size") == 0)
6152 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6153 else if (strcmp(def->defname, "async_capable") == 0)
6154 fpinfo->async_capable = defGetBoolean(def);
6159 * Merge FDW options from input relations into a new set of options for a join
6160 * or an upper rel.
6162 * For a join relation, FDW-specific information about the inner and outer
6163 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
6164 * fpinfo_o provides the information for the input relation; fpinfo_i is
6165 * expected to NULL.
6167 static void
6168 merge_fdw_options(PgFdwRelationInfo *fpinfo,
6169 const PgFdwRelationInfo *fpinfo_o,
6170 const PgFdwRelationInfo *fpinfo_i)
6172 /* We must always have fpinfo_o. */
6173 Assert(fpinfo_o);
6175 /* fpinfo_i may be NULL, but if present the servers must both match. */
6176 Assert(!fpinfo_i ||
6177 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
6180 * Copy the server specific FDW options. (For a join, both relations come
6181 * from the same server, so the server options should have the same value
6182 * for both relations.)
6184 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
6185 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
6186 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
6187 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
6188 fpinfo->fetch_size = fpinfo_o->fetch_size;
6189 fpinfo->async_capable = fpinfo_o->async_capable;
6191 /* Merge the table level options from either side of the join. */
6192 if (fpinfo_i)
6195 * We'll prefer to use remote estimates for this join if any table
6196 * from either side of the join is using remote estimates. This is
6197 * most likely going to be preferred since they're already willing to
6198 * pay the price of a round trip to get the remote EXPLAIN. In any
6199 * case it's not entirely clear how we might otherwise handle this
6200 * best.
6202 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
6203 fpinfo_i->use_remote_estimate;
6206 * Set fetch size to maximum of the joining sides, since we are
6207 * expecting the rows returned by the join to be proportional to the
6208 * relation sizes.
6210 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
6213 * We'll prefer to consider this join async-capable if any table from
6214 * either side of the join is considered async-capable. This would be
6215 * reasonable because in that case the foreign server would have its
6216 * own resources to scan that table asynchronously, and the join could
6217 * also be computed asynchronously using the resources.
6219 fpinfo->async_capable = fpinfo_o->async_capable ||
6220 fpinfo_i->async_capable;
6225 * postgresGetForeignJoinPaths
6226 * Add possible ForeignPath to joinrel, if join is safe to push down.
6228 static void
6229 postgresGetForeignJoinPaths(PlannerInfo *root,
6230 RelOptInfo *joinrel,
6231 RelOptInfo *outerrel,
6232 RelOptInfo *innerrel,
6233 JoinType jointype,
6234 JoinPathExtraData *extra)
6236 PgFdwRelationInfo *fpinfo;
6237 ForeignPath *joinpath;
6238 double rows;
6239 int width;
6240 Cost startup_cost;
6241 Cost total_cost;
6242 Path *epq_path; /* Path to create plan to be executed when
6243 * EvalPlanQual gets triggered. */
6246 * Skip if this join combination has been considered already.
6248 if (joinrel->fdw_private)
6249 return;
6252 * This code does not work for joins with lateral references, since those
6253 * must have parameterized paths, which we don't generate yet.
6255 if (!bms_is_empty(joinrel->lateral_relids))
6256 return;
6259 * Create unfinished PgFdwRelationInfo entry which is used to indicate
6260 * that the join relation is already considered, so that we won't waste
6261 * time in judging safety of join pushdown and adding the same paths again
6262 * if found safe. Once we know that this join can be pushed down, we fill
6263 * the entry.
6265 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
6266 fpinfo->pushdown_safe = false;
6267 joinrel->fdw_private = fpinfo;
6268 /* attrs_used is only for base relations. */
6269 fpinfo->attrs_used = NULL;
6272 * If there is a possibility that EvalPlanQual will be executed, we need
6273 * to be able to reconstruct the row using scans of the base relations.
6274 * GetExistingLocalJoinPath will find a suitable path for this purpose in
6275 * the path list of the joinrel, if one exists. We must be careful to
6276 * call it before adding any ForeignPath, since the ForeignPath might
6277 * dominate the only suitable local path available. We also do it before
6278 * calling foreign_join_ok(), since that function updates fpinfo and marks
6279 * it as pushable if the join is found to be pushable.
6281 if (root->parse->commandType == CMD_DELETE ||
6282 root->parse->commandType == CMD_UPDATE ||
6283 root->rowMarks)
6285 epq_path = GetExistingLocalJoinPath(joinrel);
6286 if (!epq_path)
6288 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
6289 return;
6292 else
6293 epq_path = NULL;
6295 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
6297 /* Free path required for EPQ if we copied one; we don't need it now */
6298 if (epq_path)
6299 pfree(epq_path);
6300 return;
6304 * Compute the selectivity and cost of the local_conds, so we don't have
6305 * to do it over again for each path. The best we can do for these
6306 * conditions is to estimate selectivity on the basis of local statistics.
6307 * The local conditions are applied after the join has been computed on
6308 * the remote side like quals in WHERE clause, so pass jointype as
6309 * JOIN_INNER.
6311 fpinfo->local_conds_sel = clauselist_selectivity(root,
6312 fpinfo->local_conds,
6314 JOIN_INNER,
6315 NULL);
6316 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6319 * If we are going to estimate costs locally, estimate the join clause
6320 * selectivity here while we have special join info.
6322 if (!fpinfo->use_remote_estimate)
6323 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
6324 0, fpinfo->jointype,
6325 extra->sjinfo);
6327 /* Estimate costs for bare join relation */
6328 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
6329 &rows, &width, &startup_cost, &total_cost);
6330 /* Now update this information in the joinrel */
6331 joinrel->rows = rows;
6332 joinrel->reltarget->width = width;
6333 fpinfo->rows = rows;
6334 fpinfo->width = width;
6335 fpinfo->startup_cost = startup_cost;
6336 fpinfo->total_cost = total_cost;
6339 * Create a new join path and add it to the joinrel which represents a
6340 * join between foreign tables.
6342 joinpath = create_foreign_join_path(root,
6343 joinrel,
6344 NULL, /* default pathtarget */
6345 rows,
6346 startup_cost,
6347 total_cost,
6348 NIL, /* no pathkeys */
6349 joinrel->lateral_relids,
6350 epq_path,
6351 NIL); /* no fdw_private */
6353 /* Add generated path into joinrel by add_path(). */
6354 add_path(joinrel, (Path *) joinpath);
6356 /* Consider pathkeys for the join relation */
6357 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
6359 /* XXX Consider parameterized paths for the join relation */
6363 * Assess whether the aggregation, grouping and having operations can be pushed
6364 * down to the foreign server. As a side effect, save information we obtain in
6365 * this function to PgFdwRelationInfo of the input relation.
6367 static bool
6368 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
6369 Node *havingQual)
6371 Query *query = root->parse;
6372 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
6373 PathTarget *grouping_target = grouped_rel->reltarget;
6374 PgFdwRelationInfo *ofpinfo;
6375 ListCell *lc;
6376 int i;
6377 List *tlist = NIL;
6379 /* We currently don't support pushing Grouping Sets. */
6380 if (query->groupingSets)
6381 return false;
6383 /* Get the fpinfo of the underlying scan relation. */
6384 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
6387 * If underlying scan relation has any local conditions, those conditions
6388 * are required to be applied before performing aggregation. Hence the
6389 * aggregate cannot be pushed down.
6391 if (ofpinfo->local_conds)
6392 return false;
6395 * Examine grouping expressions, as well as other expressions we'd need to
6396 * compute, and check whether they are safe to push down to the foreign
6397 * server. All GROUP BY expressions will be part of the grouping target
6398 * and thus there is no need to search for them separately. Add grouping
6399 * expressions into target list which will be passed to foreign server.
6401 * A tricky fine point is that we must not put any expression into the
6402 * target list that is just a foreign param (that is, something that
6403 * deparse.c would conclude has to be sent to the foreign server). If we
6404 * do, the expression will also appear in the fdw_exprs list of the plan
6405 * node, and setrefs.c will get confused and decide that the fdw_exprs
6406 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
6407 * a broken plan. Somewhat oddly, it's OK if the expression contains such
6408 * a node, as long as it's not at top level; then no match is possible.
6410 i = 0;
6411 foreach(lc, grouping_target->exprs)
6413 Expr *expr = (Expr *) lfirst(lc);
6414 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
6415 ListCell *l;
6418 * Check whether this expression is part of GROUP BY clause. Note we
6419 * check the whole GROUP BY clause not just processed_groupClause,
6420 * because we will ship all of it, cf. appendGroupByClause.
6422 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
6424 TargetEntry *tle;
6427 * If any GROUP BY expression is not shippable, then we cannot
6428 * push down aggregation to the foreign server.
6430 if (!is_foreign_expr(root, grouped_rel, expr))
6431 return false;
6434 * If it would be a foreign param, we can't put it into the tlist,
6435 * so we have to fail.
6437 if (is_foreign_param(root, grouped_rel, expr))
6438 return false;
6441 * Pushable, so add to tlist. We need to create a TLE for this
6442 * expression and apply the sortgroupref to it. We cannot use
6443 * add_to_flat_tlist() here because that avoids making duplicate
6444 * entries in the tlist. If there are duplicate entries with
6445 * distinct sortgrouprefs, we have to duplicate that situation in
6446 * the output tlist.
6448 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
6449 tle->ressortgroupref = sgref;
6450 tlist = lappend(tlist, tle);
6452 else
6455 * Non-grouping expression we need to compute. Can we ship it
6456 * as-is to the foreign server?
6458 if (is_foreign_expr(root, grouped_rel, expr) &&
6459 !is_foreign_param(root, grouped_rel, expr))
6461 /* Yes, so add to tlist as-is; OK to suppress duplicates */
6462 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6464 else
6466 /* Not pushable as a whole; extract its Vars and aggregates */
6467 List *aggvars;
6469 aggvars = pull_var_clause((Node *) expr,
6470 PVC_INCLUDE_AGGREGATES);
6473 * If any aggregate expression is not shippable, then we
6474 * cannot push down aggregation to the foreign server. (We
6475 * don't have to check is_foreign_param, since that certainly
6476 * won't return true for any such expression.)
6478 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
6479 return false;
6482 * Add aggregates, if any, into the targetlist. Plain Vars
6483 * outside an aggregate can be ignored, because they should be
6484 * either same as some GROUP BY column or part of some GROUP
6485 * BY expression. In either case, they are already part of
6486 * the targetlist and thus no need to add them again. In fact
6487 * including plain Vars in the tlist when they do not match a
6488 * GROUP BY column would cause the foreign server to complain
6489 * that the shipped query is invalid.
6491 foreach(l, aggvars)
6493 Expr *aggref = (Expr *) lfirst(l);
6495 if (IsA(aggref, Aggref))
6496 tlist = add_to_flat_tlist(tlist, list_make1(aggref));
6501 i++;
6505 * Classify the pushable and non-pushable HAVING clauses and save them in
6506 * remote_conds and local_conds of the grouped rel's fpinfo.
6508 if (havingQual)
6510 foreach(lc, (List *) havingQual)
6512 Expr *expr = (Expr *) lfirst(lc);
6513 RestrictInfo *rinfo;
6516 * Currently, the core code doesn't wrap havingQuals in
6517 * RestrictInfos, so we must make our own.
6519 Assert(!IsA(expr, RestrictInfo));
6520 rinfo = make_restrictinfo(root,
6521 expr,
6522 true,
6523 false,
6524 false,
6525 false,
6526 root->qual_security_level,
6527 grouped_rel->relids,
6528 NULL,
6529 NULL);
6530 if (is_foreign_expr(root, grouped_rel, expr))
6531 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
6532 else
6533 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
6538 * If there are any local conditions, pull Vars and aggregates from it and
6539 * check whether they are safe to pushdown or not.
6541 if (fpinfo->local_conds)
6543 List *aggvars = NIL;
6545 foreach(lc, fpinfo->local_conds)
6547 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
6549 aggvars = list_concat(aggvars,
6550 pull_var_clause((Node *) rinfo->clause,
6551 PVC_INCLUDE_AGGREGATES));
6554 foreach(lc, aggvars)
6556 Expr *expr = (Expr *) lfirst(lc);
6559 * If aggregates within local conditions are not safe to push
6560 * down, then we cannot push down the query. Vars are already
6561 * part of GROUP BY clause which are checked above, so no need to
6562 * access them again here. Again, we need not check
6563 * is_foreign_param for a foreign aggregate.
6565 if (IsA(expr, Aggref))
6567 if (!is_foreign_expr(root, grouped_rel, expr))
6568 return false;
6570 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6575 /* Store generated targetlist */
6576 fpinfo->grouped_tlist = tlist;
6578 /* Safe to pushdown */
6579 fpinfo->pushdown_safe = true;
6582 * Set # of retrieved rows and cached relation costs to some negative
6583 * value, so that we can detect when they are set to some sensible values,
6584 * during one (usually the first) of the calls to estimate_path_cost_size.
6586 fpinfo->retrieved_rows = -1;
6587 fpinfo->rel_startup_cost = -1;
6588 fpinfo->rel_total_cost = -1;
6591 * Set the string describing this grouped relation to be used in EXPLAIN
6592 * output of corresponding ForeignScan. Note that the decoration we add
6593 * to the base relation name mustn't include any digits, or it'll confuse
6594 * postgresExplainForeignScan.
6596 fpinfo->relation_name = psprintf("Aggregate on (%s)",
6597 ofpinfo->relation_name);
6599 return true;
6603 * postgresGetForeignUpperPaths
6604 * Add paths for post-join operations like aggregation, grouping etc. if
6605 * corresponding operations are safe to push down.
6607 static void
6608 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
6609 RelOptInfo *input_rel, RelOptInfo *output_rel,
6610 void *extra)
6612 PgFdwRelationInfo *fpinfo;
6615 * If input rel is not safe to pushdown, then simply return as we cannot
6616 * perform any post-join operations on the foreign server.
6618 if (!input_rel->fdw_private ||
6619 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
6620 return;
6622 /* Ignore stages we don't support; and skip any duplicate calls. */
6623 if ((stage != UPPERREL_GROUP_AGG &&
6624 stage != UPPERREL_ORDERED &&
6625 stage != UPPERREL_FINAL) ||
6626 output_rel->fdw_private)
6627 return;
6629 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
6630 fpinfo->pushdown_safe = false;
6631 fpinfo->stage = stage;
6632 output_rel->fdw_private = fpinfo;
6634 switch (stage)
6636 case UPPERREL_GROUP_AGG:
6637 add_foreign_grouping_paths(root, input_rel, output_rel,
6638 (GroupPathExtraData *) extra);
6639 break;
6640 case UPPERREL_ORDERED:
6641 add_foreign_ordered_paths(root, input_rel, output_rel);
6642 break;
6643 case UPPERREL_FINAL:
6644 add_foreign_final_paths(root, input_rel, output_rel,
6645 (FinalPathExtraData *) extra);
6646 break;
6647 default:
6648 elog(ERROR, "unexpected upper relation: %d", (int) stage);
6649 break;
6654 * add_foreign_grouping_paths
6655 * Add foreign path for grouping and/or aggregation.
6657 * Given input_rel represents the underlying scan. The paths are added to the
6658 * given grouped_rel.
6660 static void
6661 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
6662 RelOptInfo *grouped_rel,
6663 GroupPathExtraData *extra)
6665 Query *parse = root->parse;
6666 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6667 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
6668 ForeignPath *grouppath;
6669 double rows;
6670 int width;
6671 Cost startup_cost;
6672 Cost total_cost;
6674 /* Nothing to be done, if there is no grouping or aggregation required. */
6675 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
6676 !root->hasHavingQual)
6677 return;
6679 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
6680 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
6682 /* save the input_rel as outerrel in fpinfo */
6683 fpinfo->outerrel = input_rel;
6686 * Copy foreign table, foreign server, user mapping, FDW options etc.
6687 * details from the input relation's fpinfo.
6689 fpinfo->table = ifpinfo->table;
6690 fpinfo->server = ifpinfo->server;
6691 fpinfo->user = ifpinfo->user;
6692 merge_fdw_options(fpinfo, ifpinfo, NULL);
6695 * Assess if it is safe to push down aggregation and grouping.
6697 * Use HAVING qual from extra. In case of child partition, it will have
6698 * translated Vars.
6700 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
6701 return;
6704 * Compute the selectivity and cost of the local_conds, so we don't have
6705 * to do it over again for each path. (Currently we create just a single
6706 * path here, but in future it would be possible that we build more paths
6707 * such as pre-sorted paths as in postgresGetForeignPaths and
6708 * postgresGetForeignJoinPaths.) The best we can do for these conditions
6709 * is to estimate selectivity on the basis of local statistics.
6711 fpinfo->local_conds_sel = clauselist_selectivity(root,
6712 fpinfo->local_conds,
6714 JOIN_INNER,
6715 NULL);
6717 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6719 /* Estimate the cost of push down */
6720 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
6721 &rows, &width, &startup_cost, &total_cost);
6723 /* Now update this information in the fpinfo */
6724 fpinfo->rows = rows;
6725 fpinfo->width = width;
6726 fpinfo->startup_cost = startup_cost;
6727 fpinfo->total_cost = total_cost;
6729 /* Create and add foreign path to the grouping relation. */
6730 grouppath = create_foreign_upper_path(root,
6731 grouped_rel,
6732 grouped_rel->reltarget,
6733 rows,
6734 startup_cost,
6735 total_cost,
6736 NIL, /* no pathkeys */
6737 NULL,
6738 NIL); /* no fdw_private */
6740 /* Add generated path into grouped_rel by add_path(). */
6741 add_path(grouped_rel, (Path *) grouppath);
6745 * add_foreign_ordered_paths
6746 * Add foreign paths for performing the final sort remotely.
6748 * Given input_rel contains the source-data Paths. The paths are added to the
6749 * given ordered_rel.
6751 static void
6752 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
6753 RelOptInfo *ordered_rel)
6755 Query *parse = root->parse;
6756 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6757 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
6758 PgFdwPathExtraData *fpextra;
6759 double rows;
6760 int width;
6761 Cost startup_cost;
6762 Cost total_cost;
6763 List *fdw_private;
6764 ForeignPath *ordered_path;
6765 ListCell *lc;
6767 /* Shouldn't get here unless the query has ORDER BY */
6768 Assert(parse->sortClause);
6770 /* We don't support cases where there are any SRFs in the targetlist */
6771 if (parse->hasTargetSRFs)
6772 return;
6774 /* Save the input_rel as outerrel in fpinfo */
6775 fpinfo->outerrel = input_rel;
6778 * Copy foreign table, foreign server, user mapping, FDW options etc.
6779 * details from the input relation's fpinfo.
6781 fpinfo->table = ifpinfo->table;
6782 fpinfo->server = ifpinfo->server;
6783 fpinfo->user = ifpinfo->user;
6784 merge_fdw_options(fpinfo, ifpinfo, NULL);
6787 * If the input_rel is a base or join relation, we would already have
6788 * considered pushing down the final sort to the remote server when
6789 * creating pre-sorted foreign paths for that relation, because the
6790 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6791 * standard_qp_callback()).
6793 if (input_rel->reloptkind == RELOPT_BASEREL ||
6794 input_rel->reloptkind == RELOPT_JOINREL)
6796 Assert(root->query_pathkeys == root->sort_pathkeys);
6798 /* Safe to push down if the query_pathkeys is safe to push down */
6799 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6801 return;
6804 /* The input_rel should be a grouping relation */
6805 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6806 ifpinfo->stage == UPPERREL_GROUP_AGG);
6809 * We try to create a path below by extending a simple foreign path for
6810 * the underlying grouping relation to perform the final sort remotely,
6811 * which is stored into the fdw_private list of the resulting path.
6814 /* Assess if it is safe to push down the final sort */
6815 foreach(lc, root->sort_pathkeys)
6817 PathKey *pathkey = (PathKey *) lfirst(lc);
6818 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6821 * is_foreign_expr would detect volatile expressions as well, but
6822 * checking ec_has_volatile here saves some cycles.
6824 if (pathkey_ec->ec_has_volatile)
6825 return;
6828 * Can't push down the sort if pathkey's opfamily is not shippable.
6830 if (!is_shippable(pathkey->pk_opfamily, OperatorFamilyRelationId,
6831 fpinfo))
6832 return;
6835 * The EC must contain a shippable EM that is computed in input_rel's
6836 * reltarget, else we can't push down the sort.
6838 if (find_em_for_rel_target(root,
6839 pathkey_ec,
6840 input_rel) == NULL)
6841 return;
6844 /* Safe to push down */
6845 fpinfo->pushdown_safe = true;
6847 /* Construct PgFdwPathExtraData */
6848 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6849 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6850 fpextra->has_final_sort = true;
6852 /* Estimate the costs of performing the final sort remotely */
6853 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6854 &rows, &width, &startup_cost, &total_cost);
6857 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6858 * Items in the list must match order in enum FdwPathPrivateIndex.
6860 fdw_private = list_make2(makeBoolean(true), makeBoolean(false));
6862 /* Create foreign ordering path */
6863 ordered_path = create_foreign_upper_path(root,
6864 input_rel,
6865 root->upper_targets[UPPERREL_ORDERED],
6866 rows,
6867 startup_cost,
6868 total_cost,
6869 root->sort_pathkeys,
6870 NULL, /* no extra plan */
6871 fdw_private);
6873 /* and add it to the ordered_rel */
6874 add_path(ordered_rel, (Path *) ordered_path);
6878 * add_foreign_final_paths
6879 * Add foreign paths for performing the final processing remotely.
6881 * Given input_rel contains the source-data Paths. The paths are added to the
6882 * given final_rel.
6884 static void
6885 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6886 RelOptInfo *final_rel,
6887 FinalPathExtraData *extra)
6889 Query *parse = root->parse;
6890 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6891 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6892 bool has_final_sort = false;
6893 List *pathkeys = NIL;
6894 PgFdwPathExtraData *fpextra;
6895 bool save_use_remote_estimate = false;
6896 double rows;
6897 int width;
6898 Cost startup_cost;
6899 Cost total_cost;
6900 List *fdw_private;
6901 ForeignPath *final_path;
6904 * Currently, we only support this for SELECT commands
6906 if (parse->commandType != CMD_SELECT)
6907 return;
6910 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6911 * to add a LIMIT node
6913 if (!parse->rowMarks && !extra->limit_needed)
6914 return;
6916 /* We don't support cases where there are any SRFs in the targetlist */
6917 if (parse->hasTargetSRFs)
6918 return;
6920 /* Save the input_rel as outerrel in fpinfo */
6921 fpinfo->outerrel = input_rel;
6924 * Copy foreign table, foreign server, user mapping, FDW options etc.
6925 * details from the input relation's fpinfo.
6927 fpinfo->table = ifpinfo->table;
6928 fpinfo->server = ifpinfo->server;
6929 fpinfo->user = ifpinfo->user;
6930 merge_fdw_options(fpinfo, ifpinfo, NULL);
6933 * If there is no need to add a LIMIT node, there might be a ForeignPath
6934 * in the input_rel's pathlist that implements all behavior of the query.
6935 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6936 * (if any) before we get here.
6938 if (!extra->limit_needed)
6940 ListCell *lc;
6942 Assert(parse->rowMarks);
6945 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6946 * so the input_rel should be a base, join, or ordered relation; and
6947 * if it's an ordered relation, its input relation should be a base or
6948 * join relation.
6950 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6951 input_rel->reloptkind == RELOPT_JOINREL ||
6952 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6953 ifpinfo->stage == UPPERREL_ORDERED &&
6954 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6955 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6957 foreach(lc, input_rel->pathlist)
6959 Path *path = (Path *) lfirst(lc);
6962 * apply_scanjoin_target_to_paths() uses create_projection_path()
6963 * to adjust each of its input paths if needed, whereas
6964 * create_ordered_paths() uses apply_projection_to_path() to do
6965 * that. So the former might have put a ProjectionPath on top of
6966 * the ForeignPath; look through ProjectionPath and see if the
6967 * path underneath it is ForeignPath.
6969 if (IsA(path, ForeignPath) ||
6970 (IsA(path, ProjectionPath) &&
6971 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6974 * Create foreign final path; this gets rid of a
6975 * no-longer-needed outer plan (if any), which makes the
6976 * EXPLAIN output look cleaner
6978 final_path = create_foreign_upper_path(root,
6979 path->parent,
6980 path->pathtarget,
6981 path->rows,
6982 path->startup_cost,
6983 path->total_cost,
6984 path->pathkeys,
6985 NULL, /* no extra plan */
6986 NULL); /* no fdw_private */
6988 /* and add it to the final_rel */
6989 add_path(final_rel, (Path *) final_path);
6991 /* Safe to push down */
6992 fpinfo->pushdown_safe = true;
6994 return;
6999 * If we get here it means no ForeignPaths; since we would already
7000 * have considered pushing down all operations for the query to the
7001 * remote server, give up on it.
7003 return;
7006 Assert(extra->limit_needed);
7009 * If the input_rel is an ordered relation, replace the input_rel with its
7010 * input relation
7012 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
7013 ifpinfo->stage == UPPERREL_ORDERED)
7015 input_rel = ifpinfo->outerrel;
7016 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
7017 has_final_sort = true;
7018 pathkeys = root->sort_pathkeys;
7021 /* The input_rel should be a base, join, or grouping relation */
7022 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
7023 input_rel->reloptkind == RELOPT_JOINREL ||
7024 (input_rel->reloptkind == RELOPT_UPPER_REL &&
7025 ifpinfo->stage == UPPERREL_GROUP_AGG));
7028 * We try to create a path below by extending a simple foreign path for
7029 * the underlying base, join, or grouping relation to perform the final
7030 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
7031 * stored into the fdw_private list of the resulting path. (We
7032 * re-estimate the costs of sorting the underlying relation, if
7033 * has_final_sort.)
7037 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
7038 * server
7042 * If the underlying relation has any local conditions, the LIMIT/OFFSET
7043 * cannot be pushed down.
7045 if (ifpinfo->local_conds)
7046 return;
7049 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
7050 * not safe to remote.
7052 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
7053 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
7054 return;
7056 /* Safe to push down */
7057 fpinfo->pushdown_safe = true;
7059 /* Construct PgFdwPathExtraData */
7060 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
7061 fpextra->target = root->upper_targets[UPPERREL_FINAL];
7062 fpextra->has_final_sort = has_final_sort;
7063 fpextra->has_limit = extra->limit_needed;
7064 fpextra->limit_tuples = extra->limit_tuples;
7065 fpextra->count_est = extra->count_est;
7066 fpextra->offset_est = extra->offset_est;
7069 * Estimate the costs of performing the final sort and the LIMIT
7070 * restriction remotely. If has_final_sort is false, we wouldn't need to
7071 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
7072 * roughly estimated using the costs we already have for the underlying
7073 * relation, in the same way as when use_remote_estimate is false. Since
7074 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
7075 * false in that case.
7077 if (!fpextra->has_final_sort)
7079 save_use_remote_estimate = ifpinfo->use_remote_estimate;
7080 ifpinfo->use_remote_estimate = false;
7082 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
7083 &rows, &width, &startup_cost, &total_cost);
7084 if (!fpextra->has_final_sort)
7085 ifpinfo->use_remote_estimate = save_use_remote_estimate;
7088 * Build the fdw_private list that will be used by postgresGetForeignPlan.
7089 * Items in the list must match order in enum FdwPathPrivateIndex.
7091 fdw_private = list_make2(makeBoolean(has_final_sort),
7092 makeBoolean(extra->limit_needed));
7095 * Create foreign final path; this gets rid of a no-longer-needed outer
7096 * plan (if any), which makes the EXPLAIN output look cleaner
7098 final_path = create_foreign_upper_path(root,
7099 input_rel,
7100 root->upper_targets[UPPERREL_FINAL],
7101 rows,
7102 startup_cost,
7103 total_cost,
7104 pathkeys,
7105 NULL, /* no extra plan */
7106 fdw_private);
7108 /* and add it to the final_rel */
7109 add_path(final_rel, (Path *) final_path);
7113 * postgresIsForeignPathAsyncCapable
7114 * Check whether a given ForeignPath node is async-capable.
7116 static bool
7117 postgresIsForeignPathAsyncCapable(ForeignPath *path)
7119 RelOptInfo *rel = ((Path *) path)->parent;
7120 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
7122 return fpinfo->async_capable;
7126 * postgresForeignAsyncRequest
7127 * Asynchronously request next tuple from a foreign PostgreSQL table.
7129 static void
7130 postgresForeignAsyncRequest(AsyncRequest *areq)
7132 produce_tuple_asynchronously(areq, true);
7136 * postgresForeignAsyncConfigureWait
7137 * Configure a file descriptor event for which we wish to wait.
7139 static void
7140 postgresForeignAsyncConfigureWait(AsyncRequest *areq)
7142 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7143 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7144 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7145 AppendState *requestor = (AppendState *) areq->requestor;
7146 WaitEventSet *set = requestor->as_eventset;
7148 /* This should not be called unless callback_pending */
7149 Assert(areq->callback_pending);
7152 * If process_pending_request() has been invoked on the given request
7153 * before we get here, we might have some tuples already; in which case
7154 * complete the request
7156 if (fsstate->next_tuple < fsstate->num_tuples)
7158 complete_pending_request(areq);
7159 if (areq->request_complete)
7160 return;
7161 Assert(areq->callback_pending);
7164 /* We must have run out of tuples */
7165 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7167 /* The core code would have registered postmaster death event */
7168 Assert(GetNumRegisteredWaitEvents(set) >= 1);
7170 /* Begin an asynchronous data fetch if not already done */
7171 if (!pendingAreq)
7172 fetch_more_data_begin(areq);
7173 else if (pendingAreq->requestor != areq->requestor)
7176 * This is the case when the in-process request was made by another
7177 * Append. Note that it might be useless to process the request,
7178 * because the query might not need tuples from that Append anymore.
7179 * If there are any child subplans of the same parent that are ready
7180 * for new requests, skip the given request. Likewise, if there are
7181 * any configured events other than the postmaster death event, skip
7182 * it. Otherwise, process the in-process request, then begin a fetch
7183 * to configure the event below, because we might otherwise end up
7184 * with no configured events other than the postmaster death event.
7186 if (!bms_is_empty(requestor->as_needrequest))
7187 return;
7188 if (GetNumRegisteredWaitEvents(set) > 1)
7189 return;
7190 process_pending_request(pendingAreq);
7191 fetch_more_data_begin(areq);
7193 else if (pendingAreq->requestee != areq->requestee)
7196 * This is the case when the in-process request was made by the same
7197 * parent but for a different child. Since we configure only the
7198 * event for the request made for that child, skip the given request.
7200 return;
7202 else
7203 Assert(pendingAreq == areq);
7205 AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
7206 NULL, areq);
7210 * postgresForeignAsyncNotify
7211 * Fetch some more tuples from a file descriptor that becomes ready,
7212 * requesting next tuple.
7214 static void
7215 postgresForeignAsyncNotify(AsyncRequest *areq)
7217 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7218 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7220 /* The core code would have initialized the callback_pending flag */
7221 Assert(!areq->callback_pending);
7224 * If process_pending_request() has been invoked on the given request
7225 * before we get here, we might have some tuples already; in which case
7226 * produce the next tuple
7228 if (fsstate->next_tuple < fsstate->num_tuples)
7230 produce_tuple_asynchronously(areq, true);
7231 return;
7234 /* We must have run out of tuples */
7235 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7237 /* The request should be currently in-process */
7238 Assert(fsstate->conn_state->pendingAreq == areq);
7240 /* On error, report the original query, not the FETCH. */
7241 if (!PQconsumeInput(fsstate->conn))
7242 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
7244 fetch_more_data(node);
7246 produce_tuple_asynchronously(areq, true);
7250 * Asynchronously produce next tuple from a foreign PostgreSQL table.
7252 static void
7253 produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
7255 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7256 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7257 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7258 TupleTableSlot *result;
7260 /* This should not be called if the request is currently in-process */
7261 Assert(areq != pendingAreq);
7263 /* Fetch some more tuples, if we've run out */
7264 if (fsstate->next_tuple >= fsstate->num_tuples)
7266 /* No point in another fetch if we already detected EOF, though */
7267 if (!fsstate->eof_reached)
7269 /* Mark the request as pending for a callback */
7270 ExecAsyncRequestPending(areq);
7271 /* Begin another fetch if requested and if no pending request */
7272 if (fetch && !pendingAreq)
7273 fetch_more_data_begin(areq);
7275 else
7277 /* There's nothing more to do; just return a NULL pointer */
7278 result = NULL;
7279 /* Mark the request as complete */
7280 ExecAsyncRequestDone(areq, result);
7282 return;
7285 /* Get a tuple from the ForeignScan node */
7286 result = areq->requestee->ExecProcNodeReal(areq->requestee);
7287 if (!TupIsNull(result))
7289 /* Mark the request as complete */
7290 ExecAsyncRequestDone(areq, result);
7291 return;
7294 /* We must have run out of tuples */
7295 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7297 /* Fetch some more tuples, if we've not detected EOF yet */
7298 if (!fsstate->eof_reached)
7300 /* Mark the request as pending for a callback */
7301 ExecAsyncRequestPending(areq);
7302 /* Begin another fetch if requested and if no pending request */
7303 if (fetch && !pendingAreq)
7304 fetch_more_data_begin(areq);
7306 else
7308 /* There's nothing more to do; just return a NULL pointer */
7309 result = NULL;
7310 /* Mark the request as complete */
7311 ExecAsyncRequestDone(areq, result);
7316 * Begin an asynchronous data fetch.
7318 * Note: this function assumes there is no currently-in-progress asynchronous
7319 * data fetch.
7321 * Note: fetch_more_data must be called to fetch the result.
7323 static void
7324 fetch_more_data_begin(AsyncRequest *areq)
7326 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7327 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7328 char sql[64];
7330 Assert(!fsstate->conn_state->pendingAreq);
7332 /* Create the cursor synchronously. */
7333 if (!fsstate->cursor_exists)
7334 create_cursor(node);
7336 /* We will send this query, but not wait for the response. */
7337 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
7338 fsstate->fetch_size, fsstate->cursor_number);
7340 if (!PQsendQuery(fsstate->conn, sql))
7341 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
7343 /* Remember that the request is in process */
7344 fsstate->conn_state->pendingAreq = areq;
7348 * Process a pending asynchronous request.
7350 void
7351 process_pending_request(AsyncRequest *areq)
7353 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7354 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7356 /* The request would have been pending for a callback */
7357 Assert(areq->callback_pending);
7359 /* The request should be currently in-process */
7360 Assert(fsstate->conn_state->pendingAreq == areq);
7362 fetch_more_data(node);
7365 * If we didn't get any tuples, must be end of data; complete the request
7366 * now. Otherwise, we postpone completing the request until we are called
7367 * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
7369 if (fsstate->next_tuple >= fsstate->num_tuples)
7371 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7372 areq->callback_pending = false;
7373 /* Mark the request as complete */
7374 ExecAsyncRequestDone(areq, NULL);
7375 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7376 ExecAsyncResponse(areq);
7381 * Complete a pending asynchronous request.
7383 static void
7384 complete_pending_request(AsyncRequest *areq)
7386 /* The request would have been pending for a callback */
7387 Assert(areq->callback_pending);
7389 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7390 areq->callback_pending = false;
7392 /* We begin a fetch afterwards if necessary; don't fetch */
7393 produce_tuple_asynchronously(areq, false);
7395 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7396 ExecAsyncResponse(areq);
7398 /* Also, we do instrumentation ourselves, if required */
7399 if (areq->requestee->instrument)
7400 InstrUpdateTupleCount(areq->requestee->instrument,
7401 TupIsNull(areq->result) ? 0.0 : 1.0);
7405 * Create a tuple from the specified row of the PGresult.
7407 * rel is the local representation of the foreign table, attinmeta is
7408 * conversion data for the rel's tupdesc, and retrieved_attrs is an
7409 * integer list of the table column numbers present in the PGresult.
7410 * fsstate is the ForeignScan plan node's execution state.
7411 * temp_context is a working context that can be reset after each tuple.
7413 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
7414 * if we're processing a remote join, while fsstate is NULL in a non-query
7415 * context such as ANALYZE, or if we're processing a non-scan query node.
7417 static HeapTuple
7418 make_tuple_from_result_row(PGresult *res,
7419 int row,
7420 Relation rel,
7421 AttInMetadata *attinmeta,
7422 List *retrieved_attrs,
7423 ForeignScanState *fsstate,
7424 MemoryContext temp_context)
7426 HeapTuple tuple;
7427 TupleDesc tupdesc;
7428 Datum *values;
7429 bool *nulls;
7430 ItemPointer ctid = NULL;
7431 ConversionLocation errpos;
7432 ErrorContextCallback errcallback;
7433 MemoryContext oldcontext;
7434 ListCell *lc;
7435 int j;
7437 Assert(row < PQntuples(res));
7440 * Do the following work in a temp context that we reset after each tuple.
7441 * This cleans up not only the data we have direct access to, but any
7442 * cruft the I/O functions might leak.
7444 oldcontext = MemoryContextSwitchTo(temp_context);
7447 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
7448 * provided, otherwise look to the scan node's ScanTupleSlot.
7450 if (rel)
7451 tupdesc = RelationGetDescr(rel);
7452 else
7454 Assert(fsstate);
7455 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
7458 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
7459 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
7460 /* Initialize to nulls for any columns not present in result */
7461 memset(nulls, true, tupdesc->natts * sizeof(bool));
7464 * Set up and install callback to report where conversion error occurs.
7466 errpos.cur_attno = 0;
7467 errpos.rel = rel;
7468 errpos.fsstate = fsstate;
7469 errcallback.callback = conversion_error_callback;
7470 errcallback.arg = (void *) &errpos;
7471 errcallback.previous = error_context_stack;
7472 error_context_stack = &errcallback;
7475 * i indexes columns in the relation, j indexes columns in the PGresult.
7477 j = 0;
7478 foreach(lc, retrieved_attrs)
7480 int i = lfirst_int(lc);
7481 char *valstr;
7483 /* fetch next column's textual value */
7484 if (PQgetisnull(res, row, j))
7485 valstr = NULL;
7486 else
7487 valstr = PQgetvalue(res, row, j);
7490 * convert value to internal representation
7492 * Note: we ignore system columns other than ctid and oid in result
7494 errpos.cur_attno = i;
7495 if (i > 0)
7497 /* ordinary column */
7498 Assert(i <= tupdesc->natts);
7499 nulls[i - 1] = (valstr == NULL);
7500 /* Apply the input function even to nulls, to support domains */
7501 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
7502 valstr,
7503 attinmeta->attioparams[i - 1],
7504 attinmeta->atttypmods[i - 1]);
7506 else if (i == SelfItemPointerAttributeNumber)
7508 /* ctid */
7509 if (valstr != NULL)
7511 Datum datum;
7513 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
7514 ctid = (ItemPointer) DatumGetPointer(datum);
7517 errpos.cur_attno = 0;
7519 j++;
7522 /* Uninstall error context callback. */
7523 error_context_stack = errcallback.previous;
7526 * Check we got the expected number of columns. Note: j == 0 and
7527 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
7529 if (j > 0 && j != PQnfields(res))
7530 elog(ERROR, "remote query result does not match the foreign table");
7533 * Build the result tuple in caller's memory context.
7535 MemoryContextSwitchTo(oldcontext);
7537 tuple = heap_form_tuple(tupdesc, values, nulls);
7540 * If we have a CTID to return, install it in both t_self and t_ctid.
7541 * t_self is the normal place, but if the tuple is converted to a
7542 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
7543 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
7545 if (ctid)
7546 tuple->t_self = tuple->t_data->t_ctid = *ctid;
7549 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
7550 * heap_form_tuple. heap_form_tuple actually creates the tuple with
7551 * DatumTupleFields, not HeapTupleFields, but the executor expects
7552 * HeapTupleFields and will happily extract system columns on that
7553 * assumption. If we don't do this then, for example, the tuple length
7554 * ends up in the xmin field, which isn't what we want.
7556 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
7557 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
7558 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
7560 /* Clean up */
7561 MemoryContextReset(temp_context);
7563 return tuple;
7567 * Callback function which is called when error occurs during column value
7568 * conversion. Print names of column and relation.
7570 * Note that this function mustn't do any catalog lookups, since we are in
7571 * an already-failed transaction. Fortunately, we can get the needed info
7572 * from the relation or the query's rangetable instead.
7574 static void
7575 conversion_error_callback(void *arg)
7577 ConversionLocation *errpos = (ConversionLocation *) arg;
7578 Relation rel = errpos->rel;
7579 ForeignScanState *fsstate = errpos->fsstate;
7580 const char *attname = NULL;
7581 const char *relname = NULL;
7582 bool is_wholerow = false;
7585 * If we're in a scan node, always use aliases from the rangetable, for
7586 * consistency between the simple-relation and remote-join cases. Look at
7587 * the relation's tupdesc only if we're not in a scan node.
7589 if (fsstate)
7591 /* ForeignScan case */
7592 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
7593 int varno = 0;
7594 AttrNumber colno = 0;
7596 if (fsplan->scan.scanrelid > 0)
7598 /* error occurred in a scan against a foreign table */
7599 varno = fsplan->scan.scanrelid;
7600 colno = errpos->cur_attno;
7602 else
7604 /* error occurred in a scan against a foreign join */
7605 TargetEntry *tle;
7607 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
7608 errpos->cur_attno - 1);
7611 * Target list can have Vars and expressions. For Vars, we can
7612 * get some information, however for expressions we can't. Thus
7613 * for expressions, just show generic context message.
7615 if (IsA(tle->expr, Var))
7617 Var *var = (Var *) tle->expr;
7619 varno = var->varno;
7620 colno = var->varattno;
7624 if (varno > 0)
7626 EState *estate = fsstate->ss.ps.state;
7627 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
7629 relname = rte->eref->aliasname;
7631 if (colno == 0)
7632 is_wholerow = true;
7633 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
7634 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
7635 else if (colno == SelfItemPointerAttributeNumber)
7636 attname = "ctid";
7639 else if (rel)
7641 /* Non-ForeignScan case (we should always have a rel here) */
7642 TupleDesc tupdesc = RelationGetDescr(rel);
7644 relname = RelationGetRelationName(rel);
7645 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
7647 Form_pg_attribute attr = TupleDescAttr(tupdesc,
7648 errpos->cur_attno - 1);
7650 attname = NameStr(attr->attname);
7652 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
7653 attname = "ctid";
7656 if (relname && is_wholerow)
7657 errcontext("whole-row reference to foreign table \"%s\"", relname);
7658 else if (relname && attname)
7659 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
7660 else
7661 errcontext("processing expression at position %d in select list",
7662 errpos->cur_attno);
7666 * Given an EquivalenceClass and a foreign relation, find an EC member
7667 * that can be used to sort the relation remotely according to a pathkey
7668 * using this EC.
7670 * If there is more than one suitable candidate, return an arbitrary
7671 * one of them. If there is none, return NULL.
7673 * This checks that the EC member expression uses only Vars from the given
7674 * rel and is shippable. Caller must separately verify that the pathkey's
7675 * ordering operator is shippable.
7677 EquivalenceMember *
7678 find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel)
7680 ListCell *lc;
7682 foreach(lc, ec->ec_members)
7684 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc);
7687 * Note we require !bms_is_empty, else we'd accept constant
7688 * expressions which are not suitable for the purpose.
7690 if (bms_is_subset(em->em_relids, rel->relids) &&
7691 !bms_is_empty(em->em_relids) &&
7692 is_foreign_expr(root, rel, em->em_expr))
7693 return em;
7696 return NULL;
7700 * Find an EquivalenceClass member that is to be computed as a sort column
7701 * in the given rel's reltarget, and is shippable.
7703 * If there is more than one suitable candidate, return an arbitrary
7704 * one of them. If there is none, return NULL.
7706 * This checks that the EC member expression uses only Vars from the given
7707 * rel and is shippable. Caller must separately verify that the pathkey's
7708 * ordering operator is shippable.
7710 EquivalenceMember *
7711 find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec,
7712 RelOptInfo *rel)
7714 PathTarget *target = rel->reltarget;
7715 ListCell *lc1;
7716 int i;
7718 i = 0;
7719 foreach(lc1, target->exprs)
7721 Expr *expr = (Expr *) lfirst(lc1);
7722 Index sgref = get_pathtarget_sortgroupref(target, i);
7723 ListCell *lc2;
7725 /* Ignore non-sort expressions */
7726 if (sgref == 0 ||
7727 get_sortgroupref_clause_noerr(sgref,
7728 root->parse->sortClause) == NULL)
7730 i++;
7731 continue;
7734 /* We ignore binary-compatible relabeling on both ends */
7735 while (expr && IsA(expr, RelabelType))
7736 expr = ((RelabelType *) expr)->arg;
7738 /* Locate an EquivalenceClass member matching this expr, if any */
7739 foreach(lc2, ec->ec_members)
7741 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
7742 Expr *em_expr;
7744 /* Don't match constants */
7745 if (em->em_is_const)
7746 continue;
7748 /* Ignore child members */
7749 if (em->em_is_child)
7750 continue;
7752 /* Match if same expression (after stripping relabel) */
7753 em_expr = em->em_expr;
7754 while (em_expr && IsA(em_expr, RelabelType))
7755 em_expr = ((RelabelType *) em_expr)->arg;
7757 if (!equal(em_expr, expr))
7758 continue;
7760 /* Check that expression (including relabels!) is shippable */
7761 if (is_foreign_expr(root, rel, em->em_expr))
7762 return em;
7765 i++;
7768 return NULL;
7772 * Determine batch size for a given foreign table. The option specified for
7773 * a table has precedence.
7775 static int
7776 get_batch_size_option(Relation rel)
7778 Oid foreigntableid = RelationGetRelid(rel);
7779 ForeignTable *table;
7780 ForeignServer *server;
7781 List *options;
7782 ListCell *lc;
7784 /* we use 1 by default, which means "no batching" */
7785 int batch_size = 1;
7788 * Load options for table and server. We append server options after table
7789 * options, because table options take precedence.
7791 table = GetForeignTable(foreigntableid);
7792 server = GetForeignServer(table->serverid);
7794 options = NIL;
7795 options = list_concat(options, table->options);
7796 options = list_concat(options, server->options);
7798 /* See if either table or server specifies batch_size. */
7799 foreach(lc, options)
7801 DefElem *def = (DefElem *) lfirst(lc);
7803 if (strcmp(def->defname, "batch_size") == 0)
7805 (void) parse_int(defGetString(def), &batch_size, 0, NULL);
7806 break;
7810 return batch_size;