Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/execdebug.h"
42 : #include "executor/nodeMergeAppend.h"
43 : #include "lib/binaryheap.h"
44 : #include "miscadmin.h"
45 :
46 : /*
47 : * We have one slot for each item in the heap array. We use SlotNumber
48 : * to store slot indexes. This doesn't actually provide any formal
49 : * type-safety, but it makes the code more self-documenting.
50 : */
51 : typedef int32 SlotNumber;
52 :
53 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
54 : static int heap_compare_slots(Datum a, Datum b, void *arg);
55 :
56 :
57 : /* ----------------------------------------------------------------
58 : * ExecInitMergeAppend
59 : *
60 : * Begin all of the subscans of the MergeAppend node.
61 : * ----------------------------------------------------------------
62 : */
63 : MergeAppendState *
64 34 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 : {
66 34 : MergeAppendState *mergestate = makeNode(MergeAppendState);
67 : PlanState **mergeplanstates;
68 : int nplans;
69 : int i;
70 : ListCell *lc;
71 :
72 : /* check for unsupported flags */
73 34 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74 :
75 : /*
76 : * Lock the non-leaf tables in the partition tree controlled by this node.
77 : * It's a no-op for non-partitioned parent tables.
78 : */
79 34 : ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
80 :
81 : /*
82 : * Set up empty vector of subplan states
83 : */
84 34 : nplans = list_length(node->mergeplans);
85 :
86 34 : mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
87 :
88 : /*
89 : * create new MergeAppendState for our node
90 : */
91 34 : mergestate->ps.plan = (Plan *) node;
92 34 : mergestate->ps.state = estate;
93 34 : mergestate->ps.ExecProcNode = ExecMergeAppend;
94 34 : mergestate->mergeplans = mergeplanstates;
95 34 : mergestate->ms_nplans = nplans;
96 :
97 34 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
98 34 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
99 : mergestate);
100 :
101 : /*
102 : * Miscellaneous initialization
103 : *
104 : * MergeAppend plans don't have expression contexts because they never
105 : * call ExecQual or ExecProject.
106 : */
107 :
108 : /*
109 : * MergeAppend nodes do have Result slots, which hold pointers to tuples,
110 : * so we have to initialize them.
111 : */
112 34 : ExecInitResultTupleSlot(estate, &mergestate->ps);
113 :
114 : /*
115 : * call ExecInitNode on each of the plans to be executed and save the
116 : * results into the array "mergeplans".
117 : */
118 34 : i = 0;
119 134 : foreach(lc, node->mergeplans)
120 : {
121 100 : Plan *initNode = (Plan *) lfirst(lc);
122 :
123 100 : mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
124 100 : i++;
125 : }
126 :
127 : /*
128 : * initialize output tuple type
129 : */
130 34 : ExecAssignResultTypeFromTL(&mergestate->ps);
131 34 : mergestate->ps.ps_ProjInfo = NULL;
132 :
133 : /*
134 : * initialize sort-key information
135 : */
136 34 : mergestate->ms_nkeys = node->numCols;
137 34 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
138 :
139 72 : for (i = 0; i < node->numCols; i++)
140 : {
141 38 : SortSupport sortKey = mergestate->ms_sortkeys + i;
142 :
143 38 : sortKey->ssup_cxt = CurrentMemoryContext;
144 38 : sortKey->ssup_collation = node->collations[i];
145 38 : sortKey->ssup_nulls_first = node->nullsFirst[i];
146 38 : sortKey->ssup_attno = node->sortColIdx[i];
147 :
148 : /*
149 : * It isn't feasible to perform abbreviated key conversion, since
150 : * tuples are pulled into mergestate's binary heap as needed. It
151 : * would likely be counter-productive to convert tuples into an
152 : * abbreviated representation as they're pulled up, so opt out of that
153 : * additional optimization entirely.
154 : */
155 38 : sortKey->abbreviate = false;
156 :
157 38 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
158 : }
159 :
160 : /*
161 : * initialize to show we have not run the subplans yet
162 : */
163 34 : mergestate->ms_initialized = false;
164 :
165 34 : return mergestate;
166 : }
167 :
168 : /* ----------------------------------------------------------------
169 : * ExecMergeAppend
170 : *
171 : * Handles iteration over multiple subplans.
172 : * ----------------------------------------------------------------
173 : */
174 : static TupleTableSlot *
175 66 : ExecMergeAppend(PlanState *pstate)
176 : {
177 66 : MergeAppendState *node = castNode(MergeAppendState, pstate);
178 : TupleTableSlot *result;
179 : SlotNumber i;
180 :
181 66 : CHECK_FOR_INTERRUPTS();
182 :
183 66 : if (!node->ms_initialized)
184 : {
185 : /*
186 : * First time through: pull the first tuple from each subplan, and set
187 : * up the heap.
188 : */
189 58 : for (i = 0; i < node->ms_nplans; i++)
190 : {
191 44 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
192 44 : if (!TupIsNull(node->ms_slots[i]))
193 40 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
194 : }
195 14 : binaryheap_build(node->ms_heap);
196 14 : node->ms_initialized = true;
197 : }
198 : else
199 : {
200 : /*
201 : * Otherwise, pull the next tuple from whichever subplan we returned
202 : * from last time, and reinsert the subplan index into the heap,
203 : * because it might now compare differently against the existing
204 : * elements of the heap. (We could perhaps simplify the logic a bit
205 : * by doing this before returning from the prior call, but it's better
206 : * to not pull tuples until necessary.)
207 : */
208 52 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
209 52 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
210 52 : if (!TupIsNull(node->ms_slots[i]))
211 43 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
212 : else
213 9 : (void) binaryheap_remove_first(node->ms_heap);
214 : }
215 :
216 66 : if (binaryheap_empty(node->ms_heap))
217 : {
218 : /* All the subplans are exhausted, and so is the heap */
219 3 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
220 : }
221 : else
222 : {
223 63 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
224 63 : result = node->ms_slots[i];
225 : }
226 :
227 66 : return result;
228 : }
229 :
230 : /*
231 : * Compare the tuples in the two given slots.
232 : */
233 : static int32
234 92 : heap_compare_slots(Datum a, Datum b, void *arg)
235 : {
236 92 : MergeAppendState *node = (MergeAppendState *) arg;
237 92 : SlotNumber slot1 = DatumGetInt32(a);
238 92 : SlotNumber slot2 = DatumGetInt32(b);
239 :
240 92 : TupleTableSlot *s1 = node->ms_slots[slot1];
241 92 : TupleTableSlot *s2 = node->ms_slots[slot2];
242 : int nkey;
243 :
244 92 : Assert(!TupIsNull(s1));
245 92 : Assert(!TupIsNull(s2));
246 :
247 97 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
248 : {
249 92 : SortSupport sortKey = node->ms_sortkeys + nkey;
250 92 : AttrNumber attno = sortKey->ssup_attno;
251 : Datum datum1,
252 : datum2;
253 : bool isNull1,
254 : isNull2;
255 : int compare;
256 :
257 92 : datum1 = slot_getattr(s1, attno, &isNull1);
258 92 : datum2 = slot_getattr(s2, attno, &isNull2);
259 :
260 92 : compare = ApplySortComparator(datum1, isNull1,
261 : datum2, isNull2,
262 : sortKey);
263 92 : if (compare != 0)
264 87 : return -compare;
265 : }
266 5 : return 0;
267 : }
268 :
269 : /* ----------------------------------------------------------------
270 : * ExecEndMergeAppend
271 : *
272 : * Shuts down the subscans of the MergeAppend node.
273 : *
274 : * Returns nothing of interest.
275 : * ----------------------------------------------------------------
276 : */
277 : void
278 34 : ExecEndMergeAppend(MergeAppendState *node)
279 : {
280 : PlanState **mergeplans;
281 : int nplans;
282 : int i;
283 :
284 : /*
285 : * get information from the node
286 : */
287 34 : mergeplans = node->mergeplans;
288 34 : nplans = node->ms_nplans;
289 :
290 : /*
291 : * shut down each of the subscans
292 : */
293 134 : for (i = 0; i < nplans; i++)
294 100 : ExecEndNode(mergeplans[i]);
295 34 : }
296 :
297 : void
298 3 : ExecReScanMergeAppend(MergeAppendState *node)
299 : {
300 : int i;
301 :
302 9 : for (i = 0; i < node->ms_nplans; i++)
303 : {
304 6 : PlanState *subnode = node->mergeplans[i];
305 :
306 : /*
307 : * ExecReScan doesn't know about my subplans, so I have to do
308 : * changed-parameter signaling myself.
309 : */
310 6 : if (node->ps.chgParam != NULL)
311 6 : UpdateChangedParamSet(subnode, node->ps.chgParam);
312 :
313 : /*
314 : * If chgParam of subnode is not null then plan will be re-scanned by
315 : * first ExecProcNode.
316 : */
317 6 : if (subnode->chgParam == NULL)
318 0 : ExecReScan(subnode);
319 : }
320 3 : binaryheap_reset(node->ms_heap);
321 3 : node->ms_initialized = false;
322 3 : }
|