Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGatherMerge.c
4 : * Scan a plan in multiple workers, and do order-preserving merge.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeGatherMerge.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/relscan.h"
18 : #include "access/xact.h"
19 : #include "executor/execdebug.h"
20 : #include "executor/execParallel.h"
21 : #include "executor/nodeGatherMerge.h"
22 : #include "executor/nodeSubplan.h"
23 : #include "executor/tqueue.h"
24 : #include "lib/binaryheap.h"
25 : #include "miscadmin.h"
26 : #include "utils/memutils.h"
27 : #include "utils/rel.h"
28 :
29 : /*
30 : * When we read tuples from workers, it's a good idea to read several at once
31 : * for efficiency when possible: this minimizes context-switching overhead.
32 : * But reading too many at a time wastes memory without improving performance.
33 : * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
34 : */
35 : #define MAX_TUPLE_STORE 10
36 :
37 : /*
38 : * Pending-tuple array for each worker. This holds additional tuples that
39 : * we were able to fetch from the worker, but can't process yet. In addition,
40 : * this struct holds the "done" flag indicating the worker is known to have
41 : * no more tuples. (We do not use this struct for the leader; we don't keep
42 : * any pending tuples for the leader, and the need_to_scan_locally flag serves
43 : * as its "done" indicator.)
44 : */
45 : typedef struct GMReaderTupleBuffer
46 : {
47 : HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
48 : int nTuples; /* number of tuples currently stored */
49 : int readCounter; /* index of next tuple to extract */
50 : bool done; /* true if reader is known exhausted */
51 : } GMReaderTupleBuffer;
52 :
53 : static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
54 : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
55 : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
56 : static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
57 : bool nowait, bool *done);
58 : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
59 : static void gather_merge_setup(GatherMergeState *gm_state);
60 : static void gather_merge_init(GatherMergeState *gm_state);
61 : static void gather_merge_clear_tuples(GatherMergeState *gm_state);
62 : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
63 : bool nowait);
64 : static void load_tuple_array(GatherMergeState *gm_state, int reader);
65 :
66 : /* ----------------------------------------------------------------
67 : * ExecInitGather
68 : * ----------------------------------------------------------------
69 : */
70 : GatherMergeState *
71 8 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
72 : {
73 : GatherMergeState *gm_state;
74 : Plan *outerNode;
75 : bool hasoid;
76 : TupleDesc tupDesc;
77 :
78 : /* Gather merge node doesn't have innerPlan node. */
79 8 : Assert(innerPlan(node) == NULL);
80 :
81 : /*
82 : * create state structure
83 : */
84 8 : gm_state = makeNode(GatherMergeState);
85 8 : gm_state->ps.plan = (Plan *) node;
86 8 : gm_state->ps.state = estate;
87 8 : gm_state->ps.ExecProcNode = ExecGatherMerge;
88 :
89 8 : gm_state->initialized = false;
90 8 : gm_state->gm_initialized = false;
91 8 : gm_state->tuples_needed = -1;
92 :
93 : /*
94 : * Miscellaneous initialization
95 : *
96 : * create expression context for node
97 : */
98 8 : ExecAssignExprContext(estate, &gm_state->ps);
99 :
100 : /*
101 : * GatherMerge doesn't support checking a qual (it's always more efficient
102 : * to do it in the child node).
103 : */
104 8 : Assert(!node->plan.qual);
105 :
106 : /*
107 : * tuple table initialization
108 : */
109 8 : ExecInitResultTupleSlot(estate, &gm_state->ps);
110 :
111 : /*
112 : * now initialize outer plan
113 : */
114 8 : outerNode = outerPlan(node);
115 8 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
116 :
117 : /*
118 : * Initialize result tuple type and projection info.
119 : */
120 8 : ExecAssignResultTypeFromTL(&gm_state->ps);
121 8 : ExecAssignProjectionInfo(&gm_state->ps, NULL);
122 :
123 : /*
124 : * initialize sort-key information
125 : */
126 8 : if (node->numCols)
127 : {
128 : int i;
129 :
130 8 : gm_state->gm_nkeys = node->numCols;
131 8 : gm_state->gm_sortkeys =
132 8 : palloc0(sizeof(SortSupportData) * node->numCols);
133 :
134 16 : for (i = 0; i < node->numCols; i++)
135 : {
136 8 : SortSupport sortKey = gm_state->gm_sortkeys + i;
137 :
138 8 : sortKey->ssup_cxt = CurrentMemoryContext;
139 8 : sortKey->ssup_collation = node->collations[i];
140 8 : sortKey->ssup_nulls_first = node->nullsFirst[i];
141 8 : sortKey->ssup_attno = node->sortColIdx[i];
142 :
143 : /*
144 : * We don't perform abbreviated key conversion here, for the same
145 : * reasons that it isn't used in MergeAppend
146 : */
147 8 : sortKey->abbreviate = false;
148 :
149 8 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
150 : }
151 : }
152 :
153 : /*
154 : * Store the tuple descriptor into gather merge state, so we can use it
155 : * while initializing the gather merge slots.
156 : */
157 8 : if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
158 8 : hasoid = false;
159 8 : tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
160 8 : gm_state->tupDesc = tupDesc;
161 :
162 : /* Now allocate the workspace for gather merge */
163 8 : gather_merge_setup(gm_state);
164 :
165 8 : return gm_state;
166 : }
167 :
168 : /* ----------------------------------------------------------------
169 : * ExecGatherMerge(node)
170 : *
171 : * Scans the relation via multiple workers and returns
172 : * the next qualifying tuple.
173 : * ----------------------------------------------------------------
174 : */
175 : static TupleTableSlot *
176 45 : ExecGatherMerge(PlanState *pstate)
177 : {
178 45 : GatherMergeState *node = castNode(GatherMergeState, pstate);
179 : TupleTableSlot *slot;
180 : ExprContext *econtext;
181 :
182 45 : CHECK_FOR_INTERRUPTS();
183 :
184 : /*
185 : * As with Gather, we don't launch workers until this node is actually
186 : * executed.
187 : */
188 45 : if (!node->initialized)
189 : {
190 6 : EState *estate = node->ps.state;
191 6 : GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
192 :
193 : /*
194 : * Sometimes we might have to run without parallelism; but if parallel
195 : * mode is active then we can try to fire up some workers.
196 : */
197 6 : if (gm->num_workers > 0 && IsInParallelMode())
198 : {
199 : ParallelContext *pcxt;
200 :
201 : /* Initialize, or re-initialize, shared state needed by workers. */
202 6 : if (!node->pei)
203 4 : node->pei = ExecInitParallelPlan(node->ps.lefttree,
204 : estate,
205 : gm->num_workers,
206 : node->tuples_needed);
207 : else
208 2 : ExecParallelReinitialize(node->ps.lefttree,
209 2 : node->pei);
210 :
211 : /* Try to launch workers. */
212 6 : pcxt = node->pei->pcxt;
213 6 : LaunchParallelWorkers(pcxt);
214 : /* We save # workers launched for the benefit of EXPLAIN */
215 6 : node->nworkers_launched = pcxt->nworkers_launched;
216 :
217 : /* Set up tuple queue readers to read the results. */
218 6 : if (pcxt->nworkers_launched > 0)
219 : {
220 5 : ExecParallelCreateReaders(node->pei, node->tupDesc);
221 : /* Make a working array showing the active readers */
222 5 : node->nreaders = pcxt->nworkers_launched;
223 5 : node->reader = (TupleQueueReader **)
224 5 : palloc(node->nreaders * sizeof(TupleQueueReader *));
225 5 : memcpy(node->reader, node->pei->reader,
226 5 : node->nreaders * sizeof(TupleQueueReader *));
227 : }
228 : else
229 : {
230 : /* No workers? Then never mind. */
231 1 : node->nreaders = 0;
232 1 : node->reader = NULL;
233 : }
234 : }
235 :
236 : /* always allow leader to participate */
237 6 : node->need_to_scan_locally = true;
238 6 : node->initialized = true;
239 : }
240 :
241 : /*
242 : * Reset per-tuple memory context to free any expression evaluation
243 : * storage allocated in the previous tuple cycle.
244 : */
245 45 : econtext = node->ps.ps_ExprContext;
246 45 : ResetExprContext(econtext);
247 :
248 : /*
249 : * Get next tuple, either from one of our workers, or by running the plan
250 : * ourselves.
251 : */
252 45 : slot = gather_merge_getnext(node);
253 45 : if (TupIsNull(slot))
254 4 : return NULL;
255 :
256 : /*
257 : * Form the result tuple using ExecProject(), and return it.
258 : */
259 41 : econtext->ecxt_outertuple = slot;
260 41 : return ExecProject(node->ps.ps_ProjInfo);
261 : }
262 :
263 : /* ----------------------------------------------------------------
264 : * ExecEndGatherMerge
265 : *
266 : * frees any storage allocated through C routines.
267 : * ----------------------------------------------------------------
268 : */
269 : void
270 8 : ExecEndGatherMerge(GatherMergeState *node)
271 : {
272 8 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
273 8 : ExecShutdownGatherMerge(node);
274 8 : ExecFreeExprContext(&node->ps);
275 8 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
276 8 : }
277 :
278 : /* ----------------------------------------------------------------
279 : * ExecShutdownGatherMerge
280 : *
281 : * Destroy the setup for parallel workers including parallel context.
282 : * ----------------------------------------------------------------
283 : */
284 : void
285 12 : ExecShutdownGatherMerge(GatherMergeState *node)
286 : {
287 12 : ExecShutdownGatherMergeWorkers(node);
288 :
289 : /* Now destroy the parallel context. */
290 12 : if (node->pei != NULL)
291 : {
292 4 : ExecParallelCleanup(node->pei);
293 4 : node->pei = NULL;
294 : }
295 12 : }
296 :
297 : /* ----------------------------------------------------------------
298 : * ExecShutdownGatherMergeWorkers
299 : *
300 : * Stop all the parallel workers.
301 : * ----------------------------------------------------------------
302 : */
303 : static void
304 15 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
305 : {
306 15 : if (node->pei != NULL)
307 6 : ExecParallelFinish(node->pei);
308 :
309 : /* Flush local copy of reader array */
310 15 : if (node->reader)
311 5 : pfree(node->reader);
312 15 : node->reader = NULL;
313 15 : }
314 :
315 : /* ----------------------------------------------------------------
316 : * ExecReScanGatherMerge
317 : *
318 : * Prepare to re-scan the result of a GatherMerge.
319 : * ----------------------------------------------------------------
320 : */
321 : void
322 3 : ExecReScanGatherMerge(GatherMergeState *node)
323 : {
324 3 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
325 3 : PlanState *outerPlan = outerPlanState(node);
326 :
327 : /* Make sure any existing workers are gracefully shut down */
328 3 : ExecShutdownGatherMergeWorkers(node);
329 :
330 : /* Free any unused tuples, so we don't leak memory across rescans */
331 3 : gather_merge_clear_tuples(node);
332 :
333 : /* Mark node so that shared state will be rebuilt at next call */
334 3 : node->initialized = false;
335 3 : node->gm_initialized = false;
336 :
337 : /*
338 : * Set child node's chgParam to tell it that the next scan might deliver a
339 : * different set of rows within the leader process. (The overall rowset
340 : * shouldn't change, but the leader process's subset might; hence nodes
341 : * between here and the parallel table scan node mustn't optimize on the
342 : * assumption of an unchanging rowset.)
343 : */
344 3 : if (gm->rescan_param >= 0)
345 3 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
346 : gm->rescan_param);
347 :
348 : /*
349 : * If chgParam of subnode is not null then plan will be re-scanned by
350 : * first ExecProcNode. Note: because this does nothing if we have a
351 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
352 : * will not see a ReScan call until after they get a ReInitializeDSM call.
353 : * That ordering might not be something to rely on, though. A good rule
354 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
355 : * should reset only local state, and anything that depends on both of
356 : * those steps being finished must wait until the first ExecProcNode call.
357 : */
358 3 : if (outerPlan->chgParam == NULL)
359 0 : ExecReScan(outerPlan);
360 3 : }
361 :
362 : /*
363 : * Set up the data structures that we'll need for Gather Merge.
364 : *
365 : * We allocate these once on the basis of gm->num_workers, which is an
366 : * upper bound for the number of workers we'll actually have. During
367 : * a rescan, we reset the structures to empty. This approach simplifies
368 : * not leaking memory across rescans.
369 : *
370 : * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
371 : * are for workers. The values placed into gm_heap correspond to indexes
372 : * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
373 : * 0 to n-1; it has no entry for the leader.
374 : */
375 : static void
376 8 : gather_merge_setup(GatherMergeState *gm_state)
377 : {
378 8 : GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
379 8 : int nreaders = gm->num_workers;
380 : int i;
381 :
382 : /*
383 : * Allocate gm_slots for the number of workers + one more slot for leader.
384 : * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
385 : * read the tuple, and then stores it directly into its gm_slots entry.
386 : * For other slots, code below will call ExecInitExtraTupleSlot() to
387 : * create a slot for the worker's results. Note that during any single
388 : * scan, we might have fewer than num_workers available workers, in which
389 : * case the extra array entries go unused.
390 : */
391 8 : gm_state->gm_slots = (TupleTableSlot **)
392 8 : palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
393 :
394 : /* Allocate the tuple slot and tuple array for each worker */
395 8 : gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
396 8 : palloc0(nreaders * sizeof(GMReaderTupleBuffer));
397 :
398 40 : for (i = 0; i < nreaders; i++)
399 : {
400 : /* Allocate the tuple array with length MAX_TUPLE_STORE */
401 64 : gm_state->gm_tuple_buffers[i].tuple =
402 32 : (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
403 :
404 : /* Initialize tuple slot for worker */
405 32 : gm_state->gm_slots[i + 1] = ExecInitExtraTupleSlot(gm_state->ps.state);
406 32 : ExecSetSlotDescriptor(gm_state->gm_slots[i + 1],
407 : gm_state->tupDesc);
408 : }
409 :
410 : /* Allocate the resources for the merge */
411 8 : gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
412 : heap_compare_slots,
413 : gm_state);
414 8 : }
415 :
416 : /*
417 : * Initialize the Gather Merge.
418 : *
419 : * Reset data structures to ensure they're empty. Then pull at least one
420 : * tuple from leader + each worker (or set its "done" indicator), and set up
421 : * the heap.
422 : */
423 : static void
424 6 : gather_merge_init(GatherMergeState *gm_state)
425 : {
426 6 : int nreaders = gm_state->nreaders;
427 6 : bool nowait = true;
428 : int i;
429 :
430 : /* Assert that gather_merge_setup made enough space */
431 6 : Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
432 :
433 : /* Reset leader's tuple slot to empty */
434 6 : gm_state->gm_slots[0] = NULL;
435 :
436 : /* Reset the tuple slot and tuple array for each worker */
437 26 : for (i = 0; i < nreaders; i++)
438 : {
439 : /* Reset tuple array to empty */
440 20 : gm_state->gm_tuple_buffers[i].nTuples = 0;
441 20 : gm_state->gm_tuple_buffers[i].readCounter = 0;
442 : /* Reset done flag to not-done */
443 20 : gm_state->gm_tuple_buffers[i].done = false;
444 : /* Ensure output slot is empty */
445 20 : ExecClearTuple(gm_state->gm_slots[i + 1]);
446 : }
447 :
448 : /* Reset binary heap to empty */
449 6 : binaryheap_reset(gm_state->gm_heap);
450 :
451 : /*
452 : * First, try to read a tuple from each worker (including leader) in
453 : * nowait mode. After this, if not all workers were able to produce a
454 : * tuple (or a "done" indication), then re-read from remaining workers,
455 : * this time using wait mode. Add all live readers (those producing at
456 : * least one tuple) to the heap.
457 : */
458 : reread:
459 62 : for (i = 0; i <= nreaders; i++)
460 : {
461 51 : CHECK_FOR_INTERRUPTS();
462 :
463 : /* skip this source if already known done */
464 91 : if ((i == 0) ? gm_state->need_to_scan_locally :
465 40 : !gm_state->gm_tuple_buffers[i - 1].done)
466 : {
467 51 : if (TupIsNull(gm_state->gm_slots[i]))
468 : {
469 : /* Don't have a tuple yet, try to get one */
470 92 : if (gather_merge_readnext(gm_state, i, nowait))
471 6 : binaryheap_add_unordered(gm_state->gm_heap,
472 : Int32GetDatum(i));
473 : }
474 : else
475 : {
476 : /*
477 : * We already got at least one tuple from this worker, but
478 : * might as well see if it has any more ready by now.
479 : */
480 5 : load_tuple_array(gm_state, i);
481 : }
482 : }
483 : }
484 :
485 : /* need not recheck leader, since nowait doesn't matter for it */
486 31 : for (i = 1; i <= nreaders; i++)
487 : {
488 30 : if (!gm_state->gm_tuple_buffers[i - 1].done &&
489 10 : TupIsNull(gm_state->gm_slots[i]))
490 : {
491 5 : nowait = false;
492 5 : goto reread;
493 : }
494 : }
495 :
496 : /* Now heapify the heap. */
497 6 : binaryheap_build(gm_state->gm_heap);
498 :
499 6 : gm_state->gm_initialized = true;
500 6 : }
501 :
502 : /*
503 : * Clear out the tuple table slot, and any unused pending tuples,
504 : * for each gather merge input.
505 : */
506 : static void
507 7 : gather_merge_clear_tuples(GatherMergeState *gm_state)
508 : {
509 : int i;
510 :
511 31 : for (i = 0; i < gm_state->nreaders; i++)
512 : {
513 24 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
514 :
515 48 : while (tuple_buffer->readCounter < tuple_buffer->nTuples)
516 0 : heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
517 :
518 24 : ExecClearTuple(gm_state->gm_slots[i + 1]);
519 : }
520 7 : }
521 :
522 : /*
523 : * Read the next tuple for gather merge.
524 : *
525 : * Fetch the sorted tuple out of the heap.
526 : */
527 : static TupleTableSlot *
528 45 : gather_merge_getnext(GatherMergeState *gm_state)
529 : {
530 : int i;
531 :
532 45 : if (!gm_state->gm_initialized)
533 : {
534 : /*
535 : * First time through: pull the first tuple from each participant, and
536 : * set up the heap.
537 : */
538 6 : gather_merge_init(gm_state);
539 : }
540 : else
541 : {
542 : /*
543 : * Otherwise, pull the next tuple from whichever participant we
544 : * returned from last time, and reinsert that participant's index into
545 : * the heap, because it might now compare differently against the
546 : * other elements of the heap.
547 : */
548 39 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
549 :
550 39 : if (gather_merge_readnext(gm_state, i, false))
551 35 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
552 : else
553 : {
554 : /* reader exhausted, remove it from heap */
555 4 : (void) binaryheap_remove_first(gm_state->gm_heap);
556 : }
557 : }
558 :
559 45 : if (binaryheap_empty(gm_state->gm_heap))
560 : {
561 : /* All the queues are exhausted, and so is the heap */
562 4 : gather_merge_clear_tuples(gm_state);
563 4 : return NULL;
564 : }
565 : else
566 : {
567 : /* Return next tuple from whichever participant has the leading one */
568 41 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
569 41 : return gm_state->gm_slots[i];
570 : }
571 : }
572 :
573 : /*
574 : * Read tuple(s) for given reader in nowait mode, and load into its tuple
575 : * array, until we have MAX_TUPLE_STORE of them or would have to block.
576 : */
577 : static void
578 5 : load_tuple_array(GatherMergeState *gm_state, int reader)
579 : {
580 : GMReaderTupleBuffer *tuple_buffer;
581 : int i;
582 :
583 : /* Don't do anything if this is the leader. */
584 5 : if (reader == 0)
585 10 : return;
586 :
587 0 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
588 :
589 : /* If there's nothing in the array, reset the counters to zero. */
590 0 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
591 0 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
592 :
593 : /* Try to fill additional slots in the array. */
594 0 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
595 : {
596 : HeapTuple tuple;
597 :
598 0 : tuple = gm_readnext_tuple(gm_state,
599 : reader,
600 : true,
601 : &tuple_buffer->done);
602 0 : if (!HeapTupleIsValid(tuple))
603 0 : break;
604 0 : tuple_buffer->tuple[i] = heap_copytuple(tuple);
605 0 : tuple_buffer->nTuples++;
606 : }
607 : }
608 :
609 : /*
610 : * Store the next tuple for a given reader into the appropriate slot.
611 : *
612 : * Returns true if successful, false if not (either reader is exhausted,
613 : * or we didn't want to wait for a tuple). Sets done flag if reader
614 : * is found to be exhausted.
615 : */
616 : static bool
617 85 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
618 : {
619 : GMReaderTupleBuffer *tuple_buffer;
620 : HeapTuple tup;
621 :
622 : /*
623 : * If we're being asked to generate a tuple from the leader, then we just
624 : * call ExecProcNode as normal to produce one.
625 : */
626 85 : if (reader == 0)
627 : {
628 45 : if (gm_state->need_to_scan_locally)
629 : {
630 45 : PlanState *outerPlan = outerPlanState(gm_state);
631 : TupleTableSlot *outerTupleSlot;
632 :
633 45 : outerTupleSlot = ExecProcNode(outerPlan);
634 :
635 45 : if (!TupIsNull(outerTupleSlot))
636 : {
637 41 : gm_state->gm_slots[0] = outerTupleSlot;
638 41 : return true;
639 : }
640 : /* need_to_scan_locally serves as "done" flag for leader */
641 4 : gm_state->need_to_scan_locally = false;
642 : }
643 4 : return false;
644 : }
645 :
646 : /* Otherwise, check the state of the relevant tuple buffer. */
647 40 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
648 :
649 40 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
650 : {
651 : /* Return any tuple previously read that is still buffered. */
652 0 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
653 : }
654 40 : else if (tuple_buffer->done)
655 : {
656 : /* Reader is known to be exhausted. */
657 0 : return false;
658 : }
659 : else
660 : {
661 : /* Read and buffer next tuple. */
662 40 : tup = gm_readnext_tuple(gm_state,
663 : reader,
664 : nowait,
665 : &tuple_buffer->done);
666 40 : if (!HeapTupleIsValid(tup))
667 40 : return false;
668 0 : tup = heap_copytuple(tup);
669 :
670 : /*
671 : * Attempt to read more tuples in nowait mode and store them in the
672 : * pending-tuple array for the reader.
673 : */
674 0 : load_tuple_array(gm_state, reader);
675 : }
676 :
677 0 : Assert(HeapTupleIsValid(tup));
678 :
679 : /* Build the TupleTableSlot for the given tuple */
680 0 : ExecStoreTuple(tup, /* tuple to store */
681 0 : gm_state->gm_slots[reader], /* slot in which to store the
682 : * tuple */
683 : InvalidBuffer, /* no buffer associated with tuple */
684 : true); /* pfree tuple when done with it */
685 :
686 0 : return true;
687 : }
688 :
689 : /*
690 : * Attempt to read a tuple from given worker.
691 : */
692 : static HeapTuple
693 40 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
694 : bool *done)
695 : {
696 : TupleQueueReader *reader;
697 : HeapTuple tup;
698 : MemoryContext oldContext;
699 : MemoryContext tupleContext;
700 :
701 : /* Check for async events, particularly messages from workers. */
702 40 : CHECK_FOR_INTERRUPTS();
703 :
704 : /* Attempt to read a tuple. */
705 40 : reader = gm_state->reader[nreader - 1];
706 :
707 : /* Run TupleQueueReaders in per-tuple context */
708 40 : tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
709 40 : oldContext = MemoryContextSwitchTo(tupleContext);
710 40 : tup = TupleQueueReaderNext(reader, nowait, done);
711 40 : MemoryContextSwitchTo(oldContext);
712 :
713 40 : return tup;
714 : }
715 :
716 : /*
717 : * We have one slot for each item in the heap array. We use SlotNumber
718 : * to store slot indexes. This doesn't actually provide any formal
719 : * type-safety, but it makes the code more self-documenting.
720 : */
721 : typedef int32 SlotNumber;
722 :
723 : /*
724 : * Compare the tuples in the two given slots.
725 : */
726 : static int32
727 0 : heap_compare_slots(Datum a, Datum b, void *arg)
728 : {
729 0 : GatherMergeState *node = (GatherMergeState *) arg;
730 0 : SlotNumber slot1 = DatumGetInt32(a);
731 0 : SlotNumber slot2 = DatumGetInt32(b);
732 :
733 0 : TupleTableSlot *s1 = node->gm_slots[slot1];
734 0 : TupleTableSlot *s2 = node->gm_slots[slot2];
735 : int nkey;
736 :
737 0 : Assert(!TupIsNull(s1));
738 0 : Assert(!TupIsNull(s2));
739 :
740 0 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
741 : {
742 0 : SortSupport sortKey = node->gm_sortkeys + nkey;
743 0 : AttrNumber attno = sortKey->ssup_attno;
744 : Datum datum1,
745 : datum2;
746 : bool isNull1,
747 : isNull2;
748 : int compare;
749 :
750 0 : datum1 = slot_getattr(s1, attno, &isNull1);
751 0 : datum2 = slot_getattr(s2, attno, &isNull2);
752 :
753 0 : compare = ApplySortComparator(datum1, isNull1,
754 : datum2, isNull2,
755 : sortKey);
756 0 : if (compare != 0)
757 0 : return -compare;
758 : }
759 0 : return 0;
760 : }
|