LCOV - code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 324 357 90.8 %
Date: 2017-09-29 15:12:54 Functions: 16 16 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * parallel.c
       4             :  *    Infrastructure for launching parallel workers
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/access/transam/parallel.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/parallel.h"
      18             : #include "access/xact.h"
      19             : #include "access/xlog.h"
      20             : #include "catalog/namespace.h"
      21             : #include "commands/async.h"
      22             : #include "executor/execParallel.h"
      23             : #include "libpq/libpq.h"
      24             : #include "libpq/pqformat.h"
      25             : #include "libpq/pqmq.h"
      26             : #include "miscadmin.h"
      27             : #include "optimizer/planmain.h"
      28             : #include "pgstat.h"
      29             : #include "storage/ipc.h"
      30             : #include "storage/sinval.h"
      31             : #include "storage/spin.h"
      32             : #include "tcop/tcopprot.h"
      33             : #include "utils/combocid.h"
      34             : #include "utils/guc.h"
      35             : #include "utils/inval.h"
      36             : #include "utils/memutils.h"
      37             : #include "utils/resowner.h"
      38             : #include "utils/snapmgr.h"
      39             : 
      40             : 
      41             : /*
      42             :  * We don't want to waste a lot of memory on an error queue which, most of
      43             :  * the time, will process only a handful of small messages.  However, it is
      44             :  * desirable to make it large enough that a typical ErrorResponse can be sent
      45             :  * without blocking.  That way, a worker that errors out can write the whole
      46             :  * message into the queue and terminate without waiting for the user backend.
      47             :  */
      48             : #define PARALLEL_ERROR_QUEUE_SIZE           16384
      49             : 
      50             : /* Magic number for parallel context TOC. */
      51             : #define PARALLEL_MAGIC                      0x50477c7c
      52             : 
      53             : /*
      54             :  * Magic numbers for parallel state sharing.  Higher-level code should use
      55             :  * smaller values, leaving these very large ones for use by this module.
      56             :  */
      57             : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
      58             : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
      59             : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
      60             : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
      61             : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
      62             : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
      63             : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
      64             : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
      65             : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
      66             : 
      67             : /* Fixed-size parallel state. */
      68             : typedef struct FixedParallelState
      69             : {
      70             :     /* Fixed-size state that workers must restore. */
      71             :     Oid         database_id;
      72             :     Oid         authenticated_user_id;
      73             :     Oid         current_user_id;
      74             :     Oid         temp_namespace_id;
      75             :     Oid         temp_toast_namespace_id;
      76             :     int         sec_context;
      77             :     PGPROC     *parallel_master_pgproc;
      78             :     pid_t       parallel_master_pid;
      79             :     BackendId   parallel_master_backend_id;
      80             : 
      81             :     /* Mutex protects remaining fields. */
      82             :     slock_t     mutex;
      83             : 
      84             :     /* Maximum XactLastRecEnd of any worker. */
      85             :     XLogRecPtr  last_xlog_end;
      86             : } FixedParallelState;
      87             : 
      88             : /*
      89             :  * Our parallel worker number.  We initialize this to -1, meaning that we are
      90             :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
      91             :  * and < the number of workers before any user code is invoked; each parallel
      92             :  * worker will get a different parallel worker number.
      93             :  */
      94             : int         ParallelWorkerNumber = -1;
      95             : 
      96             : /* Is there a parallel message pending which we need to receive? */
      97             : volatile bool ParallelMessagePending = false;
      98             : 
      99             : /* Are we initializing a parallel worker? */
     100             : bool        InitializingParallelWorker = false;
     101             : 
     102             : /* Pointer to our fixed parallel state. */
     103             : static FixedParallelState *MyFixedParallelState;
     104             : 
     105             : /* List of active parallel contexts. */
     106             : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     107             : 
     108             : /*
     109             :  * List of internal parallel worker entry points.  We need this for
     110             :  * reasons explained in LookupParallelWorkerFunction(), below.
     111             :  */
     112             : static const struct
     113             : {
     114             :     const char *fn_name;
     115             :     parallel_worker_main_type fn_addr;
     116             : }           InternalParallelWorkers[] =
     117             : 
     118             : {
     119             :     {
     120             :         "ParallelQueryMain", ParallelQueryMain
     121             :     }
     122             : };
     123             : 
     124             : /* Private functions. */
     125             : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     126             : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     127             : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     128             : 
     129             : 
     130             : /*
     131             :  * Establish a new parallel context.  This should be done after entering
     132             :  * parallel mode, and (unless there is an error) the context should be
     133             :  * destroyed before exiting the current subtransaction.
     134             :  */
     135             : ParallelContext *
     136          17 : CreateParallelContext(const char *library_name, const char *function_name,
     137             :                       int nworkers)
     138             : {
     139             :     MemoryContext oldcontext;
     140             :     ParallelContext *pcxt;
     141             : 
     142             :     /* It is unsafe to create a parallel context if not in parallel mode. */
     143          17 :     Assert(IsInParallelMode());
     144             : 
     145             :     /* Number of workers should be non-negative. */
     146          17 :     Assert(nworkers >= 0);
     147             : 
     148             :     /*
     149             :      * If dynamic shared memory is not available, we won't be able to use
     150             :      * background workers.
     151             :      */
     152          17 :     if (dynamic_shared_memory_type == DSM_IMPL_NONE)
     153           0 :         nworkers = 0;
     154             : 
     155             :     /*
     156             :      * If we are running under serializable isolation, we can't use parallel
     157             :      * workers, at least not until somebody enhances that mechanism to be
     158             :      * parallel-aware.
     159             :      */
     160          17 :     if (IsolationIsSerializable())
     161           0 :         nworkers = 0;
     162             : 
     163             :     /* We might be running in a short-lived memory context. */
     164          17 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     165             : 
     166             :     /* Initialize a new ParallelContext. */
     167          17 :     pcxt = palloc0(sizeof(ParallelContext));
     168          17 :     pcxt->subid = GetCurrentSubTransactionId();
     169          17 :     pcxt->nworkers = nworkers;
     170          17 :     pcxt->library_name = pstrdup(library_name);
     171          17 :     pcxt->function_name = pstrdup(function_name);
     172          17 :     pcxt->error_context_stack = error_context_stack;
     173          17 :     shm_toc_initialize_estimator(&pcxt->estimator);
     174          17 :     dlist_push_head(&pcxt_list, &pcxt->node);
     175             : 
     176             :     /* Restore previous memory context. */
     177          17 :     MemoryContextSwitchTo(oldcontext);
     178             : 
     179          17 :     return pcxt;
     180             : }
     181             : 
     182             : /*
     183             :  * Establish the dynamic shared memory segment for a parallel context and
     184             :  * copy state and other bookkeeping information that will be needed by
     185             :  * parallel workers into it.
     186             :  */
     187             : void
     188          17 : InitializeParallelDSM(ParallelContext *pcxt)
     189             : {
     190             :     MemoryContext oldcontext;
     191          17 :     Size        library_len = 0;
     192          17 :     Size        guc_len = 0;
     193          17 :     Size        combocidlen = 0;
     194          17 :     Size        tsnaplen = 0;
     195          17 :     Size        asnaplen = 0;
     196          17 :     Size        tstatelen = 0;
     197          17 :     Size        segsize = 0;
     198             :     int         i;
     199             :     FixedParallelState *fps;
     200          17 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
     201          17 :     Snapshot    active_snapshot = GetActiveSnapshot();
     202             : 
     203             :     /* We might be running in a very short-lived memory context. */
     204          17 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     205             : 
     206             :     /* Allow space to store the fixed-size parallel state. */
     207          17 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     208          17 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     209             : 
     210             :     /*
     211             :      * Normally, the user will have requested at least one worker process, but
     212             :      * if by chance they have not, we can skip a bunch of things here.
     213             :      */
     214          17 :     if (pcxt->nworkers > 0)
     215             :     {
     216             :         /* Estimate space for various kinds of state sharing. */
     217          17 :         library_len = EstimateLibraryStateSpace();
     218          17 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     219          17 :         guc_len = EstimateGUCStateSpace();
     220          17 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     221          17 :         combocidlen = EstimateComboCIDStateSpace();
     222          17 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     223          17 :         tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     224          17 :         shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     225          17 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
     226          17 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     227          17 :         tstatelen = EstimateTransactionStateSpace();
     228          17 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     229             :         /* If you add more chunks here, you probably need to add keys. */
     230          17 :         shm_toc_estimate_keys(&pcxt->estimator, 6);
     231             : 
     232             :         /* Estimate space need for error queues. */
     233             :         StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     234             :                          PARALLEL_ERROR_QUEUE_SIZE,
     235             :                          "parallel error queue size not buffer-aligned");
     236          17 :         shm_toc_estimate_chunk(&pcxt->estimator,
     237             :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     238             :                                         pcxt->nworkers));
     239          17 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     240             : 
     241             :         /* Estimate how much we'll need for the entrypoint info. */
     242          17 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     243             :                                strlen(pcxt->function_name) + 2);
     244          17 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     245             :     }
     246             : 
     247             :     /*
     248             :      * Create DSM and initialize with new table of contents.  But if the user
     249             :      * didn't request any workers, then don't bother creating a dynamic shared
     250             :      * memory segment; instead, just use backend-private memory.
     251             :      *
     252             :      * Also, if we can't create a dynamic shared memory segment because the
     253             :      * maximum number of segments have already been created, then fall back to
     254             :      * backend-private memory, and plan not to use any workers.  We hope this
     255             :      * won't happen very often, but it's better to abandon the use of
     256             :      * parallelism than to fail outright.
     257             :      */
     258          17 :     segsize = shm_toc_estimate(&pcxt->estimator);
     259          17 :     if (pcxt->nworkers > 0)
     260          17 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     261          17 :     if (pcxt->seg != NULL)
     262          17 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     263             :                                    dsm_segment_address(pcxt->seg),
     264             :                                    segsize);
     265             :     else
     266             :     {
     267           0 :         pcxt->nworkers = 0;
     268           0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     269           0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     270             :                                    segsize);
     271             :     }
     272             : 
     273             :     /* Initialize fixed-size state in shared memory. */
     274          17 :     fps = (FixedParallelState *)
     275          17 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     276          17 :     fps->database_id = MyDatabaseId;
     277          17 :     fps->authenticated_user_id = GetAuthenticatedUserId();
     278          17 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     279          17 :     GetTempNamespaceState(&fps->temp_namespace_id,
     280             :                           &fps->temp_toast_namespace_id);
     281          17 :     fps->parallel_master_pgproc = MyProc;
     282          17 :     fps->parallel_master_pid = MyProcPid;
     283          17 :     fps->parallel_master_backend_id = MyBackendId;
     284          17 :     SpinLockInit(&fps->mutex);
     285          17 :     fps->last_xlog_end = 0;
     286          17 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     287             : 
     288             :     /* We can skip the rest of this if we're not budgeting for any workers. */
     289          17 :     if (pcxt->nworkers > 0)
     290             :     {
     291             :         char       *libraryspace;
     292             :         char       *gucspace;
     293             :         char       *combocidspace;
     294             :         char       *tsnapspace;
     295             :         char       *asnapspace;
     296             :         char       *tstatespace;
     297             :         char       *error_queue_space;
     298             :         char       *entrypointstate;
     299             :         Size        lnamelen;
     300             : 
     301             :         /* Serialize shared libraries we have loaded. */
     302          17 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     303          17 :         SerializeLibraryState(library_len, libraryspace);
     304          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     305             : 
     306             :         /* Serialize GUC settings. */
     307          17 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     308          17 :         SerializeGUCState(guc_len, gucspace);
     309          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     310             : 
     311             :         /* Serialize combo CID state. */
     312          17 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     313          17 :         SerializeComboCIDState(combocidlen, combocidspace);
     314          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     315             : 
     316             :         /* Serialize transaction snapshot and active snapshot. */
     317          17 :         tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     318          17 :         SerializeSnapshot(transaction_snapshot, tsnapspace);
     319          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     320             :                        tsnapspace);
     321          17 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     322          17 :         SerializeSnapshot(active_snapshot, asnapspace);
     323          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     324             : 
     325             :         /* Serialize transaction state. */
     326          17 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     327          17 :         SerializeTransactionState(tstatelen, tstatespace);
     328          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     329             : 
     330             :         /* Allocate space for worker information. */
     331          17 :         pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
     332             : 
     333             :         /*
     334             :          * Establish error queues in dynamic shared memory.
     335             :          *
     336             :          * These queues should be used only for transmitting ErrorResponse,
     337             :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     338             :          * should be transmitted via separate (possibly larger?) queues.
     339             :          */
     340          17 :         error_queue_space =
     341          17 :             shm_toc_allocate(pcxt->toc,
     342             :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     343          17 :                                       pcxt->nworkers));
     344          76 :         for (i = 0; i < pcxt->nworkers; ++i)
     345             :         {
     346             :             char       *start;
     347             :             shm_mq     *mq;
     348             : 
     349          59 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     350          59 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     351          59 :             shm_mq_set_receiver(mq, MyProc);
     352          59 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     353             :         }
     354          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     355             : 
     356             :         /*
     357             :          * Serialize entrypoint information.  It's unsafe to pass function
     358             :          * pointers across processes, as the function pointer may be different
     359             :          * in each process in EXEC_BACKEND builds, so we always pass library
     360             :          * and function name.  (We use library name "postgres" for functions
     361             :          * in the core backend.)
     362             :          */
     363          17 :         lnamelen = strlen(pcxt->library_name);
     364          17 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     365          17 :                                            strlen(pcxt->function_name) + 2);
     366          17 :         strcpy(entrypointstate, pcxt->library_name);
     367          17 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     368          17 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     369             :     }
     370             : 
     371             :     /* Restore previous memory context. */
     372          17 :     MemoryContextSwitchTo(oldcontext);
     373          17 : }
     374             : 
     375             : /*
     376             :  * Reinitialize the dynamic shared memory segment for a parallel context such
     377             :  * that we could launch workers for it again.
     378             :  */
     379             : void
     380          15 : ReinitializeParallelDSM(ParallelContext *pcxt)
     381             : {
     382             :     FixedParallelState *fps;
     383             :     char       *error_queue_space;
     384             :     int         i;
     385             : 
     386             :     /* Wait for any old workers to exit. */
     387          15 :     if (pcxt->nworkers_launched > 0)
     388             :     {
     389          15 :         WaitForParallelWorkersToFinish(pcxt);
     390          15 :         WaitForParallelWorkersToExit(pcxt);
     391          15 :         pcxt->nworkers_launched = 0;
     392             :     }
     393             : 
     394             :     /* Reset a few bits of fixed parallel state to a clean state. */
     395          15 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     396          15 :     fps->last_xlog_end = 0;
     397             : 
     398             :     /* Recreate error queues. */
     399          15 :     error_queue_space =
     400          15 :         shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     401          75 :     for (i = 0; i < pcxt->nworkers; ++i)
     402             :     {
     403             :         char       *start;
     404             :         shm_mq     *mq;
     405             : 
     406          60 :         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     407          60 :         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     408          60 :         shm_mq_set_receiver(mq, MyProc);
     409          60 :         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     410             :     }
     411          15 : }
     412             : 
     413             : /*
     414             :  * Launch parallel workers.
     415             :  */
     416             : void
     417          32 : LaunchParallelWorkers(ParallelContext *pcxt)
     418             : {
     419             :     MemoryContext oldcontext;
     420             :     BackgroundWorker worker;
     421             :     int         i;
     422          32 :     bool        any_registrations_failed = false;
     423             : 
     424             :     /* Skip this if we have no workers. */
     425          32 :     if (pcxt->nworkers == 0)
     426          32 :         return;
     427             : 
     428             :     /* We need to be a lock group leader. */
     429          32 :     BecomeLockGroupLeader();
     430             : 
     431             :     /* If we do have workers, we'd better have a DSM segment. */
     432          32 :     Assert(pcxt->seg != NULL);
     433             : 
     434             :     /* We might be running in a short-lived memory context. */
     435          32 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     436             : 
     437             :     /* Configure a worker. */
     438          32 :     memset(&worker, 0, sizeof(worker));
     439          32 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     440             :              MyProcPid);
     441          32 :     worker.bgw_flags =
     442             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     443             :         | BGWORKER_CLASS_PARALLEL;
     444          32 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     445          32 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     446          32 :     sprintf(worker.bgw_library_name, "postgres");
     447          32 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     448          32 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     449          32 :     worker.bgw_notify_pid = MyProcPid;
     450             : 
     451             :     /*
     452             :      * Start workers.
     453             :      *
     454             :      * The caller must be able to tolerate ending up with fewer workers than
     455             :      * expected, so there is no need to throw an error here if registration
     456             :      * fails.  It wouldn't help much anyway, because registering the worker in
     457             :      * no way guarantees that it will start up and initialize successfully.
     458             :      */
     459         151 :     for (i = 0; i < pcxt->nworkers; ++i)
     460             :     {
     461         119 :         memcpy(worker.bgw_extra, &i, sizeof(int));
     462         235 :         if (!any_registrations_failed &&
     463         116 :             RegisterDynamicBackgroundWorker(&worker,
     464         116 :                                             &pcxt->worker[i].bgwhandle))
     465             :         {
     466         115 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
     467         115 :                               pcxt->worker[i].bgwhandle);
     468         115 :             pcxt->nworkers_launched++;
     469             :         }
     470             :         else
     471             :         {
     472             :             /*
     473             :              * If we weren't able to register the worker, then we've bumped up
     474             :              * against the max_worker_processes limit, and future
     475             :              * registrations will probably fail too, so arrange to skip them.
     476             :              * But we still have to execute this code for the remaining slots
     477             :              * to make sure that we forget about the error queues we budgeted
     478             :              * for those workers.  Otherwise, we'll wait for them to start,
     479             :              * but they never will.
     480             :              */
     481           4 :             any_registrations_failed = true;
     482           4 :             pcxt->worker[i].bgwhandle = NULL;
     483           4 :             shm_mq_detach(pcxt->worker[i].error_mqh);
     484           4 :             pcxt->worker[i].error_mqh = NULL;
     485             :         }
     486             :     }
     487             : 
     488             :     /* Restore previous memory context. */
     489          32 :     MemoryContextSwitchTo(oldcontext);
     490             : }
     491             : 
     492             : /*
     493             :  * Wait for all workers to finish computing.
     494             :  *
     495             :  * Even if the parallel operation seems to have completed successfully, it's
     496             :  * important to call this function afterwards.  We must not miss any errors
     497             :  * the workers may have thrown during the parallel operation, or any that they
     498             :  * may yet throw while shutting down.
     499             :  *
     500             :  * Also, we want to update our notion of XactLastRecEnd based on worker
     501             :  * feedback.
     502             :  */
     503             : void
     504          68 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     505             : {
     506             :     for (;;)
     507             :     {
     508          68 :         bool        anyone_alive = false;
     509             :         int         i;
     510             : 
     511             :         /*
     512             :          * This will process any parallel messages that are pending, which may
     513             :          * change the outcome of the loop that follows.  It may also throw an
     514             :          * error propagated from a worker.
     515             :          */
     516          68 :         CHECK_FOR_INTERRUPTS();
     517             : 
     518         242 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     519             :         {
     520         196 :             if (pcxt->worker[i].error_mqh != NULL)
     521             :             {
     522          22 :                 anyone_alive = true;
     523          22 :                 break;
     524             :             }
     525             :         }
     526             : 
     527          68 :         if (!anyone_alive)
     528          46 :             break;
     529             : 
     530          22 :         WaitLatch(MyLatch, WL_LATCH_SET, -1,
     531             :                   WAIT_EVENT_PARALLEL_FINISH);
     532          22 :         ResetLatch(MyLatch);
     533          22 :     }
     534             : 
     535          46 :     if (pcxt->toc != NULL)
     536             :     {
     537             :         FixedParallelState *fps;
     538             : 
     539          46 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     540          46 :         if (fps->last_xlog_end > XactLastRecEnd)
     541           0 :             XactLastRecEnd = fps->last_xlog_end;
     542             :     }
     543          46 : }
     544             : 
     545             : /*
     546             :  * Wait for all workers to exit.
     547             :  *
     548             :  * This function ensures that workers have been completely shutdown.  The
     549             :  * difference between WaitForParallelWorkersToFinish and this function is
     550             :  * that former just ensures that last message sent by worker backend is
     551             :  * received by master backend whereas this ensures the complete shutdown.
     552             :  */
     553             : static void
     554          32 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     555             : {
     556             :     int         i;
     557             : 
     558             :     /* Wait until the workers actually die. */
     559         147 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
     560             :     {
     561             :         BgwHandleStatus status;
     562             : 
     563         115 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     564           0 :             continue;
     565             : 
     566         115 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     567             : 
     568             :         /*
     569             :          * If the postmaster kicked the bucket, we have no chance of cleaning
     570             :          * up safely -- we won't be able to tell when our workers are actually
     571             :          * dead.  This doesn't necessitate a PANIC since they will all abort
     572             :          * eventually, but we can't safely continue this session.
     573             :          */
     574         115 :         if (status == BGWH_POSTMASTER_DIED)
     575           0 :             ereport(FATAL,
     576             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     577             :                      errmsg("postmaster exited during a parallel transaction")));
     578             : 
     579             :         /* Release memory. */
     580         115 :         pfree(pcxt->worker[i].bgwhandle);
     581         115 :         pcxt->worker[i].bgwhandle = NULL;
     582             :     }
     583          32 : }
     584             : 
     585             : /*
     586             :  * Destroy a parallel context.
     587             :  *
     588             :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     589             :  * first, before calling this function.  When this function is invoked, any
     590             :  * remaining workers are forcibly killed; the dynamic shared memory segment
     591             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     592             :  */
     593             : void
     594          17 : DestroyParallelContext(ParallelContext *pcxt)
     595             : {
     596             :     int         i;
     597             : 
     598             :     /*
     599             :      * Be careful about order of operations here!  We remove the parallel
     600             :      * context from the list before we do anything else; otherwise, if an
     601             :      * error occurs during a subsequent step, we might try to nuke it again
     602             :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     603             :      */
     604          17 :     dlist_delete(&pcxt->node);
     605             : 
     606             :     /* Kill each worker in turn, and forget their error queues. */
     607          17 :     if (pcxt->worker != NULL)
     608             :     {
     609          72 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     610             :         {
     611          55 :             if (pcxt->worker[i].error_mqh != NULL)
     612             :             {
     613           1 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     614             : 
     615           1 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     616           1 :                 pcxt->worker[i].error_mqh = NULL;
     617             :             }
     618             :         }
     619             :     }
     620             : 
     621             :     /*
     622             :      * If we have allocated a shared memory segment, detach it.  This will
     623             :      * implicitly detach the error queues, and any other shared memory queues,
     624             :      * stored there.
     625             :      */
     626          17 :     if (pcxt->seg != NULL)
     627             :     {
     628          17 :         dsm_detach(pcxt->seg);
     629          17 :         pcxt->seg = NULL;
     630             :     }
     631             : 
     632             :     /*
     633             :      * If this parallel context is actually in backend-private memory rather
     634             :      * than shared memory, free that memory instead.
     635             :      */
     636          17 :     if (pcxt->private_memory != NULL)
     637             :     {
     638           0 :         pfree(pcxt->private_memory);
     639           0 :         pcxt->private_memory = NULL;
     640             :     }
     641             : 
     642             :     /*
     643             :      * We can't finish transaction commit or abort until all of the workers
     644             :      * have exited.  This means, in particular, that we can't respond to
     645             :      * interrupts at this stage.
     646             :      */
     647          17 :     HOLD_INTERRUPTS();
     648          17 :     WaitForParallelWorkersToExit(pcxt);
     649          17 :     RESUME_INTERRUPTS();
     650             : 
     651             :     /* Free the worker array itself. */
     652          17 :     if (pcxt->worker != NULL)
     653             :     {
     654          17 :         pfree(pcxt->worker);
     655          17 :         pcxt->worker = NULL;
     656             :     }
     657             : 
     658             :     /* Free memory. */
     659          17 :     pfree(pcxt->library_name);
     660          17 :     pfree(pcxt->function_name);
     661          17 :     pfree(pcxt);
     662          17 : }
     663             : 
     664             : /*
     665             :  * Are there any parallel contexts currently active?
     666             :  */
     667             : bool
     668         130 : ParallelContextActive(void)
     669             : {
     670         130 :     return !dlist_is_empty(&pcxt_list);
     671             : }
     672             : 
     673             : /*
     674             :  * Handle receipt of an interrupt indicating a parallel worker message.
     675             :  *
     676             :  * Note: this is called within a signal handler!  All we can do is set
     677             :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     678             :  * HandleParallelMessages().
     679             :  */
     680             : void
     681         230 : HandleParallelMessageInterrupt(void)
     682             : {
     683         230 :     InterruptPending = true;
     684         230 :     ParallelMessagePending = true;
     685         230 :     SetLatch(MyLatch);
     686         230 : }
     687             : 
     688             : /*
     689             :  * Handle any queued protocol messages received from parallel workers.
     690             :  */
     691             : void
     692         230 : HandleParallelMessages(void)
     693             : {
     694             :     dlist_iter  iter;
     695             :     MemoryContext oldcontext;
     696             : 
     697             :     static MemoryContext hpm_context = NULL;
     698             : 
     699             :     /*
     700             :      * This is invoked from ProcessInterrupts(), and since some of the
     701             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
     702             :      * for recursive calls if more signals are received while this runs.  It's
     703             :      * unclear that recursive entry would be safe, and it doesn't seem useful
     704             :      * even if it is safe, so let's block interrupts until done.
     705             :      */
     706         230 :     HOLD_INTERRUPTS();
     707             : 
     708             :     /*
     709             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
     710             :      * don't want to risk leaking data into long-lived contexts, so let's do
     711             :      * our work here in a private context that we can reset on each use.
     712             :      */
     713         230 :     if (hpm_context == NULL)    /* first time through? */
     714           1 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
     715             :                                             "HandleParallelMessages",
     716             :                                             ALLOCSET_DEFAULT_SIZES);
     717             :     else
     718         229 :         MemoryContextReset(hpm_context);
     719             : 
     720         230 :     oldcontext = MemoryContextSwitchTo(hpm_context);
     721             : 
     722             :     /* OK to process messages.  Reset the flag saying there are more to do. */
     723         230 :     ParallelMessagePending = false;
     724             : 
     725         459 :     dlist_foreach(iter, &pcxt_list)
     726             :     {
     727             :         ParallelContext *pcxt;
     728             :         int         i;
     729             : 
     730         230 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
     731         230 :         if (pcxt->worker == NULL)
     732           0 :             continue;
     733             : 
     734        1131 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     735             :         {
     736             :             /*
     737             :              * Read as many messages as we can from each worker, but stop when
     738             :              * either (1) the worker's error queue goes away, which can happen
     739             :              * if we receive a Terminate message from the worker; or (2) no
     740             :              * more messages can be read from the worker without blocking.
     741             :              */
     742        2033 :             while (pcxt->worker[i].error_mqh != NULL)
     743             :             {
     744             :                 shm_mq_result res;
     745             :                 Size        nbytes;
     746             :                 void       *data;
     747             : 
     748         683 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
     749             :                                      &data, true);
     750         683 :                 if (res == SHM_MQ_WOULD_BLOCK)
     751         453 :                     break;
     752         230 :                 else if (res == SHM_MQ_SUCCESS)
     753             :                 {
     754             :                     StringInfoData msg;
     755             : 
     756         230 :                     initStringInfo(&msg);
     757         230 :                     appendBinaryStringInfo(&msg, data, nbytes);
     758         230 :                     HandleParallelMessage(pcxt, i, &msg);
     759         229 :                     pfree(msg.data);
     760             :                 }
     761             :                 else
     762           0 :                     ereport(ERROR,
     763             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     764             :                              errmsg("lost connection to parallel worker")));
     765             :             }
     766             :         }
     767             :     }
     768             : 
     769         229 :     MemoryContextSwitchTo(oldcontext);
     770             : 
     771             :     /* Might as well clear the context on our way out */
     772         229 :     MemoryContextReset(hpm_context);
     773             : 
     774         229 :     RESUME_INTERRUPTS();
     775         229 : }
     776             : 
     777             : /*
     778             :  * Handle a single protocol message received from a single parallel worker.
     779             :  */
     780             : static void
     781         230 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
     782             : {
     783             :     char        msgtype;
     784             : 
     785         230 :     msgtype = pq_getmsgbyte(msg);
     786             : 
     787         230 :     switch (msgtype)
     788             :     {
     789             :         case 'K':               /* BackendKeyData */
     790             :             {
     791         115 :                 int32       pid = pq_getmsgint(msg, 4);
     792             : 
     793         115 :                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
     794         115 :                 (void) pq_getmsgend(msg);
     795         115 :                 pcxt->worker[i].pid = pid;
     796         115 :                 break;
     797             :             }
     798             : 
     799             :         case 'E':               /* ErrorResponse */
     800             :         case 'N':               /* NoticeResponse */
     801             :             {
     802             :                 ErrorData   edata;
     803             :                 ErrorContextCallback *save_error_context_stack;
     804             : 
     805             :                 /* Parse ErrorResponse or NoticeResponse. */
     806           1 :                 pq_parse_errornotice(msg, &edata);
     807             : 
     808             :                 /* Death of a worker isn't enough justification for suicide. */
     809           1 :                 edata.elevel = Min(edata.elevel, ERROR);
     810             : 
     811             :                 /*
     812             :                  * If desired, add a context line to show that this is a
     813             :                  * message propagated from a parallel worker.  Otherwise, it
     814             :                  * can sometimes be confusing to understand what actually
     815             :                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
     816             :                  * because it causes test-result instability depending on
     817             :                  * whether a parallel worker is actually used or not.)
     818             :                  */
     819           1 :                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
     820             :                 {
     821           1 :                     if (edata.context)
     822           0 :                         edata.context = psprintf("%s\n%s", edata.context,
     823             :                                                  _("parallel worker"));
     824             :                     else
     825           1 :                         edata.context = pstrdup(_("parallel worker"));
     826             :                 }
     827             : 
     828             :                 /*
     829             :                  * Context beyond that should use the error context callbacks
     830             :                  * that were in effect when the ParallelContext was created,
     831             :                  * not the current ones.
     832             :                  */
     833           1 :                 save_error_context_stack = error_context_stack;
     834           1 :                 error_context_stack = pcxt->error_context_stack;
     835             : 
     836             :                 /* Rethrow error or print notice. */
     837           1 :                 ThrowErrorData(&edata);
     838             : 
     839             :                 /* Not an error, so restore previous context stack. */
     840           0 :                 error_context_stack = save_error_context_stack;
     841             : 
     842           0 :                 break;
     843             :             }
     844             : 
     845             :         case 'A':               /* NotifyResponse */
     846             :             {
     847             :                 /* Propagate NotifyResponse. */
     848             :                 int32       pid;
     849             :                 const char *channel;
     850             :                 const char *payload;
     851             : 
     852           0 :                 pid = pq_getmsgint(msg, 4);
     853           0 :                 channel = pq_getmsgrawstring(msg);
     854           0 :                 payload = pq_getmsgrawstring(msg);
     855           0 :                 pq_endmessage(msg);
     856             : 
     857           0 :                 NotifyMyFrontEnd(channel, payload, pid);
     858             : 
     859           0 :                 break;
     860             :             }
     861             : 
     862             :         case 'X':               /* Terminate, indicating clean exit */
     863             :             {
     864         114 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     865         114 :                 pcxt->worker[i].error_mqh = NULL;
     866         114 :                 break;
     867             :             }
     868             : 
     869             :         default:
     870             :             {
     871           0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
     872             :                      msgtype, msg->len);
     873             :             }
     874             :     }
     875         229 : }
     876             : 
     877             : /*
     878             :  * End-of-subtransaction cleanup for parallel contexts.
     879             :  *
     880             :  * Currently, it's forbidden to enter or leave a subtransaction while
     881             :  * parallel mode is in effect, so we could just blow away everything.  But
     882             :  * we may want to relax that restriction in the future, so this code
     883             :  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
     884             :  */
     885             : void
     886           1 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
     887             : {
     888           3 :     while (!dlist_is_empty(&pcxt_list))
     889             :     {
     890             :         ParallelContext *pcxt;
     891             : 
     892           1 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
     893           1 :         if (pcxt->subid != mySubId)
     894           0 :             break;
     895           1 :         if (isCommit)
     896           0 :             elog(WARNING, "leaked parallel context");
     897           1 :         DestroyParallelContext(pcxt);
     898             :     }
     899           1 : }
     900             : 
     901             : /*
     902             :  * End-of-transaction cleanup for parallel contexts.
     903             :  */
     904             : void
     905         115 : AtEOXact_Parallel(bool isCommit)
     906             : {
     907         230 :     while (!dlist_is_empty(&pcxt_list))
     908             :     {
     909             :         ParallelContext *pcxt;
     910             : 
     911           0 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
     912           0 :         if (isCommit)
     913           0 :             elog(WARNING, "leaked parallel context");
     914           0 :         DestroyParallelContext(pcxt);
     915             :     }
     916         115 : }
     917             : 
     918             : /*
     919             :  * Main entrypoint for parallel workers.
     920             :  */
     921             : void
     922         115 : ParallelWorkerMain(Datum main_arg)
     923             : {
     924             :     dsm_segment *seg;
     925             :     shm_toc    *toc;
     926             :     FixedParallelState *fps;
     927             :     char       *error_queue_space;
     928             :     shm_mq     *mq;
     929             :     shm_mq_handle *mqh;
     930             :     char       *libraryspace;
     931             :     char       *entrypointstate;
     932             :     char       *library_name;
     933             :     char       *function_name;
     934             :     parallel_worker_main_type entrypt;
     935             :     char       *gucspace;
     936             :     char       *combocidspace;
     937             :     char       *tsnapspace;
     938             :     char       *asnapspace;
     939             :     char       *tstatespace;
     940             :     StringInfoData msgbuf;
     941             : 
     942             :     /* Set flag to indicate that we're initializing a parallel worker. */
     943         115 :     InitializingParallelWorker = true;
     944             : 
     945             :     /* Establish signal handlers. */
     946         115 :     pqsignal(SIGTERM, die);
     947         115 :     BackgroundWorkerUnblockSignals();
     948             : 
     949             :     /* Determine and set our parallel worker number. */
     950         115 :     Assert(ParallelWorkerNumber == -1);
     951         115 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
     952             : 
     953             :     /* Set up a memory context and resource owner. */
     954         115 :     Assert(CurrentResourceOwner == NULL);
     955         115 :     CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
     956         115 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
     957             :                                                  "Parallel worker",
     958             :                                                  ALLOCSET_DEFAULT_SIZES);
     959             : 
     960             :     /*
     961             :      * Now that we have a resource owner, we can attach to the dynamic shared
     962             :      * memory segment and read the table of contents.
     963             :      */
     964         115 :     seg = dsm_attach(DatumGetUInt32(main_arg));
     965         115 :     if (seg == NULL)
     966           0 :         ereport(ERROR,
     967             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     968             :                  errmsg("could not map dynamic shared memory segment")));
     969         115 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
     970         115 :     if (toc == NULL)
     971           0 :         ereport(ERROR,
     972             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     973             :                  errmsg("invalid magic number in dynamic shared memory segment")));
     974             : 
     975             :     /* Look up fixed parallel state. */
     976         115 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
     977         115 :     MyFixedParallelState = fps;
     978             : 
     979             :     /*
     980             :      * Now that we have a worker number, we can find and attach to the error
     981             :      * queue provided for us.  That's good, because until we do that, any
     982             :      * errors that happen here will not be reported back to the process that
     983             :      * requested that this worker be launched.
     984             :      */
     985         115 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
     986         115 :     mq = (shm_mq *) (error_queue_space +
     987         115 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
     988         115 :     shm_mq_set_sender(mq, MyProc);
     989         115 :     mqh = shm_mq_attach(mq, seg, NULL);
     990         115 :     pq_redirect_to_shm_mq(seg, mqh);
     991         115 :     pq_set_parallel_master(fps->parallel_master_pid,
     992             :                            fps->parallel_master_backend_id);
     993             : 
     994             :     /*
     995             :      * Send a BackendKeyData message to the process that initiated parallelism
     996             :      * so that it has access to our PID before it receives any other messages
     997             :      * from us.  Our cancel key is sent, too, since that's the way the
     998             :      * protocol message is defined, but it won't actually be used for anything
     999             :      * in this case.
    1000             :      */
    1001         115 :     pq_beginmessage(&msgbuf, 'K');
    1002         115 :     pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
    1003         115 :     pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
    1004         115 :     pq_endmessage(&msgbuf);
    1005             : 
    1006             :     /*
    1007             :      * Hooray! Primary initialization is complete.  Now, we need to set up our
    1008             :      * backend-local state to match the original backend.
    1009             :      */
    1010             : 
    1011             :     /*
    1012             :      * Join locking group.  We must do this before anything that could try to
    1013             :      * acquire a heavyweight lock, because any heavyweight locks acquired to
    1014             :      * this point could block either directly against the parallel group
    1015             :      * leader or against some process which in turn waits for a lock that
    1016             :      * conflicts with the parallel group leader, causing an undetected
    1017             :      * deadlock.  (If we can't join the lock group, the leader has gone away,
    1018             :      * so just exit quietly.)
    1019             :      */
    1020         115 :     if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
    1021             :                                fps->parallel_master_pid))
    1022         114 :         return;
    1023             : 
    1024             :     /*
    1025             :      * Load libraries that were loaded by original backend.  We want to do
    1026             :      * this before restoring GUCs, because the libraries might define custom
    1027             :      * variables.
    1028             :      */
    1029         115 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1030         115 :     RestoreLibraryState(libraryspace);
    1031             : 
    1032             :     /*
    1033             :      * Identify the entry point to be called.  In theory this could result in
    1034             :      * loading an additional library, though most likely the entry point is in
    1035             :      * the core backend or in a library we just loaded.
    1036             :      */
    1037         115 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1038         115 :     library_name = entrypointstate;
    1039         115 :     function_name = entrypointstate + strlen(library_name) + 1;
    1040             : 
    1041         115 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1042             : 
    1043             :     /* Restore database connection. */
    1044         115 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1045             :                                               fps->authenticated_user_id);
    1046             : 
    1047             :     /*
    1048             :      * Set the client encoding to the database encoding, since that is what
    1049             :      * the leader will expect.
    1050             :      */
    1051         115 :     SetClientEncoding(GetDatabaseEncoding());
    1052             : 
    1053             :     /* Restore GUC values from launching backend. */
    1054         115 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1055         115 :     StartTransactionCommand();
    1056         115 :     RestoreGUCState(gucspace);
    1057         115 :     CommitTransactionCommand();
    1058             : 
    1059             :     /* Crank up a transaction state appropriate to a parallel worker. */
    1060         115 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1061         115 :     StartParallelWorkerTransaction(tstatespace);
    1062             : 
    1063             :     /* Restore combo CID state. */
    1064         115 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1065         115 :     RestoreComboCIDState(combocidspace);
    1066             : 
    1067             :     /* Restore transaction snapshot. */
    1068         115 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
    1069         115 :     RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
    1070         115 :                                fps->parallel_master_pgproc);
    1071             : 
    1072             :     /* Restore active snapshot. */
    1073         115 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1074         115 :     PushActiveSnapshot(RestoreSnapshot(asnapspace));
    1075             : 
    1076             :     /*
    1077             :      * We've changed which tuples we can see, and must therefore invalidate
    1078             :      * system caches.
    1079             :      */
    1080         115 :     InvalidateSystemCaches();
    1081             : 
    1082             :     /* Restore user ID and security context. */
    1083         115 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1084             : 
    1085             :     /* Restore temp-namespace state to ensure search path matches leader's. */
    1086         115 :     SetTempNamespaceState(fps->temp_namespace_id,
    1087             :                           fps->temp_toast_namespace_id);
    1088             : 
    1089             :     /* Set ParallelMasterBackendId so we know how to address temp relations. */
    1090         115 :     ParallelMasterBackendId = fps->parallel_master_backend_id;
    1091             : 
    1092             :     /*
    1093             :      * We've initialized all of our state now; nothing should change
    1094             :      * hereafter.
    1095             :      */
    1096         115 :     InitializingParallelWorker = false;
    1097         115 :     EnterParallelMode();
    1098             : 
    1099             :     /*
    1100             :      * Time to do the real work: invoke the caller-supplied code.
    1101             :      */
    1102         115 :     entrypt(seg, toc);
    1103             : 
    1104             :     /* Must exit parallel mode to pop active snapshot. */
    1105         114 :     ExitParallelMode();
    1106             : 
    1107             :     /* Must pop active snapshot so resowner.c doesn't complain. */
    1108         114 :     PopActiveSnapshot();
    1109             : 
    1110             :     /* Shut down the parallel-worker transaction. */
    1111         114 :     EndParallelWorkerTransaction();
    1112             : 
    1113             :     /* Report success. */
    1114         114 :     pq_putmessage('X', NULL, 0);
    1115             : }
    1116             : 
    1117             : /*
    1118             :  * Update shared memory with the ending location of the last WAL record we
    1119             :  * wrote, if it's greater than the value already stored there.
    1120             :  */
    1121             : void
    1122         114 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1123             : {
    1124         114 :     FixedParallelState *fps = MyFixedParallelState;
    1125             : 
    1126         114 :     Assert(fps != NULL);
    1127         114 :     SpinLockAcquire(&fps->mutex);
    1128         114 :     if (fps->last_xlog_end < last_xlog_end)
    1129           0 :         fps->last_xlog_end = last_xlog_end;
    1130         114 :     SpinLockRelease(&fps->mutex);
    1131         114 : }
    1132             : 
    1133             : /*
    1134             :  * Look up (and possibly load) a parallel worker entry point function.
    1135             :  *
    1136             :  * For functions contained in the core code, we use library name "postgres"
    1137             :  * and consult the InternalParallelWorkers array.  External functions are
    1138             :  * looked up, and loaded if necessary, using load_external_function().
    1139             :  *
    1140             :  * The point of this is to pass function names as strings across process
    1141             :  * boundaries.  We can't pass actual function addresses because of the
    1142             :  * possibility that the function has been loaded at a different address
    1143             :  * in a different process.  This is obviously a hazard for functions in
    1144             :  * loadable libraries, but it can happen even for functions in the core code
    1145             :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1146             :  *
    1147             :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1148             :  * in favor of applying load_external_function() for core functions too;
    1149             :  * but that raises portability issues that are not worth addressing now.
    1150             :  */
    1151             : static parallel_worker_main_type
    1152         115 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1153             : {
    1154             :     /*
    1155             :      * If the function is to be loaded from postgres itself, search the
    1156             :      * InternalParallelWorkers array.
    1157             :      */
    1158         115 :     if (strcmp(libraryname, "postgres") == 0)
    1159             :     {
    1160             :         int         i;
    1161             : 
    1162         115 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1163             :         {
    1164         115 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1165         115 :                 return InternalParallelWorkers[i].fn_addr;
    1166             :         }
    1167             : 
    1168             :         /* We can only reach this by programming error. */
    1169           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
    1170             :     }
    1171             : 
    1172             :     /* Otherwise load from external library. */
    1173           0 :     return (parallel_worker_main_type)
    1174             :         load_external_function(libraryname, funcname, true, NULL);
    1175             : }

Generated by: LCOV version 1.11