LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 335 369 90.8 %
Date: 2017-09-29 15:12:54 Functions: 16 16 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execParallel.c
       4             :  *    Support routines for parallel execution.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * This file contains routines that are intended to support setting up,
      10             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11             :  * executor.  The ParallelContext machinery will handle starting the
      12             :  * workers and ensuring that their state generally matches that of the
      13             :  * leader; see src/backend/access/transam/README.parallel for details.
      14             :  * However, we must save and restore relevant executor state, such as
      15             :  * any ParamListInfo associated with the query, buffer usage info, and
      16             :  * the actual plan to be passed down to the worker.
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *    src/backend/executor/execParallel.c
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "executor/execParallel.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeBitmapHeapscan.h"
      29             : #include "executor/nodeCustom.h"
      30             : #include "executor/nodeForeignscan.h"
      31             : #include "executor/nodeIndexscan.h"
      32             : #include "executor/nodeIndexonlyscan.h"
      33             : #include "executor/nodeSeqscan.h"
      34             : #include "executor/nodeSort.h"
      35             : #include "executor/tqueue.h"
      36             : #include "nodes/nodeFuncs.h"
      37             : #include "optimizer/planmain.h"
      38             : #include "optimizer/planner.h"
      39             : #include "storage/spin.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/dsa.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/snapmgr.h"
      44             : #include "pgstat.h"
      45             : 
      46             : /*
      47             :  * Magic numbers for parallel executor communication.  We use constants
      48             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      49             :  * by individual parallel nodes to store their own state.
      50             :  */
      51             : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      52             : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      53             : #define PARALLEL_KEY_PARAMS             UINT64CONST(0xE000000000000003)
      54             : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      55             : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      56             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      57             : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      58             : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      59             : 
      60             : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      61             : 
      62             : /*
      63             :  * Fixed-size random stuff that we need to pass to parallel workers.
      64             :  */
      65             : typedef struct FixedParallelExecutorState
      66             : {
      67             :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      68             : } FixedParallelExecutorState;
      69             : 
      70             : /*
      71             :  * DSM structure for accumulating per-PlanState instrumentation.
      72             :  *
      73             :  * instrument_options: Same meaning here as in instrument.c.
      74             :  *
      75             :  * instrument_offset: Offset, relative to the start of this structure,
      76             :  * of the first Instrumentation object.  This will depend on the length of
      77             :  * the plan_node_id array.
      78             :  *
      79             :  * num_workers: Number of workers.
      80             :  *
      81             :  * num_plan_nodes: Number of plan nodes.
      82             :  *
      83             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      84             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      85             :  */
      86             : struct SharedExecutorInstrumentation
      87             : {
      88             :     int         instrument_options;
      89             :     int         instrument_offset;
      90             :     int         num_workers;
      91             :     int         num_plan_nodes;
      92             :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
      93             :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
      94             : };
      95             : #define GetInstrumentationArray(sei) \
      96             :     (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
      97             :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
      98             : 
      99             : /* Context object for ExecParallelEstimate. */
     100             : typedef struct ExecParallelEstimateContext
     101             : {
     102             :     ParallelContext *pcxt;
     103             :     int         nnodes;
     104             : } ExecParallelEstimateContext;
     105             : 
     106             : /* Context object for ExecParallelInitializeDSM. */
     107             : typedef struct ExecParallelInitializeDSMContext
     108             : {
     109             :     ParallelContext *pcxt;
     110             :     SharedExecutorInstrumentation *instrumentation;
     111             :     int         nnodes;
     112             : } ExecParallelInitializeDSMContext;
     113             : 
     114             : /* Helper functions that run in the parallel leader. */
     115             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     116             : static bool ExecParallelEstimate(PlanState *node,
     117             :                      ExecParallelEstimateContext *e);
     118             : static bool ExecParallelInitializeDSM(PlanState *node,
     119             :                           ExecParallelInitializeDSMContext *d);
     120             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     121             :                              bool reinitialize);
     122             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     123             :                             ParallelContext *pcxt);
     124             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     125             :                                     SharedExecutorInstrumentation *instrumentation);
     126             : 
     127             : /* Helper function that runs in the parallel worker. */
     128             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     129             : 
     130             : /*
     131             :  * Create a serialized representation of the plan to be sent to each worker.
     132             :  */
     133             : static char *
     134          17 : ExecSerializePlan(Plan *plan, EState *estate)
     135             : {
     136             :     PlannedStmt *pstmt;
     137             :     ListCell   *lc;
     138             : 
     139             :     /* We can't scribble on the original plan, so make a copy. */
     140          17 :     plan = copyObject(plan);
     141             : 
     142             :     /*
     143             :      * The worker will start its own copy of the executor, and that copy will
     144             :      * insert a junk filter if the toplevel node has any resjunk entries. We
     145             :      * don't want that to happen, because while resjunk columns shouldn't be
     146             :      * sent back to the user, here the tuples are coming back to another
     147             :      * backend which may very well need them.  So mutate the target list
     148             :      * accordingly.  This is sort of a hack; there might be better ways to do
     149             :      * this...
     150             :      */
     151          51 :     foreach(lc, plan->targetlist)
     152             :     {
     153          34 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     154             : 
     155          34 :         tle->resjunk = false;
     156             :     }
     157             : 
     158             :     /*
     159             :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     160             :      * for our purposes, but the worker will need at least a minimal
     161             :      * PlannedStmt to start the executor.
     162             :      */
     163          17 :     pstmt = makeNode(PlannedStmt);
     164          17 :     pstmt->commandType = CMD_SELECT;
     165          17 :     pstmt->queryId = 0;
     166          17 :     pstmt->hasReturning = false;
     167          17 :     pstmt->hasModifyingCTE = false;
     168          17 :     pstmt->canSetTag = true;
     169          17 :     pstmt->transientPlan = false;
     170          17 :     pstmt->dependsOnRole = false;
     171          17 :     pstmt->parallelModeNeeded = false;
     172          17 :     pstmt->planTree = plan;
     173          17 :     pstmt->rtable = estate->es_range_table;
     174          17 :     pstmt->resultRelations = NIL;
     175          17 :     pstmt->nonleafResultRelations = NIL;
     176             : 
     177             :     /*
     178             :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     179             :      * for unsafe ones (so that the list indexes of the safe ones are
     180             :      * preserved).  This positively ensures that the worker won't try to run,
     181             :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     182             :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     183             :      */
     184          17 :     pstmt->subplans = NIL;
     185          18 :     foreach(lc, estate->es_plannedstmt->subplans)
     186             :     {
     187           1 :         Plan       *subplan = (Plan *) lfirst(lc);
     188             : 
     189           1 :         if (subplan && !subplan->parallel_safe)
     190           0 :             subplan = NULL;
     191           1 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     192             :     }
     193             : 
     194          17 :     pstmt->rewindPlanIDs = NULL;
     195          17 :     pstmt->rowMarks = NIL;
     196          17 :     pstmt->relationOids = NIL;
     197          17 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     198          17 :     pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
     199          17 :     pstmt->utilityStmt = NULL;
     200          17 :     pstmt->stmt_location = -1;
     201          17 :     pstmt->stmt_len = -1;
     202             : 
     203             :     /* Return serialized copy of our dummy PlannedStmt. */
     204          17 :     return nodeToString(pstmt);
     205             : }
     206             : 
     207             : /*
     208             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     209             :  * which is shared across all parallel workers.  Before we size the DSM, give
     210             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     211             :  * &pcxt->estimator.
     212             :  *
     213             :  * While we're at it, count the number of PlanState nodes in the tree, so
     214             :  * we know how many SharedPlanStateInstrumentation structures we need.
     215             :  */
     216             : static bool
     217          46 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     218             : {
     219          46 :     if (planstate == NULL)
     220           0 :         return false;
     221             : 
     222             :     /* Count this node. */
     223          46 :     e->nnodes++;
     224             : 
     225          46 :     switch (nodeTag(planstate))
     226             :     {
     227             :         case T_SeqScanState:
     228          14 :             if (planstate->plan->parallel_aware)
     229          13 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     230             :                                     e->pcxt);
     231          14 :             break;
     232             :         case T_IndexScanState:
     233           3 :             if (planstate->plan->parallel_aware)
     234           2 :                 ExecIndexScanEstimate((IndexScanState *) planstate,
     235             :                                       e->pcxt);
     236           3 :             break;
     237             :         case T_IndexOnlyScanState:
     238           4 :             if (planstate->plan->parallel_aware)
     239           3 :                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     240             :                                           e->pcxt);
     241           4 :             break;
     242             :         case T_ForeignScanState:
     243           0 :             if (planstate->plan->parallel_aware)
     244           0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     245             :                                         e->pcxt);
     246           0 :             break;
     247             :         case T_CustomScanState:
     248           0 :             if (planstate->plan->parallel_aware)
     249           0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     250             :                                        e->pcxt);
     251           0 :             break;
     252             :         case T_BitmapHeapScanState:
     253           2 :             if (planstate->plan->parallel_aware)
     254           2 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     255             :                                        e->pcxt);
     256           2 :             break;
     257             :         case T_SortState:
     258             :             /* even when not parallel-aware */
     259           5 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     260           5 :             break;
     261             : 
     262             :         default:
     263          18 :             break;
     264             :     }
     265             : 
     266          46 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     267             : }
     268             : 
     269             : /*
     270             :  * Initialize the dynamic shared memory segment that will be used to control
     271             :  * parallel execution.
     272             :  */
     273             : static bool
     274          46 : ExecParallelInitializeDSM(PlanState *planstate,
     275             :                           ExecParallelInitializeDSMContext *d)
     276             : {
     277          46 :     if (planstate == NULL)
     278           0 :         return false;
     279             : 
     280             :     /* If instrumentation is enabled, initialize slot for this node. */
     281          46 :     if (d->instrumentation != NULL)
     282           2 :         d->instrumentation->plan_node_id[d->nnodes] =
     283           1 :             planstate->plan->plan_node_id;
     284             : 
     285             :     /* Count this node. */
     286          46 :     d->nnodes++;
     287             : 
     288             :     /*
     289             :      * Call initializers for DSM-using plan nodes.
     290             :      *
     291             :      * Most plan nodes won't do anything here, but plan nodes that allocated
     292             :      * DSM may need to initialize shared state in the DSM before parallel
     293             :      * workers are launched.  They can allocate the space they previously
     294             :      * estimated using shm_toc_allocate, and add the keys they previously
     295             :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     296             :      */
     297          46 :     switch (nodeTag(planstate))
     298             :     {
     299             :         case T_SeqScanState:
     300          14 :             if (planstate->plan->parallel_aware)
     301          13 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     302             :                                          d->pcxt);
     303          14 :             break;
     304             :         case T_IndexScanState:
     305           3 :             if (planstate->plan->parallel_aware)
     306           2 :                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
     307             :                                            d->pcxt);
     308           3 :             break;
     309             :         case T_IndexOnlyScanState:
     310           4 :             if (planstate->plan->parallel_aware)
     311           3 :                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     312             :                                                d->pcxt);
     313           4 :             break;
     314             :         case T_ForeignScanState:
     315           0 :             if (planstate->plan->parallel_aware)
     316           0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     317             :                                              d->pcxt);
     318           0 :             break;
     319             :         case T_CustomScanState:
     320           0 :             if (planstate->plan->parallel_aware)
     321           0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     322             :                                             d->pcxt);
     323           0 :             break;
     324             :         case T_BitmapHeapScanState:
     325           2 :             if (planstate->plan->parallel_aware)
     326           2 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     327             :                                             d->pcxt);
     328           2 :             break;
     329             :         case T_SortState:
     330             :             /* even when not parallel-aware */
     331           5 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     332           5 :             break;
     333             : 
     334             :         default:
     335          18 :             break;
     336             :     }
     337             : 
     338          46 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     339             : }
     340             : 
     341             : /*
     342             :  * It sets up the response queues for backend workers to return tuples
     343             :  * to the main backend and start the workers.
     344             :  */
     345             : static shm_mq_handle **
     346          32 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     347             : {
     348             :     shm_mq_handle **responseq;
     349             :     char       *tqueuespace;
     350             :     int         i;
     351             : 
     352             :     /* Skip this if no workers. */
     353          32 :     if (pcxt->nworkers == 0)
     354           0 :         return NULL;
     355             : 
     356             :     /* Allocate memory for shared memory queue handles. */
     357          32 :     responseq = (shm_mq_handle **)
     358          32 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     359             : 
     360             :     /*
     361             :      * If not reinitializing, allocate space from the DSM for the queues;
     362             :      * otherwise, find the already allocated space.
     363             :      */
     364          32 :     if (!reinitialize)
     365          17 :         tqueuespace =
     366          17 :             shm_toc_allocate(pcxt->toc,
     367             :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     368          17 :                                       pcxt->nworkers));
     369             :     else
     370          15 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     371             : 
     372             :     /* Create the queues, and become the receiver for each. */
     373         151 :     for (i = 0; i < pcxt->nworkers; ++i)
     374             :     {
     375             :         shm_mq     *mq;
     376             : 
     377         119 :         mq = shm_mq_create(tqueuespace +
     378         119 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     379             :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     380             : 
     381         119 :         shm_mq_set_receiver(mq, MyProc);
     382         119 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     383             :     }
     384             : 
     385             :     /* Add array of queues to shm_toc, so others can find it. */
     386          32 :     if (!reinitialize)
     387          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     388             : 
     389             :     /* Return array of handles. */
     390          32 :     return responseq;
     391             : }
     392             : 
     393             : /*
     394             :  * Sets up the required infrastructure for backend workers to perform
     395             :  * execution and return results to the main backend.
     396             :  */
     397             : ParallelExecutorInfo *
     398          17 : ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
     399             :                      int64 tuples_needed)
     400             : {
     401             :     ParallelExecutorInfo *pei;
     402             :     ParallelContext *pcxt;
     403             :     ExecParallelEstimateContext e;
     404             :     ExecParallelInitializeDSMContext d;
     405             :     FixedParallelExecutorState *fpes;
     406             :     char       *pstmt_data;
     407             :     char       *pstmt_space;
     408             :     char       *param_space;
     409             :     BufferUsage *bufusage_space;
     410          17 :     SharedExecutorInstrumentation *instrumentation = NULL;
     411             :     int         pstmt_len;
     412             :     int         param_len;
     413          17 :     int         instrumentation_len = 0;
     414          17 :     int         instrument_offset = 0;
     415          17 :     Size        dsa_minsize = dsa_minimum_size();
     416             :     char       *query_string;
     417             :     int         query_len;
     418             : 
     419             :     /* Allocate object for return value. */
     420          17 :     pei = palloc0(sizeof(ParallelExecutorInfo));
     421          17 :     pei->finished = false;
     422          17 :     pei->planstate = planstate;
     423             : 
     424             :     /* Fix up and serialize plan to be sent to workers. */
     425          17 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     426             : 
     427             :     /* Create a parallel context. */
     428          17 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     429          17 :     pei->pcxt = pcxt;
     430             : 
     431             :     /*
     432             :      * Before telling the parallel context to create a dynamic shared memory
     433             :      * segment, we need to figure out how big it should be.  Estimate space
     434             :      * for the various things we need to store.
     435             :      */
     436             : 
     437             :     /* Estimate space for fixed-size state. */
     438          17 :     shm_toc_estimate_chunk(&pcxt->estimator,
     439             :                            sizeof(FixedParallelExecutorState));
     440          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     441             : 
     442             :     /* Estimate space for query text. */
     443          17 :     query_len = strlen(estate->es_sourceText);
     444          17 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len);
     445          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     446             : 
     447             :     /* Estimate space for serialized PlannedStmt. */
     448          17 :     pstmt_len = strlen(pstmt_data) + 1;
     449          17 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     450          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     451             : 
     452             :     /* Estimate space for serialized ParamListInfo. */
     453          17 :     param_len = EstimateParamListSpace(estate->es_param_list_info);
     454          17 :     shm_toc_estimate_chunk(&pcxt->estimator, param_len);
     455          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     456             : 
     457             :     /*
     458             :      * Estimate space for BufferUsage.
     459             :      *
     460             :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     461             :      * we could skip this.  But we have no way of knowing whether anyone's
     462             :      * looking at pgBufferUsage, so do it unconditionally.
     463             :      */
     464          17 :     shm_toc_estimate_chunk(&pcxt->estimator,
     465             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     466          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     467             : 
     468             :     /* Estimate space for tuple queues. */
     469          17 :     shm_toc_estimate_chunk(&pcxt->estimator,
     470             :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     471          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     472             : 
     473             :     /*
     474             :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     475             :      * count of how many PlanState nodes there are.
     476             :      */
     477          17 :     e.pcxt = pcxt;
     478          17 :     e.nnodes = 0;
     479          17 :     ExecParallelEstimate(planstate, &e);
     480             : 
     481             :     /* Estimate space for instrumentation, if required. */
     482          17 :     if (estate->es_instrument)
     483             :     {
     484           1 :         instrumentation_len =
     485           1 :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     486           1 :             sizeof(int) * e.nnodes;
     487           1 :         instrumentation_len = MAXALIGN(instrumentation_len);
     488           1 :         instrument_offset = instrumentation_len;
     489           1 :         instrumentation_len +=
     490           2 :             mul_size(sizeof(Instrumentation),
     491           1 :                      mul_size(e.nnodes, nworkers));
     492           1 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     493           1 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     494             :     }
     495             : 
     496             :     /* Estimate space for DSA area. */
     497          17 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     498          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     499             : 
     500             :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     501          17 :     InitializeParallelDSM(pcxt);
     502             : 
     503             :     /*
     504             :      * OK, now we have a dynamic shared memory segment, and it should be big
     505             :      * enough to store all of the data we estimated we would want to put into
     506             :      * it, plus whatever general stuff (not specifically executor-related) the
     507             :      * ParallelContext itself needs to store there.  None of the space we
     508             :      * asked for has been allocated or initialized yet, though, so do that.
     509             :      */
     510             : 
     511             :     /* Store fixed-size state. */
     512          17 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     513          17 :     fpes->tuples_needed = tuples_needed;
     514          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     515             : 
     516             :     /* Store query string */
     517          17 :     query_string = shm_toc_allocate(pcxt->toc, query_len);
     518          17 :     memcpy(query_string, estate->es_sourceText, query_len);
     519          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     520             : 
     521             :     /* Store serialized PlannedStmt. */
     522          17 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     523          17 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     524          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     525             : 
     526             :     /* Store serialized ParamListInfo. */
     527          17 :     param_space = shm_toc_allocate(pcxt->toc, param_len);
     528          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
     529          17 :     SerializeParamList(estate->es_param_list_info, &param_space);
     530             : 
     531             :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     532          17 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     533          17 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     534          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     535          17 :     pei->buffer_usage = bufusage_space;
     536             : 
     537             :     /* Set up the tuple queues that the workers will write into. */
     538          17 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     539             : 
     540             :     /* We don't need the TupleQueueReaders yet, though. */
     541          17 :     pei->reader = NULL;
     542             : 
     543             :     /*
     544             :      * If instrumentation options were supplied, allocate space for the data.
     545             :      * It only gets partially initialized here; the rest happens during
     546             :      * ExecParallelInitializeDSM.
     547             :      */
     548          17 :     if (estate->es_instrument)
     549             :     {
     550             :         Instrumentation *instrument;
     551             :         int         i;
     552             : 
     553           1 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     554           1 :         instrumentation->instrument_options = estate->es_instrument;
     555           1 :         instrumentation->instrument_offset = instrument_offset;
     556           1 :         instrumentation->num_workers = nworkers;
     557           1 :         instrumentation->num_plan_nodes = e.nnodes;
     558           1 :         instrument = GetInstrumentationArray(instrumentation);
     559           5 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     560           4 :             InstrInit(&instrument[i], estate->es_instrument);
     561           1 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     562             :                        instrumentation);
     563           1 :         pei->instrumentation = instrumentation;
     564             :     }
     565             : 
     566             :     /*
     567             :      * Create a DSA area that can be used by the leader and all workers.
     568             :      * (However, if we failed to create a DSM and are using private memory
     569             :      * instead, then skip this.)
     570             :      */
     571          17 :     if (pcxt->seg != NULL)
     572             :     {
     573             :         char       *area_space;
     574             : 
     575          17 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     576          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     577          17 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     578             :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     579             :                                         pcxt->seg);
     580             :     }
     581             : 
     582             :     /*
     583             :      * Make the area available to executor nodes running in the leader.  See
     584             :      * also ParallelQueryMain which makes it available to workers.
     585             :      */
     586          17 :     estate->es_query_dsa = pei->area;
     587             : 
     588             :     /*
     589             :      * Give parallel-aware nodes a chance to initialize their shared data.
     590             :      * This also initializes the elements of instrumentation->ps_instrument,
     591             :      * if it exists.
     592             :      */
     593          17 :     d.pcxt = pcxt;
     594          17 :     d.instrumentation = instrumentation;
     595          17 :     d.nnodes = 0;
     596          17 :     ExecParallelInitializeDSM(planstate, &d);
     597             : 
     598             :     /*
     599             :      * Make sure that the world hasn't shifted under our feet.  This could
     600             :      * probably just be an Assert(), but let's be conservative for now.
     601             :      */
     602          17 :     if (e.nnodes != d.nnodes)
     603           0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     604             : 
     605             :     /* OK, we're ready to rock and roll. */
     606          17 :     return pei;
     607             : }
     608             : 
     609             : /*
     610             :  * Set up tuple queue readers to read the results of a parallel subplan.
     611             :  * All the workers are expected to return tuples matching tupDesc.
     612             :  *
     613             :  * This is separate from ExecInitParallelPlan() because we can launch the
     614             :  * worker processes and let them start doing something before we do this.
     615             :  */
     616             : void
     617          31 : ExecParallelCreateReaders(ParallelExecutorInfo *pei,
     618             :                           TupleDesc tupDesc)
     619             : {
     620          31 :     int         nworkers = pei->pcxt->nworkers_launched;
     621             :     int         i;
     622             : 
     623          31 :     Assert(pei->reader == NULL);
     624             : 
     625          31 :     if (nworkers > 0)
     626             :     {
     627          31 :         pei->reader = (TupleQueueReader **)
     628          31 :             palloc(nworkers * sizeof(TupleQueueReader *));
     629             : 
     630         146 :         for (i = 0; i < nworkers; i++)
     631             :         {
     632         115 :             shm_mq_set_handle(pei->tqueue[i],
     633         115 :                               pei->pcxt->worker[i].bgwhandle);
     634         115 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
     635             :                                                     tupDesc);
     636             :         }
     637             :     }
     638          31 : }
     639             : 
     640             : /*
     641             :  * Re-initialize the parallel executor shared memory state before launching
     642             :  * a fresh batch of workers.
     643             :  */
     644             : void
     645          15 : ExecParallelReinitialize(PlanState *planstate,
     646             :                          ParallelExecutorInfo *pei)
     647             : {
     648             :     /* Old workers must already be shut down */
     649          15 :     Assert(pei->finished);
     650             : 
     651          15 :     ReinitializeParallelDSM(pei->pcxt);
     652          15 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     653          15 :     pei->reader = NULL;
     654          15 :     pei->finished = false;
     655             : 
     656             :     /* Traverse plan tree and let each child node reset associated state. */
     657          15 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     658          15 : }
     659             : 
     660             : /*
     661             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     662             :  */
     663             : static bool
     664          32 : ExecParallelReInitializeDSM(PlanState *planstate,
     665             :                             ParallelContext *pcxt)
     666             : {
     667          32 :     if (planstate == NULL)
     668           0 :         return false;
     669             : 
     670             :     /*
     671             :      * Call reinitializers for DSM-using plan nodes.
     672             :      */
     673          32 :     switch (nodeTag(planstate))
     674             :     {
     675             :         case T_SeqScanState:
     676           2 :             if (planstate->plan->parallel_aware)
     677           2 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     678             :                                            pcxt);
     679           2 :             break;
     680             :         case T_IndexScanState:
     681           2 :             if (planstate->plan->parallel_aware)
     682           2 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     683             :                                              pcxt);
     684           2 :             break;
     685             :         case T_IndexOnlyScanState:
     686           2 :             if (planstate->plan->parallel_aware)
     687           2 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
     688             :                                                  pcxt);
     689           2 :             break;
     690             :         case T_ForeignScanState:
     691           0 :             if (planstate->plan->parallel_aware)
     692           0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
     693             :                                                pcxt);
     694           0 :             break;
     695             :         case T_CustomScanState:
     696           0 :             if (planstate->plan->parallel_aware)
     697           0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
     698             :                                               pcxt);
     699           0 :             break;
     700             :         case T_BitmapHeapScanState:
     701           9 :             if (planstate->plan->parallel_aware)
     702           9 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
     703             :                                               pcxt);
     704           9 :             break;
     705             :         case T_SortState:
     706             :             /* even when not parallel-aware */
     707           2 :             ExecSortReInitializeDSM((SortState *) planstate, pcxt);
     708           2 :             break;
     709             : 
     710             :         default:
     711          15 :             break;
     712             :     }
     713             : 
     714          32 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
     715             : }
     716             : 
     717             : /*
     718             :  * Copy instrumentation information about this node and its descendants from
     719             :  * dynamic shared memory.
     720             :  */
     721             : static bool
     722           1 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
     723             :                                     SharedExecutorInstrumentation *instrumentation)
     724             : {
     725             :     Instrumentation *instrument;
     726             :     int         i;
     727             :     int         n;
     728             :     int         ibytes;
     729           1 :     int         plan_node_id = planstate->plan->plan_node_id;
     730             :     MemoryContext oldcontext;
     731             : 
     732             :     /* Find the instrumentation for this node. */
     733           1 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
     734           1 :         if (instrumentation->plan_node_id[i] == plan_node_id)
     735           1 :             break;
     736           1 :     if (i >= instrumentation->num_plan_nodes)
     737           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
     738             : 
     739             :     /* Accumulate the statistics from all workers. */
     740           1 :     instrument = GetInstrumentationArray(instrumentation);
     741           1 :     instrument += i * instrumentation->num_workers;
     742           5 :     for (n = 0; n < instrumentation->num_workers; ++n)
     743           4 :         InstrAggNode(planstate->instrument, &instrument[n]);
     744             : 
     745             :     /*
     746             :      * Also store the per-worker detail.
     747             :      *
     748             :      * Worker instrumentation should be allocated in the same context as the
     749             :      * regular instrumentation information, which is the per-query context.
     750             :      * Switch into per-query memory context.
     751             :      */
     752           1 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
     753           1 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
     754           1 :     planstate->worker_instrument =
     755           1 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
     756           1 :     MemoryContextSwitchTo(oldcontext);
     757             : 
     758           1 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
     759           1 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
     760             : 
     761             :     /*
     762             :      * Perform any node-type-specific work that needs to be done.  Currently,
     763             :      * only Sort nodes need to do anything here.
     764             :      */
     765           1 :     if (IsA(planstate, SortState))
     766           0 :         ExecSortRetrieveInstrumentation((SortState *) planstate);
     767             : 
     768           1 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
     769             :                                  instrumentation);
     770             : }
     771             : 
     772             : /*
     773             :  * Finish parallel execution.  We wait for parallel workers to finish, and
     774             :  * accumulate their buffer usage and instrumentation.
     775             :  */
     776             : void
     777          31 : ExecParallelFinish(ParallelExecutorInfo *pei)
     778             : {
     779          31 :     int         nworkers = pei->pcxt->nworkers_launched;
     780             :     int         i;
     781             : 
     782             :     /* Make this be a no-op if called twice in a row. */
     783          31 :     if (pei->finished)
     784          31 :         return;
     785             : 
     786             :     /*
     787             :      * Detach from tuple queues ASAP, so that any still-active workers will
     788             :      * notice that no further results are wanted.
     789             :      */
     790          31 :     if (pei->tqueue != NULL)
     791             :     {
     792         145 :         for (i = 0; i < nworkers; i++)
     793         114 :             shm_mq_detach(pei->tqueue[i]);
     794          31 :         pfree(pei->tqueue);
     795          31 :         pei->tqueue = NULL;
     796             :     }
     797             : 
     798             :     /*
     799             :      * While we're waiting for the workers to finish, let's get rid of the
     800             :      * tuple queue readers.  (Any other local cleanup could be done here too.)
     801             :      */
     802          31 :     if (pei->reader != NULL)
     803             :     {
     804         144 :         for (i = 0; i < nworkers; i++)
     805         114 :             DestroyTupleQueueReader(pei->reader[i]);
     806          30 :         pfree(pei->reader);
     807          30 :         pei->reader = NULL;
     808             :     }
     809             : 
     810             :     /* Now wait for the workers to finish. */
     811          31 :     WaitForParallelWorkersToFinish(pei->pcxt);
     812             : 
     813             :     /*
     814             :      * Next, accumulate buffer usage.  (This must wait for the workers to
     815             :      * finish, or we might get incomplete data.)
     816             :      */
     817         145 :     for (i = 0; i < nworkers; i++)
     818         114 :         InstrAccumParallelQuery(&pei->buffer_usage[i]);
     819             : 
     820             :     /* Finally, accumulate instrumentation, if any. */
     821          31 :     if (pei->instrumentation)
     822           1 :         ExecParallelRetrieveInstrumentation(pei->planstate,
     823             :                                             pei->instrumentation);
     824             : 
     825          31 :     pei->finished = true;
     826             : }
     827             : 
     828             : /*
     829             :  * Clean up whatever ParallelExecutorInfo resources still exist after
     830             :  * ExecParallelFinish.  We separate these routines because someone might
     831             :  * want to examine the contents of the DSM after ExecParallelFinish and
     832             :  * before calling this routine.
     833             :  */
     834             : void
     835          16 : ExecParallelCleanup(ParallelExecutorInfo *pei)
     836             : {
     837          16 :     if (pei->area != NULL)
     838             :     {
     839          16 :         dsa_detach(pei->area);
     840          16 :         pei->area = NULL;
     841             :     }
     842          16 :     if (pei->pcxt != NULL)
     843             :     {
     844          16 :         DestroyParallelContext(pei->pcxt);
     845          16 :         pei->pcxt = NULL;
     846             :     }
     847          16 :     pfree(pei);
     848          16 : }
     849             : 
     850             : /*
     851             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
     852             :  * for that purpose.
     853             :  */
     854             : static DestReceiver *
     855         115 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
     856             : {
     857             :     char       *mqspace;
     858             :     shm_mq     *mq;
     859             : 
     860         115 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     861         115 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
     862         115 :     mq = (shm_mq *) mqspace;
     863         115 :     shm_mq_set_sender(mq, MyProc);
     864         115 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
     865             : }
     866             : 
     867             : /*
     868             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
     869             :  */
     870             : static QueryDesc *
     871         115 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
     872             :                          int instrument_options)
     873             : {
     874             :     char       *pstmtspace;
     875             :     char       *paramspace;
     876             :     PlannedStmt *pstmt;
     877             :     ParamListInfo paramLI;
     878             :     char       *queryString;
     879             : 
     880             :     /* Get the query string from shared memory */
     881         115 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
     882             : 
     883             :     /* Reconstruct leader-supplied PlannedStmt. */
     884         115 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
     885         115 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
     886             : 
     887             :     /* Reconstruct ParamListInfo. */
     888         115 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
     889         115 :     paramLI = RestoreParamList(&paramspace);
     890             : 
     891             :     /*
     892             :      * Create a QueryDesc for the query.
     893             :      *
     894             :      * It's not obvious how to obtain the query string from here; and even if
     895             :      * we could copying it would take more cycles than not copying it. But
     896             :      * it's a bit unsatisfying to just use a dummy string here, so consider
     897             :      * revising this someday.
     898             :      */
     899         115 :     return CreateQueryDesc(pstmt,
     900             :                            queryString,
     901             :                            GetActiveSnapshot(), InvalidSnapshot,
     902             :                            receiver, paramLI, NULL, instrument_options);
     903             : }
     904             : 
     905             : /*
     906             :  * Copy instrumentation information from this node and its descendants into
     907             :  * dynamic shared memory, so that the parallel leader can retrieve it.
     908             :  */
     909             : static bool
     910           4 : ExecParallelReportInstrumentation(PlanState *planstate,
     911             :                                   SharedExecutorInstrumentation *instrumentation)
     912             : {
     913             :     int         i;
     914           4 :     int         plan_node_id = planstate->plan->plan_node_id;
     915             :     Instrumentation *instrument;
     916             : 
     917           4 :     InstrEndLoop(planstate->instrument);
     918             : 
     919             :     /*
     920             :      * If we shuffled the plan_node_id values in ps_instrument into sorted
     921             :      * order, we could use binary search here.  This might matter someday if
     922             :      * we're pushing down sufficiently large plan trees.  For now, do it the
     923             :      * slow, dumb way.
     924             :      */
     925           4 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
     926           4 :         if (instrumentation->plan_node_id[i] == plan_node_id)
     927           4 :             break;
     928           4 :     if (i >= instrumentation->num_plan_nodes)
     929           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
     930             : 
     931             :     /*
     932             :      * Add our statistics to the per-node, per-worker totals.  It's possible
     933             :      * that this could happen more than once if we relaunched workers.
     934             :      */
     935           4 :     instrument = GetInstrumentationArray(instrumentation);
     936           4 :     instrument += i * instrumentation->num_workers;
     937           4 :     Assert(IsParallelWorker());
     938           4 :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
     939           4 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
     940             : 
     941           4 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
     942             :                                  instrumentation);
     943             : }
     944             : 
     945             : /*
     946             :  * Initialize the PlanState and its descendants with the information
     947             :  * retrieved from shared memory.  This has to be done once the PlanState
     948             :  * is allocated and initialized by executor; that is, after ExecutorStart().
     949             :  */
     950             : static bool
     951         265 : ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
     952             : {
     953         265 :     if (planstate == NULL)
     954           0 :         return false;
     955             : 
     956         265 :     switch (nodeTag(planstate))
     957             :     {
     958             :         case T_SeqScanState:
     959          42 :             if (planstate->plan->parallel_aware)
     960          38 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
     961          42 :             break;
     962             :         case T_IndexScanState:
     963          17 :             if (planstate->plan->parallel_aware)
     964          16 :                 ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
     965          17 :             break;
     966             :         case T_IndexOnlyScanState:
     967          24 :             if (planstate->plan->parallel_aware)
     968          20 :                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
     969          24 :             break;
     970             :         case T_ForeignScanState:
     971           0 :             if (planstate->plan->parallel_aware)
     972           0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
     973             :                                                 toc);
     974           0 :             break;
     975             :         case T_CustomScanState:
     976           0 :             if (planstate->plan->parallel_aware)
     977           0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
     978             :                                                toc);
     979           0 :             break;
     980             :         case T_BitmapHeapScanState:
     981          44 :             if (planstate->plan->parallel_aware)
     982          44 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
     983          44 :             break;
     984             :         case T_SortState:
     985             :             /* even when not parallel-aware */
     986          21 :             ExecSortInitializeWorker((SortState *) planstate, toc);
     987          21 :             break;
     988             : 
     989             :         default:
     990         117 :             break;
     991             :     }
     992             : 
     993         265 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
     994             : }
     995             : 
     996             : /*
     997             :  * Main entrypoint for parallel query worker processes.
     998             :  *
     999             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1000             :  * create a sensible parallel environment has already been done;
    1001             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1002             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1003             :  * here.
    1004             :  *
    1005             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1006             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1007             :  * to execute that plan and write the resulting tuples to the appropriate
    1008             :  * tuple queue.  Various bits of supporting information that we need in order
    1009             :  * to do this are also stored in the dsm_segment and can be accessed through
    1010             :  * the shm_toc.
    1011             :  */
    1012             : void
    1013         115 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1014             : {
    1015             :     FixedParallelExecutorState *fpes;
    1016             :     BufferUsage *buffer_usage;
    1017             :     DestReceiver *receiver;
    1018             :     QueryDesc  *queryDesc;
    1019             :     SharedExecutorInstrumentation *instrumentation;
    1020         115 :     int         instrument_options = 0;
    1021             :     void       *area_space;
    1022             :     dsa_area   *area;
    1023             : 
    1024             :     /* Get fixed-size state. */
    1025         115 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1026             : 
    1027             :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1028         115 :     receiver = ExecParallelGetReceiver(seg, toc);
    1029         115 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1030         115 :     if (instrumentation != NULL)
    1031           4 :         instrument_options = instrumentation->instrument_options;
    1032         115 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1033             : 
    1034             :     /* Setting debug_query_string for individual workers */
    1035         115 :     debug_query_string = queryDesc->sourceText;
    1036             : 
    1037             :     /* Report workers' query for monitoring purposes */
    1038         115 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1039             : 
    1040             :     /* Prepare to track buffer usage during query execution. */
    1041         115 :     InstrStartParallelQuery();
    1042             : 
    1043             :     /* Attach to the dynamic shared memory area. */
    1044         115 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1045         115 :     area = dsa_attach_in_place(area_space, seg);
    1046             : 
    1047             :     /* Start up the executor */
    1048         115 :     ExecutorStart(queryDesc, 0);
    1049             : 
    1050             :     /* Special executor initialization steps for parallel workers */
    1051         115 :     queryDesc->planstate->state->es_query_dsa = area;
    1052         115 :     ExecParallelInitializeWorker(queryDesc->planstate, toc);
    1053             : 
    1054             :     /* Pass down any tuple bound */
    1055         115 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1056             : 
    1057             :     /*
    1058             :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1059             :      * more tuples than that.
    1060             :      */
    1061         115 :     ExecutorRun(queryDesc,
    1062             :                 ForwardScanDirection,
    1063         115 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
    1064             :                 true);
    1065             : 
    1066             :     /* Shut down the executor */
    1067         114 :     ExecutorFinish(queryDesc);
    1068             : 
    1069             :     /* Report buffer usage during parallel execution. */
    1070         114 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1071         114 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
    1072             : 
    1073             :     /* Report instrumentation data if any instrumentation options are set. */
    1074         114 :     if (instrumentation != NULL)
    1075           4 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1076             :                                           instrumentation);
    1077             : 
    1078             :     /* Must do this after capturing instrumentation. */
    1079         114 :     ExecutorEnd(queryDesc);
    1080             : 
    1081             :     /* Cleanup. */
    1082         114 :     dsa_detach(area);
    1083         114 :     FreeQueryDesc(queryDesc);
    1084         114 :     (*receiver->rDestroy) (receiver);
    1085         114 : }

Generated by: LCOV version 1.11