LCOV - code coverage report
Current view: top level - src/backend/executor - nodeGather.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 133 136 97.8 %
Date: 2017-09-29 15:12:54 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeGather.c
       4             :  *    Support routines for scanning a plan via multiple workers.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * A Gather executor launches parallel workers to run multiple copies of a
      10             :  * plan.  It can also run the plan itself, if the workers are not available
      11             :  * or have not started up yet.  It then merges all of the results it produces
      12             :  * and the results from the workers into a single output stream.  Therefore,
      13             :  * it will normally be used with a plan where running multiple copies of the
      14             :  * same plan does not produce duplicate output, such as parallel-aware
      15             :  * SeqScan.
      16             :  *
      17             :  * Alternatively, a Gather node can be configured to use just one worker
      18             :  * and the single-copy flag can be set.  In this case, the Gather node will
      19             :  * run the plan in one worker and will not execute the plan itself.  In
      20             :  * this case, it simply returns whatever tuples were returned by the worker.
      21             :  * If a worker cannot be obtained, then it will run the plan itself and
      22             :  * return the results.  Therefore, a plan used with a single-copy Gather
      23             :  * node need not be parallel-aware.
      24             :  *
      25             :  * IDENTIFICATION
      26             :  *    src/backend/executor/nodeGather.c
      27             :  *
      28             :  *-------------------------------------------------------------------------
      29             :  */
      30             : 
      31             : #include "postgres.h"
      32             : 
      33             : #include "access/relscan.h"
      34             : #include "access/xact.h"
      35             : #include "executor/execdebug.h"
      36             : #include "executor/execParallel.h"
      37             : #include "executor/nodeGather.h"
      38             : #include "executor/nodeSubplan.h"
      39             : #include "executor/tqueue.h"
      40             : #include "miscadmin.h"
      41             : #include "pgstat.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/rel.h"
      44             : 
      45             : 
      46             : static TupleTableSlot *ExecGather(PlanState *pstate);
      47             : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
      48             : static HeapTuple gather_readnext(GatherState *gatherstate);
      49             : static void ExecShutdownGatherWorkers(GatherState *node);
      50             : 
      51             : 
      52             : /* ----------------------------------------------------------------
      53             :  *      ExecInitGather
      54             :  * ----------------------------------------------------------------
      55             :  */
      56             : GatherState *
      57          26 : ExecInitGather(Gather *node, EState *estate, int eflags)
      58             : {
      59             :     GatherState *gatherstate;
      60             :     Plan       *outerNode;
      61             :     bool        hasoid;
      62             :     TupleDesc   tupDesc;
      63             : 
      64             :     /* Gather node doesn't have innerPlan node. */
      65          26 :     Assert(innerPlan(node) == NULL);
      66             : 
      67             :     /*
      68             :      * create state structure
      69             :      */
      70          26 :     gatherstate = makeNode(GatherState);
      71          26 :     gatherstate->ps.plan = (Plan *) node;
      72          26 :     gatherstate->ps.state = estate;
      73          26 :     gatherstate->ps.ExecProcNode = ExecGather;
      74             : 
      75          26 :     gatherstate->initialized = false;
      76          26 :     gatherstate->need_to_scan_locally = !node->single_copy;
      77          26 :     gatherstate->tuples_needed = -1;
      78             : 
      79             :     /*
      80             :      * Miscellaneous initialization
      81             :      *
      82             :      * create expression context for node
      83             :      */
      84          26 :     ExecAssignExprContext(estate, &gatherstate->ps);
      85             : 
      86             :     /*
      87             :      * Gather doesn't support checking a qual (it's always more efficient to
      88             :      * do it in the child node).
      89             :      */
      90          26 :     Assert(!node->plan.qual);
      91             : 
      92             :     /*
      93             :      * tuple table initialization
      94             :      */
      95          26 :     gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
      96          26 :     ExecInitResultTupleSlot(estate, &gatherstate->ps);
      97             : 
      98             :     /*
      99             :      * now initialize outer plan
     100             :      */
     101          26 :     outerNode = outerPlan(node);
     102          26 :     outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
     103             : 
     104             :     /*
     105             :      * Initialize result tuple type and projection info.
     106             :      */
     107          26 :     ExecAssignResultTypeFromTL(&gatherstate->ps);
     108          26 :     ExecAssignProjectionInfo(&gatherstate->ps, NULL);
     109             : 
     110             :     /*
     111             :      * Initialize funnel slot to same tuple descriptor as outer plan.
     112             :      */
     113          26 :     if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
     114          26 :         hasoid = false;
     115          26 :     tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
     116          26 :     ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
     117             : 
     118          26 :     return gatherstate;
     119             : }
     120             : 
     121             : /* ----------------------------------------------------------------
     122             :  *      ExecGather(node)
     123             :  *
     124             :  *      Scans the relation via multiple workers and returns
     125             :  *      the next qualifying tuple.
     126             :  * ----------------------------------------------------------------
     127             :  */
     128             : static TupleTableSlot *
     129      108089 : ExecGather(PlanState *pstate)
     130             : {
     131      108089 :     GatherState *node = castNode(GatherState, pstate);
     132      108089 :     TupleTableSlot *fslot = node->funnel_slot;
     133             :     TupleTableSlot *slot;
     134             :     ExprContext *econtext;
     135             : 
     136      108089 :     CHECK_FOR_INTERRUPTS();
     137             : 
     138             :     /*
     139             :      * Initialize the parallel context and workers on first execution. We do
     140             :      * this on first execution rather than during node initialization, as it
     141             :      * needs to allocate a large dynamic segment, so it is better to do it
     142             :      * only if it is really needed.
     143             :      */
     144      108089 :     if (!node->initialized)
     145             :     {
     146          26 :         EState     *estate = node->ps.state;
     147          26 :         Gather     *gather = (Gather *) node->ps.plan;
     148             : 
     149             :         /*
     150             :          * Sometimes we might have to run without parallelism; but if parallel
     151             :          * mode is active then we can try to fire up some workers.
     152             :          */
     153          26 :         if (gather->num_workers > 0 && IsInParallelMode())
     154             :         {
     155             :             ParallelContext *pcxt;
     156             : 
     157             :             /* Initialize, or re-initialize, shared state needed by workers. */
     158          26 :             if (!node->pei)
     159          13 :                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
     160             :                                                  estate,
     161             :                                                  gather->num_workers,
     162             :                                                  node->tuples_needed);
     163             :             else
     164          13 :                 ExecParallelReinitialize(node->ps.lefttree,
     165          13 :                                          node->pei);
     166             : 
     167             :             /*
     168             :              * Register backend workers. We might not get as many as we
     169             :              * requested, or indeed any at all.
     170             :              */
     171          26 :             pcxt = node->pei->pcxt;
     172          26 :             LaunchParallelWorkers(pcxt);
     173             :             /* We save # workers launched for the benefit of EXPLAIN */
     174          26 :             node->nworkers_launched = pcxt->nworkers_launched;
     175             : 
     176             :             /* Set up tuple queue readers to read the results. */
     177          26 :             if (pcxt->nworkers_launched > 0)
     178             :             {
     179          26 :                 ExecParallelCreateReaders(node->pei,
     180             :                                           fslot->tts_tupleDescriptor);
     181             :                 /* Make a working array showing the active readers */
     182          26 :                 node->nreaders = pcxt->nworkers_launched;
     183          26 :                 node->reader = (TupleQueueReader **)
     184          26 :                     palloc(node->nreaders * sizeof(TupleQueueReader *));
     185          26 :                 memcpy(node->reader, node->pei->reader,
     186          26 :                        node->nreaders * sizeof(TupleQueueReader *));
     187             :             }
     188             :             else
     189             :             {
     190             :                 /* No workers?  Then never mind. */
     191           0 :                 node->nreaders = 0;
     192           0 :                 node->reader = NULL;
     193             :             }
     194          26 :             node->nextreader = 0;
     195             :         }
     196             : 
     197             :         /* Run plan locally if no workers or not single-copy. */
     198          52 :         node->need_to_scan_locally = (node->nreaders == 0)
     199          26 :             || !gather->single_copy;
     200          26 :         node->initialized = true;
     201             :     }
     202             : 
     203             :     /*
     204             :      * Reset per-tuple memory context to free any expression evaluation
     205             :      * storage allocated in the previous tuple cycle.  This will also clear
     206             :      * any previous tuple returned by a TupleQueueReader; to make sure we
     207             :      * don't leave a dangling pointer around, clear the working slot first.
     208             :      */
     209      108089 :     ExecClearTuple(fslot);
     210      108089 :     econtext = node->ps.ps_ExprContext;
     211      108089 :     ResetExprContext(econtext);
     212             : 
     213             :     /*
     214             :      * Get next tuple, either from one of our workers, or by running the plan
     215             :      * ourselves.
     216             :      */
     217      108089 :     slot = gather_getnext(node);
     218      108088 :     if (TupIsNull(slot))
     219          25 :         return NULL;
     220             : 
     221             :     /*
     222             :      * Form the result tuple using ExecProject(), and return it.
     223             :      */
     224      108063 :     econtext->ecxt_outertuple = slot;
     225      108063 :     return ExecProject(node->ps.ps_ProjInfo);
     226             : }
     227             : 
     228             : /* ----------------------------------------------------------------
     229             :  *      ExecEndGather
     230             :  *
     231             :  *      frees any storage allocated through C routines.
     232             :  * ----------------------------------------------------------------
     233             :  */
     234             : void
     235          25 : ExecEndGather(GatherState *node)
     236             : {
     237          25 :     ExecEndNode(outerPlanState(node));  /* let children clean up first */
     238          25 :     ExecShutdownGather(node);
     239          25 :     ExecFreeExprContext(&node->ps);
     240          25 :     ExecClearTuple(node->ps.ps_ResultTupleSlot);
     241          25 : }
     242             : 
     243             : /*
     244             :  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
     245             :  * using gather_readnext, or if no tuple queue contains a tuple and the
     246             :  * single_copy flag is not set, we might generate one locally instead.
     247             :  */
     248             : static TupleTableSlot *
     249      108089 : gather_getnext(GatherState *gatherstate)
     250             : {
     251      108089 :     PlanState  *outerPlan = outerPlanState(gatherstate);
     252             :     TupleTableSlot *outerTupleSlot;
     253      108089 :     TupleTableSlot *fslot = gatherstate->funnel_slot;
     254      108089 :     MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
     255             :     HeapTuple   tup;
     256             : 
     257      216227 :     while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
     258             :     {
     259      108113 :         CHECK_FOR_INTERRUPTS();
     260             : 
     261      108113 :         if (gatherstate->nreaders > 0)
     262             :         {
     263             :             MemoryContext oldContext;
     264             : 
     265             :             /* Run TupleQueueReaders in per-tuple context */
     266      108113 :             oldContext = MemoryContextSwitchTo(tupleContext);
     267      108113 :             tup = gather_readnext(gatherstate);
     268      108112 :             MemoryContextSwitchTo(oldContext);
     269             : 
     270      108112 :             if (HeapTupleIsValid(tup))
     271             :             {
     272          50 :                 ExecStoreTuple(tup, /* tuple to store */
     273             :                                fslot,   /* slot in which to store the tuple */
     274             :                                InvalidBuffer,   /* buffer associated with this
     275             :                                                  * tuple */
     276             :                                false);  /* slot should not pfree tuple */
     277          50 :                 return fslot;
     278             :             }
     279             :         }
     280             : 
     281      108062 :         if (gatherstate->need_to_scan_locally)
     282             :         {
     283      108037 :             outerTupleSlot = ExecProcNode(outerPlan);
     284             : 
     285      108037 :             if (!TupIsNull(outerTupleSlot))
     286      108013 :                 return outerTupleSlot;
     287             : 
     288          24 :             gatherstate->need_to_scan_locally = false;
     289             :         }
     290             :     }
     291             : 
     292          25 :     return ExecClearTuple(fslot);
     293             : }
     294             : 
     295             : /*
     296             :  * Attempt to read a tuple from one of our parallel workers.
     297             :  */
     298             : static HeapTuple
     299      108113 : gather_readnext(GatherState *gatherstate)
     300             : {
     301      108113 :     int         nvisited = 0;
     302             : 
     303             :     for (;;)
     304             :     {
     305             :         TupleQueueReader *reader;
     306             :         HeapTuple   tup;
     307             :         bool        readerdone;
     308             : 
     309             :         /* Check for async events, particularly messages from workers. */
     310      433883 :         CHECK_FOR_INTERRUPTS();
     311             : 
     312             :         /* Attempt to read a tuple, but don't block if none is available. */
     313      433882 :         Assert(gatherstate->nextreader < gatherstate->nreaders);
     314      433882 :         reader = gatherstate->reader[gatherstate->nextreader];
     315      433882 :         tup = TupleQueueReaderNext(reader, true, &readerdone);
     316             : 
     317             :         /*
     318             :          * If this reader is done, remove it from our working array of active
     319             :          * readers.  If all readers are done, we're outta here.
     320             :          */
     321      433882 :         if (readerdone)
     322             :         {
     323          94 :             Assert(!tup);
     324          94 :             --gatherstate->nreaders;
     325          94 :             if (gatherstate->nreaders == 0)
     326      108137 :                 return NULL;
     327         207 :             memmove(&gatherstate->reader[gatherstate->nextreader],
     328         138 :                     &gatherstate->reader[gatherstate->nextreader + 1],
     329             :                     sizeof(TupleQueueReader *)
     330          69 :                     * (gatherstate->nreaders - gatherstate->nextreader));
     331          69 :             if (gatherstate->nextreader >= gatherstate->nreaders)
     332          69 :                 gatherstate->nextreader = 0;
     333          69 :             continue;
     334             :         }
     335             : 
     336             :         /* If we got a tuple, return it. */
     337      433788 :         if (tup)
     338          50 :             return tup;
     339             : 
     340             :         /*
     341             :          * Advance nextreader pointer in round-robin fashion.  Note that we
     342             :          * only reach this code if we weren't able to get a tuple from the
     343             :          * current worker.  We used to advance the nextreader pointer after
     344             :          * every tuple, but it turns out to be much more efficient to keep
     345             :          * reading from the same queue until that would require blocking.
     346             :          */
     347      433738 :         gatherstate->nextreader++;
     348      433738 :         if (gatherstate->nextreader >= gatherstate->nreaders)
     349      108679 :             gatherstate->nextreader = 0;
     350             : 
     351             :         /* Have we visited every (surviving) TupleQueueReader? */
     352      433738 :         nvisited++;
     353      433738 :         if (nvisited >= gatherstate->nreaders)
     354             :         {
     355             :             /*
     356             :              * If (still) running plan locally, return NULL so caller can
     357             :              * generate another tuple from the local copy of the plan.
     358             :              */
     359      108703 :             if (gatherstate->need_to_scan_locally)
     360      108037 :                 return NULL;
     361             : 
     362             :             /* Nothing to do except wait for developments. */
     363         666 :             WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
     364         666 :             ResetLatch(MyLatch);
     365         666 :             nvisited = 0;
     366             :         }
     367      325770 :     }
     368             : }
     369             : 
     370             : /* ----------------------------------------------------------------
     371             :  *      ExecShutdownGatherWorkers
     372             :  *
     373             :  *      Stop all the parallel workers.
     374             :  * ----------------------------------------------------------------
     375             :  */
     376             : static void
     377          53 : ExecShutdownGatherWorkers(GatherState *node)
     378             : {
     379          53 :     if (node->pei != NULL)
     380          25 :         ExecParallelFinish(node->pei);
     381             : 
     382             :     /* Flush local copy of reader array */
     383          53 :     if (node->reader)
     384          25 :         pfree(node->reader);
     385          53 :     node->reader = NULL;
     386          53 : }
     387             : 
     388             : /* ----------------------------------------------------------------
     389             :  *      ExecShutdownGather
     390             :  *
     391             :  *      Destroy the setup for parallel workers including parallel context.
     392             :  * ----------------------------------------------------------------
     393             :  */
     394             : void
     395          37 : ExecShutdownGather(GatherState *node)
     396             : {
     397          37 :     ExecShutdownGatherWorkers(node);
     398             : 
     399             :     /* Now destroy the parallel context. */
     400          37 :     if (node->pei != NULL)
     401             :     {
     402          12 :         ExecParallelCleanup(node->pei);
     403          12 :         node->pei = NULL;
     404             :     }
     405          37 : }
     406             : 
     407             : /* ----------------------------------------------------------------
     408             :  *                      Join Support
     409             :  * ----------------------------------------------------------------
     410             :  */
     411             : 
     412             : /* ----------------------------------------------------------------
     413             :  *      ExecReScanGather
     414             :  *
     415             :  *      Prepare to re-scan the result of a Gather.
     416             :  * ----------------------------------------------------------------
     417             :  */
     418             : void
     419          16 : ExecReScanGather(GatherState *node)
     420             : {
     421          16 :     Gather     *gather = (Gather *) node->ps.plan;
     422          16 :     PlanState  *outerPlan = outerPlanState(node);
     423             : 
     424             :     /* Make sure any existing workers are gracefully shut down */
     425          16 :     ExecShutdownGatherWorkers(node);
     426             : 
     427             :     /* Mark node so that shared state will be rebuilt at next call */
     428          16 :     node->initialized = false;
     429             : 
     430             :     /*
     431             :      * Set child node's chgParam to tell it that the next scan might deliver a
     432             :      * different set of rows within the leader process.  (The overall rowset
     433             :      * shouldn't change, but the leader process's subset might; hence nodes
     434             :      * between here and the parallel table scan node mustn't optimize on the
     435             :      * assumption of an unchanging rowset.)
     436             :      */
     437          16 :     if (gather->rescan_param >= 0)
     438          16 :         outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
     439             :                                              gather->rescan_param);
     440             : 
     441             :     /*
     442             :      * If chgParam of subnode is not null then plan will be re-scanned by
     443             :      * first ExecProcNode.  Note: because this does nothing if we have a
     444             :      * rescan_param, it's currently guaranteed that parallel-aware child nodes
     445             :      * will not see a ReScan call until after they get a ReInitializeDSM call.
     446             :      * That ordering might not be something to rely on, though.  A good rule
     447             :      * of thumb is that ReInitializeDSM should reset only shared state, ReScan
     448             :      * should reset only local state, and anything that depends on both of
     449             :      * those steps being finished must wait until the first ExecProcNode call.
     450             :      */
     451          16 :     if (outerPlan->chgParam == NULL)
     452           0 :         ExecReScan(outerPlan);
     453          16 : }

Generated by: LCOV version 1.11