Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeRecursiveunion.c
4 : * routines to handle RecursiveUnion nodes.
5 : *
6 : * To implement UNION (without ALL), we need a hashtable that stores tuples
7 : * already seen. The hash key is computed from the grouping columns.
8 : *
9 : *
10 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : *
14 : * IDENTIFICATION
15 : * src/backend/executor/nodeRecursiveunion.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 : #include "postgres.h"
20 :
21 : #include "executor/execdebug.h"
22 : #include "executor/nodeRecursiveunion.h"
23 : #include "miscadmin.h"
24 : #include "utils/memutils.h"
25 :
26 :
27 :
28 : /*
29 : * Initialize the hash table to empty.
30 : */
31 : static void
32 4 : build_hash_table(RecursiveUnionState *rustate)
33 : {
34 4 : RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 :
36 4 : Assert(node->numCols > 0);
37 4 : Assert(node->numGroups > 0);
38 :
39 4 : rustate->hashtable = BuildTupleHashTable(node->numCols,
40 : node->dupColIdx,
41 : rustate->eqfunctions,
42 : rustate->hashfunctions,
43 : node->numGroups,
44 : 0,
45 : rustate->tableContext,
46 : rustate->tempContext,
47 : false);
48 4 : }
49 :
50 :
51 : /* ----------------------------------------------------------------
52 : * ExecRecursiveUnion(node)
53 : *
54 : * Scans the recursive query sequentially and returns the next
55 : * qualifying tuple.
56 : *
57 : * 1. evaluate non recursive term and assign the result to RT
58 : *
59 : * 2. execute recursive terms
60 : *
61 : * 2.1 WT := RT
62 : * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
63 : * 2.3 replace the name of recursive term with WT
64 : * 2.4 evaluate the recursive term and store into WT
65 : * 2.5 append WT to RT
66 : * 2.6 go back to 2.2
67 : * ----------------------------------------------------------------
68 : */
69 : static TupleTableSlot *
70 1229 : ExecRecursiveUnion(PlanState *pstate)
71 : {
72 1229 : RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
73 1229 : PlanState *outerPlan = outerPlanState(node);
74 1229 : PlanState *innerPlan = innerPlanState(node);
75 1229 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
76 : TupleTableSlot *slot;
77 : bool isnew;
78 :
79 1229 : CHECK_FOR_INTERRUPTS();
80 :
81 : /* 1. Evaluate non-recursive term */
82 1229 : if (!node->recursing)
83 : {
84 : for (;;)
85 : {
86 127 : slot = ExecProcNode(outerPlan);
87 127 : if (TupIsNull(slot))
88 : break;
89 87 : if (plan->numCols > 0)
90 : {
91 : /* Find or build hashtable entry for this tuple's group */
92 4 : LookupTupleHashEntry(node->hashtable, slot, &isnew);
93 : /* Must reset temp context after each hashtable lookup */
94 4 : MemoryContextReset(node->tempContext);
95 : /* Ignore tuple if already seen */
96 4 : if (!isnew)
97 0 : continue;
98 : }
99 : /* Each non-duplicate tuple goes to the working table ... */
100 87 : tuplestore_puttupleslot(node->working_table, slot);
101 : /* ... and to the caller */
102 87 : return slot;
103 0 : }
104 40 : node->recursing = true;
105 : }
106 :
107 : /* 2. Execute recursive term */
108 : for (;;)
109 : {
110 2035 : slot = ExecProcNode(innerPlan);
111 2035 : if (TupIsNull(slot))
112 : {
113 : /* Done if there's nothing in the intermediate table */
114 925 : if (node->intermediate_empty)
115 35 : break;
116 :
117 : /* done with old working table ... */
118 890 : tuplestore_end(node->working_table);
119 :
120 : /* intermediate table becomes working table */
121 890 : node->working_table = node->intermediate_table;
122 :
123 : /* create new empty intermediate table */
124 890 : node->intermediate_table = tuplestore_begin_heap(false, false,
125 : work_mem);
126 890 : node->intermediate_empty = true;
127 :
128 : /* reset the recursive term */
129 890 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
130 : plan->wtParam);
131 :
132 : /* and continue fetching from recursive term */
133 890 : continue;
134 : }
135 :
136 1110 : if (plan->numCols > 0)
137 : {
138 : /* Find or build hashtable entry for this tuple's group */
139 20 : LookupTupleHashEntry(node->hashtable, slot, &isnew);
140 : /* Must reset temp context after each hashtable lookup */
141 20 : MemoryContextReset(node->tempContext);
142 : /* Ignore tuple if already seen */
143 20 : if (!isnew)
144 3 : continue;
145 : }
146 :
147 : /* Else, tuple is good; stash it in intermediate table ... */
148 1107 : node->intermediate_empty = false;
149 1107 : tuplestore_puttupleslot(node->intermediate_table, slot);
150 : /* ... and return it */
151 1107 : return slot;
152 893 : }
153 :
154 35 : return NULL;
155 : }
156 :
157 : /* ----------------------------------------------------------------
158 : * ExecInitRecursiveUnionScan
159 : * ----------------------------------------------------------------
160 : */
161 : RecursiveUnionState *
162 40 : ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
163 : {
164 : RecursiveUnionState *rustate;
165 : ParamExecData *prmdata;
166 :
167 : /* check for unsupported flags */
168 40 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
169 :
170 : /*
171 : * create state structure
172 : */
173 40 : rustate = makeNode(RecursiveUnionState);
174 40 : rustate->ps.plan = (Plan *) node;
175 40 : rustate->ps.state = estate;
176 40 : rustate->ps.ExecProcNode = ExecRecursiveUnion;
177 :
178 40 : rustate->eqfunctions = NULL;
179 40 : rustate->hashfunctions = NULL;
180 40 : rustate->hashtable = NULL;
181 40 : rustate->tempContext = NULL;
182 40 : rustate->tableContext = NULL;
183 :
184 : /* initialize processing state */
185 40 : rustate->recursing = false;
186 40 : rustate->intermediate_empty = true;
187 40 : rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
188 40 : rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
189 :
190 : /*
191 : * If hashing, we need a per-tuple memory context for comparisons, and a
192 : * longer-lived context to store the hash table. The table can't just be
193 : * kept in the per-query context because we want to be able to throw it
194 : * away when rescanning.
195 : */
196 40 : if (node->numCols > 0)
197 : {
198 4 : rustate->tempContext =
199 4 : AllocSetContextCreate(CurrentMemoryContext,
200 : "RecursiveUnion",
201 : ALLOCSET_DEFAULT_SIZES);
202 4 : rustate->tableContext =
203 4 : AllocSetContextCreate(CurrentMemoryContext,
204 : "RecursiveUnion hash table",
205 : ALLOCSET_DEFAULT_SIZES);
206 : }
207 :
208 : /*
209 : * Make the state structure available to descendant WorkTableScan nodes
210 : * via the Param slot reserved for it.
211 : */
212 40 : prmdata = &(estate->es_param_exec_vals[node->wtParam]);
213 40 : Assert(prmdata->execPlan == NULL);
214 40 : prmdata->value = PointerGetDatum(rustate);
215 40 : prmdata->isnull = false;
216 :
217 : /*
218 : * Miscellaneous initialization
219 : *
220 : * RecursiveUnion plans don't have expression contexts because they never
221 : * call ExecQual or ExecProject.
222 : */
223 40 : Assert(node->plan.qual == NIL);
224 :
225 : /*
226 : * RecursiveUnion nodes still have Result slots, which hold pointers to
227 : * tuples, so we have to initialize them.
228 : */
229 40 : ExecInitResultTupleSlot(estate, &rustate->ps);
230 :
231 : /*
232 : * Initialize result tuple type and projection info. (Note: we have to
233 : * set up the result type before initializing child nodes, because
234 : * nodeWorktablescan.c expects it to be valid.)
235 : */
236 40 : ExecAssignResultTypeFromTL(&rustate->ps);
237 40 : rustate->ps.ps_ProjInfo = NULL;
238 :
239 : /*
240 : * initialize child nodes
241 : */
242 40 : outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
243 40 : innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
244 :
245 : /*
246 : * If hashing, precompute fmgr lookup data for inner loop, and create the
247 : * hash table.
248 : */
249 40 : if (node->numCols > 0)
250 : {
251 4 : execTuplesHashPrepare(node->numCols,
252 : node->dupOperators,
253 : &rustate->eqfunctions,
254 : &rustate->hashfunctions);
255 4 : build_hash_table(rustate);
256 : }
257 :
258 40 : return rustate;
259 : }
260 :
261 : /* ----------------------------------------------------------------
262 : * ExecEndRecursiveUnionScan
263 : *
264 : * frees any storage allocated through C routines.
265 : * ----------------------------------------------------------------
266 : */
267 : void
268 40 : ExecEndRecursiveUnion(RecursiveUnionState *node)
269 : {
270 : /* Release tuplestores */
271 40 : tuplestore_end(node->working_table);
272 40 : tuplestore_end(node->intermediate_table);
273 :
274 : /* free subsidiary stuff including hashtable */
275 40 : if (node->tempContext)
276 4 : MemoryContextDelete(node->tempContext);
277 40 : if (node->tableContext)
278 4 : MemoryContextDelete(node->tableContext);
279 :
280 : /*
281 : * clean out the upper tuple table
282 : */
283 40 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
284 :
285 : /*
286 : * close down subplans
287 : */
288 40 : ExecEndNode(outerPlanState(node));
289 40 : ExecEndNode(innerPlanState(node));
290 40 : }
291 :
292 : /* ----------------------------------------------------------------
293 : * ExecReScanRecursiveUnion
294 : *
295 : * Rescans the relation.
296 : * ----------------------------------------------------------------
297 : */
298 : void
299 0 : ExecReScanRecursiveUnion(RecursiveUnionState *node)
300 : {
301 0 : PlanState *outerPlan = outerPlanState(node);
302 0 : PlanState *innerPlan = innerPlanState(node);
303 0 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
304 :
305 : /*
306 : * Set recursive term's chgParam to tell it that we'll modify the working
307 : * table and therefore it has to rescan.
308 : */
309 0 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
310 :
311 : /*
312 : * if chgParam of subnode is not null then plan will be re-scanned by
313 : * first ExecProcNode. Because of above, we only have to do this to the
314 : * non-recursive term.
315 : */
316 0 : if (outerPlan->chgParam == NULL)
317 0 : ExecReScan(outerPlan);
318 :
319 : /* Release any hashtable storage */
320 0 : if (node->tableContext)
321 0 : MemoryContextResetAndDeleteChildren(node->tableContext);
322 :
323 : /* And rebuild empty hashtable if needed */
324 0 : if (plan->numCols > 0)
325 0 : build_hash_table(node);
326 :
327 : /* reset processing state */
328 0 : node->recursing = false;
329 0 : node->intermediate_empty = true;
330 0 : tuplestore_clear(node->working_table);
331 0 : tuplestore_clear(node->intermediate_table);
332 0 : }
|