Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * shm_mq.c
4 : * single-reader, single-writer shared memory message queue
5 : *
6 : * Both the sender and the receiver must have a PGPROC; their respective
7 : * process latches are used for synchronization. Only the sender may send,
8 : * and only the receiver may receive. This is intended to allow a user
9 : * backend to communicate with worker backends that it has registered.
10 : *
11 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : * src/include/storage/shm_mq.h
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "miscadmin.h"
22 : #include "pgstat.h"
23 : #include "postmaster/bgworker.h"
24 : #include "storage/procsignal.h"
25 : #include "storage/shm_mq.h"
26 : #include "storage/spin.h"
27 :
28 : /*
29 : * This structure represents the actual queue, stored in shared memory.
30 : *
31 : * Some notes on synchronization:
32 : *
33 : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34 : * mq_sender and mq_bytes_written can only be changed by the sender. However,
35 : * because most of these fields are 8 bytes and we don't assume that 8 byte
36 : * reads and writes are atomic, the spinlock must be taken whenever the field
37 : * is updated, and whenever it is read by a process other than the one allowed
38 : * to modify it. But the process that is allowed to modify it is also allowed
39 : * to read it without the lock. On architectures where 8-byte writes are
40 : * atomic, we could replace these spinlocks with memory barriers, but
41 : * testing found no performance benefit, so it seems best to keep things
42 : * simple for now.
43 : *
44 : * mq_detached can be set by either the sender or the receiver, so the mutex
45 : * must be held to read or write it. Memory barriers could be used here as
46 : * well, if needed.
47 : *
48 : * mq_ring_size and mq_ring_offset never change after initialization, and
49 : * can therefore be read without the lock.
50 : *
51 : * Importantly, mq_ring can be safely read and written without a lock. Were
52 : * this not the case, we'd have to hold the spinlock for much longer
53 : * intervals, and performance might suffer. Fortunately, that's not
54 : * necessary. At any given time, the difference between mq_bytes_read and
55 : * mq_bytes_written defines the number of bytes within mq_ring that contain
56 : * unread data, and mq_bytes_read defines the position where those bytes
57 : * begin. The sender can increase the number of unread bytes at any time,
58 : * but only the receiver can give license to overwrite those bytes, by
59 : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
60 : * the unread bytes it knows to be present without the lock. Conversely,
61 : * the sender can write to the unused portion of the ring buffer without
62 : * the lock, because nobody else can be reading or writing those bytes. The
63 : * receiver could be making more bytes unused by incrementing mq_bytes_read,
64 : * but that's OK. Note that it would be unsafe for the receiver to read any
65 : * data it's already marked as read, or to write any data; and it would be
66 : * unsafe for the sender to reread any data after incrementing
67 : * mq_bytes_written, but fortunately there's no need for any of that.
68 : */
69 : struct shm_mq
70 : {
71 : slock_t mq_mutex;
72 : PGPROC *mq_receiver;
73 : PGPROC *mq_sender;
74 : uint64 mq_bytes_read;
75 : uint64 mq_bytes_written;
76 : Size mq_ring_size;
77 : bool mq_detached;
78 : uint8 mq_ring_offset;
79 : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
80 : };
81 :
82 : /*
83 : * This structure is a backend-private handle for access to a queue.
84 : *
85 : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
86 : * an optional pointer to the dynamic shared memory segment that contains it.
87 : * (If mqh_segment is provided, we register an on_dsm_detach callback to
88 : * make sure we detach from the queue before detaching from DSM.)
89 : *
90 : * If this queue is intended to connect the current process with a background
91 : * worker that started it, the user can pass a pointer to the worker handle
92 : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
93 : * is to allow us to begin sending to or receiving from that queue before the
94 : * process we'll be communicating with has even been started. If it fails
95 : * to start, the handle will allow us to notice that and fail cleanly, rather
96 : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
97 : * simple cases - e.g. where there are just 2 processes communicating; in
98 : * more complex scenarios, every process may not have a BackgroundWorkerHandle
99 : * available, or may need to watch for the failure of more than one other
100 : * process at a time.
101 : *
102 : * When a message exists as a contiguous chunk of bytes in the queue - that is,
103 : * it is smaller than the size of the ring buffer and does not wrap around
104 : * the end - we return the message to the caller as a pointer into the buffer.
105 : * For messages that are larger or happen to wrap, we reassemble the message
106 : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
107 : * the buffer, and mqh_buflen is the number of bytes allocated for it.
108 : *
109 : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
110 : * are used to track the state of non-blocking operations. When the caller
111 : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
112 : * are expected to retry the call at a later time with the same argument;
113 : * we need to retain enough state to pick up where we left off.
114 : * mqh_length_word_complete tracks whether we are done sending or receiving
115 : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
116 : * the number of bytes read or written for either the length word or the
117 : * message itself, and mqh_expected_bytes - which is used only for reads -
118 : * tracks the expected total size of the payload.
119 : *
120 : * mqh_counterparty_attached tracks whether we know the counterparty to have
121 : * attached to the queue at some previous point. This lets us avoid some
122 : * mutex acquisitions.
123 : *
124 : * mqh_context is the memory context in effect at the time we attached to
125 : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
126 : * we make sure any other allocations we do happen in this context as well,
127 : * to avoid nasty surprises.
128 : */
129 : struct shm_mq_handle
130 : {
131 : shm_mq *mqh_queue;
132 : dsm_segment *mqh_segment;
133 : BackgroundWorkerHandle *mqh_handle;
134 : char *mqh_buffer;
135 : Size mqh_buflen;
136 : Size mqh_consume_pending;
137 : Size mqh_partial_bytes;
138 : Size mqh_expected_bytes;
139 : bool mqh_length_word_complete;
140 : bool mqh_counterparty_attached;
141 : MemoryContext mqh_context;
142 : };
143 :
144 : static void shm_mq_detach_internal(shm_mq *mq);
145 : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
146 : const void *data, bool nowait, Size *bytes_written);
147 : static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
148 : bool nowait, Size *nbytesp, void **datap);
149 : static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
150 : BackgroundWorkerHandle *handle);
151 : static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
152 : BackgroundWorkerHandle *handle);
153 : static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
154 : static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
155 : static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
156 : static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
157 : static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
158 : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
159 :
160 : /* Minimum queue size is enough for header and at least one chunk of data. */
161 : const Size shm_mq_minimum_size =
162 : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
163 :
164 : #define MQH_INITIAL_BUFSIZE 8192
165 :
166 : /*
167 : * Initialize a new shared message queue.
168 : */
169 : shm_mq *
170 238 : shm_mq_create(void *address, Size size)
171 : {
172 238 : shm_mq *mq = address;
173 238 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
174 :
175 : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
176 238 : size = MAXALIGN_DOWN(size);
177 :
178 : /* Queue size must be large enough to hold some data. */
179 238 : Assert(size > data_offset);
180 :
181 : /* Initialize queue header. */
182 238 : SpinLockInit(&mq->mq_mutex);
183 238 : mq->mq_receiver = NULL;
184 238 : mq->mq_sender = NULL;
185 238 : mq->mq_bytes_read = 0;
186 238 : mq->mq_bytes_written = 0;
187 238 : mq->mq_ring_size = size - data_offset;
188 238 : mq->mq_detached = false;
189 238 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
190 :
191 238 : return mq;
192 : }
193 :
194 : /*
195 : * Set the identity of the process that will receive from a shared message
196 : * queue.
197 : */
198 : void
199 238 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
200 : {
201 238 : volatile shm_mq *vmq = mq;
202 : PGPROC *sender;
203 :
204 238 : SpinLockAcquire(&mq->mq_mutex);
205 238 : Assert(vmq->mq_receiver == NULL);
206 238 : vmq->mq_receiver = proc;
207 238 : sender = vmq->mq_sender;
208 238 : SpinLockRelease(&mq->mq_mutex);
209 :
210 238 : if (sender != NULL)
211 0 : SetLatch(&sender->procLatch);
212 238 : }
213 :
214 : /*
215 : * Set the identity of the process that will send to a shared message queue.
216 : */
217 : void
218 230 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
219 : {
220 230 : volatile shm_mq *vmq = mq;
221 : PGPROC *receiver;
222 :
223 230 : SpinLockAcquire(&mq->mq_mutex);
224 230 : Assert(vmq->mq_sender == NULL);
225 230 : vmq->mq_sender = proc;
226 230 : receiver = vmq->mq_receiver;
227 230 : SpinLockRelease(&mq->mq_mutex);
228 :
229 230 : if (receiver != NULL)
230 230 : SetLatch(&receiver->procLatch);
231 230 : }
232 :
233 : /*
234 : * Get the configured receiver.
235 : */
236 : PGPROC *
237 0 : shm_mq_get_receiver(shm_mq *mq)
238 : {
239 0 : volatile shm_mq *vmq = mq;
240 : PGPROC *receiver;
241 :
242 0 : SpinLockAcquire(&mq->mq_mutex);
243 0 : receiver = vmq->mq_receiver;
244 0 : SpinLockRelease(&mq->mq_mutex);
245 :
246 0 : return receiver;
247 : }
248 :
249 : /*
250 : * Get the configured sender.
251 : */
252 : PGPROC *
253 434264 : shm_mq_get_sender(shm_mq *mq)
254 : {
255 434264 : volatile shm_mq *vmq = mq;
256 : PGPROC *sender;
257 :
258 434264 : SpinLockAcquire(&mq->mq_mutex);
259 434264 : sender = vmq->mq_sender;
260 434264 : SpinLockRelease(&mq->mq_mutex);
261 :
262 434264 : return sender;
263 : }
264 :
265 : /*
266 : * Attach to a shared message queue so we can send or receive messages.
267 : *
268 : * The memory context in effect at the time this function is called should
269 : * be one which will last for at least as long as the message queue itself.
270 : * We'll allocate the handle in that context, and future allocations that
271 : * are needed to buffer incoming data will happen in that context as well.
272 : *
273 : * If seg != NULL, the queue will be automatically detached when that dynamic
274 : * shared memory segment is detached.
275 : *
276 : * If handle != NULL, the queue can be read or written even before the
277 : * other process has attached. We'll wait for it to do so if needed. The
278 : * handle must be for a background worker initialized with bgw_notify_pid
279 : * equal to our PID.
280 : *
281 : * shm_mq_detach() should be called when done. This will free the
282 : * shm_mq_handle and mark the queue itself as detached, so that our
283 : * counterpart won't get stuck waiting for us to fill or drain the queue
284 : * after we've already lost interest.
285 : */
286 : shm_mq_handle *
287 468 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
288 : {
289 468 : shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
290 :
291 468 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
292 468 : mqh->mqh_queue = mq;
293 468 : mqh->mqh_segment = seg;
294 468 : mqh->mqh_handle = handle;
295 468 : mqh->mqh_buffer = NULL;
296 468 : mqh->mqh_buflen = 0;
297 468 : mqh->mqh_consume_pending = 0;
298 468 : mqh->mqh_partial_bytes = 0;
299 468 : mqh->mqh_expected_bytes = 0;
300 468 : mqh->mqh_length_word_complete = false;
301 468 : mqh->mqh_counterparty_attached = false;
302 468 : mqh->mqh_context = CurrentMemoryContext;
303 :
304 468 : if (seg != NULL)
305 468 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
306 :
307 468 : return mqh;
308 : }
309 :
310 : /*
311 : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
312 : * been passed to shm_mq_attach.
313 : */
314 : void
315 230 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
316 : {
317 230 : Assert(mqh->mqh_handle == NULL);
318 230 : mqh->mqh_handle = handle;
319 230 : }
320 :
321 : /*
322 : * Write a message into a shared message queue.
323 : */
324 : shm_mq_result
325 65 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
326 : {
327 : shm_mq_iovec iov;
328 :
329 65 : iov.data = data;
330 65 : iov.len = nbytes;
331 :
332 65 : return shm_mq_sendv(mqh, &iov, 1, nowait);
333 : }
334 :
335 : /*
336 : * Write a message into a shared message queue, gathered from multiple
337 : * addresses.
338 : *
339 : * When nowait = false, we'll wait on our process latch when the ring buffer
340 : * fills up, and then continue writing once the receiver has drained some data.
341 : * The process latch is reset after each wait.
342 : *
343 : * When nowait = true, we do not manipulate the state of the process latch;
344 : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
345 : * this case, the caller should call this function again, with the same
346 : * arguments, each time the process latch is set. (Once begun, the sending
347 : * of a message cannot be aborted except by detaching from the queue; changing
348 : * the length or payload will corrupt the queue.)
349 : */
350 : shm_mq_result
351 295 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
352 : {
353 : shm_mq_result res;
354 295 : shm_mq *mq = mqh->mqh_queue;
355 295 : Size nbytes = 0;
356 : Size bytes_written;
357 : int i;
358 295 : int which_iov = 0;
359 : Size offset;
360 :
361 295 : Assert(mq->mq_sender == MyProc);
362 :
363 : /* Compute total size of write. */
364 820 : for (i = 0; i < iovcnt; ++i)
365 525 : nbytes += iov[i].len;
366 :
367 : /* Try to write, or finish writing, the length word into the buffer. */
368 885 : while (!mqh->mqh_length_word_complete)
369 : {
370 295 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
371 590 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
372 295 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
373 : nowait, &bytes_written);
374 :
375 295 : if (res == SHM_MQ_DETACHED)
376 : {
377 : /* Reset state in case caller tries to send another message. */
378 0 : mqh->mqh_partial_bytes = 0;
379 0 : mqh->mqh_length_word_complete = false;
380 0 : return res;
381 : }
382 295 : mqh->mqh_partial_bytes += bytes_written;
383 :
384 295 : if (mqh->mqh_partial_bytes >= sizeof(Size))
385 : {
386 295 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
387 :
388 295 : mqh->mqh_partial_bytes = 0;
389 295 : mqh->mqh_length_word_complete = true;
390 : }
391 :
392 295 : if (res != SHM_MQ_SUCCESS)
393 0 : return res;
394 :
395 : /* Length word can't be split unless bigger than required alignment. */
396 295 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
397 : }
398 :
399 : /* Write the actual data bytes into the buffer. */
400 295 : Assert(mqh->mqh_partial_bytes <= nbytes);
401 295 : offset = mqh->mqh_partial_bytes;
402 : do
403 : {
404 : Size chunksize;
405 :
406 : /* Figure out which bytes need to be sent next. */
407 411 : if (offset >= iov[which_iov].len)
408 : {
409 0 : offset -= iov[which_iov].len;
410 0 : ++which_iov;
411 0 : if (which_iov >= iovcnt)
412 0 : break;
413 0 : continue;
414 : }
415 :
416 : /*
417 : * We want to avoid copying the data if at all possible, but every
418 : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
419 : * the last. Thus, if a chunk other than the last one ends on a
420 : * non-MAXALIGN'd boundary, we have to combine the tail end of its
421 : * data with data from one or more following chunks until we either
422 : * reach the last chunk or accumulate a number of bytes which is
423 : * MAXALIGN'd.
424 : */
425 641 : if (which_iov + 1 < iovcnt &&
426 230 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
427 : {
428 : char tmpbuf[MAXIMUM_ALIGNOF];
429 230 : int j = 0;
430 :
431 : for (;;)
432 : {
433 922 : if (offset < iov[which_iov].len)
434 : {
435 578 : tmpbuf[j] = iov[which_iov].data[offset];
436 578 : j++;
437 578 : offset++;
438 578 : if (j == MAXIMUM_ALIGNOF)
439 116 : break;
440 : }
441 : else
442 : {
443 344 : offset -= iov[which_iov].len;
444 344 : which_iov++;
445 344 : if (which_iov >= iovcnt)
446 114 : break;
447 : }
448 692 : }
449 :
450 230 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
451 :
452 230 : if (res == SHM_MQ_DETACHED)
453 : {
454 : /* Reset state in case caller tries to send another message. */
455 0 : mqh->mqh_partial_bytes = 0;
456 0 : mqh->mqh_length_word_complete = false;
457 0 : return res;
458 : }
459 :
460 230 : mqh->mqh_partial_bytes += bytes_written;
461 230 : if (res != SHM_MQ_SUCCESS)
462 0 : return res;
463 230 : continue;
464 : }
465 :
466 : /*
467 : * If this is the last chunk, we can write all the data, even if it
468 : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
469 : * MAXALIGN_DOWN the write size.
470 : */
471 181 : chunksize = iov[which_iov].len - offset;
472 181 : if (which_iov + 1 < iovcnt)
473 0 : chunksize = MAXALIGN_DOWN(chunksize);
474 181 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
475 : nowait, &bytes_written);
476 :
477 181 : if (res == SHM_MQ_DETACHED)
478 : {
479 : /* Reset state in case caller tries to send another message. */
480 0 : mqh->mqh_length_word_complete = false;
481 0 : mqh->mqh_partial_bytes = 0;
482 0 : return res;
483 : }
484 :
485 181 : mqh->mqh_partial_bytes += bytes_written;
486 181 : offset += bytes_written;
487 181 : if (res != SHM_MQ_SUCCESS)
488 0 : return res;
489 411 : } while (mqh->mqh_partial_bytes < nbytes);
490 :
491 : /* Reset for next message. */
492 295 : mqh->mqh_partial_bytes = 0;
493 295 : mqh->mqh_length_word_complete = false;
494 :
495 : /* Notify receiver of the newly-written data, and return. */
496 295 : return shm_mq_notify_receiver(mq);
497 : }
498 :
499 : /*
500 : * Receive a message from a shared message queue.
501 : *
502 : * We set *nbytes to the message length and *data to point to the message
503 : * payload. If the entire message exists in the queue as a single,
504 : * contiguous chunk, *data will point directly into shared memory; otherwise,
505 : * it will point to a temporary buffer. This mostly avoids data copying in
506 : * the hoped-for case where messages are short compared to the buffer size,
507 : * while still allowing longer messages. In either case, the return value
508 : * remains valid until the next receive operation is performed on the queue.
509 : *
510 : * When nowait = false, we'll wait on our process latch when the ring buffer
511 : * is empty and we have not yet received a full message. The sender will
512 : * set our process latch after more data has been written, and we'll resume
513 : * processing. Each call will therefore return a complete message
514 : * (unless the sender detaches the queue).
515 : *
516 : * When nowait = true, we do not manipulate the state of the process latch;
517 : * instead, whenever the buffer is empty and we need to read from it, we
518 : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
519 : * function again after the process latch has been set.
520 : */
521 : shm_mq_result
522 434691 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
523 : {
524 434691 : shm_mq *mq = mqh->mqh_queue;
525 : shm_mq_result res;
526 434691 : Size rb = 0;
527 : Size nbytes;
528 : void *rawdata;
529 :
530 434691 : Assert(mq->mq_receiver == MyProc);
531 :
532 : /* We can't receive data until the sender has attached. */
533 434691 : if (!mqh->mqh_counterparty_attached)
534 : {
535 434269 : if (nowait)
536 : {
537 : int counterparty_gone;
538 :
539 : /*
540 : * We shouldn't return at this point at all unless the sender
541 : * hasn't attached yet. However, the correct return value depends
542 : * on whether the sender is still attached. If we first test
543 : * whether the sender has ever attached and then test whether the
544 : * sender has detached, there's a race condition: a sender that
545 : * attaches and detaches very quickly might fool us into thinking
546 : * the sender never attached at all. So, test whether our
547 : * counterparty is definitively gone first, and only afterwards
548 : * check whether the sender ever attached in the first place.
549 : */
550 434249 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
551 434249 : if (shm_mq_get_sender(mq) == NULL)
552 : {
553 434039 : if (counterparty_gone)
554 0 : return SHM_MQ_DETACHED;
555 : else
556 434039 : return SHM_MQ_WOULD_BLOCK;
557 : }
558 : }
559 20 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
560 15 : && shm_mq_get_sender(mq) == NULL)
561 : {
562 0 : mq->mq_detached = true;
563 0 : return SHM_MQ_DETACHED;
564 : }
565 230 : mqh->mqh_counterparty_attached = true;
566 : }
567 :
568 : /* Consume any zero-copy data from previous receive operation. */
569 652 : if (mqh->mqh_consume_pending > 0)
570 : {
571 180 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
572 180 : mqh->mqh_consume_pending = 0;
573 : }
574 :
575 : /* Try to read, or finish reading, the length word from the buffer. */
576 1304 : while (!mqh->mqh_length_word_complete)
577 : {
578 : /* Try to receive the message length word. */
579 652 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
580 652 : res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
581 : nowait, &rb, &rawdata);
582 652 : if (res != SHM_MQ_SUCCESS)
583 357 : return res;
584 :
585 : /*
586 : * Hopefully, we'll receive the entire message length word at once.
587 : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
588 : * multiple reads.
589 : */
590 295 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
591 0 : {
592 : Size needed;
593 :
594 295 : nbytes = *(Size *) rawdata;
595 :
596 : /* If we've already got the whole message, we're done. */
597 295 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
598 295 : if (rb >= needed)
599 : {
600 : /*
601 : * Technically, we could consume the message length
602 : * information at this point, but the extra write to shared
603 : * memory wouldn't be free and in most cases we would reap no
604 : * benefit.
605 : */
606 295 : mqh->mqh_consume_pending = needed;
607 295 : *nbytesp = nbytes;
608 295 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
609 295 : return SHM_MQ_SUCCESS;
610 : }
611 :
612 : /*
613 : * We don't have the whole message, but we at least have the whole
614 : * length word.
615 : */
616 0 : mqh->mqh_expected_bytes = nbytes;
617 0 : mqh->mqh_length_word_complete = true;
618 0 : shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
619 0 : rb -= MAXALIGN(sizeof(Size));
620 : }
621 : else
622 : {
623 : Size lengthbytes;
624 :
625 : /* Can't be split unless bigger than required alignment. */
626 0 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
627 :
628 : /* Message word is split; need buffer to reassemble. */
629 : if (mqh->mqh_buffer == NULL)
630 : {
631 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
632 : MQH_INITIAL_BUFSIZE);
633 : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
634 : }
635 : Assert(mqh->mqh_buflen >= sizeof(Size));
636 :
637 : /* Copy and consume partial length word. */
638 : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
639 : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
640 : else
641 : lengthbytes = rb;
642 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
643 : lengthbytes);
644 : mqh->mqh_partial_bytes += lengthbytes;
645 : shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
646 : rb -= lengthbytes;
647 :
648 : /* If we now have the whole word, we're ready to read payload. */
649 : if (mqh->mqh_partial_bytes >= sizeof(Size))
650 : {
651 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
652 : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
653 : mqh->mqh_length_word_complete = true;
654 : mqh->mqh_partial_bytes = 0;
655 : }
656 : }
657 : }
658 0 : nbytes = mqh->mqh_expected_bytes;
659 :
660 0 : if (mqh->mqh_partial_bytes == 0)
661 : {
662 : /*
663 : * Try to obtain the whole message in a single chunk. If this works,
664 : * we need not copy the data and can return a pointer directly into
665 : * shared memory.
666 : */
667 0 : res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
668 0 : if (res != SHM_MQ_SUCCESS)
669 0 : return res;
670 0 : if (rb >= nbytes)
671 : {
672 0 : mqh->mqh_length_word_complete = false;
673 0 : mqh->mqh_consume_pending = MAXALIGN(nbytes);
674 0 : *nbytesp = nbytes;
675 0 : *datap = rawdata;
676 0 : return SHM_MQ_SUCCESS;
677 : }
678 :
679 : /*
680 : * The message has wrapped the buffer. We'll need to copy it in order
681 : * to return it to the client in one chunk. First, make sure we have
682 : * a large enough buffer available.
683 : */
684 0 : if (mqh->mqh_buflen < nbytes)
685 : {
686 0 : Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
687 :
688 0 : while (newbuflen < nbytes)
689 0 : newbuflen *= 2;
690 :
691 0 : if (mqh->mqh_buffer != NULL)
692 : {
693 0 : pfree(mqh->mqh_buffer);
694 0 : mqh->mqh_buffer = NULL;
695 0 : mqh->mqh_buflen = 0;
696 : }
697 0 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
698 0 : mqh->mqh_buflen = newbuflen;
699 : }
700 : }
701 :
702 : /* Loop until we've copied the entire message. */
703 : for (;;)
704 : {
705 : Size still_needed;
706 :
707 : /* Copy as much as we can. */
708 0 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
709 0 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
710 0 : mqh->mqh_partial_bytes += rb;
711 :
712 : /*
713 : * Update count of bytes read, with alignment padding. Note that this
714 : * will never actually insert any padding except at the end of a
715 : * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
716 : * and each read and write is as well.
717 : */
718 0 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
719 0 : shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
720 :
721 : /* If we got all the data, exit the loop. */
722 0 : if (mqh->mqh_partial_bytes >= nbytes)
723 0 : break;
724 :
725 : /* Wait for some more data. */
726 0 : still_needed = nbytes - mqh->mqh_partial_bytes;
727 0 : res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
728 0 : if (res != SHM_MQ_SUCCESS)
729 0 : return res;
730 0 : if (rb > still_needed)
731 0 : rb = still_needed;
732 0 : }
733 :
734 : /* Return the complete message, and reset for next message. */
735 0 : *nbytesp = nbytes;
736 0 : *datap = mqh->mqh_buffer;
737 0 : mqh->mqh_length_word_complete = false;
738 0 : mqh->mqh_partial_bytes = 0;
739 0 : return SHM_MQ_SUCCESS;
740 : }
741 :
742 : /*
743 : * Wait for the other process that's supposed to use this queue to attach
744 : * to it.
745 : *
746 : * The return value is SHM_MQ_DETACHED if the worker has already detached or
747 : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
748 : * Note that we will only be able to detect that the worker has died before
749 : * attaching if a background worker handle was passed to shm_mq_attach().
750 : */
751 : shm_mq_result
752 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
753 : {
754 0 : shm_mq *mq = mqh->mqh_queue;
755 : PGPROC **victim;
756 :
757 0 : if (shm_mq_get_receiver(mq) == MyProc)
758 0 : victim = &mq->mq_sender;
759 : else
760 : {
761 0 : Assert(shm_mq_get_sender(mq) == MyProc);
762 0 : victim = &mq->mq_receiver;
763 : }
764 :
765 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
766 0 : return SHM_MQ_SUCCESS;
767 : else
768 0 : return SHM_MQ_DETACHED;
769 : }
770 :
771 : /*
772 : * Detach from a shared message queue, and destroy the shm_mq_handle.
773 : */
774 : void
775 347 : shm_mq_detach(shm_mq_handle *mqh)
776 : {
777 : /* Notify counterparty that we're outta here. */
778 347 : shm_mq_detach_internal(mqh->mqh_queue);
779 :
780 : /* Cancel on_dsm_detach callback, if any. */
781 347 : if (mqh->mqh_segment)
782 347 : cancel_on_dsm_detach(mqh->mqh_segment,
783 : shm_mq_detach_callback,
784 347 : PointerGetDatum(mqh->mqh_queue));
785 :
786 : /* Release local memory associated with handle. */
787 347 : if (mqh->mqh_buffer != NULL)
788 0 : pfree(mqh->mqh_buffer);
789 347 : pfree(mqh);
790 347 : }
791 :
792 : /*
793 : * Notify counterparty that we're detaching from shared message queue.
794 : *
795 : * The purpose of this function is to make sure that the process
796 : * with which we're communicating doesn't block forever waiting for us to
797 : * fill or drain the queue once we've lost interest. When the sender
798 : * detaches, the receiver can read any messages remaining in the queue;
799 : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
800 : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
801 : *
802 : * This is separated out from shm_mq_detach() because if the on_dsm_detach
803 : * callback fires, we only want to do this much. We do not try to touch
804 : * the local shm_mq_handle, as it may have been pfree'd already.
805 : */
806 : static void
807 468 : shm_mq_detach_internal(shm_mq *mq)
808 : {
809 468 : volatile shm_mq *vmq = mq;
810 : PGPROC *victim;
811 :
812 468 : SpinLockAcquire(&mq->mq_mutex);
813 468 : if (vmq->mq_sender == MyProc)
814 230 : victim = vmq->mq_receiver;
815 : else
816 : {
817 238 : Assert(vmq->mq_receiver == MyProc);
818 238 : victim = vmq->mq_sender;
819 : }
820 468 : vmq->mq_detached = true;
821 468 : SpinLockRelease(&mq->mq_mutex);
822 :
823 468 : if (victim != NULL)
824 460 : SetLatch(&victim->procLatch);
825 468 : }
826 :
827 : /*
828 : * Get the shm_mq from handle.
829 : */
830 : shm_mq *
831 0 : shm_mq_get_queue(shm_mq_handle *mqh)
832 : {
833 0 : return mqh->mqh_queue;
834 : }
835 :
836 : /*
837 : * Write bytes into a shared message queue.
838 : */
839 : static shm_mq_result
840 706 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
841 : bool nowait, Size *bytes_written)
842 : {
843 706 : shm_mq *mq = mqh->mqh_queue;
844 706 : Size sent = 0;
845 : uint64 used;
846 706 : Size ringsize = mq->mq_ring_size;
847 : Size available;
848 :
849 2118 : while (sent < nbytes)
850 : {
851 : bool detached;
852 : uint64 rb;
853 :
854 : /* Compute number of ring buffer bytes used and available. */
855 706 : rb = shm_mq_get_bytes_read(mq, &detached);
856 706 : Assert(mq->mq_bytes_written >= rb);
857 706 : used = mq->mq_bytes_written - rb;
858 706 : Assert(used <= ringsize);
859 706 : available = Min(ringsize - used, nbytes - sent);
860 :
861 : /* Bail out if the queue has been detached. */
862 706 : if (detached)
863 : {
864 0 : *bytes_written = sent;
865 0 : return SHM_MQ_DETACHED;
866 : }
867 :
868 706 : if (available == 0 && !mqh->mqh_counterparty_attached)
869 : {
870 : /*
871 : * The queue is full, so if the receiver isn't yet known to be
872 : * attached, we must wait for that to happen.
873 : */
874 0 : if (nowait)
875 : {
876 0 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
877 : {
878 0 : *bytes_written = sent;
879 0 : return SHM_MQ_DETACHED;
880 : }
881 0 : if (shm_mq_get_receiver(mq) == NULL)
882 : {
883 0 : *bytes_written = sent;
884 0 : return SHM_MQ_WOULD_BLOCK;
885 : }
886 : }
887 0 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
888 : mqh->mqh_handle))
889 : {
890 0 : mq->mq_detached = true;
891 0 : *bytes_written = sent;
892 0 : return SHM_MQ_DETACHED;
893 : }
894 0 : mqh->mqh_counterparty_attached = true;
895 :
896 : /*
897 : * The receiver may have read some data after attaching, so we
898 : * must not wait without rechecking the queue state.
899 : */
900 : }
901 706 : else if (available == 0)
902 : {
903 : shm_mq_result res;
904 :
905 : /* Let the receiver know that we need them to read some data. */
906 0 : res = shm_mq_notify_receiver(mq);
907 0 : if (res != SHM_MQ_SUCCESS)
908 : {
909 0 : *bytes_written = sent;
910 0 : return res;
911 : }
912 :
913 : /* Skip manipulation of our latch if nowait = true. */
914 0 : if (nowait)
915 : {
916 0 : *bytes_written = sent;
917 0 : return SHM_MQ_WOULD_BLOCK;
918 : }
919 :
920 : /*
921 : * Wait for our latch to be set. It might already be set for some
922 : * unrelated reason, but that'll just result in one extra trip
923 : * through the loop. It's worth it to avoid resetting the latch
924 : * at top of loop, because setting an already-set latch is much
925 : * cheaper than setting one that has been reset.
926 : */
927 0 : WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
928 :
929 : /* Reset the latch so we don't spin. */
930 0 : ResetLatch(MyLatch);
931 :
932 : /* An interrupt may have occurred while we were waiting. */
933 0 : CHECK_FOR_INTERRUPTS();
934 : }
935 : else
936 : {
937 706 : Size offset = mq->mq_bytes_written % (uint64) ringsize;
938 706 : Size sendnow = Min(available, ringsize - offset);
939 :
940 : /* Write as much data as we can via a single memcpy(). */
941 706 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
942 706 : (char *) data + sent, sendnow);
943 706 : sent += sendnow;
944 :
945 : /*
946 : * Update count of bytes written, with alignment padding. Note
947 : * that this will never actually insert any padding except at the
948 : * end of a run of bytes, because the buffer size is a multiple of
949 : * MAXIMUM_ALIGNOF, and each read is as well.
950 : */
951 706 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
952 706 : shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
953 :
954 : /*
955 : * For efficiency, we don't set the reader's latch here. We'll do
956 : * that only when the buffer fills up or after writing an entire
957 : * message.
958 : */
959 : }
960 : }
961 :
962 706 : *bytes_written = sent;
963 706 : return SHM_MQ_SUCCESS;
964 : }
965 :
966 : /*
967 : * Wait until at least *nbytesp bytes are available to be read from the
968 : * shared message queue, or until the buffer wraps around. If the queue is
969 : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
970 : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
971 : * to the location at which data bytes can be read, *nbytesp is set to the
972 : * number of bytes which can be read at that address, and the return value
973 : * is SHM_MQ_SUCCESS.
974 : */
975 : static shm_mq_result
976 652 : shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
977 : Size *nbytesp, void **datap)
978 : {
979 652 : Size ringsize = mq->mq_ring_size;
980 : uint64 used;
981 : uint64 written;
982 :
983 : for (;;)
984 : {
985 : Size offset;
986 : bool detached;
987 :
988 : /* Get bytes written, so we can compute what's available to read. */
989 657 : written = shm_mq_get_bytes_written(mq, &detached);
990 657 : used = written - mq->mq_bytes_read;
991 657 : Assert(used <= ringsize);
992 657 : offset = mq->mq_bytes_read % (uint64) ringsize;
993 :
994 : /* If we have enough data or buffer has wrapped, we're done. */
995 657 : if (used >= bytes_needed || offset + used >= ringsize)
996 : {
997 295 : *nbytesp = Min(used, ringsize - offset);
998 295 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
999 947 : return SHM_MQ_SUCCESS;
1000 : }
1001 :
1002 : /*
1003 : * Fall out before waiting if the queue has been detached.
1004 : *
1005 : * Note that we don't check for this until *after* considering whether
1006 : * the data already available is enough, since the receiver can finish
1007 : * receiving a message stored in the buffer even after the sender has
1008 : * detached.
1009 : */
1010 362 : if (detached)
1011 114 : return SHM_MQ_DETACHED;
1012 :
1013 : /* Skip manipulation of our latch if nowait = true. */
1014 248 : if (nowait)
1015 243 : return SHM_MQ_WOULD_BLOCK;
1016 :
1017 : /*
1018 : * Wait for our latch to be set. It might already be set for some
1019 : * unrelated reason, but that'll just result in one extra trip through
1020 : * the loop. It's worth it to avoid resetting the latch at top of
1021 : * loop, because setting an already-set latch is much cheaper than
1022 : * setting one that has been reset.
1023 : */
1024 5 : WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1025 :
1026 : /* Reset the latch so we don't spin. */
1027 5 : ResetLatch(MyLatch);
1028 :
1029 : /* An interrupt may have occurred while we were waiting. */
1030 5 : CHECK_FOR_INTERRUPTS();
1031 5 : }
1032 : }
1033 :
1034 : /*
1035 : * Test whether a counterparty who may not even be alive yet is definitely gone.
1036 : */
1037 : static bool
1038 434249 : shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
1039 : {
1040 : bool detached;
1041 : pid_t pid;
1042 :
1043 : /* Acquire the lock just long enough to check the pointer. */
1044 434249 : SpinLockAcquire(&mq->mq_mutex);
1045 434249 : detached = mq->mq_detached;
1046 434249 : SpinLockRelease(&mq->mq_mutex);
1047 :
1048 : /* If the queue has been detached, counterparty is definitely gone. */
1049 434249 : if (detached)
1050 2 : return true;
1051 :
1052 : /* If there's a handle, check worker status. */
1053 434247 : if (handle != NULL)
1054 : {
1055 : BgwHandleStatus status;
1056 :
1057 : /* Check for unexpected worker death. */
1058 434247 : status = GetBackgroundWorkerPid(handle, &pid);
1059 434247 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1060 : {
1061 : /* Mark it detached, just to make it official. */
1062 0 : SpinLockAcquire(&mq->mq_mutex);
1063 0 : mq->mq_detached = true;
1064 0 : SpinLockRelease(&mq->mq_mutex);
1065 0 : return true;
1066 : }
1067 : }
1068 :
1069 : /* Counterparty is not definitively gone. */
1070 434247 : return false;
1071 : }
1072 :
1073 : /*
1074 : * This is used when a process is waiting for its counterpart to attach to the
1075 : * queue. We exit when the other process attaches as expected, or, if
1076 : * handle != NULL, when the referenced background process or the postmaster
1077 : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1078 : * potentially get stuck here forever waiting for a process that may never
1079 : * start. We do check for interrupts, though.
1080 : *
1081 : * ptr is a pointer to the memory address that we're expecting to become
1082 : * non-NULL when our counterpart attaches to the queue.
1083 : */
1084 : static bool
1085 20 : shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
1086 : BackgroundWorkerHandle *handle)
1087 : {
1088 20 : bool result = false;
1089 :
1090 : for (;;)
1091 : {
1092 : BgwHandleStatus status;
1093 : pid_t pid;
1094 : bool detached;
1095 :
1096 : /* Acquire the lock just long enough to check the pointer. */
1097 150 : SpinLockAcquire(&mq->mq_mutex);
1098 150 : detached = mq->mq_detached;
1099 150 : result = (*ptr != NULL);
1100 150 : SpinLockRelease(&mq->mq_mutex);
1101 :
1102 : /* Fail if detached; else succeed if initialized. */
1103 150 : if (detached)
1104 : {
1105 15 : result = false;
1106 35 : break;
1107 : }
1108 135 : if (result)
1109 5 : break;
1110 :
1111 130 : if (handle != NULL)
1112 : {
1113 : /* Check for unexpected worker death. */
1114 130 : status = GetBackgroundWorkerPid(handle, &pid);
1115 130 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1116 : {
1117 0 : result = false;
1118 0 : break;
1119 : }
1120 : }
1121 :
1122 : /* Wait to be signalled. */
1123 130 : WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1124 :
1125 : /* Reset the latch so we don't spin. */
1126 130 : ResetLatch(MyLatch);
1127 :
1128 : /* An interrupt may have occurred while we were waiting. */
1129 130 : CHECK_FOR_INTERRUPTS();
1130 130 : }
1131 :
1132 20 : return result;
1133 : }
1134 :
1135 : /*
1136 : * Get the number of bytes read. The receiver need not use this to access
1137 : * the count of bytes read, but the sender must.
1138 : */
1139 : static uint64
1140 706 : shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1141 : {
1142 : uint64 v;
1143 :
1144 706 : SpinLockAcquire(&mq->mq_mutex);
1145 706 : v = mq->mq_bytes_read;
1146 706 : *detached = mq->mq_detached;
1147 706 : SpinLockRelease(&mq->mq_mutex);
1148 :
1149 706 : return v;
1150 : }
1151 :
1152 : /*
1153 : * Increment the number of bytes read.
1154 : */
1155 : static void
1156 180 : shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
1157 : {
1158 : PGPROC *sender;
1159 :
1160 180 : SpinLockAcquire(&mq->mq_mutex);
1161 180 : mq->mq_bytes_read += n;
1162 180 : sender = mq->mq_sender;
1163 180 : SpinLockRelease(&mq->mq_mutex);
1164 :
1165 : /* We shouldn't have any bytes to read without a sender. */
1166 180 : Assert(sender != NULL);
1167 180 : SetLatch(&sender->procLatch);
1168 180 : }
1169 :
1170 : /*
1171 : * Get the number of bytes written. The sender need not use this to access
1172 : * the count of bytes written, but the receiver must.
1173 : */
1174 : static uint64
1175 657 : shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1176 : {
1177 : uint64 v;
1178 :
1179 657 : SpinLockAcquire(&mq->mq_mutex);
1180 657 : v = mq->mq_bytes_written;
1181 657 : *detached = mq->mq_detached;
1182 657 : SpinLockRelease(&mq->mq_mutex);
1183 :
1184 657 : return v;
1185 : }
1186 :
1187 : /*
1188 : * Increment the number of bytes written.
1189 : */
1190 : static void
1191 706 : shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
1192 : {
1193 706 : SpinLockAcquire(&mq->mq_mutex);
1194 706 : mq->mq_bytes_written += n;
1195 706 : SpinLockRelease(&mq->mq_mutex);
1196 706 : }
1197 :
1198 : /*
1199 : * Set receiver's latch, unless queue is detached.
1200 : */
1201 : static shm_mq_result
1202 295 : shm_mq_notify_receiver(volatile shm_mq *mq)
1203 : {
1204 : PGPROC *receiver;
1205 : bool detached;
1206 :
1207 295 : SpinLockAcquire(&mq->mq_mutex);
1208 295 : detached = mq->mq_detached;
1209 295 : receiver = mq->mq_receiver;
1210 295 : SpinLockRelease(&mq->mq_mutex);
1211 :
1212 295 : if (detached)
1213 0 : return SHM_MQ_DETACHED;
1214 295 : if (receiver)
1215 295 : SetLatch(&receiver->procLatch);
1216 295 : return SHM_MQ_SUCCESS;
1217 : }
1218 :
1219 : /* Shim for on_dsm_callback. */
1220 : static void
1221 121 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1222 : {
1223 121 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1224 :
1225 121 : shm_mq_detach_internal(mq);
1226 121 : }
|