Update copyright for 2022
[pgsql.git] / src / backend / executor / nodeMergeAppend.c
blob418f89dea8c7e08d688d9be31096acc6108b491b
1 /*-------------------------------------------------------------------------
3 * nodeMergeAppend.c
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
15 /* INTERFACE ROUTINES
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
21 * NOTES
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
31 * ...
32 * /
33 * MergeAppend---+------+------+--- nil
34 * / \ | | |
35 * nil nil ... ... ...
36 * subplans
39 #include "postgres.h"
41 #include "executor/execdebug.h"
42 #include "executor/execPartition.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
48 * We have one slot for each item in the heap array. We use SlotNumber
49 * to store slot indexes. This doesn't actually provide any formal
50 * type-safety, but it makes the code more self-documenting.
52 typedef int32 SlotNumber;
54 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 static int heap_compare_slots(Datum a, Datum b, void *arg);
58 /* ----------------------------------------------------------------
59 * ExecInitMergeAppend
61 * Begin all of the subscans of the MergeAppend node.
62 * ----------------------------------------------------------------
64 MergeAppendState *
65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
67 MergeAppendState *mergestate = makeNode(MergeAppendState);
68 PlanState **mergeplanstates;
69 Bitmapset *validsubplans;
70 int nplans;
71 int i,
74 /* check for unsupported flags */
75 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
78 * create new MergeAppendState for our node
80 mergestate->ps.plan = (Plan *) node;
81 mergestate->ps.state = estate;
82 mergestate->ps.ExecProcNode = ExecMergeAppend;
84 /* If run-time partition pruning is enabled, then set that up now */
85 if (node->part_prune_info != NULL)
87 PartitionPruneState *prunestate;
89 /* We may need an expression context to evaluate partition exprs */
90 ExecAssignExprContext(estate, &mergestate->ps);
92 prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
93 node->part_prune_info);
94 mergestate->ms_prune_state = prunestate;
96 /* Perform an initial partition prune, if required. */
97 if (prunestate->do_initial_prune)
99 /* Determine which subplans survive initial pruning */
100 validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
101 list_length(node->mergeplans));
103 nplans = bms_num_members(validsubplans);
105 else
107 /* We'll need to initialize all subplans */
108 nplans = list_length(node->mergeplans);
109 Assert(nplans > 0);
110 validsubplans = bms_add_range(NULL, 0, nplans - 1);
114 * When no run-time pruning is required and there's at least one
115 * subplan, we can fill as_valid_subplans immediately, preventing
116 * later calls to ExecFindMatchingSubPlans.
118 if (!prunestate->do_exec_prune && nplans > 0)
119 mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
121 else
123 nplans = list_length(node->mergeplans);
126 * When run-time partition pruning is not enabled we can just mark all
127 * subplans as valid; they must also all be initialized.
129 Assert(nplans > 0);
130 mergestate->ms_valid_subplans = validsubplans =
131 bms_add_range(NULL, 0, nplans - 1);
132 mergestate->ms_prune_state = NULL;
135 mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
136 mergestate->mergeplans = mergeplanstates;
137 mergestate->ms_nplans = nplans;
139 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
140 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
141 mergestate);
144 * Miscellaneous initialization
146 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
147 * so we have to initialize them. FIXME
149 ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
151 /* node returns slots from each of its subnodes, therefore not fixed */
152 mergestate->ps.resultopsset = true;
153 mergestate->ps.resultopsfixed = false;
156 * call ExecInitNode on each of the valid plans to be executed and save
157 * the results into the mergeplanstates array.
159 j = 0;
160 i = -1;
161 while ((i = bms_next_member(validsubplans, i)) >= 0)
163 Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
165 mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
168 mergestate->ps.ps_ProjInfo = NULL;
171 * initialize sort-key information
173 mergestate->ms_nkeys = node->numCols;
174 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
176 for (i = 0; i < node->numCols; i++)
178 SortSupport sortKey = mergestate->ms_sortkeys + i;
180 sortKey->ssup_cxt = CurrentMemoryContext;
181 sortKey->ssup_collation = node->collations[i];
182 sortKey->ssup_nulls_first = node->nullsFirst[i];
183 sortKey->ssup_attno = node->sortColIdx[i];
186 * It isn't feasible to perform abbreviated key conversion, since
187 * tuples are pulled into mergestate's binary heap as needed. It
188 * would likely be counter-productive to convert tuples into an
189 * abbreviated representation as they're pulled up, so opt out of that
190 * additional optimization entirely.
192 sortKey->abbreviate = false;
194 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
198 * initialize to show we have not run the subplans yet
200 mergestate->ms_initialized = false;
202 return mergestate;
205 /* ----------------------------------------------------------------
206 * ExecMergeAppend
208 * Handles iteration over multiple subplans.
209 * ----------------------------------------------------------------
211 static TupleTableSlot *
212 ExecMergeAppend(PlanState *pstate)
214 MergeAppendState *node = castNode(MergeAppendState, pstate);
215 TupleTableSlot *result;
216 SlotNumber i;
218 CHECK_FOR_INTERRUPTS();
220 if (!node->ms_initialized)
222 /* Nothing to do if all subplans were pruned */
223 if (node->ms_nplans == 0)
224 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
227 * If we've yet to determine the valid subplans then do so now. If
228 * run-time pruning is disabled then the valid subplans will always be
229 * set to all subplans.
231 if (node->ms_valid_subplans == NULL)
232 node->ms_valid_subplans =
233 ExecFindMatchingSubPlans(node->ms_prune_state);
236 * First time through: pull the first tuple from each valid subplan,
237 * and set up the heap.
239 i = -1;
240 while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
242 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
243 if (!TupIsNull(node->ms_slots[i]))
244 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
246 binaryheap_build(node->ms_heap);
247 node->ms_initialized = true;
249 else
252 * Otherwise, pull the next tuple from whichever subplan we returned
253 * from last time, and reinsert the subplan index into the heap,
254 * because it might now compare differently against the existing
255 * elements of the heap. (We could perhaps simplify the logic a bit
256 * by doing this before returning from the prior call, but it's better
257 * to not pull tuples until necessary.)
259 i = DatumGetInt32(binaryheap_first(node->ms_heap));
260 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
261 if (!TupIsNull(node->ms_slots[i]))
262 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
263 else
264 (void) binaryheap_remove_first(node->ms_heap);
267 if (binaryheap_empty(node->ms_heap))
269 /* All the subplans are exhausted, and so is the heap */
270 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
272 else
274 i = DatumGetInt32(binaryheap_first(node->ms_heap));
275 result = node->ms_slots[i];
278 return result;
282 * Compare the tuples in the two given slots.
284 static int32
285 heap_compare_slots(Datum a, Datum b, void *arg)
287 MergeAppendState *node = (MergeAppendState *) arg;
288 SlotNumber slot1 = DatumGetInt32(a);
289 SlotNumber slot2 = DatumGetInt32(b);
291 TupleTableSlot *s1 = node->ms_slots[slot1];
292 TupleTableSlot *s2 = node->ms_slots[slot2];
293 int nkey;
295 Assert(!TupIsNull(s1));
296 Assert(!TupIsNull(s2));
298 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
300 SortSupport sortKey = node->ms_sortkeys + nkey;
301 AttrNumber attno = sortKey->ssup_attno;
302 Datum datum1,
303 datum2;
304 bool isNull1,
305 isNull2;
306 int compare;
308 datum1 = slot_getattr(s1, attno, &isNull1);
309 datum2 = slot_getattr(s2, attno, &isNull2);
311 compare = ApplySortComparator(datum1, isNull1,
312 datum2, isNull2,
313 sortKey);
314 if (compare != 0)
316 INVERT_COMPARE_RESULT(compare);
317 return compare;
320 return 0;
323 /* ----------------------------------------------------------------
324 * ExecEndMergeAppend
326 * Shuts down the subscans of the MergeAppend node.
328 * Returns nothing of interest.
329 * ----------------------------------------------------------------
331 void
332 ExecEndMergeAppend(MergeAppendState *node)
334 PlanState **mergeplans;
335 int nplans;
336 int i;
339 * get information from the node
341 mergeplans = node->mergeplans;
342 nplans = node->ms_nplans;
345 * shut down each of the subscans
347 for (i = 0; i < nplans; i++)
348 ExecEndNode(mergeplans[i]);
351 void
352 ExecReScanMergeAppend(MergeAppendState *node)
354 int i;
357 * If any PARAM_EXEC Params used in pruning expressions have changed, then
358 * we'd better unset the valid subplans so that they are reselected for
359 * the new parameter values.
361 if (node->ms_prune_state &&
362 bms_overlap(node->ps.chgParam,
363 node->ms_prune_state->execparamids))
365 bms_free(node->ms_valid_subplans);
366 node->ms_valid_subplans = NULL;
369 for (i = 0; i < node->ms_nplans; i++)
371 PlanState *subnode = node->mergeplans[i];
374 * ExecReScan doesn't know about my subplans, so I have to do
375 * changed-parameter signaling myself.
377 if (node->ps.chgParam != NULL)
378 UpdateChangedParamSet(subnode, node->ps.chgParam);
381 * If chgParam of subnode is not null then plan will be re-scanned by
382 * first ExecProcNode.
384 if (subnode->chgParam == NULL)
385 ExecReScan(subnode);
387 binaryheap_reset(node->ms_heap);
388 node->ms_initialized = false;