Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGather.c
4 : * Support routines for scanning a plan via multiple workers.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * A Gather executor launches parallel workers to run multiple copies of a
10 : * plan. It can also run the plan itself, if the workers are not available
11 : * or have not started up yet. It then merges all of the results it produces
12 : * and the results from the workers into a single output stream. Therefore,
13 : * it will normally be used with a plan where running multiple copies of the
14 : * same plan does not produce duplicate output, such as parallel-aware
15 : * SeqScan.
16 : *
17 : * Alternatively, a Gather node can be configured to use just one worker
18 : * and the single-copy flag can be set. In this case, the Gather node will
19 : * run the plan in one worker and will not execute the plan itself. In
20 : * this case, it simply returns whatever tuples were returned by the worker.
21 : * If a worker cannot be obtained, then it will run the plan itself and
22 : * return the results. Therefore, a plan used with a single-copy Gather
23 : * node need not be parallel-aware.
24 : *
25 : * IDENTIFICATION
26 : * src/backend/executor/nodeGather.c
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 :
31 : #include "postgres.h"
32 :
33 : #include "access/relscan.h"
34 : #include "access/xact.h"
35 : #include "executor/execdebug.h"
36 : #include "executor/execParallel.h"
37 : #include "executor/nodeGather.h"
38 : #include "executor/nodeSubplan.h"
39 : #include "executor/tqueue.h"
40 : #include "miscadmin.h"
41 : #include "pgstat.h"
42 : #include "utils/memutils.h"
43 : #include "utils/rel.h"
44 :
45 :
46 : static TupleTableSlot *ExecGather(PlanState *pstate);
47 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
48 : static HeapTuple gather_readnext(GatherState *gatherstate);
49 : static void ExecShutdownGatherWorkers(GatherState *node);
50 :
51 :
52 : /* ----------------------------------------------------------------
53 : * ExecInitGather
54 : * ----------------------------------------------------------------
55 : */
56 : GatherState *
57 26 : ExecInitGather(Gather *node, EState *estate, int eflags)
58 : {
59 : GatherState *gatherstate;
60 : Plan *outerNode;
61 : bool hasoid;
62 : TupleDesc tupDesc;
63 :
64 : /* Gather node doesn't have innerPlan node. */
65 26 : Assert(innerPlan(node) == NULL);
66 :
67 : /*
68 : * create state structure
69 : */
70 26 : gatherstate = makeNode(GatherState);
71 26 : gatherstate->ps.plan = (Plan *) node;
72 26 : gatherstate->ps.state = estate;
73 26 : gatherstate->ps.ExecProcNode = ExecGather;
74 :
75 26 : gatherstate->initialized = false;
76 26 : gatherstate->need_to_scan_locally = !node->single_copy;
77 26 : gatherstate->tuples_needed = -1;
78 :
79 : /*
80 : * Miscellaneous initialization
81 : *
82 : * create expression context for node
83 : */
84 26 : ExecAssignExprContext(estate, &gatherstate->ps);
85 :
86 : /*
87 : * Gather doesn't support checking a qual (it's always more efficient to
88 : * do it in the child node).
89 : */
90 26 : Assert(!node->plan.qual);
91 :
92 : /*
93 : * tuple table initialization
94 : */
95 26 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
96 26 : ExecInitResultTupleSlot(estate, &gatherstate->ps);
97 :
98 : /*
99 : * now initialize outer plan
100 : */
101 26 : outerNode = outerPlan(node);
102 26 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
103 :
104 : /*
105 : * Initialize result tuple type and projection info.
106 : */
107 26 : ExecAssignResultTypeFromTL(&gatherstate->ps);
108 26 : ExecAssignProjectionInfo(&gatherstate->ps, NULL);
109 :
110 : /*
111 : * Initialize funnel slot to same tuple descriptor as outer plan.
112 : */
113 26 : if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
114 26 : hasoid = false;
115 26 : tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
116 26 : ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
117 :
118 26 : return gatherstate;
119 : }
120 :
121 : /* ----------------------------------------------------------------
122 : * ExecGather(node)
123 : *
124 : * Scans the relation via multiple workers and returns
125 : * the next qualifying tuple.
126 : * ----------------------------------------------------------------
127 : */
128 : static TupleTableSlot *
129 108089 : ExecGather(PlanState *pstate)
130 : {
131 108089 : GatherState *node = castNode(GatherState, pstate);
132 108089 : TupleTableSlot *fslot = node->funnel_slot;
133 : TupleTableSlot *slot;
134 : ExprContext *econtext;
135 :
136 108089 : CHECK_FOR_INTERRUPTS();
137 :
138 : /*
139 : * Initialize the parallel context and workers on first execution. We do
140 : * this on first execution rather than during node initialization, as it
141 : * needs to allocate a large dynamic segment, so it is better to do it
142 : * only if it is really needed.
143 : */
144 108089 : if (!node->initialized)
145 : {
146 26 : EState *estate = node->ps.state;
147 26 : Gather *gather = (Gather *) node->ps.plan;
148 :
149 : /*
150 : * Sometimes we might have to run without parallelism; but if parallel
151 : * mode is active then we can try to fire up some workers.
152 : */
153 26 : if (gather->num_workers > 0 && IsInParallelMode())
154 : {
155 : ParallelContext *pcxt;
156 :
157 : /* Initialize, or re-initialize, shared state needed by workers. */
158 26 : if (!node->pei)
159 13 : node->pei = ExecInitParallelPlan(node->ps.lefttree,
160 : estate,
161 : gather->num_workers,
162 : node->tuples_needed);
163 : else
164 13 : ExecParallelReinitialize(node->ps.lefttree,
165 13 : node->pei);
166 :
167 : /*
168 : * Register backend workers. We might not get as many as we
169 : * requested, or indeed any at all.
170 : */
171 26 : pcxt = node->pei->pcxt;
172 26 : LaunchParallelWorkers(pcxt);
173 : /* We save # workers launched for the benefit of EXPLAIN */
174 26 : node->nworkers_launched = pcxt->nworkers_launched;
175 :
176 : /* Set up tuple queue readers to read the results. */
177 26 : if (pcxt->nworkers_launched > 0)
178 : {
179 26 : ExecParallelCreateReaders(node->pei,
180 : fslot->tts_tupleDescriptor);
181 : /* Make a working array showing the active readers */
182 26 : node->nreaders = pcxt->nworkers_launched;
183 26 : node->reader = (TupleQueueReader **)
184 26 : palloc(node->nreaders * sizeof(TupleQueueReader *));
185 26 : memcpy(node->reader, node->pei->reader,
186 26 : node->nreaders * sizeof(TupleQueueReader *));
187 : }
188 : else
189 : {
190 : /* No workers? Then never mind. */
191 0 : node->nreaders = 0;
192 0 : node->reader = NULL;
193 : }
194 26 : node->nextreader = 0;
195 : }
196 :
197 : /* Run plan locally if no workers or not single-copy. */
198 52 : node->need_to_scan_locally = (node->nreaders == 0)
199 26 : || !gather->single_copy;
200 26 : node->initialized = true;
201 : }
202 :
203 : /*
204 : * Reset per-tuple memory context to free any expression evaluation
205 : * storage allocated in the previous tuple cycle. This will also clear
206 : * any previous tuple returned by a TupleQueueReader; to make sure we
207 : * don't leave a dangling pointer around, clear the working slot first.
208 : */
209 108089 : ExecClearTuple(fslot);
210 108089 : econtext = node->ps.ps_ExprContext;
211 108089 : ResetExprContext(econtext);
212 :
213 : /*
214 : * Get next tuple, either from one of our workers, or by running the plan
215 : * ourselves.
216 : */
217 108089 : slot = gather_getnext(node);
218 108088 : if (TupIsNull(slot))
219 25 : return NULL;
220 :
221 : /*
222 : * Form the result tuple using ExecProject(), and return it.
223 : */
224 108063 : econtext->ecxt_outertuple = slot;
225 108063 : return ExecProject(node->ps.ps_ProjInfo);
226 : }
227 :
228 : /* ----------------------------------------------------------------
229 : * ExecEndGather
230 : *
231 : * frees any storage allocated through C routines.
232 : * ----------------------------------------------------------------
233 : */
234 : void
235 25 : ExecEndGather(GatherState *node)
236 : {
237 25 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
238 25 : ExecShutdownGather(node);
239 25 : ExecFreeExprContext(&node->ps);
240 25 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
241 25 : }
242 :
243 : /*
244 : * Read the next tuple. We might fetch a tuple from one of the tuple queues
245 : * using gather_readnext, or if no tuple queue contains a tuple and the
246 : * single_copy flag is not set, we might generate one locally instead.
247 : */
248 : static TupleTableSlot *
249 108089 : gather_getnext(GatherState *gatherstate)
250 : {
251 108089 : PlanState *outerPlan = outerPlanState(gatherstate);
252 : TupleTableSlot *outerTupleSlot;
253 108089 : TupleTableSlot *fslot = gatherstate->funnel_slot;
254 108089 : MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
255 : HeapTuple tup;
256 :
257 216227 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
258 : {
259 108113 : CHECK_FOR_INTERRUPTS();
260 :
261 108113 : if (gatherstate->nreaders > 0)
262 : {
263 : MemoryContext oldContext;
264 :
265 : /* Run TupleQueueReaders in per-tuple context */
266 108113 : oldContext = MemoryContextSwitchTo(tupleContext);
267 108113 : tup = gather_readnext(gatherstate);
268 108112 : MemoryContextSwitchTo(oldContext);
269 :
270 108112 : if (HeapTupleIsValid(tup))
271 : {
272 50 : ExecStoreTuple(tup, /* tuple to store */
273 : fslot, /* slot in which to store the tuple */
274 : InvalidBuffer, /* buffer associated with this
275 : * tuple */
276 : false); /* slot should not pfree tuple */
277 50 : return fslot;
278 : }
279 : }
280 :
281 108062 : if (gatherstate->need_to_scan_locally)
282 : {
283 108037 : outerTupleSlot = ExecProcNode(outerPlan);
284 :
285 108037 : if (!TupIsNull(outerTupleSlot))
286 108013 : return outerTupleSlot;
287 :
288 24 : gatherstate->need_to_scan_locally = false;
289 : }
290 : }
291 :
292 25 : return ExecClearTuple(fslot);
293 : }
294 :
295 : /*
296 : * Attempt to read a tuple from one of our parallel workers.
297 : */
298 : static HeapTuple
299 108113 : gather_readnext(GatherState *gatherstate)
300 : {
301 108113 : int nvisited = 0;
302 :
303 : for (;;)
304 : {
305 : TupleQueueReader *reader;
306 : HeapTuple tup;
307 : bool readerdone;
308 :
309 : /* Check for async events, particularly messages from workers. */
310 433955 : CHECK_FOR_INTERRUPTS();
311 :
312 : /* Attempt to read a tuple, but don't block if none is available. */
313 433954 : Assert(gatherstate->nextreader < gatherstate->nreaders);
314 433954 : reader = gatherstate->reader[gatherstate->nextreader];
315 433954 : tup = TupleQueueReaderNext(reader, true, &readerdone);
316 :
317 : /*
318 : * If this reader is done, remove it from our working array of active
319 : * readers. If all readers are done, we're outta here.
320 : */
321 433954 : if (readerdone)
322 : {
323 94 : Assert(!tup);
324 94 : --gatherstate->nreaders;
325 94 : if (gatherstate->nreaders == 0)
326 108137 : return NULL;
327 207 : memmove(&gatherstate->reader[gatherstate->nextreader],
328 138 : &gatherstate->reader[gatherstate->nextreader + 1],
329 : sizeof(TupleQueueReader *)
330 69 : * (gatherstate->nreaders - gatherstate->nextreader));
331 69 : if (gatherstate->nextreader >= gatherstate->nreaders)
332 69 : gatherstate->nextreader = 0;
333 69 : continue;
334 : }
335 :
336 : /* If we got a tuple, return it. */
337 433860 : if (tup)
338 50 : return tup;
339 :
340 : /*
341 : * Advance nextreader pointer in round-robin fashion. Note that we
342 : * only reach this code if we weren't able to get a tuple from the
343 : * current worker. We used to advance the nextreader pointer after
344 : * every tuple, but it turns out to be much more efficient to keep
345 : * reading from the same queue until that would require blocking.
346 : */
347 433810 : gatherstate->nextreader++;
348 433810 : if (gatherstate->nextreader >= gatherstate->nreaders)
349 108705 : gatherstate->nextreader = 0;
350 :
351 : /* Have we visited every (surviving) TupleQueueReader? */
352 433810 : nvisited++;
353 433810 : if (nvisited >= gatherstate->nreaders)
354 : {
355 : /*
356 : * If (still) running plan locally, return NULL so caller can
357 : * generate another tuple from the local copy of the plan.
358 : */
359 108729 : if (gatherstate->need_to_scan_locally)
360 108037 : return NULL;
361 :
362 : /* Nothing to do except wait for developments. */
363 692 : WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
364 692 : ResetLatch(MyLatch);
365 692 : nvisited = 0;
366 : }
367 325842 : }
368 : }
369 :
370 : /* ----------------------------------------------------------------
371 : * ExecShutdownGatherWorkers
372 : *
373 : * Stop all the parallel workers.
374 : * ----------------------------------------------------------------
375 : */
376 : static void
377 53 : ExecShutdownGatherWorkers(GatherState *node)
378 : {
379 53 : if (node->pei != NULL)
380 25 : ExecParallelFinish(node->pei);
381 :
382 : /* Flush local copy of reader array */
383 53 : if (node->reader)
384 25 : pfree(node->reader);
385 53 : node->reader = NULL;
386 53 : }
387 :
388 : /* ----------------------------------------------------------------
389 : * ExecShutdownGather
390 : *
391 : * Destroy the setup for parallel workers including parallel context.
392 : * ----------------------------------------------------------------
393 : */
394 : void
395 37 : ExecShutdownGather(GatherState *node)
396 : {
397 37 : ExecShutdownGatherWorkers(node);
398 :
399 : /* Now destroy the parallel context. */
400 37 : if (node->pei != NULL)
401 : {
402 12 : ExecParallelCleanup(node->pei);
403 12 : node->pei = NULL;
404 : }
405 37 : }
406 :
407 : /* ----------------------------------------------------------------
408 : * Join Support
409 : * ----------------------------------------------------------------
410 : */
411 :
412 : /* ----------------------------------------------------------------
413 : * ExecReScanGather
414 : *
415 : * Prepare to re-scan the result of a Gather.
416 : * ----------------------------------------------------------------
417 : */
418 : void
419 16 : ExecReScanGather(GatherState *node)
420 : {
421 16 : Gather *gather = (Gather *) node->ps.plan;
422 16 : PlanState *outerPlan = outerPlanState(node);
423 :
424 : /* Make sure any existing workers are gracefully shut down */
425 16 : ExecShutdownGatherWorkers(node);
426 :
427 : /* Mark node so that shared state will be rebuilt at next call */
428 16 : node->initialized = false;
429 :
430 : /*
431 : * Set child node's chgParam to tell it that the next scan might deliver a
432 : * different set of rows within the leader process. (The overall rowset
433 : * shouldn't change, but the leader process's subset might; hence nodes
434 : * between here and the parallel table scan node mustn't optimize on the
435 : * assumption of an unchanging rowset.)
436 : */
437 16 : if (gather->rescan_param >= 0)
438 16 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
439 : gather->rescan_param);
440 :
441 : /*
442 : * If chgParam of subnode is not null then plan will be re-scanned by
443 : * first ExecProcNode. Note: because this does nothing if we have a
444 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
445 : * will not see a ReScan call until after they get a ReInitializeDSM call.
446 : * That ordering might not be something to rely on, though. A good rule
447 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
448 : * should reset only local state, and anything that depends on both of
449 : * those steps being finished must wait until the first ExecProcNode call.
450 : */
451 16 : if (outerPlan->chgParam == NULL)
452 0 : ExecReScan(outerPlan);
453 16 : }
|