1 /*-------------------------------------------------------------------------
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
33 * MergeAppend---+------+------+--- nil
41 #include "executor/execdebug.h"
42 #include "executor/execPartition.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
48 * We have one slot for each item in the heap array. We use SlotNumber
49 * to store slot indexes. This doesn't actually provide any formal
50 * type-safety, but it makes the code more self-documenting.
52 typedef int32 SlotNumber
;
54 static TupleTableSlot
*ExecMergeAppend(PlanState
*pstate
);
55 static int heap_compare_slots(Datum a
, Datum b
, void *arg
);
58 /* ----------------------------------------------------------------
61 * Begin all of the subscans of the MergeAppend node.
62 * ----------------------------------------------------------------
65 ExecInitMergeAppend(MergeAppend
*node
, EState
*estate
, int eflags
)
67 MergeAppendState
*mergestate
= makeNode(MergeAppendState
);
68 PlanState
**mergeplanstates
;
69 Bitmapset
*validsubplans
;
74 /* check for unsupported flags */
75 Assert(!(eflags
& (EXEC_FLAG_BACKWARD
| EXEC_FLAG_MARK
)));
78 * create new MergeAppendState for our node
80 mergestate
->ps
.plan
= (Plan
*) node
;
81 mergestate
->ps
.state
= estate
;
82 mergestate
->ps
.ExecProcNode
= ExecMergeAppend
;
84 /* If run-time partition pruning is enabled, then set that up now */
85 if (node
->part_prune_info
!= NULL
)
87 PartitionPruneState
*prunestate
;
89 /* We may need an expression context to evaluate partition exprs */
90 ExecAssignExprContext(estate
, &mergestate
->ps
);
92 prunestate
= ExecCreatePartitionPruneState(&mergestate
->ps
,
93 node
->part_prune_info
);
94 mergestate
->ms_prune_state
= prunestate
;
96 /* Perform an initial partition prune, if required. */
97 if (prunestate
->do_initial_prune
)
99 /* Determine which subplans survive initial pruning */
100 validsubplans
= ExecFindInitialMatchingSubPlans(prunestate
,
101 list_length(node
->mergeplans
));
103 nplans
= bms_num_members(validsubplans
);
107 /* We'll need to initialize all subplans */
108 nplans
= list_length(node
->mergeplans
);
110 validsubplans
= bms_add_range(NULL
, 0, nplans
- 1);
114 * When no run-time pruning is required and there's at least one
115 * subplan, we can fill as_valid_subplans immediately, preventing
116 * later calls to ExecFindMatchingSubPlans.
118 if (!prunestate
->do_exec_prune
&& nplans
> 0)
119 mergestate
->ms_valid_subplans
= bms_add_range(NULL
, 0, nplans
- 1);
123 nplans
= list_length(node
->mergeplans
);
126 * When run-time partition pruning is not enabled we can just mark all
127 * subplans as valid; they must also all be initialized.
130 mergestate
->ms_valid_subplans
= validsubplans
=
131 bms_add_range(NULL
, 0, nplans
- 1);
132 mergestate
->ms_prune_state
= NULL
;
135 mergeplanstates
= (PlanState
**) palloc(nplans
* sizeof(PlanState
*));
136 mergestate
->mergeplans
= mergeplanstates
;
137 mergestate
->ms_nplans
= nplans
;
139 mergestate
->ms_slots
= (TupleTableSlot
**) palloc0(sizeof(TupleTableSlot
*) * nplans
);
140 mergestate
->ms_heap
= binaryheap_allocate(nplans
, heap_compare_slots
,
144 * Miscellaneous initialization
146 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
147 * so we have to initialize them. FIXME
149 ExecInitResultTupleSlotTL(&mergestate
->ps
, &TTSOpsVirtual
);
151 /* node returns slots from each of its subnodes, therefore not fixed */
152 mergestate
->ps
.resultopsset
= true;
153 mergestate
->ps
.resultopsfixed
= false;
156 * call ExecInitNode on each of the valid plans to be executed and save
157 * the results into the mergeplanstates array.
161 while ((i
= bms_next_member(validsubplans
, i
)) >= 0)
163 Plan
*initNode
= (Plan
*) list_nth(node
->mergeplans
, i
);
165 mergeplanstates
[j
++] = ExecInitNode(initNode
, estate
, eflags
);
168 mergestate
->ps
.ps_ProjInfo
= NULL
;
171 * initialize sort-key information
173 mergestate
->ms_nkeys
= node
->numCols
;
174 mergestate
->ms_sortkeys
= palloc0(sizeof(SortSupportData
) * node
->numCols
);
176 for (i
= 0; i
< node
->numCols
; i
++)
178 SortSupport sortKey
= mergestate
->ms_sortkeys
+ i
;
180 sortKey
->ssup_cxt
= CurrentMemoryContext
;
181 sortKey
->ssup_collation
= node
->collations
[i
];
182 sortKey
->ssup_nulls_first
= node
->nullsFirst
[i
];
183 sortKey
->ssup_attno
= node
->sortColIdx
[i
];
186 * It isn't feasible to perform abbreviated key conversion, since
187 * tuples are pulled into mergestate's binary heap as needed. It
188 * would likely be counter-productive to convert tuples into an
189 * abbreviated representation as they're pulled up, so opt out of that
190 * additional optimization entirely.
192 sortKey
->abbreviate
= false;
194 PrepareSortSupportFromOrderingOp(node
->sortOperators
[i
], sortKey
);
198 * initialize to show we have not run the subplans yet
200 mergestate
->ms_initialized
= false;
205 /* ----------------------------------------------------------------
208 * Handles iteration over multiple subplans.
209 * ----------------------------------------------------------------
211 static TupleTableSlot
*
212 ExecMergeAppend(PlanState
*pstate
)
214 MergeAppendState
*node
= castNode(MergeAppendState
, pstate
);
215 TupleTableSlot
*result
;
218 CHECK_FOR_INTERRUPTS();
220 if (!node
->ms_initialized
)
222 /* Nothing to do if all subplans were pruned */
223 if (node
->ms_nplans
== 0)
224 return ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
227 * If we've yet to determine the valid subplans then do so now. If
228 * run-time pruning is disabled then the valid subplans will always be
229 * set to all subplans.
231 if (node
->ms_valid_subplans
== NULL
)
232 node
->ms_valid_subplans
=
233 ExecFindMatchingSubPlans(node
->ms_prune_state
);
236 * First time through: pull the first tuple from each valid subplan,
237 * and set up the heap.
240 while ((i
= bms_next_member(node
->ms_valid_subplans
, i
)) >= 0)
242 node
->ms_slots
[i
] = ExecProcNode(node
->mergeplans
[i
]);
243 if (!TupIsNull(node
->ms_slots
[i
]))
244 binaryheap_add_unordered(node
->ms_heap
, Int32GetDatum(i
));
246 binaryheap_build(node
->ms_heap
);
247 node
->ms_initialized
= true;
252 * Otherwise, pull the next tuple from whichever subplan we returned
253 * from last time, and reinsert the subplan index into the heap,
254 * because it might now compare differently against the existing
255 * elements of the heap. (We could perhaps simplify the logic a bit
256 * by doing this before returning from the prior call, but it's better
257 * to not pull tuples until necessary.)
259 i
= DatumGetInt32(binaryheap_first(node
->ms_heap
));
260 node
->ms_slots
[i
] = ExecProcNode(node
->mergeplans
[i
]);
261 if (!TupIsNull(node
->ms_slots
[i
]))
262 binaryheap_replace_first(node
->ms_heap
, Int32GetDatum(i
));
264 (void) binaryheap_remove_first(node
->ms_heap
);
267 if (binaryheap_empty(node
->ms_heap
))
269 /* All the subplans are exhausted, and so is the heap */
270 result
= ExecClearTuple(node
->ps
.ps_ResultTupleSlot
);
274 i
= DatumGetInt32(binaryheap_first(node
->ms_heap
));
275 result
= node
->ms_slots
[i
];
282 * Compare the tuples in the two given slots.
285 heap_compare_slots(Datum a
, Datum b
, void *arg
)
287 MergeAppendState
*node
= (MergeAppendState
*) arg
;
288 SlotNumber slot1
= DatumGetInt32(a
);
289 SlotNumber slot2
= DatumGetInt32(b
);
291 TupleTableSlot
*s1
= node
->ms_slots
[slot1
];
292 TupleTableSlot
*s2
= node
->ms_slots
[slot2
];
295 Assert(!TupIsNull(s1
));
296 Assert(!TupIsNull(s2
));
298 for (nkey
= 0; nkey
< node
->ms_nkeys
; nkey
++)
300 SortSupport sortKey
= node
->ms_sortkeys
+ nkey
;
301 AttrNumber attno
= sortKey
->ssup_attno
;
308 datum1
= slot_getattr(s1
, attno
, &isNull1
);
309 datum2
= slot_getattr(s2
, attno
, &isNull2
);
311 compare
= ApplySortComparator(datum1
, isNull1
,
316 INVERT_COMPARE_RESULT(compare
);
323 /* ----------------------------------------------------------------
326 * Shuts down the subscans of the MergeAppend node.
328 * Returns nothing of interest.
329 * ----------------------------------------------------------------
332 ExecEndMergeAppend(MergeAppendState
*node
)
334 PlanState
**mergeplans
;
339 * get information from the node
341 mergeplans
= node
->mergeplans
;
342 nplans
= node
->ms_nplans
;
345 * shut down each of the subscans
347 for (i
= 0; i
< nplans
; i
++)
348 ExecEndNode(mergeplans
[i
]);
352 ExecReScanMergeAppend(MergeAppendState
*node
)
357 * If any PARAM_EXEC Params used in pruning expressions have changed, then
358 * we'd better unset the valid subplans so that they are reselected for
359 * the new parameter values.
361 if (node
->ms_prune_state
&&
362 bms_overlap(node
->ps
.chgParam
,
363 node
->ms_prune_state
->execparamids
))
365 bms_free(node
->ms_valid_subplans
);
366 node
->ms_valid_subplans
= NULL
;
369 for (i
= 0; i
< node
->ms_nplans
; i
++)
371 PlanState
*subnode
= node
->mergeplans
[i
];
374 * ExecReScan doesn't know about my subplans, so I have to do
375 * changed-parameter signaling myself.
377 if (node
->ps
.chgParam
!= NULL
)
378 UpdateChangedParamSet(subnode
, node
->ps
.chgParam
);
381 * If chgParam of subnode is not null then plan will be re-scanned by
382 * first ExecProcNode.
384 if (subnode
->chgParam
== NULL
)
387 binaryheap_reset(node
->ms_heap
);
388 node
->ms_initialized
= false;