LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - shm_mq.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 272 389 69.9 %
Date: 2017-09-29 13:40:31 Functions: 21 24 87.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * shm_mq.c
       4             :  *    single-reader, single-writer shared memory message queue
       5             :  *
       6             :  * Both the sender and the receiver must have a PGPROC; their respective
       7             :  * process latches are used for synchronization.  Only the sender may send,
       8             :  * and only the receiver may receive.  This is intended to allow a user
       9             :  * backend to communicate with worker backends that it has registered.
      10             :  *
      11             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      12             :  * Portions Copyright (c) 1994, Regents of the University of California
      13             :  *
      14             :  * src/include/storage/shm_mq.h
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "miscadmin.h"
      22             : #include "pgstat.h"
      23             : #include "postmaster/bgworker.h"
      24             : #include "storage/procsignal.h"
      25             : #include "storage/shm_mq.h"
      26             : #include "storage/spin.h"
      27             : 
      28             : /*
      29             :  * This structure represents the actual queue, stored in shared memory.
      30             :  *
      31             :  * Some notes on synchronization:
      32             :  *
      33             :  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
      34             :  * mq_sender and mq_bytes_written can only be changed by the sender.  However,
      35             :  * because most of these fields are 8 bytes and we don't assume that 8 byte
      36             :  * reads and writes are atomic, the spinlock must be taken whenever the field
      37             :  * is updated, and whenever it is read by a process other than the one allowed
      38             :  * to modify it. But the process that is allowed to modify it is also allowed
      39             :  * to read it without the lock.  On architectures where 8-byte writes are
      40             :  * atomic, we could replace these spinlocks with memory barriers, but
      41             :  * testing found no performance benefit, so it seems best to keep things
      42             :  * simple for now.
      43             :  *
      44             :  * mq_detached can be set by either the sender or the receiver, so the mutex
      45             :  * must be held to read or write it.  Memory barriers could be used here as
      46             :  * well, if needed.
      47             :  *
      48             :  * mq_ring_size and mq_ring_offset never change after initialization, and
      49             :  * can therefore be read without the lock.
      50             :  *
      51             :  * Importantly, mq_ring can be safely read and written without a lock.  Were
      52             :  * this not the case, we'd have to hold the spinlock for much longer
      53             :  * intervals, and performance might suffer.  Fortunately, that's not
      54             :  * necessary.  At any given time, the difference between mq_bytes_read and
      55             :  * mq_bytes_written defines the number of bytes within mq_ring that contain
      56             :  * unread data, and mq_bytes_read defines the position where those bytes
      57             :  * begin.  The sender can increase the number of unread bytes at any time,
      58             :  * but only the receiver can give license to overwrite those bytes, by
      59             :  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
      60             :  * the unread bytes it knows to be present without the lock.  Conversely,
      61             :  * the sender can write to the unused portion of the ring buffer without
      62             :  * the lock, because nobody else can be reading or writing those bytes.  The
      63             :  * receiver could be making more bytes unused by incrementing mq_bytes_read,
      64             :  * but that's OK.  Note that it would be unsafe for the receiver to read any
      65             :  * data it's already marked as read, or to write any data; and it would be
      66             :  * unsafe for the sender to reread any data after incrementing
      67             :  * mq_bytes_written, but fortunately there's no need for any of that.
      68             :  */
      69             : struct shm_mq
      70             : {
      71             :     slock_t     mq_mutex;
      72             :     PGPROC     *mq_receiver;
      73             :     PGPROC     *mq_sender;
      74             :     uint64      mq_bytes_read;
      75             :     uint64      mq_bytes_written;
      76             :     Size        mq_ring_size;
      77             :     bool        mq_detached;
      78             :     uint8       mq_ring_offset;
      79             :     char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
      80             : };
      81             : 
      82             : /*
      83             :  * This structure is a backend-private handle for access to a queue.
      84             :  *
      85             :  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
      86             :  * an optional pointer to the dynamic shared memory segment that contains it.
      87             :  * (If mqh_segment is provided, we register an on_dsm_detach callback to
      88             :  * make sure we detach from the queue before detaching from DSM.)
      89             :  *
      90             :  * If this queue is intended to connect the current process with a background
      91             :  * worker that started it, the user can pass a pointer to the worker handle
      92             :  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
      93             :  * is to allow us to begin sending to or receiving from that queue before the
      94             :  * process we'll be communicating with has even been started.  If it fails
      95             :  * to start, the handle will allow us to notice that and fail cleanly, rather
      96             :  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
      97             :  * simple cases - e.g. where there are just 2 processes communicating; in
      98             :  * more complex scenarios, every process may not have a BackgroundWorkerHandle
      99             :  * available, or may need to watch for the failure of more than one other
     100             :  * process at a time.
     101             :  *
     102             :  * When a message exists as a contiguous chunk of bytes in the queue - that is,
     103             :  * it is smaller than the size of the ring buffer and does not wrap around
     104             :  * the end - we return the message to the caller as a pointer into the buffer.
     105             :  * For messages that are larger or happen to wrap, we reassemble the message
     106             :  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
     107             :  * the buffer, and mqh_buflen is the number of bytes allocated for it.
     108             :  *
     109             :  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
     110             :  * are used to track the state of non-blocking operations.  When the caller
     111             :  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
     112             :  * are expected to retry the call at a later time with the same argument;
     113             :  * we need to retain enough state to pick up where we left off.
     114             :  * mqh_length_word_complete tracks whether we are done sending or receiving
     115             :  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
     116             :  * the number of bytes read or written for either the length word or the
     117             :  * message itself, and mqh_expected_bytes - which is used only for reads -
     118             :  * tracks the expected total size of the payload.
     119             :  *
     120             :  * mqh_counterparty_attached tracks whether we know the counterparty to have
     121             :  * attached to the queue at some previous point.  This lets us avoid some
     122             :  * mutex acquisitions.
     123             :  *
     124             :  * mqh_context is the memory context in effect at the time we attached to
     125             :  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
     126             :  * we make sure any other allocations we do happen in this context as well,
     127             :  * to avoid nasty surprises.
     128             :  */
     129             : struct shm_mq_handle
     130             : {
     131             :     shm_mq     *mqh_queue;
     132             :     dsm_segment *mqh_segment;
     133             :     BackgroundWorkerHandle *mqh_handle;
     134             :     char       *mqh_buffer;
     135             :     Size        mqh_buflen;
     136             :     Size        mqh_consume_pending;
     137             :     Size        mqh_partial_bytes;
     138             :     Size        mqh_expected_bytes;
     139             :     bool        mqh_length_word_complete;
     140             :     bool        mqh_counterparty_attached;
     141             :     MemoryContext mqh_context;
     142             : };
     143             : 
     144             : static void shm_mq_detach_internal(shm_mq *mq);
     145             : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
     146             :                   const void *data, bool nowait, Size *bytes_written);
     147             : static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
     148             :                      bool nowait, Size *nbytesp, void **datap);
     149             : static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
     150             :                          BackgroundWorkerHandle *handle);
     151             : static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
     152             :                      BackgroundWorkerHandle *handle);
     153             : static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
     154             : static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
     155             : static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
     156             : static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
     157             : static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
     158             : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
     159             : 
     160             : /* Minimum queue size is enough for header and at least one chunk of data. */
     161             : const Size  shm_mq_minimum_size =
     162             : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
     163             : 
     164             : #define MQH_INITIAL_BUFSIZE             8192
     165             : 
     166             : /*
     167             :  * Initialize a new shared message queue.
     168             :  */
     169             : shm_mq *
     170         238 : shm_mq_create(void *address, Size size)
     171             : {
     172         238 :     shm_mq     *mq = address;
     173         238 :     Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
     174             : 
     175             :     /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
     176         238 :     size = MAXALIGN_DOWN(size);
     177             : 
     178             :     /* Queue size must be large enough to hold some data. */
     179         238 :     Assert(size > data_offset);
     180             : 
     181             :     /* Initialize queue header. */
     182         238 :     SpinLockInit(&mq->mq_mutex);
     183         238 :     mq->mq_receiver = NULL;
     184         238 :     mq->mq_sender = NULL;
     185         238 :     mq->mq_bytes_read = 0;
     186         238 :     mq->mq_bytes_written = 0;
     187         238 :     mq->mq_ring_size = size - data_offset;
     188         238 :     mq->mq_detached = false;
     189         238 :     mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
     190             : 
     191         238 :     return mq;
     192             : }
     193             : 
     194             : /*
     195             :  * Set the identity of the process that will receive from a shared message
     196             :  * queue.
     197             :  */
     198             : void
     199         238 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
     200             : {
     201         238 :     volatile shm_mq *vmq = mq;
     202             :     PGPROC     *sender;
     203             : 
     204         238 :     SpinLockAcquire(&mq->mq_mutex);
     205         238 :     Assert(vmq->mq_receiver == NULL);
     206         238 :     vmq->mq_receiver = proc;
     207         238 :     sender = vmq->mq_sender;
     208         238 :     SpinLockRelease(&mq->mq_mutex);
     209             : 
     210         238 :     if (sender != NULL)
     211           0 :         SetLatch(&sender->procLatch);
     212         238 : }
     213             : 
     214             : /*
     215             :  * Set the identity of the process that will send to a shared message queue.
     216             :  */
     217             : void
     218         230 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
     219             : {
     220         230 :     volatile shm_mq *vmq = mq;
     221             :     PGPROC     *receiver;
     222             : 
     223         230 :     SpinLockAcquire(&mq->mq_mutex);
     224         230 :     Assert(vmq->mq_sender == NULL);
     225         230 :     vmq->mq_sender = proc;
     226         230 :     receiver = vmq->mq_receiver;
     227         230 :     SpinLockRelease(&mq->mq_mutex);
     228             : 
     229         230 :     if (receiver != NULL)
     230         230 :         SetLatch(&receiver->procLatch);
     231         230 : }
     232             : 
     233             : /*
     234             :  * Get the configured receiver.
     235             :  */
     236             : PGPROC *
     237           0 : shm_mq_get_receiver(shm_mq *mq)
     238             : {
     239           0 :     volatile shm_mq *vmq = mq;
     240             :     PGPROC     *receiver;
     241             : 
     242           0 :     SpinLockAcquire(&mq->mq_mutex);
     243           0 :     receiver = vmq->mq_receiver;
     244           0 :     SpinLockRelease(&mq->mq_mutex);
     245             : 
     246           0 :     return receiver;
     247             : }
     248             : 
     249             : /*
     250             :  * Get the configured sender.
     251             :  */
     252             : PGPROC *
     253      434264 : shm_mq_get_sender(shm_mq *mq)
     254             : {
     255      434264 :     volatile shm_mq *vmq = mq;
     256             :     PGPROC     *sender;
     257             : 
     258      434264 :     SpinLockAcquire(&mq->mq_mutex);
     259      434264 :     sender = vmq->mq_sender;
     260      434264 :     SpinLockRelease(&mq->mq_mutex);
     261             : 
     262      434264 :     return sender;
     263             : }
     264             : 
     265             : /*
     266             :  * Attach to a shared message queue so we can send or receive messages.
     267             :  *
     268             :  * The memory context in effect at the time this function is called should
     269             :  * be one which will last for at least as long as the message queue itself.
     270             :  * We'll allocate the handle in that context, and future allocations that
     271             :  * are needed to buffer incoming data will happen in that context as well.
     272             :  *
     273             :  * If seg != NULL, the queue will be automatically detached when that dynamic
     274             :  * shared memory segment is detached.
     275             :  *
     276             :  * If handle != NULL, the queue can be read or written even before the
     277             :  * other process has attached.  We'll wait for it to do so if needed.  The
     278             :  * handle must be for a background worker initialized with bgw_notify_pid
     279             :  * equal to our PID.
     280             :  *
     281             :  * shm_mq_detach() should be called when done.  This will free the
     282             :  * shm_mq_handle and mark the queue itself as detached, so that our
     283             :  * counterpart won't get stuck waiting for us to fill or drain the queue
     284             :  * after we've already lost interest.
     285             :  */
     286             : shm_mq_handle *
     287         468 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
     288             : {
     289         468 :     shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
     290             : 
     291         468 :     Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
     292         468 :     mqh->mqh_queue = mq;
     293         468 :     mqh->mqh_segment = seg;
     294         468 :     mqh->mqh_handle = handle;
     295         468 :     mqh->mqh_buffer = NULL;
     296         468 :     mqh->mqh_buflen = 0;
     297         468 :     mqh->mqh_consume_pending = 0;
     298         468 :     mqh->mqh_partial_bytes = 0;
     299         468 :     mqh->mqh_expected_bytes = 0;
     300         468 :     mqh->mqh_length_word_complete = false;
     301         468 :     mqh->mqh_counterparty_attached = false;
     302         468 :     mqh->mqh_context = CurrentMemoryContext;
     303             : 
     304         468 :     if (seg != NULL)
     305         468 :         on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
     306             : 
     307         468 :     return mqh;
     308             : }
     309             : 
     310             : /*
     311             :  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
     312             :  * been passed to shm_mq_attach.
     313             :  */
     314             : void
     315         230 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
     316             : {
     317         230 :     Assert(mqh->mqh_handle == NULL);
     318         230 :     mqh->mqh_handle = handle;
     319         230 : }
     320             : 
     321             : /*
     322             :  * Write a message into a shared message queue.
     323             :  */
     324             : shm_mq_result
     325          65 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
     326             : {
     327             :     shm_mq_iovec iov;
     328             : 
     329          65 :     iov.data = data;
     330          65 :     iov.len = nbytes;
     331             : 
     332          65 :     return shm_mq_sendv(mqh, &iov, 1, nowait);
     333             : }
     334             : 
     335             : /*
     336             :  * Write a message into a shared message queue, gathered from multiple
     337             :  * addresses.
     338             :  *
     339             :  * When nowait = false, we'll wait on our process latch when the ring buffer
     340             :  * fills up, and then continue writing once the receiver has drained some data.
     341             :  * The process latch is reset after each wait.
     342             :  *
     343             :  * When nowait = true, we do not manipulate the state of the process latch;
     344             :  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
     345             :  * this case, the caller should call this function again, with the same
     346             :  * arguments, each time the process latch is set.  (Once begun, the sending
     347             :  * of a message cannot be aborted except by detaching from the queue; changing
     348             :  * the length or payload will corrupt the queue.)
     349             :  */
     350             : shm_mq_result
     351         295 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
     352             : {
     353             :     shm_mq_result res;
     354         295 :     shm_mq     *mq = mqh->mqh_queue;
     355         295 :     Size        nbytes = 0;
     356             :     Size        bytes_written;
     357             :     int         i;
     358         295 :     int         which_iov = 0;
     359             :     Size        offset;
     360             : 
     361         295 :     Assert(mq->mq_sender == MyProc);
     362             : 
     363             :     /* Compute total size of write. */
     364         820 :     for (i = 0; i < iovcnt; ++i)
     365         525 :         nbytes += iov[i].len;
     366             : 
     367             :     /* Try to write, or finish writing, the length word into the buffer. */
     368         885 :     while (!mqh->mqh_length_word_complete)
     369             :     {
     370         295 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     371         590 :         res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     372         295 :                                 ((char *) &nbytes) + mqh->mqh_partial_bytes,
     373             :                                 nowait, &bytes_written);
     374             : 
     375         295 :         if (res == SHM_MQ_DETACHED)
     376             :         {
     377             :             /* Reset state in case caller tries to send another message. */
     378           0 :             mqh->mqh_partial_bytes = 0;
     379           0 :             mqh->mqh_length_word_complete = false;
     380           0 :             return res;
     381             :         }
     382         295 :         mqh->mqh_partial_bytes += bytes_written;
     383             : 
     384         295 :         if (mqh->mqh_partial_bytes >= sizeof(Size))
     385             :         {
     386         295 :             Assert(mqh->mqh_partial_bytes == sizeof(Size));
     387             : 
     388         295 :             mqh->mqh_partial_bytes = 0;
     389         295 :             mqh->mqh_length_word_complete = true;
     390             :         }
     391             : 
     392         295 :         if (res != SHM_MQ_SUCCESS)
     393           0 :             return res;
     394             : 
     395             :         /* Length word can't be split unless bigger than required alignment. */
     396         295 :         Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
     397             :     }
     398             : 
     399             :     /* Write the actual data bytes into the buffer. */
     400         295 :     Assert(mqh->mqh_partial_bytes <= nbytes);
     401         295 :     offset = mqh->mqh_partial_bytes;
     402             :     do
     403             :     {
     404             :         Size        chunksize;
     405             : 
     406             :         /* Figure out which bytes need to be sent next. */
     407         411 :         if (offset >= iov[which_iov].len)
     408             :         {
     409           0 :             offset -= iov[which_iov].len;
     410           0 :             ++which_iov;
     411           0 :             if (which_iov >= iovcnt)
     412           0 :                 break;
     413           0 :             continue;
     414             :         }
     415             : 
     416             :         /*
     417             :          * We want to avoid copying the data if at all possible, but every
     418             :          * chunk of bytes we write into the queue has to be MAXALIGN'd, except
     419             :          * the last.  Thus, if a chunk other than the last one ends on a
     420             :          * non-MAXALIGN'd boundary, we have to combine the tail end of its
     421             :          * data with data from one or more following chunks until we either
     422             :          * reach the last chunk or accumulate a number of bytes which is
     423             :          * MAXALIGN'd.
     424             :          */
     425         641 :         if (which_iov + 1 < iovcnt &&
     426         230 :             offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
     427             :         {
     428             :             char        tmpbuf[MAXIMUM_ALIGNOF];
     429         230 :             int         j = 0;
     430             : 
     431             :             for (;;)
     432             :             {
     433         922 :                 if (offset < iov[which_iov].len)
     434             :                 {
     435         578 :                     tmpbuf[j] = iov[which_iov].data[offset];
     436         578 :                     j++;
     437         578 :                     offset++;
     438         578 :                     if (j == MAXIMUM_ALIGNOF)
     439         116 :                         break;
     440             :                 }
     441             :                 else
     442             :                 {
     443         344 :                     offset -= iov[which_iov].len;
     444         344 :                     which_iov++;
     445         344 :                     if (which_iov >= iovcnt)
     446         114 :                         break;
     447             :                 }
     448         692 :             }
     449             : 
     450         230 :             res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
     451             : 
     452         230 :             if (res == SHM_MQ_DETACHED)
     453             :             {
     454             :                 /* Reset state in case caller tries to send another message. */
     455           0 :                 mqh->mqh_partial_bytes = 0;
     456           0 :                 mqh->mqh_length_word_complete = false;
     457           0 :                 return res;
     458             :             }
     459             : 
     460         230 :             mqh->mqh_partial_bytes += bytes_written;
     461         230 :             if (res != SHM_MQ_SUCCESS)
     462           0 :                 return res;
     463         230 :             continue;
     464             :         }
     465             : 
     466             :         /*
     467             :          * If this is the last chunk, we can write all the data, even if it
     468             :          * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
     469             :          * MAXALIGN_DOWN the write size.
     470             :          */
     471         181 :         chunksize = iov[which_iov].len - offset;
     472         181 :         if (which_iov + 1 < iovcnt)
     473           0 :             chunksize = MAXALIGN_DOWN(chunksize);
     474         181 :         res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
     475             :                                 nowait, &bytes_written);
     476             : 
     477         181 :         if (res == SHM_MQ_DETACHED)
     478             :         {
     479             :             /* Reset state in case caller tries to send another message. */
     480           0 :             mqh->mqh_length_word_complete = false;
     481           0 :             mqh->mqh_partial_bytes = 0;
     482           0 :             return res;
     483             :         }
     484             : 
     485         181 :         mqh->mqh_partial_bytes += bytes_written;
     486         181 :         offset += bytes_written;
     487         181 :         if (res != SHM_MQ_SUCCESS)
     488           0 :             return res;
     489         411 :     } while (mqh->mqh_partial_bytes < nbytes);
     490             : 
     491             :     /* Reset for next message. */
     492         295 :     mqh->mqh_partial_bytes = 0;
     493         295 :     mqh->mqh_length_word_complete = false;
     494             : 
     495             :     /* Notify receiver of the newly-written data, and return. */
     496         295 :     return shm_mq_notify_receiver(mq);
     497             : }
     498             : 
     499             : /*
     500             :  * Receive a message from a shared message queue.
     501             :  *
     502             :  * We set *nbytes to the message length and *data to point to the message
     503             :  * payload.  If the entire message exists in the queue as a single,
     504             :  * contiguous chunk, *data will point directly into shared memory; otherwise,
     505             :  * it will point to a temporary buffer.  This mostly avoids data copying in
     506             :  * the hoped-for case where messages are short compared to the buffer size,
     507             :  * while still allowing longer messages.  In either case, the return value
     508             :  * remains valid until the next receive operation is performed on the queue.
     509             :  *
     510             :  * When nowait = false, we'll wait on our process latch when the ring buffer
     511             :  * is empty and we have not yet received a full message.  The sender will
     512             :  * set our process latch after more data has been written, and we'll resume
     513             :  * processing.  Each call will therefore return a complete message
     514             :  * (unless the sender detaches the queue).
     515             :  *
     516             :  * When nowait = true, we do not manipulate the state of the process latch;
     517             :  * instead, whenever the buffer is empty and we need to read from it, we
     518             :  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
     519             :  * function again after the process latch has been set.
     520             :  */
     521             : shm_mq_result
     522      434691 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
     523             : {
     524      434691 :     shm_mq     *mq = mqh->mqh_queue;
     525             :     shm_mq_result res;
     526      434691 :     Size        rb = 0;
     527             :     Size        nbytes;
     528             :     void       *rawdata;
     529             : 
     530      434691 :     Assert(mq->mq_receiver == MyProc);
     531             : 
     532             :     /* We can't receive data until the sender has attached. */
     533      434691 :     if (!mqh->mqh_counterparty_attached)
     534             :     {
     535      434269 :         if (nowait)
     536             :         {
     537             :             int         counterparty_gone;
     538             : 
     539             :             /*
     540             :              * We shouldn't return at this point at all unless the sender
     541             :              * hasn't attached yet.  However, the correct return value depends
     542             :              * on whether the sender is still attached.  If we first test
     543             :              * whether the sender has ever attached and then test whether the
     544             :              * sender has detached, there's a race condition: a sender that
     545             :              * attaches and detaches very quickly might fool us into thinking
     546             :              * the sender never attached at all.  So, test whether our
     547             :              * counterparty is definitively gone first, and only afterwards
     548             :              * check whether the sender ever attached in the first place.
     549             :              */
     550      434249 :             counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
     551      434249 :             if (shm_mq_get_sender(mq) == NULL)
     552             :             {
     553      434039 :                 if (counterparty_gone)
     554           0 :                     return SHM_MQ_DETACHED;
     555             :                 else
     556      434039 :                     return SHM_MQ_WOULD_BLOCK;
     557             :             }
     558             :         }
     559          20 :         else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
     560          15 :                  && shm_mq_get_sender(mq) == NULL)
     561             :         {
     562           0 :             mq->mq_detached = true;
     563           0 :             return SHM_MQ_DETACHED;
     564             :         }
     565         230 :         mqh->mqh_counterparty_attached = true;
     566             :     }
     567             : 
     568             :     /* Consume any zero-copy data from previous receive operation. */
     569         652 :     if (mqh->mqh_consume_pending > 0)
     570             :     {
     571         180 :         shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
     572         180 :         mqh->mqh_consume_pending = 0;
     573             :     }
     574             : 
     575             :     /* Try to read, or finish reading, the length word from the buffer. */
     576        1304 :     while (!mqh->mqh_length_word_complete)
     577             :     {
     578             :         /* Try to receive the message length word. */
     579         652 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     580         652 :         res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
     581             :                                    nowait, &rb, &rawdata);
     582         652 :         if (res != SHM_MQ_SUCCESS)
     583         357 :             return res;
     584             : 
     585             :         /*
     586             :          * Hopefully, we'll receive the entire message length word at once.
     587             :          * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
     588             :          * multiple reads.
     589             :          */
     590         295 :         if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
     591           0 :         {
     592             :             Size        needed;
     593             : 
     594         295 :             nbytes = *(Size *) rawdata;
     595             : 
     596             :             /* If we've already got the whole message, we're done. */
     597         295 :             needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
     598         295 :             if (rb >= needed)
     599             :             {
     600             :                 /*
     601             :                  * Technically, we could consume the message length
     602             :                  * information at this point, but the extra write to shared
     603             :                  * memory wouldn't be free and in most cases we would reap no
     604             :                  * benefit.
     605             :                  */
     606         295 :                 mqh->mqh_consume_pending = needed;
     607         295 :                 *nbytesp = nbytes;
     608         295 :                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
     609         295 :                 return SHM_MQ_SUCCESS;
     610             :             }
     611             : 
     612             :             /*
     613             :              * We don't have the whole message, but we at least have the whole
     614             :              * length word.
     615             :              */
     616           0 :             mqh->mqh_expected_bytes = nbytes;
     617           0 :             mqh->mqh_length_word_complete = true;
     618           0 :             shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
     619           0 :             rb -= MAXALIGN(sizeof(Size));
     620             :         }
     621             :         else
     622             :         {
     623             :             Size        lengthbytes;
     624             : 
     625             :             /* Can't be split unless bigger than required alignment. */
     626           0 :             Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
     627             : 
     628             :             /* Message word is split; need buffer to reassemble. */
     629             :             if (mqh->mqh_buffer == NULL)
     630             :             {
     631             :                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
     632             :                                                      MQH_INITIAL_BUFSIZE);
     633             :                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
     634             :             }
     635             :             Assert(mqh->mqh_buflen >= sizeof(Size));
     636             : 
     637             :             /* Copy and consume partial length word. */
     638             :             if (mqh->mqh_partial_bytes + rb > sizeof(Size))
     639             :                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
     640             :             else
     641             :                 lengthbytes = rb;
     642             :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
     643             :                    lengthbytes);
     644             :             mqh->mqh_partial_bytes += lengthbytes;
     645             :             shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
     646             :             rb -= lengthbytes;
     647             : 
     648             :             /* If we now have the whole word, we're ready to read payload. */
     649             :             if (mqh->mqh_partial_bytes >= sizeof(Size))
     650             :             {
     651             :                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
     652             :                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
     653             :                 mqh->mqh_length_word_complete = true;
     654             :                 mqh->mqh_partial_bytes = 0;
     655             :             }
     656             :         }
     657             :     }
     658           0 :     nbytes = mqh->mqh_expected_bytes;
     659             : 
     660           0 :     if (mqh->mqh_partial_bytes == 0)
     661             :     {
     662             :         /*
     663             :          * Try to obtain the whole message in a single chunk.  If this works,
     664             :          * we need not copy the data and can return a pointer directly into
     665             :          * shared memory.
     666             :          */
     667           0 :         res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
     668           0 :         if (res != SHM_MQ_SUCCESS)
     669           0 :             return res;
     670           0 :         if (rb >= nbytes)
     671             :         {
     672           0 :             mqh->mqh_length_word_complete = false;
     673           0 :             mqh->mqh_consume_pending = MAXALIGN(nbytes);
     674           0 :             *nbytesp = nbytes;
     675           0 :             *datap = rawdata;
     676           0 :             return SHM_MQ_SUCCESS;
     677             :         }
     678             : 
     679             :         /*
     680             :          * The message has wrapped the buffer.  We'll need to copy it in order
     681             :          * to return it to the client in one chunk.  First, make sure we have
     682             :          * a large enough buffer available.
     683             :          */
     684           0 :         if (mqh->mqh_buflen < nbytes)
     685             :         {
     686           0 :             Size        newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
     687             : 
     688           0 :             while (newbuflen < nbytes)
     689           0 :                 newbuflen *= 2;
     690             : 
     691           0 :             if (mqh->mqh_buffer != NULL)
     692             :             {
     693           0 :                 pfree(mqh->mqh_buffer);
     694           0 :                 mqh->mqh_buffer = NULL;
     695           0 :                 mqh->mqh_buflen = 0;
     696             :             }
     697           0 :             mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
     698           0 :             mqh->mqh_buflen = newbuflen;
     699             :         }
     700             :     }
     701             : 
     702             :     /* Loop until we've copied the entire message. */
     703             :     for (;;)
     704             :     {
     705             :         Size        still_needed;
     706             : 
     707             :         /* Copy as much as we can. */
     708           0 :         Assert(mqh->mqh_partial_bytes + rb <= nbytes);
     709           0 :         memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
     710           0 :         mqh->mqh_partial_bytes += rb;
     711             : 
     712             :         /*
     713             :          * Update count of bytes read, with alignment padding.  Note that this
     714             :          * will never actually insert any padding except at the end of a
     715             :          * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
     716             :          * and each read and write is as well.
     717             :          */
     718           0 :         Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
     719           0 :         shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
     720             : 
     721             :         /* If we got all the data, exit the loop. */
     722           0 :         if (mqh->mqh_partial_bytes >= nbytes)
     723           0 :             break;
     724             : 
     725             :         /* Wait for some more data. */
     726           0 :         still_needed = nbytes - mqh->mqh_partial_bytes;
     727           0 :         res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
     728           0 :         if (res != SHM_MQ_SUCCESS)
     729           0 :             return res;
     730           0 :         if (rb > still_needed)
     731           0 :             rb = still_needed;
     732           0 :     }
     733             : 
     734             :     /* Return the complete message, and reset for next message. */
     735           0 :     *nbytesp = nbytes;
     736           0 :     *datap = mqh->mqh_buffer;
     737           0 :     mqh->mqh_length_word_complete = false;
     738           0 :     mqh->mqh_partial_bytes = 0;
     739           0 :     return SHM_MQ_SUCCESS;
     740             : }
     741             : 
     742             : /*
     743             :  * Wait for the other process that's supposed to use this queue to attach
     744             :  * to it.
     745             :  *
     746             :  * The return value is SHM_MQ_DETACHED if the worker has already detached or
     747             :  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
     748             :  * Note that we will only be able to detect that the worker has died before
     749             :  * attaching if a background worker handle was passed to shm_mq_attach().
     750             :  */
     751             : shm_mq_result
     752           0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
     753             : {
     754           0 :     shm_mq     *mq = mqh->mqh_queue;
     755             :     PGPROC    **victim;
     756             : 
     757           0 :     if (shm_mq_get_receiver(mq) == MyProc)
     758           0 :         victim = &mq->mq_sender;
     759             :     else
     760             :     {
     761           0 :         Assert(shm_mq_get_sender(mq) == MyProc);
     762           0 :         victim = &mq->mq_receiver;
     763             :     }
     764             : 
     765           0 :     if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
     766           0 :         return SHM_MQ_SUCCESS;
     767             :     else
     768           0 :         return SHM_MQ_DETACHED;
     769             : }
     770             : 
     771             : /*
     772             :  * Detach from a shared message queue, and destroy the shm_mq_handle.
     773             :  */
     774             : void
     775         347 : shm_mq_detach(shm_mq_handle *mqh)
     776             : {
     777             :     /* Notify counterparty that we're outta here. */
     778         347 :     shm_mq_detach_internal(mqh->mqh_queue);
     779             : 
     780             :     /* Cancel on_dsm_detach callback, if any. */
     781         347 :     if (mqh->mqh_segment)
     782         347 :         cancel_on_dsm_detach(mqh->mqh_segment,
     783             :                              shm_mq_detach_callback,
     784         347 :                              PointerGetDatum(mqh->mqh_queue));
     785             : 
     786             :     /* Release local memory associated with handle. */
     787         347 :     if (mqh->mqh_buffer != NULL)
     788           0 :         pfree(mqh->mqh_buffer);
     789         347 :     pfree(mqh);
     790         347 : }
     791             : 
     792             : /*
     793             :  * Notify counterparty that we're detaching from shared message queue.
     794             :  *
     795             :  * The purpose of this function is to make sure that the process
     796             :  * with which we're communicating doesn't block forever waiting for us to
     797             :  * fill or drain the queue once we've lost interest.  When the sender
     798             :  * detaches, the receiver can read any messages remaining in the queue;
     799             :  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
     800             :  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
     801             :  *
     802             :  * This is separated out from shm_mq_detach() because if the on_dsm_detach
     803             :  * callback fires, we only want to do this much.  We do not try to touch
     804             :  * the local shm_mq_handle, as it may have been pfree'd already.
     805             :  */
     806             : static void
     807         468 : shm_mq_detach_internal(shm_mq *mq)
     808             : {
     809         468 :     volatile shm_mq *vmq = mq;
     810             :     PGPROC     *victim;
     811             : 
     812         468 :     SpinLockAcquire(&mq->mq_mutex);
     813         468 :     if (vmq->mq_sender == MyProc)
     814         230 :         victim = vmq->mq_receiver;
     815             :     else
     816             :     {
     817         238 :         Assert(vmq->mq_receiver == MyProc);
     818         238 :         victim = vmq->mq_sender;
     819             :     }
     820         468 :     vmq->mq_detached = true;
     821         468 :     SpinLockRelease(&mq->mq_mutex);
     822             : 
     823         468 :     if (victim != NULL)
     824         460 :         SetLatch(&victim->procLatch);
     825         468 : }
     826             : 
     827             : /*
     828             :  * Get the shm_mq from handle.
     829             :  */
     830             : shm_mq *
     831           0 : shm_mq_get_queue(shm_mq_handle *mqh)
     832             : {
     833           0 :     return mqh->mqh_queue;
     834             : }
     835             : 
     836             : /*
     837             :  * Write bytes into a shared message queue.
     838             :  */
     839             : static shm_mq_result
     840         706 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
     841             :                   bool nowait, Size *bytes_written)
     842             : {
     843         706 :     shm_mq     *mq = mqh->mqh_queue;
     844         706 :     Size        sent = 0;
     845             :     uint64      used;
     846         706 :     Size        ringsize = mq->mq_ring_size;
     847             :     Size        available;
     848             : 
     849        2118 :     while (sent < nbytes)
     850             :     {
     851             :         bool        detached;
     852             :         uint64      rb;
     853             : 
     854             :         /* Compute number of ring buffer bytes used and available. */
     855         706 :         rb = shm_mq_get_bytes_read(mq, &detached);
     856         706 :         Assert(mq->mq_bytes_written >= rb);
     857         706 :         used = mq->mq_bytes_written - rb;
     858         706 :         Assert(used <= ringsize);
     859         706 :         available = Min(ringsize - used, nbytes - sent);
     860             : 
     861             :         /* Bail out if the queue has been detached. */
     862         706 :         if (detached)
     863             :         {
     864           0 :             *bytes_written = sent;
     865           0 :             return SHM_MQ_DETACHED;
     866             :         }
     867             : 
     868         706 :         if (available == 0 && !mqh->mqh_counterparty_attached)
     869             :         {
     870             :             /*
     871             :              * The queue is full, so if the receiver isn't yet known to be
     872             :              * attached, we must wait for that to happen.
     873             :              */
     874           0 :             if (nowait)
     875             :             {
     876           0 :                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
     877             :                 {
     878           0 :                     *bytes_written = sent;
     879           0 :                     return SHM_MQ_DETACHED;
     880             :                 }
     881           0 :                 if (shm_mq_get_receiver(mq) == NULL)
     882             :                 {
     883           0 :                     *bytes_written = sent;
     884           0 :                     return SHM_MQ_WOULD_BLOCK;
     885             :                 }
     886             :             }
     887           0 :             else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
     888             :                                            mqh->mqh_handle))
     889             :             {
     890           0 :                 mq->mq_detached = true;
     891           0 :                 *bytes_written = sent;
     892           0 :                 return SHM_MQ_DETACHED;
     893             :             }
     894           0 :             mqh->mqh_counterparty_attached = true;
     895             : 
     896             :             /*
     897             :              * The receiver may have read some data after attaching, so we
     898             :              * must not wait without rechecking the queue state.
     899             :              */
     900             :         }
     901         706 :         else if (available == 0)
     902             :         {
     903             :             shm_mq_result res;
     904             : 
     905             :             /* Let the receiver know that we need them to read some data. */
     906           0 :             res = shm_mq_notify_receiver(mq);
     907           0 :             if (res != SHM_MQ_SUCCESS)
     908             :             {
     909           0 :                 *bytes_written = sent;
     910           0 :                 return res;
     911             :             }
     912             : 
     913             :             /* Skip manipulation of our latch if nowait = true. */
     914           0 :             if (nowait)
     915             :             {
     916           0 :                 *bytes_written = sent;
     917           0 :                 return SHM_MQ_WOULD_BLOCK;
     918             :             }
     919             : 
     920             :             /*
     921             :              * Wait for our latch to be set.  It might already be set for some
     922             :              * unrelated reason, but that'll just result in one extra trip
     923             :              * through the loop.  It's worth it to avoid resetting the latch
     924             :              * at top of loop, because setting an already-set latch is much
     925             :              * cheaper than setting one that has been reset.
     926             :              */
     927           0 :             WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
     928             : 
     929             :             /* Reset the latch so we don't spin. */
     930           0 :             ResetLatch(MyLatch);
     931             : 
     932             :             /* An interrupt may have occurred while we were waiting. */
     933           0 :             CHECK_FOR_INTERRUPTS();
     934             :         }
     935             :         else
     936             :         {
     937         706 :             Size        offset = mq->mq_bytes_written % (uint64) ringsize;
     938         706 :             Size        sendnow = Min(available, ringsize - offset);
     939             : 
     940             :             /* Write as much data as we can via a single memcpy(). */
     941         706 :             memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
     942         706 :                    (char *) data + sent, sendnow);
     943         706 :             sent += sendnow;
     944             : 
     945             :             /*
     946             :              * Update count of bytes written, with alignment padding.  Note
     947             :              * that this will never actually insert any padding except at the
     948             :              * end of a run of bytes, because the buffer size is a multiple of
     949             :              * MAXIMUM_ALIGNOF, and each read is as well.
     950             :              */
     951         706 :             Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
     952         706 :             shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
     953             : 
     954             :             /*
     955             :              * For efficiency, we don't set the reader's latch here.  We'll do
     956             :              * that only when the buffer fills up or after writing an entire
     957             :              * message.
     958             :              */
     959             :         }
     960             :     }
     961             : 
     962         706 :     *bytes_written = sent;
     963         706 :     return SHM_MQ_SUCCESS;
     964             : }
     965             : 
     966             : /*
     967             :  * Wait until at least *nbytesp bytes are available to be read from the
     968             :  * shared message queue, or until the buffer wraps around.  If the queue is
     969             :  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
     970             :  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
     971             :  * to the location at which data bytes can be read, *nbytesp is set to the
     972             :  * number of bytes which can be read at that address, and the return value
     973             :  * is SHM_MQ_SUCCESS.
     974             :  */
     975             : static shm_mq_result
     976         652 : shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
     977             :                      Size *nbytesp, void **datap)
     978             : {
     979         652 :     Size        ringsize = mq->mq_ring_size;
     980             :     uint64      used;
     981             :     uint64      written;
     982             : 
     983             :     for (;;)
     984             :     {
     985             :         Size        offset;
     986             :         bool        detached;
     987             : 
     988             :         /* Get bytes written, so we can compute what's available to read. */
     989         657 :         written = shm_mq_get_bytes_written(mq, &detached);
     990         657 :         used = written - mq->mq_bytes_read;
     991         657 :         Assert(used <= ringsize);
     992         657 :         offset = mq->mq_bytes_read % (uint64) ringsize;
     993             : 
     994             :         /* If we have enough data or buffer has wrapped, we're done. */
     995         657 :         if (used >= bytes_needed || offset + used >= ringsize)
     996             :         {
     997         295 :             *nbytesp = Min(used, ringsize - offset);
     998         295 :             *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
     999         947 :             return SHM_MQ_SUCCESS;
    1000             :         }
    1001             : 
    1002             :         /*
    1003             :          * Fall out before waiting if the queue has been detached.
    1004             :          *
    1005             :          * Note that we don't check for this until *after* considering whether
    1006             :          * the data already available is enough, since the receiver can finish
    1007             :          * receiving a message stored in the buffer even after the sender has
    1008             :          * detached.
    1009             :          */
    1010         362 :         if (detached)
    1011         114 :             return SHM_MQ_DETACHED;
    1012             : 
    1013             :         /* Skip manipulation of our latch if nowait = true. */
    1014         248 :         if (nowait)
    1015         243 :             return SHM_MQ_WOULD_BLOCK;
    1016             : 
    1017             :         /*
    1018             :          * Wait for our latch to be set.  It might already be set for some
    1019             :          * unrelated reason, but that'll just result in one extra trip through
    1020             :          * the loop.  It's worth it to avoid resetting the latch at top of
    1021             :          * loop, because setting an already-set latch is much cheaper than
    1022             :          * setting one that has been reset.
    1023             :          */
    1024           5 :         WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
    1025             : 
    1026             :         /* Reset the latch so we don't spin. */
    1027           5 :         ResetLatch(MyLatch);
    1028             : 
    1029             :         /* An interrupt may have occurred while we were waiting. */
    1030           5 :         CHECK_FOR_INTERRUPTS();
    1031           5 :     }
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Test whether a counterparty who may not even be alive yet is definitely gone.
    1036             :  */
    1037             : static bool
    1038      434249 : shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
    1039             : {
    1040             :     bool        detached;
    1041             :     pid_t       pid;
    1042             : 
    1043             :     /* Acquire the lock just long enough to check the pointer. */
    1044      434249 :     SpinLockAcquire(&mq->mq_mutex);
    1045      434249 :     detached = mq->mq_detached;
    1046      434249 :     SpinLockRelease(&mq->mq_mutex);
    1047             : 
    1048             :     /* If the queue has been detached, counterparty is definitely gone. */
    1049      434249 :     if (detached)
    1050           2 :         return true;
    1051             : 
    1052             :     /* If there's a handle, check worker status. */
    1053      434247 :     if (handle != NULL)
    1054             :     {
    1055             :         BgwHandleStatus status;
    1056             : 
    1057             :         /* Check for unexpected worker death. */
    1058      434247 :         status = GetBackgroundWorkerPid(handle, &pid);
    1059      434247 :         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1060             :         {
    1061             :             /* Mark it detached, just to make it official. */
    1062           0 :             SpinLockAcquire(&mq->mq_mutex);
    1063           0 :             mq->mq_detached = true;
    1064           0 :             SpinLockRelease(&mq->mq_mutex);
    1065           0 :             return true;
    1066             :         }
    1067             :     }
    1068             : 
    1069             :     /* Counterparty is not definitively gone. */
    1070      434247 :     return false;
    1071             : }
    1072             : 
    1073             : /*
    1074             :  * This is used when a process is waiting for its counterpart to attach to the
    1075             :  * queue.  We exit when the other process attaches as expected, or, if
    1076             :  * handle != NULL, when the referenced background process or the postmaster
    1077             :  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
    1078             :  * potentially get stuck here forever waiting for a process that may never
    1079             :  * start.  We do check for interrupts, though.
    1080             :  *
    1081             :  * ptr is a pointer to the memory address that we're expecting to become
    1082             :  * non-NULL when our counterpart attaches to the queue.
    1083             :  */
    1084             : static bool
    1085          20 : shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
    1086             :                      BackgroundWorkerHandle *handle)
    1087             : {
    1088          20 :     bool        result = false;
    1089             : 
    1090             :     for (;;)
    1091             :     {
    1092             :         BgwHandleStatus status;
    1093             :         pid_t       pid;
    1094             :         bool        detached;
    1095             : 
    1096             :         /* Acquire the lock just long enough to check the pointer. */
    1097         150 :         SpinLockAcquire(&mq->mq_mutex);
    1098         150 :         detached = mq->mq_detached;
    1099         150 :         result = (*ptr != NULL);
    1100         150 :         SpinLockRelease(&mq->mq_mutex);
    1101             : 
    1102             :         /* Fail if detached; else succeed if initialized. */
    1103         150 :         if (detached)
    1104             :         {
    1105          15 :             result = false;
    1106          35 :             break;
    1107             :         }
    1108         135 :         if (result)
    1109           5 :             break;
    1110             : 
    1111         130 :         if (handle != NULL)
    1112             :         {
    1113             :             /* Check for unexpected worker death. */
    1114         130 :             status = GetBackgroundWorkerPid(handle, &pid);
    1115         130 :             if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1116             :             {
    1117           0 :                 result = false;
    1118           0 :                 break;
    1119             :             }
    1120             :         }
    1121             : 
    1122             :         /* Wait to be signalled. */
    1123         130 :         WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
    1124             : 
    1125             :         /* Reset the latch so we don't spin. */
    1126         130 :         ResetLatch(MyLatch);
    1127             : 
    1128             :         /* An interrupt may have occurred while we were waiting. */
    1129         130 :         CHECK_FOR_INTERRUPTS();
    1130         130 :     }
    1131             : 
    1132          20 :     return result;
    1133             : }
    1134             : 
    1135             : /*
    1136             :  * Get the number of bytes read.  The receiver need not use this to access
    1137             :  * the count of bytes read, but the sender must.
    1138             :  */
    1139             : static uint64
    1140         706 : shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
    1141             : {
    1142             :     uint64      v;
    1143             : 
    1144         706 :     SpinLockAcquire(&mq->mq_mutex);
    1145         706 :     v = mq->mq_bytes_read;
    1146         706 :     *detached = mq->mq_detached;
    1147         706 :     SpinLockRelease(&mq->mq_mutex);
    1148             : 
    1149         706 :     return v;
    1150             : }
    1151             : 
    1152             : /*
    1153             :  * Increment the number of bytes read.
    1154             :  */
    1155             : static void
    1156         180 : shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
    1157             : {
    1158             :     PGPROC     *sender;
    1159             : 
    1160         180 :     SpinLockAcquire(&mq->mq_mutex);
    1161         180 :     mq->mq_bytes_read += n;
    1162         180 :     sender = mq->mq_sender;
    1163         180 :     SpinLockRelease(&mq->mq_mutex);
    1164             : 
    1165             :     /* We shouldn't have any bytes to read without a sender. */
    1166         180 :     Assert(sender != NULL);
    1167         180 :     SetLatch(&sender->procLatch);
    1168         180 : }
    1169             : 
    1170             : /*
    1171             :  * Get the number of bytes written.  The sender need not use this to access
    1172             :  * the count of bytes written, but the receiver must.
    1173             :  */
    1174             : static uint64
    1175         657 : shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
    1176             : {
    1177             :     uint64      v;
    1178             : 
    1179         657 :     SpinLockAcquire(&mq->mq_mutex);
    1180         657 :     v = mq->mq_bytes_written;
    1181         657 :     *detached = mq->mq_detached;
    1182         657 :     SpinLockRelease(&mq->mq_mutex);
    1183             : 
    1184         657 :     return v;
    1185             : }
    1186             : 
    1187             : /*
    1188             :  * Increment the number of bytes written.
    1189             :  */
    1190             : static void
    1191         706 : shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
    1192             : {
    1193         706 :     SpinLockAcquire(&mq->mq_mutex);
    1194         706 :     mq->mq_bytes_written += n;
    1195         706 :     SpinLockRelease(&mq->mq_mutex);
    1196         706 : }
    1197             : 
    1198             : /*
    1199             :  * Set receiver's latch, unless queue is detached.
    1200             :  */
    1201             : static shm_mq_result
    1202         295 : shm_mq_notify_receiver(volatile shm_mq *mq)
    1203             : {
    1204             :     PGPROC     *receiver;
    1205             :     bool        detached;
    1206             : 
    1207         295 :     SpinLockAcquire(&mq->mq_mutex);
    1208         295 :     detached = mq->mq_detached;
    1209         295 :     receiver = mq->mq_receiver;
    1210         295 :     SpinLockRelease(&mq->mq_mutex);
    1211             : 
    1212         295 :     if (detached)
    1213           0 :         return SHM_MQ_DETACHED;
    1214         295 :     if (receiver)
    1215         295 :         SetLatch(&receiver->procLatch);
    1216         295 :     return SHM_MQ_SUCCESS;
    1217             : }
    1218             : 
    1219             : /* Shim for on_dsm_callback. */
    1220             : static void
    1221         121 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
    1222             : {
    1223         121 :     shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
    1224             : 
    1225         121 :     shm_mq_detach_internal(mq);
    1226         121 : }

Generated by: LCOV version 1.11