LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 343 509 67.4 %
Date: 2017-09-29 15:12:54 Functions: 36 42 85.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * async.c
       4             :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/commands/async.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : /*-------------------------------------------------------------------------
      16             :  * Async Notification Model as of 9.0:
      17             :  *
      18             :  * 1. Multiple backends on same machine. Multiple backends listening on
      19             :  *    several channels. (Channels are also called "conditions" in other
      20             :  *    parts of the code.)
      21             :  *
      22             :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      23             :  *    with actively-used pages mapped into shared memory by the slru.c module.
      24             :  *    All notification messages are placed in the queue and later read out
      25             :  *    by listening backends.
      26             :  *
      27             :  *    There is no central knowledge of which backend listens on which channel;
      28             :  *    every backend has its own list of interesting channels.
      29             :  *
      30             :  *    Although there is only one queue, notifications are treated as being
      31             :  *    database-local; this is done by including the sender's database OID
      32             :  *    in each notification message.  Listening backends ignore messages
      33             :  *    that don't match their database OID.  This is important because it
      34             :  *    ensures senders and receivers have the same database encoding and won't
      35             :  *    misinterpret non-ASCII text in the channel name or payload string.
      36             :  *
      37             :  *    Since notifications are not expected to survive database crashes,
      38             :  *    we can simply clean out the pg_notify data at any reboot, and there
      39             :  *    is no need for WAL support or fsync'ing.
      40             :  *
      41             :  * 3. Every backend that is listening on at least one channel registers by
      42             :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      43             :  *    incoming notifications in the central queue and first compares the
      44             :  *    database OID of the notification with its own database OID and then
      45             :  *    compares the notified channel with the list of channels that it listens
      46             :  *    to. In case there is a match it delivers the notification event to its
      47             :  *    frontend.  Non-matching events are simply skipped.
      48             :  *
      49             :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      50             :  *    a backend-local list which will not be processed until transaction end.
      51             :  *
      52             :  *    Duplicate notifications from the same transaction are sent out as one
      53             :  *    notification only. This is done to save work when for example a trigger
      54             :  *    on a 2 million row table fires a notification for each row that has been
      55             :  *    changed. If the application needs to receive every single notification
      56             :  *    that has been sent, it can easily add some unique string into the extra
      57             :  *    payload parameter.
      58             :  *
      59             :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      60             :  *    pending notifications to the head of the queue. The head pointer of the
      61             :  *    queue always points to the next free position and a position is just a
      62             :  *    page number and the offset in that page. This is done before marking the
      63             :  *    transaction as committed in clog. If we run into problems writing the
      64             :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      65             :  *    will roll back.
      66             :  *
      67             :  *    Once we have put all of the notifications into the queue, we return to
      68             :  *    CommitTransaction() which will then do the actual transaction commit.
      69             :  *
      70             :  *    After commit we are called another time (AtCommit_Notify()). Here we
      71             :  *    make the actual updates to the effective listen state (listenChannels).
      72             :  *
      73             :  *    Finally, after we are out of the transaction altogether, we check if
      74             :  *    we need to signal listening backends.  In SignalBackends() we scan the
      75             :  *    list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
      76             :  *    to every listening backend (we don't know which backend is listening on
      77             :  *    which channel so we must signal them all). We can exclude backends that
      78             :  *    are already up to date, though.  We don't bother with a self-signal
      79             :  *    either, but just process the queue directly.
      80             :  *
      81             :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      82             :  *    sets the process's latch, which triggers the event to be processed
      83             :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      84             :  *    command and is not within a transaction block. C.f.
      85             :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      86             :  *    flag, which will cause the processing to occur just before we next go
      87             :  *    idle.
      88             :  *
      89             :  *    Inbound-notify processing consists of reading all of the notifications
      90             :  *    that have arrived since scanning last time. We read every notification
      91             :  *    until we reach either a notification from an uncommitted transaction or
      92             :  *    the head pointer's position. Then we check if we were the laziest
      93             :  *    backend: if our pointer is set to the same position as the global tail
      94             :  *    pointer is set, then we move the global tail pointer ahead to where the
      95             :  *    second-laziest backend is (in general, we take the MIN of the current
      96             :  *    head position and all active backends' new tail pointers). Whenever we
      97             :  *    move the global tail pointer we also truncate now-unused pages (i.e.,
      98             :  *    delete files in pg_notify/ that are no longer used).
      99             :  *
     100             :  * An application that listens on the same channel it notifies will get
     101             :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     102             :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     103             :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     104             :  * frontend during startup.)  The above design guarantees that notifies from
     105             :  * other backends will never be missed by ignoring self-notifies.
     106             :  *
     107             :  * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
     108             :  * can be varied without affecting anything but performance.  The maximum
     109             :  * amount of notification data that can be queued at one time is determined
     110             :  * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
     111             :  *-------------------------------------------------------------------------
     112             :  */
     113             : 
     114             : #include "postgres.h"
     115             : 
     116             : #include <limits.h>
     117             : #include <unistd.h>
     118             : #include <signal.h>
     119             : 
     120             : #include "access/parallel.h"
     121             : #include "access/slru.h"
     122             : #include "access/transam.h"
     123             : #include "access/xact.h"
     124             : #include "catalog/pg_database.h"
     125             : #include "commands/async.h"
     126             : #include "funcapi.h"
     127             : #include "libpq/libpq.h"
     128             : #include "libpq/pqformat.h"
     129             : #include "miscadmin.h"
     130             : #include "storage/ipc.h"
     131             : #include "storage/lmgr.h"
     132             : #include "storage/proc.h"
     133             : #include "storage/procarray.h"
     134             : #include "storage/procsignal.h"
     135             : #include "storage/sinval.h"
     136             : #include "tcop/tcopprot.h"
     137             : #include "utils/builtins.h"
     138             : #include "utils/memutils.h"
     139             : #include "utils/ps_status.h"
     140             : #include "utils/timestamp.h"
     141             : 
     142             : 
     143             : /*
     144             :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     145             :  * must be kept small enough so that a notification message fits on one
     146             :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     147             :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     148             :  * than that, so changes in that data structure won't affect user-visible
     149             :  * restrictions.
     150             :  */
     151             : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     152             : 
     153             : /*
     154             :  * Struct representing an entry in the global notify queue
     155             :  *
     156             :  * This struct declaration has the maximal length, but in a real queue entry
     157             :  * the data area is only big enough for the actual channel and payload strings
     158             :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     159             :  * entry size, if both channel and payload strings are empty (but note it
     160             :  * doesn't include alignment padding).
     161             :  *
     162             :  * The "length" field should always be rounded up to the next QUEUEALIGN
     163             :  * multiple so that all fields are properly aligned.
     164             :  */
     165             : typedef struct AsyncQueueEntry
     166             : {
     167             :     int         length;         /* total allocated length of entry */
     168             :     Oid         dboid;          /* sender's database OID */
     169             :     TransactionId xid;          /* sender's XID */
     170             :     int32       srcPid;         /* sender's PID */
     171             :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     172             : } AsyncQueueEntry;
     173             : 
     174             : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     175             : #define QUEUEALIGN(len)     INTALIGN(len)
     176             : 
     177             : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     178             : 
     179             : /*
     180             :  * Struct describing a queue position, and assorted macros for working with it
     181             :  */
     182             : typedef struct QueuePosition
     183             : {
     184             :     int         page;           /* SLRU page number */
     185             :     int         offset;         /* byte offset within page */
     186             : } QueuePosition;
     187             : 
     188             : #define QUEUE_POS_PAGE(x)       ((x).page)
     189             : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     190             : 
     191             : #define SET_QUEUE_POS(x,y,z) \
     192             :     do { \
     193             :         (x).page = (y); \
     194             :         (x).offset = (z); \
     195             :     } while (0)
     196             : 
     197             : #define QUEUE_POS_EQUAL(x,y) \
     198             :      ((x).page == (y).page && (x).offset == (y).offset)
     199             : 
     200             : /* choose logically smaller QueuePosition */
     201             : #define QUEUE_POS_MIN(x,y) \
     202             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     203             :      (x).page != (y).page ? (y) : \
     204             :      (x).offset < (y).offset ? (x) : (y))
     205             : 
     206             : /* choose logically larger QueuePosition */
     207             : #define QUEUE_POS_MAX(x,y) \
     208             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     209             :      (x).page != (y).page ? (x) : \
     210             :      (x).offset > (y).offset ? (x) : (y))
     211             : 
     212             : /*
     213             :  * Struct describing a listening backend's status
     214             :  */
     215             : typedef struct QueueBackendStatus
     216             : {
     217             :     int32       pid;            /* either a PID or InvalidPid */
     218             :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     219             :     QueuePosition pos;          /* backend has read queue up to here */
     220             : } QueueBackendStatus;
     221             : 
     222             : /*
     223             :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     224             :  *
     225             :  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
     226             :  *
     227             :  * When holding the lock in SHARED mode, backends may only inspect their own
     228             :  * entries as well as the head and tail pointers. Consequently we can allow a
     229             :  * backend to update its own record while holding only SHARED lock (since no
     230             :  * other backend will inspect it).
     231             :  *
     232             :  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
     233             :  * of other backends and also change the head and tail pointers.
     234             :  *
     235             :  * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
     236             :  * In order to avoid deadlocks, whenever we need both locks, we always first
     237             :  * get AsyncQueueLock and then AsyncCtlLock.
     238             :  *
     239             :  * Each backend uses the backend[] array entry with index equal to its
     240             :  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
     241             :  * SendProcSignal fast.
     242             :  */
     243             : typedef struct AsyncQueueControl
     244             : {
     245             :     QueuePosition head;         /* head points to the next free location */
     246             :     QueuePosition tail;         /* the global tail is equivalent to the pos of
     247             :                                  * the "slowest" backend */
     248             :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     249             :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     250             :     /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
     251             : } AsyncQueueControl;
     252             : 
     253             : static AsyncQueueControl *asyncQueueControl;
     254             : 
     255             : #define QUEUE_HEAD                  (asyncQueueControl->head)
     256             : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     257             : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     258             : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     259             : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     260             : 
     261             : /*
     262             :  * The SLRU buffer area through which we access the notification queue
     263             :  */
     264             : static SlruCtlData AsyncCtlData;
     265             : 
     266             : #define AsyncCtl                    (&AsyncCtlData)
     267             : #define QUEUE_PAGESIZE              BLCKSZ
     268             : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     269             : 
     270             : /*
     271             :  * slru.c currently assumes that all filenames are four characters of hex
     272             :  * digits. That means that we can use segments 0000 through FFFF.
     273             :  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
     274             :  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
     275             :  *
     276             :  * It's of course possible to enhance slru.c, but this gives us so much
     277             :  * space already that it doesn't seem worth the trouble.
     278             :  *
     279             :  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
     280             :  * pages, because more than that would confuse slru.c into thinking there
     281             :  * was a wraparound condition.  With the default BLCKSZ this means there
     282             :  * can be up to 8GB of queued-and-not-read data.
     283             :  *
     284             :  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
     285             :  * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
     286             :  */
     287             : #define QUEUE_MAX_PAGE          (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
     288             : 
     289             : /*
     290             :  * listenChannels identifies the channels we are actually listening to
     291             :  * (ie, have committed a LISTEN on).  It is a simple list of channel names,
     292             :  * allocated in TopMemoryContext.
     293             :  */
     294             : static List *listenChannels = NIL;  /* list of C strings */
     295             : 
     296             : /*
     297             :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     298             :  * all actions requested in the current transaction.  As explained above,
     299             :  * we don't actually change listenChannels until we reach transaction commit.
     300             :  *
     301             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     302             :  * subtransaction has its own list in its own CurTransactionContext, but
     303             :  * successful subtransactions attach their lists to their parent's list.
     304             :  * Failed subtransactions simply discard their lists.
     305             :  */
     306             : typedef enum
     307             : {
     308             :     LISTEN_LISTEN,
     309             :     LISTEN_UNLISTEN,
     310             :     LISTEN_UNLISTEN_ALL
     311             : } ListenActionKind;
     312             : 
     313             : typedef struct
     314             : {
     315             :     ListenActionKind action;
     316             :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     317             : } ListenAction;
     318             : 
     319             : static List *pendingActions = NIL;  /* list of ListenAction */
     320             : 
     321             : static List *upperPendingActions = NIL; /* list of upper-xact lists */
     322             : 
     323             : /*
     324             :  * State for outbound notifies consists of a list of all channels+payloads
     325             :  * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
     326             :  * until and unless the transaction commits.  pendingNotifies is NIL if no
     327             :  * NOTIFYs have been done in the current transaction.
     328             :  *
     329             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     330             :  * subtransaction has its own list in its own CurTransactionContext, but
     331             :  * successful subtransactions attach their lists to their parent's list.
     332             :  * Failed subtransactions simply discard their lists.
     333             :  *
     334             :  * Note: the action and notify lists do not interact within a transaction.
     335             :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     336             :  * condition name, it will get a self-notify at commit.  This is a bit odd
     337             :  * but is consistent with our historical behavior.
     338             :  */
     339             : typedef struct Notification
     340             : {
     341             :     char       *channel;        /* channel name */
     342             :     char       *payload;        /* payload string (can be empty) */
     343             : } Notification;
     344             : 
     345             : static List *pendingNotifies = NIL; /* list of Notifications */
     346             : 
     347             : static List *upperPendingNotifies = NIL;    /* list of upper-xact lists */
     348             : 
     349             : /*
     350             :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     351             :  * called from inside a signal handler. That just sets the
     352             :  * notifyInterruptPending flag and sets the process
     353             :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     354             :  * actually deal with the interrupt.
     355             :  */
     356             : volatile sig_atomic_t notifyInterruptPending = false;
     357             : 
     358             : /* True if we've registered an on_shmem_exit cleanup */
     359             : static bool unlistenExitRegistered = false;
     360             : 
     361             : /* True if we're currently registered as a listener in asyncQueueControl */
     362             : static bool amRegisteredListener = false;
     363             : 
     364             : /* has this backend sent notifications in the current transaction? */
     365             : static bool backendHasSentNotifications = false;
     366             : 
     367             : /* GUC parameter */
     368             : bool        Trace_notify = false;
     369             : 
     370             : /* local function prototypes */
     371             : static bool asyncQueuePagePrecedes(int p, int q);
     372             : static void queue_listen(ListenActionKind action, const char *channel);
     373             : static void Async_UnlistenOnExit(int code, Datum arg);
     374             : static void Exec_ListenPreCommit(void);
     375             : static void Exec_ListenCommit(const char *channel);
     376             : static void Exec_UnlistenCommit(const char *channel);
     377             : static void Exec_UnlistenAllCommit(void);
     378             : static bool IsListeningOn(const char *channel);
     379             : static void asyncQueueUnregister(void);
     380             : static bool asyncQueueIsFull(void);
     381             : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     382             : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     383             : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     384             : static double asyncQueueUsage(void);
     385             : static void asyncQueueFillWarning(void);
     386             : static bool SignalBackends(void);
     387             : static void asyncQueueReadAllNotifications(void);
     388             : static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
     389             :                              QueuePosition stop,
     390             :                              char *page_buffer);
     391             : static void asyncQueueAdvanceTail(void);
     392             : static void ProcessIncomingNotify(void);
     393             : static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
     394             : static void ClearPendingActionsAndNotifies(void);
     395             : 
     396             : /*
     397             :  * We will work on the page range of 0..QUEUE_MAX_PAGE.
     398             :  */
     399             : static bool
     400          10 : asyncQueuePagePrecedes(int p, int q)
     401             : {
     402             :     int         diff;
     403             : 
     404             :     /*
     405             :      * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should be
     406             :      * in the range 0..QUEUE_MAX_PAGE.
     407             :      */
     408          10 :     Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
     409          10 :     Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
     410             : 
     411          10 :     diff = p - q;
     412          10 :     if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
     413           0 :         diff -= QUEUE_MAX_PAGE + 1;
     414          10 :     else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
     415           0 :         diff += QUEUE_MAX_PAGE + 1;
     416          10 :     return diff < 0;
     417             : }
     418             : 
     419             : /*
     420             :  * Report space needed for our shared memory area
     421             :  */
     422             : Size
     423           5 : AsyncShmemSize(void)
     424             : {
     425             :     Size        size;
     426             : 
     427             :     /* This had better match AsyncShmemInit */
     428           5 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     429           5 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     430             : 
     431           5 :     size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
     432             : 
     433           5 :     return size;
     434             : }
     435             : 
     436             : /*
     437             :  * Initialize our shared memory area
     438             :  */
     439             : void
     440           5 : AsyncShmemInit(void)
     441             : {
     442             :     bool        found;
     443             :     int         slotno;
     444             :     Size        size;
     445             : 
     446             :     /*
     447             :      * Create or attach to the AsyncQueueControl structure.
     448             :      *
     449             :      * The used entries in the backend[] array run from 1 to MaxBackends; the
     450             :      * zero'th entry is unused but must be allocated.
     451             :      */
     452           5 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     453           5 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     454             : 
     455           5 :     asyncQueueControl = (AsyncQueueControl *)
     456           5 :         ShmemInitStruct("Async Queue Control", size, &found);
     457             : 
     458           5 :     if (!found)
     459             :     {
     460             :         /* First time through, so initialize it */
     461             :         int         i;
     462             : 
     463           5 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     464           5 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     465           5 :         asyncQueueControl->lastQueueFillWarn = 0;
     466             :         /* zero'th entry won't be used, but let's initialize it anyway */
     467         570 :         for (i = 0; i <= MaxBackends; i++)
     468             :         {
     469         565 :             QUEUE_BACKEND_PID(i) = InvalidPid;
     470         565 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
     471         565 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     472             :         }
     473             :     }
     474             : 
     475             :     /*
     476             :      * Set up SLRU management of the pg_notify data.
     477             :      */
     478           5 :     AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
     479           5 :     SimpleLruInit(AsyncCtl, "async", NUM_ASYNC_BUFFERS, 0,
     480           5 :                   AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
     481             :     /* Override default assumption that writes should be fsync'd */
     482           5 :     AsyncCtl->do_fsync = false;
     483             : 
     484           5 :     if (!found)
     485             :     {
     486             :         /*
     487             :          * During start or reboot, clean out the pg_notify directory.
     488             :          */
     489           5 :         (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
     490             : 
     491             :         /* Now initialize page zero to empty */
     492           5 :         LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
     493           5 :         slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
     494             :         /* This write is just to verify that pg_notify/ is writable */
     495           5 :         SimpleLruWritePage(AsyncCtl, slotno);
     496           5 :         LWLockRelease(AsyncCtlLock);
     497             :     }
     498           5 : }
     499             : 
     500             : 
     501             : /*
     502             :  * pg_notify -
     503             :  *    SQL function to send a notification event
     504             :  */
     505             : Datum
     506           6 : pg_notify(PG_FUNCTION_ARGS)
     507             : {
     508             :     const char *channel;
     509             :     const char *payload;
     510             : 
     511           6 :     if (PG_ARGISNULL(0))
     512           1 :         channel = "";
     513             :     else
     514           5 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     515             : 
     516           6 :     if (PG_ARGISNULL(1))
     517           1 :         payload = "";
     518             :     else
     519           5 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     520             : 
     521             :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     522           6 :     PreventCommandDuringRecovery("NOTIFY");
     523             : 
     524           6 :     Async_Notify(channel, payload);
     525             : 
     526           3 :     PG_RETURN_VOID();
     527             : }
     528             : 
     529             : 
     530             : /*
     531             :  * Async_Notify
     532             :  *
     533             :  *      This is executed by the SQL notify command.
     534             :  *
     535             :  *      Adds the message to the list of pending notifies.
     536             :  *      Actual notification happens during transaction commit.
     537             :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     538             :  */
     539             : void
     540           7 : Async_Notify(const char *channel, const char *payload)
     541             : {
     542             :     Notification *n;
     543             :     MemoryContext oldcontext;
     544             : 
     545           7 :     if (IsParallelWorker())
     546           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     547             : 
     548           7 :     if (Trace_notify)
     549           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     550             : 
     551             :     /* a channel name must be specified */
     552           7 :     if (!channel || !strlen(channel))
     553           2 :         ereport(ERROR,
     554             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     555             :                  errmsg("channel name cannot be empty")));
     556             : 
     557           5 :     if (strlen(channel) >= NAMEDATALEN)
     558           1 :         ereport(ERROR,
     559             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     560             :                  errmsg("channel name too long")));
     561             : 
     562           4 :     if (payload)
     563             :     {
     564           3 :         if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
     565           0 :             ereport(ERROR,
     566             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     567             :                      errmsg("payload string too long")));
     568             :     }
     569             : 
     570             :     /* no point in making duplicate entries in the list ... */
     571           4 :     if (AsyncExistsPendingNotify(channel, payload))
     572           4 :         return;
     573             : 
     574             :     /*
     575             :      * The notification list needs to live until end of transaction, so store
     576             :      * it in the transaction context.
     577             :      */
     578           4 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     579             : 
     580           4 :     n = (Notification *) palloc(sizeof(Notification));
     581           4 :     n->channel = pstrdup(channel);
     582           4 :     if (payload)
     583           3 :         n->payload = pstrdup(payload);
     584             :     else
     585           1 :         n->payload = "";
     586             : 
     587             :     /*
     588             :      * We want to preserve the order so we need to append every notification.
     589             :      * See comments at AsyncExistsPendingNotify().
     590             :      */
     591           4 :     pendingNotifies = lappend(pendingNotifies, n);
     592             : 
     593           4 :     MemoryContextSwitchTo(oldcontext);
     594             : }
     595             : 
     596             : /*
     597             :  * queue_listen
     598             :  *      Common code for listen, unlisten, unlisten all commands.
     599             :  *
     600             :  *      Adds the request to the list of pending actions.
     601             :  *      Actual update of the listenChannels list happens during transaction
     602             :  *      commit.
     603             :  */
     604             : static void
     605           5 : queue_listen(ListenActionKind action, const char *channel)
     606             : {
     607             :     MemoryContext oldcontext;
     608             :     ListenAction *actrec;
     609             : 
     610             :     /*
     611             :      * Unlike Async_Notify, we don't try to collapse out duplicates. It would
     612             :      * be too complicated to ensure we get the right interactions of
     613             :      * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
     614             :      * would be any performance benefit anyway in sane applications.
     615             :      */
     616           5 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     617             : 
     618             :     /* space for terminating null is included in sizeof(ListenAction) */
     619           5 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
     620           5 :                                      strlen(channel) + 1);
     621           5 :     actrec->action = action;
     622           5 :     strcpy(actrec->channel, channel);
     623             : 
     624           5 :     pendingActions = lappend(pendingActions, actrec);
     625             : 
     626           5 :     MemoryContextSwitchTo(oldcontext);
     627           5 : }
     628             : 
     629             : /*
     630             :  * Async_Listen
     631             :  *
     632             :  *      This is executed by the SQL listen command.
     633             :  */
     634             : void
     635           2 : Async_Listen(const char *channel)
     636             : {
     637           2 :     if (Trace_notify)
     638           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
     639             : 
     640           2 :     queue_listen(LISTEN_LISTEN, channel);
     641           2 : }
     642             : 
     643             : /*
     644             :  * Async_Unlisten
     645             :  *
     646             :  *      This is executed by the SQL unlisten command.
     647             :  */
     648             : void
     649           1 : Async_Unlisten(const char *channel)
     650             : {
     651           1 :     if (Trace_notify)
     652           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
     653             : 
     654             :     /* If we couldn't possibly be listening, no need to queue anything */
     655           1 :     if (pendingActions == NIL && !unlistenExitRegistered)
     656           1 :         return;
     657             : 
     658           1 :     queue_listen(LISTEN_UNLISTEN, channel);
     659             : }
     660             : 
     661             : /*
     662             :  * Async_UnlistenAll
     663             :  *
     664             :  *      This is invoked by UNLISTEN * command, and also at backend exit.
     665             :  */
     666             : void
     667           2 : Async_UnlistenAll(void)
     668             : {
     669           2 :     if (Trace_notify)
     670           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
     671             : 
     672             :     /* If we couldn't possibly be listening, no need to queue anything */
     673           2 :     if (pendingActions == NIL && !unlistenExitRegistered)
     674           2 :         return;
     675             : 
     676           2 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
     677             : }
     678             : 
     679             : /*
     680             :  * SQL function: return a set of the channel names this backend is actively
     681             :  * listening to.
     682             :  *
     683             :  * Note: this coding relies on the fact that the listenChannels list cannot
     684             :  * change within a transaction.
     685             :  */
     686             : Datum
     687           3 : pg_listening_channels(PG_FUNCTION_ARGS)
     688             : {
     689             :     FuncCallContext *funcctx;
     690             :     ListCell  **lcp;
     691             : 
     692             :     /* stuff done only on the first call of the function */
     693           3 :     if (SRF_IS_FIRSTCALL())
     694             :     {
     695             :         MemoryContext oldcontext;
     696             : 
     697             :         /* create a function context for cross-call persistence */
     698           2 :         funcctx = SRF_FIRSTCALL_INIT();
     699             : 
     700             :         /* switch to memory context appropriate for multiple function calls */
     701           2 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     702             : 
     703             :         /* allocate memory for user context */
     704           2 :         lcp = (ListCell **) palloc(sizeof(ListCell *));
     705           2 :         *lcp = list_head(listenChannels);
     706           2 :         funcctx->user_fctx = (void *) lcp;
     707             : 
     708           2 :         MemoryContextSwitchTo(oldcontext);
     709             :     }
     710             : 
     711             :     /* stuff done on every call of the function */
     712           3 :     funcctx = SRF_PERCALL_SETUP();
     713           3 :     lcp = (ListCell **) funcctx->user_fctx;
     714             : 
     715           6 :     while (*lcp != NULL)
     716             :     {
     717           1 :         char       *channel = (char *) lfirst(*lcp);
     718             : 
     719           1 :         *lcp = lnext(*lcp);
     720           1 :         SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
     721             :     }
     722             : 
     723           2 :     SRF_RETURN_DONE(funcctx);
     724             : }
     725             : 
     726             : /*
     727             :  * Async_UnlistenOnExit
     728             :  *
     729             :  * This is executed at backend exit if we have done any LISTENs in this
     730             :  * backend.  It might not be necessary anymore, if the user UNLISTENed
     731             :  * everything, but we don't try to detect that case.
     732             :  */
     733             : static void
     734           2 : Async_UnlistenOnExit(int code, Datum arg)
     735             : {
     736           2 :     Exec_UnlistenAllCommit();
     737           2 :     asyncQueueUnregister();
     738           2 : }
     739             : 
     740             : /*
     741             :  * AtPrepare_Notify
     742             :  *
     743             :  *      This is called at the prepare phase of a two-phase
     744             :  *      transaction.  Save the state for possible commit later.
     745             :  */
     746             : void
     747           6 : AtPrepare_Notify(void)
     748             : {
     749             :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
     750           6 :     if (pendingActions || pendingNotifies)
     751           0 :         ereport(ERROR,
     752             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     753             :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
     754           6 : }
     755             : 
     756             : /*
     757             :  * PreCommit_Notify
     758             :  *
     759             :  *      This is called at transaction commit, before actually committing to
     760             :  *      clog.
     761             :  *
     762             :  *      If there are pending LISTEN actions, make sure we are listed in the
     763             :  *      shared-memory listener array.  This must happen before commit to
     764             :  *      ensure we don't miss any notifies from transactions that commit
     765             :  *      just after ours.
     766             :  *
     767             :  *      If there are outbound notify requests in the pendingNotifies list,
     768             :  *      add them to the global queue.  We do that before commit so that
     769             :  *      we can still throw error if we run out of queue space.
     770             :  */
     771             : void
     772       22906 : PreCommit_Notify(void)
     773             : {
     774             :     ListCell   *p;
     775             : 
     776       22906 :     if (pendingActions == NIL && pendingNotifies == NIL)
     777       45803 :         return;                 /* no relevant statements in this xact */
     778             : 
     779           9 :     if (Trace_notify)
     780           0 :         elog(DEBUG1, "PreCommit_Notify");
     781             : 
     782             :     /* Preflight for any pending listen/unlisten actions */
     783          14 :     foreach(p, pendingActions)
     784             :     {
     785           5 :         ListenAction *actrec = (ListenAction *) lfirst(p);
     786             : 
     787           5 :         switch (actrec->action)
     788             :         {
     789             :             case LISTEN_LISTEN:
     790           2 :                 Exec_ListenPreCommit();
     791           2 :                 break;
     792             :             case LISTEN_UNLISTEN:
     793             :                 /* there is no Exec_UnlistenPreCommit() */
     794           1 :                 break;
     795             :             case LISTEN_UNLISTEN_ALL:
     796             :                 /* there is no Exec_UnlistenAllPreCommit() */
     797           2 :                 break;
     798             :         }
     799             :     }
     800             : 
     801             :     /* Queue any pending notifies */
     802           9 :     if (pendingNotifies)
     803             :     {
     804             :         ListCell   *nextNotify;
     805             : 
     806             :         /*
     807             :          * Make sure that we have an XID assigned to the current transaction.
     808             :          * GetCurrentTransactionId is cheap if we already have an XID, but not
     809             :          * so cheap if we don't, and we'd prefer not to do that work while
     810             :          * holding AsyncQueueLock.
     811             :          */
     812           4 :         (void) GetCurrentTransactionId();
     813             : 
     814             :         /*
     815             :          * Serialize writers by acquiring a special lock that we hold till
     816             :          * after commit.  This ensures that queue entries appear in commit
     817             :          * order, and in particular that there are never uncommitted queue
     818             :          * entries ahead of committed ones, so an uncommitted transaction
     819             :          * can't block delivery of deliverable notifications.
     820             :          *
     821             :          * We use a heavyweight lock so that it'll automatically be released
     822             :          * after either commit or abort.  This also allows deadlocks to be
     823             :          * detected, though really a deadlock shouldn't be possible here.
     824             :          *
     825             :          * The lock is on "database 0", which is pretty ugly but it doesn't
     826             :          * seem worth inventing a special locktag category just for this.
     827             :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
     828             :          * used by the flatfiles mechanism.)
     829             :          */
     830           4 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
     831             :                          AccessExclusiveLock);
     832             : 
     833             :         /* Now push the notifications into the queue */
     834           4 :         backendHasSentNotifications = true;
     835             : 
     836           4 :         nextNotify = list_head(pendingNotifies);
     837          12 :         while (nextNotify != NULL)
     838             :         {
     839             :             /*
     840             :              * Add the pending notifications to the queue.  We acquire and
     841             :              * release AsyncQueueLock once per page, which might be overkill
     842             :              * but it does allow readers to get in while we're doing this.
     843             :              *
     844             :              * A full queue is very uncommon and should really not happen,
     845             :              * given that we have so much space available in the SLRU pages.
     846             :              * Nevertheless we need to deal with this possibility. Note that
     847             :              * when we get here we are in the process of committing our
     848             :              * transaction, but we have not yet committed to clog, so at this
     849             :              * point in time we can still roll the transaction back.
     850             :              */
     851           4 :             LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
     852           4 :             asyncQueueFillWarning();
     853           4 :             if (asyncQueueIsFull())
     854           0 :                 ereport(ERROR,
     855             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     856             :                          errmsg("too many notifications in the NOTIFY queue")));
     857           4 :             nextNotify = asyncQueueAddEntries(nextNotify);
     858           4 :             LWLockRelease(AsyncQueueLock);
     859             :         }
     860             :     }
     861             : }
     862             : 
     863             : /*
     864             :  * AtCommit_Notify
     865             :  *
     866             :  *      This is called at transaction commit, after committing to clog.
     867             :  *
     868             :  *      Update listenChannels and clear transaction-local state.
     869             :  */
     870             : void
     871       22906 : AtCommit_Notify(void)
     872             : {
     873             :     ListCell   *p;
     874             : 
     875             :     /*
     876             :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     877             :      * return as soon as possible
     878             :      */
     879       22906 :     if (!pendingActions && !pendingNotifies)
     880       45803 :         return;
     881             : 
     882           9 :     if (Trace_notify)
     883           0 :         elog(DEBUG1, "AtCommit_Notify");
     884             : 
     885             :     /* Perform any pending listen/unlisten actions */
     886          14 :     foreach(p, pendingActions)
     887             :     {
     888           5 :         ListenAction *actrec = (ListenAction *) lfirst(p);
     889             : 
     890           5 :         switch (actrec->action)
     891             :         {
     892             :             case LISTEN_LISTEN:
     893           2 :                 Exec_ListenCommit(actrec->channel);
     894           2 :                 break;
     895             :             case LISTEN_UNLISTEN:
     896           1 :                 Exec_UnlistenCommit(actrec->channel);
     897           1 :                 break;
     898             :             case LISTEN_UNLISTEN_ALL:
     899           2 :                 Exec_UnlistenAllCommit();
     900           2 :                 break;
     901             :         }
     902             :     }
     903             : 
     904             :     /* If no longer listening to anything, get out of listener array */
     905           9 :     if (amRegisteredListener && listenChannels == NIL)
     906           2 :         asyncQueueUnregister();
     907             : 
     908             :     /* And clean up */
     909           9 :     ClearPendingActionsAndNotifies();
     910             : }
     911             : 
     912             : /*
     913             :  * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
     914             :  *
     915             :  * This function must make sure we are ready to catch any incoming messages.
     916             :  */
     917             : static void
     918           2 : Exec_ListenPreCommit(void)
     919             : {
     920             :     QueuePosition head;
     921             :     QueuePosition max;
     922             :     int         i;
     923             : 
     924             :     /*
     925             :      * Nothing to do if we are already listening to something, nor if we
     926             :      * already ran this routine in this transaction.
     927             :      */
     928           2 :     if (amRegisteredListener)
     929           2 :         return;
     930             : 
     931           2 :     if (Trace_notify)
     932           0 :         elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
     933             : 
     934             :     /*
     935             :      * Before registering, make sure we will unlisten before dying. (Note:
     936             :      * this action does not get undone if we abort later.)
     937             :      */
     938           2 :     if (!unlistenExitRegistered)
     939             :     {
     940           2 :         before_shmem_exit(Async_UnlistenOnExit, 0);
     941           2 :         unlistenExitRegistered = true;
     942             :     }
     943             : 
     944             :     /*
     945             :      * This is our first LISTEN, so establish our pointer.
     946             :      *
     947             :      * We set our pointer to the global tail pointer and then move it forward
     948             :      * over already-committed notifications.  This ensures we cannot miss any
     949             :      * not-yet-committed notifications.  We might get a few more but that
     950             :      * doesn't hurt.
     951             :      *
     952             :      * In some scenarios there might be a lot of committed notifications that
     953             :      * have not yet been pruned away (because some backend is being lazy about
     954             :      * reading them).  To reduce our startup time, we can look at other
     955             :      * backends and adopt the maximum "pos" pointer of any backend that's in
     956             :      * our database; any notifications it's already advanced over are surely
     957             :      * committed and need not be re-examined by us.  (We must consider only
     958             :      * backends connected to our DB, because others will not have bothered to
     959             :      * check committed-ness of notifications in our DB.)  But we only bother
     960             :      * with that if there's more than a page worth of notifications
     961             :      * outstanding, otherwise scanning all the other backends isn't worth it.
     962             :      *
     963             :      * We need exclusive lock here so we can look at other backends' entries.
     964             :      */
     965           2 :     LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
     966           2 :     head = QUEUE_HEAD;
     967           2 :     max = QUEUE_TAIL;
     968           2 :     if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
     969             :     {
     970           0 :         for (i = 1; i <= MaxBackends; i++)
     971             :         {
     972           0 :             if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
     973           0 :                 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
     974             :         }
     975             :     }
     976           2 :     QUEUE_BACKEND_POS(MyBackendId) = max;
     977           2 :     QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
     978           2 :     QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
     979           2 :     LWLockRelease(AsyncQueueLock);
     980             : 
     981             :     /* Now we are listed in the global array, so remember we're listening */
     982           2 :     amRegisteredListener = true;
     983             : 
     984             :     /*
     985             :      * Try to move our pointer forward as far as possible. This will skip over
     986             :      * already-committed notifications. Still, we could get notifications that
     987             :      * have already committed before we started to LISTEN.
     988             :      *
     989             :      * Note that we are not yet listening on anything, so we won't deliver any
     990             :      * notification to the frontend.
     991             :      *
     992             :      * This will also advance the global tail pointer if possible.
     993             :      */
     994           2 :     if (!QUEUE_POS_EQUAL(max, head))
     995           0 :         asyncQueueReadAllNotifications();
     996             : }
     997             : 
     998             : /*
     999             :  * Exec_ListenCommit --- subroutine for AtCommit_Notify
    1000             :  *
    1001             :  * Add the channel to the list of channels we are listening on.
    1002             :  */
    1003             : static void
    1004           2 : Exec_ListenCommit(const char *channel)
    1005             : {
    1006             :     MemoryContext oldcontext;
    1007             : 
    1008             :     /* Do nothing if we are already listening on this channel */
    1009           2 :     if (IsListeningOn(channel))
    1010           2 :         return;
    1011             : 
    1012             :     /*
    1013             :      * Add the new channel name to listenChannels.
    1014             :      *
    1015             :      * XXX It is theoretically possible to get an out-of-memory failure here,
    1016             :      * which would be bad because we already committed.  For the moment it
    1017             :      * doesn't seem worth trying to guard against that, but maybe improve this
    1018             :      * later.
    1019             :      */
    1020           2 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1021           2 :     listenChannels = lappend(listenChannels, pstrdup(channel));
    1022           2 :     MemoryContextSwitchTo(oldcontext);
    1023             : }
    1024             : 
    1025             : /*
    1026             :  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
    1027             :  *
    1028             :  * Remove the specified channel name from listenChannels.
    1029             :  */
    1030             : static void
    1031           1 : Exec_UnlistenCommit(const char *channel)
    1032             : {
    1033             :     ListCell   *q;
    1034             :     ListCell   *prev;
    1035             : 
    1036           1 :     if (Trace_notify)
    1037           0 :         elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
    1038             : 
    1039           1 :     prev = NULL;
    1040           1 :     foreach(q, listenChannels)
    1041             :     {
    1042           1 :         char       *lchan = (char *) lfirst(q);
    1043             : 
    1044           1 :         if (strcmp(lchan, channel) == 0)
    1045             :         {
    1046           1 :             listenChannels = list_delete_cell(listenChannels, q, prev);
    1047           1 :             pfree(lchan);
    1048           1 :             break;
    1049             :         }
    1050           0 :         prev = q;
    1051             :     }
    1052             : 
    1053             :     /*
    1054             :      * We do not complain about unlistening something not being listened;
    1055             :      * should we?
    1056             :      */
    1057           1 : }
    1058             : 
    1059             : /*
    1060             :  * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
    1061             :  *
    1062             :  *      Unlisten on all channels for this backend.
    1063             :  */
    1064             : static void
    1065           4 : Exec_UnlistenAllCommit(void)
    1066             : {
    1067           4 :     if (Trace_notify)
    1068           0 :         elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
    1069             : 
    1070           4 :     list_free_deep(listenChannels);
    1071           4 :     listenChannels = NIL;
    1072           4 : }
    1073             : 
    1074             : /*
    1075             :  * ProcessCompletedNotifies --- send out signals and self-notifies
    1076             :  *
    1077             :  * This is called from postgres.c just before going idle at the completion
    1078             :  * of a transaction.  If we issued any notifications in the just-completed
    1079             :  * transaction, send signals to other backends to process them, and also
    1080             :  * process the queue ourselves to send messages to our own frontend.
    1081             :  *
    1082             :  * The reason that this is not done in AtCommit_Notify is that there is
    1083             :  * a nonzero chance of errors here (for example, encoding conversion errors
    1084             :  * while trying to format messages to our frontend).  An error during
    1085             :  * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
    1086             :  * to ensure that a transaction's self-notifies are delivered to the frontend
    1087             :  * before it gets the terminating ReadyForQuery message.
    1088             :  *
    1089             :  * Note that we send signals and process the queue even if the transaction
    1090             :  * eventually aborted.  This is because we need to clean out whatever got
    1091             :  * added to the queue.
    1092             :  *
    1093             :  * NOTE: we are outside of any transaction here.
    1094             :  */
    1095             : void
    1096       24949 : ProcessCompletedNotifies(void)
    1097             : {
    1098             :     MemoryContext caller_context;
    1099             :     bool        signalled;
    1100             : 
    1101             :     /* Nothing to do if we didn't send any notifications */
    1102       24949 :     if (!backendHasSentNotifications)
    1103       49894 :         return;
    1104             : 
    1105             :     /*
    1106             :      * We reset the flag immediately; otherwise, if any sort of error occurs
    1107             :      * below, we'd be locked up in an infinite loop, because control will come
    1108             :      * right back here after error cleanup.
    1109             :      */
    1110           4 :     backendHasSentNotifications = false;
    1111             : 
    1112             :     /*
    1113             :      * We must preserve the caller's memory context (probably MessageContext)
    1114             :      * across the transaction we do here.
    1115             :      */
    1116           4 :     caller_context = CurrentMemoryContext;
    1117             : 
    1118           4 :     if (Trace_notify)
    1119           0 :         elog(DEBUG1, "ProcessCompletedNotifies");
    1120             : 
    1121             :     /*
    1122             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    1123             :      * bad things happen if it gets an error.
    1124             :      */
    1125           4 :     StartTransactionCommand();
    1126             : 
    1127             :     /* Send signals to other backends */
    1128           4 :     signalled = SignalBackends();
    1129             : 
    1130           4 :     if (listenChannels != NIL)
    1131             :     {
    1132             :         /* Read the queue ourselves, and send relevant stuff to the frontend */
    1133           0 :         asyncQueueReadAllNotifications();
    1134             :     }
    1135           4 :     else if (!signalled)
    1136             :     {
    1137             :         /*
    1138             :          * If we found no other listening backends, and we aren't listening
    1139             :          * ourselves, then we must execute asyncQueueAdvanceTail to flush the
    1140             :          * queue, because ain't nobody else gonna do it.  This prevents queue
    1141             :          * overflow when we're sending useless notifies to nobody. (A new
    1142             :          * listener could have joined since we looked, but if so this is
    1143             :          * harmless.)
    1144             :          */
    1145           4 :         asyncQueueAdvanceTail();
    1146             :     }
    1147             : 
    1148           4 :     CommitTransactionCommand();
    1149             : 
    1150           4 :     MemoryContextSwitchTo(caller_context);
    1151             : 
    1152             :     /* We don't need pq_flush() here since postgres.c will do one shortly */
    1153             : }
    1154             : 
    1155             : /*
    1156             :  * Test whether we are actively listening on the given channel name.
    1157             :  *
    1158             :  * Note: this function is executed for every notification found in the queue.
    1159             :  * Perhaps it is worth further optimization, eg convert the list to a sorted
    1160             :  * array so we can binary-search it.  In practice the list is likely to be
    1161             :  * fairly short, though.
    1162             :  */
    1163             : static bool
    1164           2 : IsListeningOn(const char *channel)
    1165             : {
    1166             :     ListCell   *p;
    1167             : 
    1168           2 :     foreach(p, listenChannels)
    1169             :     {
    1170           0 :         char       *lchan = (char *) lfirst(p);
    1171             : 
    1172           0 :         if (strcmp(lchan, channel) == 0)
    1173           0 :             return true;
    1174             :     }
    1175           2 :     return false;
    1176             : }
    1177             : 
    1178             : /*
    1179             :  * Remove our entry from the listeners array when we are no longer listening
    1180             :  * on any channel.  NB: must not fail if we're already not listening.
    1181             :  */
    1182             : static void
    1183           4 : asyncQueueUnregister(void)
    1184             : {
    1185             :     bool        advanceTail;
    1186             : 
    1187           4 :     Assert(listenChannels == NIL);  /* else caller error */
    1188             : 
    1189           4 :     if (!amRegisteredListener)  /* nothing to do */
    1190           6 :         return;
    1191             : 
    1192           2 :     LWLockAcquire(AsyncQueueLock, LW_SHARED);
    1193             :     /* check if entry is valid and oldest ... */
    1194           6 :     advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
    1195           4 :         QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
    1196             :     /* ... then mark it invalid */
    1197           2 :     QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
    1198           2 :     QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
    1199           2 :     LWLockRelease(AsyncQueueLock);
    1200             : 
    1201             :     /* mark ourselves as no longer listed in the global array */
    1202           2 :     amRegisteredListener = false;
    1203             : 
    1204             :     /* If we were the laziest backend, try to advance the tail pointer */
    1205           2 :     if (advanceTail)
    1206           2 :         asyncQueueAdvanceTail();
    1207             : }
    1208             : 
    1209             : /*
    1210             :  * Test whether there is room to insert more notification messages.
    1211             :  *
    1212             :  * Caller must hold at least shared AsyncQueueLock.
    1213             :  */
    1214             : static bool
    1215           4 : asyncQueueIsFull(void)
    1216             : {
    1217             :     int         nexthead;
    1218             :     int         boundary;
    1219             : 
    1220             :     /*
    1221             :      * The queue is full if creating a new head page would create a page that
    1222             :      * logically precedes the current global tail pointer, ie, the head
    1223             :      * pointer would wrap around compared to the tail.  We cannot create such
    1224             :      * a head page for fear of confusing slru.c.  For safety we round the tail
    1225             :      * pointer back to a segment boundary (compare the truncation logic in
    1226             :      * asyncQueueAdvanceTail).
    1227             :      *
    1228             :      * Note that this test is *not* dependent on how much space there is on
    1229             :      * the current head page.  This is necessary because asyncQueueAddEntries
    1230             :      * might try to create the next head page in any case.
    1231             :      */
    1232           4 :     nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
    1233           4 :     if (nexthead > QUEUE_MAX_PAGE)
    1234           0 :         nexthead = 0;           /* wrap around */
    1235           4 :     boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
    1236           4 :     boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
    1237           4 :     return asyncQueuePagePrecedes(nexthead, boundary);
    1238             : }
    1239             : 
    1240             : /*
    1241             :  * Advance the QueuePosition to the next entry, assuming that the current
    1242             :  * entry is of length entryLength.  If we jump to a new page the function
    1243             :  * returns true, else false.
    1244             :  */
    1245             : static bool
    1246           4 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1247             : {
    1248           4 :     int         pageno = QUEUE_POS_PAGE(*position);
    1249           4 :     int         offset = QUEUE_POS_OFFSET(*position);
    1250           4 :     bool        pageJump = false;
    1251             : 
    1252             :     /*
    1253             :      * Move to the next writing position: First jump over what we have just
    1254             :      * written or read.
    1255             :      */
    1256           4 :     offset += entryLength;
    1257           4 :     Assert(offset <= QUEUE_PAGESIZE);
    1258             : 
    1259             :     /*
    1260             :      * In a second step check if another entry can possibly be written to the
    1261             :      * page. If so, stay here, we have reached the next position. If not, then
    1262             :      * we need to move on to the next page.
    1263             :      */
    1264           4 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1265             :     {
    1266           0 :         pageno++;
    1267           0 :         if (pageno > QUEUE_MAX_PAGE)
    1268           0 :             pageno = 0;         /* wrap around */
    1269           0 :         offset = 0;
    1270           0 :         pageJump = true;
    1271             :     }
    1272             : 
    1273           4 :     SET_QUEUE_POS(*position, pageno, offset);
    1274           4 :     return pageJump;
    1275             : }
    1276             : 
    1277             : /*
    1278             :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1279             :  */
    1280             : static void
    1281           4 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1282             : {
    1283           4 :     size_t      channellen = strlen(n->channel);
    1284           4 :     size_t      payloadlen = strlen(n->payload);
    1285             :     int         entryLength;
    1286             : 
    1287           4 :     Assert(channellen < NAMEDATALEN);
    1288           4 :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    1289             : 
    1290             :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    1291           4 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    1292           4 :     entryLength = QUEUEALIGN(entryLength);
    1293           4 :     qe->length = entryLength;
    1294           4 :     qe->dboid = MyDatabaseId;
    1295           4 :     qe->xid = GetCurrentTransactionId();
    1296           4 :     qe->srcPid = MyProcPid;
    1297           4 :     memcpy(qe->data, n->channel, channellen + 1);
    1298           4 :     memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
    1299           4 : }
    1300             : 
    1301             : /*
    1302             :  * Add pending notifications to the queue.
    1303             :  *
    1304             :  * We go page by page here, i.e. we stop once we have to go to a new page but
    1305             :  * we will be called again and then fill that next page. If an entry does not
    1306             :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    1307             :  * database OID in order to fill the page. So every page is always used up to
    1308             :  * the last byte which simplifies reading the page later.
    1309             :  *
    1310             :  * We are passed the list cell containing the next notification to write
    1311             :  * and return the first still-unwritten cell back.  Eventually we will return
    1312             :  * NULL indicating all is done.
    1313             :  *
    1314             :  * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
    1315             :  * locally in this function.
    1316             :  */
    1317             : static ListCell *
    1318           4 : asyncQueueAddEntries(ListCell *nextNotify)
    1319             : {
    1320             :     AsyncQueueEntry qe;
    1321             :     QueuePosition queue_head;
    1322             :     int         pageno;
    1323             :     int         offset;
    1324             :     int         slotno;
    1325             : 
    1326             :     /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
    1327           4 :     LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
    1328             : 
    1329             :     /*
    1330             :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    1331             :      * memory upon exiting.  The reason for this is that if we have to advance
    1332             :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    1333             :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    1334             :      * subsequent insertions would try to put entries into a page that slru.c
    1335             :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    1336             :      * that if we do fail, any already-inserted queue entries are forgotten;
    1337             :      * this is okay, since they'd be useless anyway after our transaction
    1338             :      * rolls back.
    1339             :      */
    1340           4 :     queue_head = QUEUE_HEAD;
    1341             : 
    1342             :     /* Fetch the current page */
    1343           4 :     pageno = QUEUE_POS_PAGE(queue_head);
    1344           4 :     slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
    1345             :     /* Note we mark the page dirty before writing in it */
    1346           4 :     AsyncCtl->shared->page_dirty[slotno] = true;
    1347             : 
    1348          12 :     while (nextNotify != NULL)
    1349             :     {
    1350           4 :         Notification *n = (Notification *) lfirst(nextNotify);
    1351             : 
    1352             :         /* Construct a valid queue entry in local variable qe */
    1353           4 :         asyncQueueNotificationToEntry(n, &qe);
    1354             : 
    1355           4 :         offset = QUEUE_POS_OFFSET(queue_head);
    1356             : 
    1357             :         /* Check whether the entry really fits on the current page */
    1358           4 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    1359             :         {
    1360             :             /* OK, so advance nextNotify past this item */
    1361           4 :             nextNotify = lnext(nextNotify);
    1362             :         }
    1363             :         else
    1364             :         {
    1365             :             /*
    1366             :              * Write a dummy entry to fill up the page. Actually readers will
    1367             :              * only check dboid and since it won't match any reader's database
    1368             :              * OID, they will ignore this entry and move on.
    1369             :              */
    1370           0 :             qe.length = QUEUE_PAGESIZE - offset;
    1371           0 :             qe.dboid = InvalidOid;
    1372           0 :             qe.data[0] = '\0';  /* empty channel */
    1373           0 :             qe.data[1] = '\0';  /* empty payload */
    1374             :         }
    1375             : 
    1376             :         /* Now copy qe into the shared buffer page */
    1377           4 :         memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
    1378             :                &qe,
    1379           4 :                qe.length);
    1380             : 
    1381             :         /* Advance queue_head appropriately, and detect if page is full */
    1382           4 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    1383             :         {
    1384             :             /*
    1385             :              * Page is full, so we're done here, but first fill the next page
    1386             :              * with zeroes.  The reason to do this is to ensure that slru.c's
    1387             :              * idea of the head page is always the same as ours, which avoids
    1388             :              * boundary problems in SimpleLruTruncate.  The test in
    1389             :              * asyncQueueIsFull() ensured that there is room to create this
    1390             :              * page without overrunning the queue.
    1391             :              */
    1392           0 :             slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
    1393             :             /* And exit the loop */
    1394           0 :             break;
    1395             :         }
    1396             :     }
    1397             : 
    1398             :     /* Success, so update the global QUEUE_HEAD */
    1399           4 :     QUEUE_HEAD = queue_head;
    1400             : 
    1401           4 :     LWLockRelease(AsyncCtlLock);
    1402             : 
    1403           4 :     return nextNotify;
    1404             : }
    1405             : 
    1406             : /*
    1407             :  * SQL function to return the fraction of the notification queue currently
    1408             :  * occupied.
    1409             :  */
    1410             : Datum
    1411           1 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    1412             : {
    1413             :     double      usage;
    1414             : 
    1415           1 :     LWLockAcquire(AsyncQueueLock, LW_SHARED);
    1416           1 :     usage = asyncQueueUsage();
    1417           1 :     LWLockRelease(AsyncQueueLock);
    1418             : 
    1419           1 :     PG_RETURN_FLOAT8(usage);
    1420             : }
    1421             : 
    1422             : /*
    1423             :  * Return the fraction of the queue that is currently occupied.
    1424             :  *
    1425             :  * The caller must hold AsyncQueueLock in (at least) shared mode.
    1426             :  */
    1427             : static double
    1428           5 : asyncQueueUsage(void)
    1429             : {
    1430           5 :     int         headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1431           5 :     int         tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1432             :     int         occupied;
    1433             : 
    1434           5 :     occupied = headPage - tailPage;
    1435             : 
    1436           5 :     if (occupied == 0)
    1437           5 :         return (double) 0;      /* fast exit for common case */
    1438             : 
    1439           0 :     if (occupied < 0)
    1440             :     {
    1441             :         /* head has wrapped around, tail not yet */
    1442           0 :         occupied += QUEUE_MAX_PAGE + 1;
    1443             :     }
    1444             : 
    1445           0 :     return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
    1446             : }
    1447             : 
    1448             : /*
    1449             :  * Check whether the queue is at least half full, and emit a warning if so.
    1450             :  *
    1451             :  * This is unlikely given the size of the queue, but possible.
    1452             :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    1453             :  *
    1454             :  * Caller must hold exclusive AsyncQueueLock.
    1455             :  */
    1456             : static void
    1457           4 : asyncQueueFillWarning(void)
    1458             : {
    1459             :     double      fillDegree;
    1460             :     TimestampTz t;
    1461             : 
    1462           4 :     fillDegree = asyncQueueUsage();
    1463           4 :     if (fillDegree < 0.5)
    1464           8 :         return;
    1465             : 
    1466           0 :     t = GetCurrentTimestamp();
    1467             : 
    1468           0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    1469             :                                    t, QUEUE_FULL_WARN_INTERVAL))
    1470             :     {
    1471           0 :         QueuePosition min = QUEUE_HEAD;
    1472           0 :         int32       minPid = InvalidPid;
    1473             :         int         i;
    1474             : 
    1475           0 :         for (i = 1; i <= MaxBackends; i++)
    1476             :         {
    1477           0 :             if (QUEUE_BACKEND_PID(i) != InvalidPid)
    1478             :             {
    1479           0 :                 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    1480           0 :                 if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    1481           0 :                     minPid = QUEUE_BACKEND_PID(i);
    1482             :             }
    1483             :         }
    1484             : 
    1485           0 :         ereport(WARNING,
    1486             :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    1487             :                  (minPid != InvalidPid ?
    1488             :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    1489             :                   : 0),
    1490             :                  (minPid != InvalidPid ?
    1491             :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    1492             :                   : 0)));
    1493             : 
    1494           0 :         asyncQueueControl->lastQueueFillWarn = t;
    1495             :     }
    1496             : }
    1497             : 
    1498             : /*
    1499             :  * Send signals to all listening backends (except our own).
    1500             :  *
    1501             :  * Returns true if we sent at least one signal.
    1502             :  *
    1503             :  * Since we need EXCLUSIVE lock anyway we also check the position of the other
    1504             :  * backends and in case one is already up-to-date we don't signal it.
    1505             :  * This can happen if concurrent notifying transactions have sent a signal and
    1506             :  * the signaled backend has read the other notifications and ours in the same
    1507             :  * step.
    1508             :  *
    1509             :  * Since we know the BackendId and the Pid the signalling is quite cheap.
    1510             :  */
    1511             : static bool
    1512           4 : SignalBackends(void)
    1513             : {
    1514           4 :     bool        signalled = false;
    1515             :     int32      *pids;
    1516             :     BackendId  *ids;
    1517             :     int         count;
    1518             :     int         i;
    1519             :     int32       pid;
    1520             : 
    1521             :     /*
    1522             :      * Identify all backends that are listening and not already up-to-date. We
    1523             :      * don't want to send signals while holding the AsyncQueueLock, so we just
    1524             :      * build a list of target PIDs.
    1525             :      *
    1526             :      * XXX in principle these pallocs could fail, which would be bad. Maybe
    1527             :      * preallocate the arrays?  But in practice this is only run in trivial
    1528             :      * transactions, so there should surely be space available.
    1529             :      */
    1530           4 :     pids = (int32 *) palloc(MaxBackends * sizeof(int32));
    1531           4 :     ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
    1532           4 :     count = 0;
    1533             : 
    1534           4 :     LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
    1535         452 :     for (i = 1; i <= MaxBackends; i++)
    1536             :     {
    1537         448 :         pid = QUEUE_BACKEND_PID(i);
    1538         448 :         if (pid != InvalidPid && pid != MyProcPid)
    1539             :         {
    1540           0 :             QueuePosition pos = QUEUE_BACKEND_POS(i);
    1541             : 
    1542           0 :             if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    1543             :             {
    1544           0 :                 pids[count] = pid;
    1545           0 :                 ids[count] = i;
    1546           0 :                 count++;
    1547             :             }
    1548             :         }
    1549             :     }
    1550           4 :     LWLockRelease(AsyncQueueLock);
    1551             : 
    1552             :     /* Now send signals */
    1553           4 :     for (i = 0; i < count; i++)
    1554             :     {
    1555           0 :         pid = pids[i];
    1556             : 
    1557             :         /*
    1558             :          * Note: assuming things aren't broken, a signal failure here could
    1559             :          * only occur if the target backend exited since we released
    1560             :          * AsyncQueueLock; which is unlikely but certainly possible. So we
    1561             :          * just log a low-level debug message if it happens.
    1562             :          */
    1563           0 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
    1564           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    1565             :         else
    1566           0 :             signalled = true;
    1567             :     }
    1568             : 
    1569           4 :     pfree(pids);
    1570           4 :     pfree(ids);
    1571             : 
    1572           4 :     return signalled;
    1573             : }
    1574             : 
    1575             : /*
    1576             :  * AtAbort_Notify
    1577             :  *
    1578             :  *  This is called at transaction abort.
    1579             :  *
    1580             :  *  Gets rid of pending actions and outbound notifies that we would have
    1581             :  *  executed if the transaction got committed.
    1582             :  */
    1583             : void
    1584        3306 : AtAbort_Notify(void)
    1585             : {
    1586             :     /*
    1587             :      * If we LISTEN but then roll back the transaction after PreCommit_Notify,
    1588             :      * we have registered as a listener but have not made any entry in
    1589             :      * listenChannels.  In that case, deregister again.
    1590             :      */
    1591        3306 :     if (amRegisteredListener && listenChannels == NIL)
    1592           0 :         asyncQueueUnregister();
    1593             : 
    1594             :     /* And clean up */
    1595        3306 :     ClearPendingActionsAndNotifies();
    1596        3306 : }
    1597             : 
    1598             : /*
    1599             :  * AtSubStart_Notify() --- Take care of subtransaction start.
    1600             :  *
    1601             :  * Push empty state for the new subtransaction.
    1602             :  */
    1603             : void
    1604         372 : AtSubStart_Notify(void)
    1605             : {
    1606             :     MemoryContext old_cxt;
    1607             : 
    1608             :     /* Keep the list-of-lists in TopTransactionContext for simplicity */
    1609         372 :     old_cxt = MemoryContextSwitchTo(TopTransactionContext);
    1610             : 
    1611         372 :     upperPendingActions = lcons(pendingActions, upperPendingActions);
    1612             : 
    1613         372 :     Assert(list_length(upperPendingActions) ==
    1614             :            GetCurrentTransactionNestLevel() - 1);
    1615             : 
    1616         372 :     pendingActions = NIL;
    1617             : 
    1618         372 :     upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
    1619             : 
    1620         372 :     Assert(list_length(upperPendingNotifies) ==
    1621             :            GetCurrentTransactionNestLevel() - 1);
    1622             : 
    1623         372 :     pendingNotifies = NIL;
    1624             : 
    1625         372 :     MemoryContextSwitchTo(old_cxt);
    1626         372 : }
    1627             : 
    1628             : /*
    1629             :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    1630             :  *
    1631             :  * Reassign all items in the pending lists to the parent transaction.
    1632             :  */
    1633             : void
    1634          49 : AtSubCommit_Notify(void)
    1635             : {
    1636             :     List       *parentPendingActions;
    1637             :     List       *parentPendingNotifies;
    1638             : 
    1639          49 :     parentPendingActions = linitial_node(List, upperPendingActions);
    1640          49 :     upperPendingActions = list_delete_first(upperPendingActions);
    1641             : 
    1642          49 :     Assert(list_length(upperPendingActions) ==
    1643             :            GetCurrentTransactionNestLevel() - 2);
    1644             : 
    1645             :     /*
    1646             :      * Mustn't try to eliminate duplicates here --- see queue_listen()
    1647             :      */
    1648          49 :     pendingActions = list_concat(parentPendingActions, pendingActions);
    1649             : 
    1650          49 :     parentPendingNotifies = linitial_node(List, upperPendingNotifies);
    1651          49 :     upperPendingNotifies = list_delete_first(upperPendingNotifies);
    1652             : 
    1653          49 :     Assert(list_length(upperPendingNotifies) ==
    1654             :            GetCurrentTransactionNestLevel() - 2);
    1655             : 
    1656             :     /*
    1657             :      * We could try to eliminate duplicates here, but it seems not worthwhile.
    1658             :      */
    1659          49 :     pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
    1660          49 : }
    1661             : 
    1662             : /*
    1663             :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    1664             :  */
    1665             : void
    1666         323 : AtSubAbort_Notify(void)
    1667             : {
    1668         323 :     int         my_level = GetCurrentTransactionNestLevel();
    1669             : 
    1670             :     /*
    1671             :      * All we have to do is pop the stack --- the actions/notifies made in
    1672             :      * this subxact are no longer interesting, and the space will be freed
    1673             :      * when CurTransactionContext is recycled.
    1674             :      *
    1675             :      * This routine could be called more than once at a given nesting level if
    1676             :      * there is trouble during subxact abort.  Avoid dumping core by using
    1677             :      * GetCurrentTransactionNestLevel as the indicator of how far we need to
    1678             :      * prune the list.
    1679             :      */
    1680         969 :     while (list_length(upperPendingActions) > my_level - 2)
    1681             :     {
    1682         323 :         pendingActions = linitial_node(List, upperPendingActions);
    1683         323 :         upperPendingActions = list_delete_first(upperPendingActions);
    1684             :     }
    1685             : 
    1686         969 :     while (list_length(upperPendingNotifies) > my_level - 2)
    1687             :     {
    1688         323 :         pendingNotifies = linitial_node(List, upperPendingNotifies);
    1689         323 :         upperPendingNotifies = list_delete_first(upperPendingNotifies);
    1690             :     }
    1691         323 : }
    1692             : 
    1693             : /*
    1694             :  * HandleNotifyInterrupt
    1695             :  *
    1696             :  *      Signal handler portion of interrupt handling. Let the backend know
    1697             :  *      that there's a pending notify interrupt. If we're currently reading
    1698             :  *      from the client, this will interrupt the read and
    1699             :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    1700             :  */
    1701             : void
    1702           0 : HandleNotifyInterrupt(void)
    1703             : {
    1704             :     /*
    1705             :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    1706             :      * you do here.
    1707             :      */
    1708             : 
    1709             :     /* signal that work needs to be done */
    1710           0 :     notifyInterruptPending = true;
    1711             : 
    1712             :     /* make sure the event is processed in due course */
    1713           0 :     SetLatch(MyLatch);
    1714           0 : }
    1715             : 
    1716             : /*
    1717             :  * ProcessNotifyInterrupt
    1718             :  *
    1719             :  *      This is called just after waiting for a frontend command.  If a
    1720             :  *      interrupt arrives (via HandleNotifyInterrupt()) while reading, the
    1721             :  *      read will be interrupted via the process's latch, and this routine
    1722             :  *      will get called.  If we are truly idle (ie, *not* inside a transaction
    1723             :  *      block), process the incoming notifies.
    1724             :  */
    1725             : void
    1726           0 : ProcessNotifyInterrupt(void)
    1727             : {
    1728           0 :     if (IsTransactionOrTransactionBlock())
    1729           0 :         return;                 /* not really idle */
    1730             : 
    1731           0 :     while (notifyInterruptPending)
    1732           0 :         ProcessIncomingNotify();
    1733             : }
    1734             : 
    1735             : 
    1736             : /*
    1737             :  * Read all pending notifications from the queue, and deliver appropriate
    1738             :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    1739             :  * notification.
    1740             :  */
    1741             : static void
    1742           0 : asyncQueueReadAllNotifications(void)
    1743             : {
    1744             :     volatile QueuePosition pos;
    1745             :     QueuePosition oldpos;
    1746             :     QueuePosition head;
    1747             :     bool        advanceTail;
    1748             : 
    1749             :     /* page_buffer must be adequately aligned, so use a union */
    1750             :     union
    1751             :     {
    1752             :         char        buf[QUEUE_PAGESIZE];
    1753             :         AsyncQueueEntry align;
    1754             :     }           page_buffer;
    1755             : 
    1756             :     /* Fetch current state */
    1757           0 :     LWLockAcquire(AsyncQueueLock, LW_SHARED);
    1758             :     /* Assert checks that we have a valid state entry */
    1759           0 :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
    1760           0 :     pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
    1761           0 :     head = QUEUE_HEAD;
    1762           0 :     LWLockRelease(AsyncQueueLock);
    1763             : 
    1764           0 :     if (QUEUE_POS_EQUAL(pos, head))
    1765             :     {
    1766             :         /* Nothing to do, we have read all notifications already. */
    1767           0 :         return;
    1768             :     }
    1769             : 
    1770             :     /*----------
    1771             :      * Note that we deliver everything that we see in the queue and that
    1772             :      * matches our _current_ listening state.
    1773             :      * Especially we do not take into account different commit times.
    1774             :      * Consider the following example:
    1775             :      *
    1776             :      * Backend 1:                    Backend 2:
    1777             :      *
    1778             :      * transaction starts
    1779             :      * NOTIFY foo;
    1780             :      * commit starts
    1781             :      *                               transaction starts
    1782             :      *                               LISTEN foo;
    1783             :      *                               commit starts
    1784             :      * commit to clog
    1785             :      *                               commit to clog
    1786             :      *
    1787             :      * It could happen that backend 2 sees the notification from backend 1 in
    1788             :      * the queue.  Even though the notifying transaction committed before
    1789             :      * the listening transaction, we still deliver the notification.
    1790             :      *
    1791             :      * The idea is that an additional notification does not do any harm, we
    1792             :      * just need to make sure that we do not miss a notification.
    1793             :      *
    1794             :      * It is possible that we fail while trying to send a message to our
    1795             :      * frontend (for example, because of encoding conversion failure).
    1796             :      * If that happens it is critical that we not try to send the same
    1797             :      * message over and over again.  Therefore, we place a PG_TRY block
    1798             :      * here that will forcibly advance our backend position before we lose
    1799             :      * control to an error.  (We could alternatively retake AsyncQueueLock
    1800             :      * and move the position before handling each individual message, but
    1801             :      * that seems like too much lock traffic.)
    1802             :      *----------
    1803             :      */
    1804           0 :     PG_TRY();
    1805             :     {
    1806             :         bool        reachedStop;
    1807             : 
    1808             :         do
    1809             :         {
    1810           0 :             int         curpage = QUEUE_POS_PAGE(pos);
    1811           0 :             int         curoffset = QUEUE_POS_OFFSET(pos);
    1812             :             int         slotno;
    1813             :             int         copysize;
    1814             : 
    1815             :             /*
    1816             :              * We copy the data from SLRU into a local buffer, so as to avoid
    1817             :              * holding the AsyncCtlLock while we are examining the entries and
    1818             :              * possibly transmitting them to our frontend.  Copy only the part
    1819             :              * of the page we will actually inspect.
    1820             :              */
    1821           0 :             slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
    1822             :                                                 InvalidTransactionId);
    1823           0 :             if (curpage == QUEUE_POS_PAGE(head))
    1824             :             {
    1825             :                 /* we only want to read as far as head */
    1826           0 :                 copysize = QUEUE_POS_OFFSET(head) - curoffset;
    1827           0 :                 if (copysize < 0)
    1828           0 :                     copysize = 0;   /* just for safety */
    1829             :             }
    1830             :             else
    1831             :             {
    1832             :                 /* fetch all the rest of the page */
    1833           0 :                 copysize = QUEUE_PAGESIZE - curoffset;
    1834             :             }
    1835           0 :             memcpy(page_buffer.buf + curoffset,
    1836           0 :                    AsyncCtl->shared->page_buffer[slotno] + curoffset,
    1837             :                    copysize);
    1838             :             /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    1839           0 :             LWLockRelease(AsyncCtlLock);
    1840             : 
    1841             :             /*
    1842             :              * Process messages up to the stop position, end of page, or an
    1843             :              * uncommitted message.
    1844             :              *
    1845             :              * Our stop position is what we found to be the head's position
    1846             :              * when we entered this function. It might have changed already.
    1847             :              * But if it has, we will receive (or have already received and
    1848             :              * queued) another signal and come here again.
    1849             :              *
    1850             :              * We are not holding AsyncQueueLock here! The queue can only
    1851             :              * extend beyond the head pointer (see above) and we leave our
    1852             :              * backend's pointer where it is so nobody will truncate or
    1853             :              * rewrite pages under us. Especially we don't want to hold a lock
    1854             :              * while sending the notifications to the frontend.
    1855             :              */
    1856           0 :             reachedStop = asyncQueueProcessPageEntries(&pos, head,
    1857             :                                                        page_buffer.buf);
    1858           0 :         } while (!reachedStop);
    1859             :     }
    1860           0 :     PG_CATCH();
    1861             :     {
    1862             :         /* Update shared state */
    1863           0 :         LWLockAcquire(AsyncQueueLock, LW_SHARED);
    1864           0 :         QUEUE_BACKEND_POS(MyBackendId) = pos;
    1865           0 :         advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
    1866           0 :         LWLockRelease(AsyncQueueLock);
    1867             : 
    1868             :         /* If we were the laziest backend, try to advance the tail pointer */
    1869           0 :         if (advanceTail)
    1870           0 :             asyncQueueAdvanceTail();
    1871             : 
    1872           0 :         PG_RE_THROW();
    1873             :     }
    1874           0 :     PG_END_TRY();
    1875             : 
    1876             :     /* Update shared state */
    1877           0 :     LWLockAcquire(AsyncQueueLock, LW_SHARED);
    1878           0 :     QUEUE_BACKEND_POS(MyBackendId) = pos;
    1879           0 :     advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
    1880           0 :     LWLockRelease(AsyncQueueLock);
    1881             : 
    1882             :     /* If we were the laziest backend, try to advance the tail pointer */
    1883           0 :     if (advanceTail)
    1884           0 :         asyncQueueAdvanceTail();
    1885             : }
    1886             : 
    1887             : /*
    1888             :  * Fetch notifications from the shared queue, beginning at position current,
    1889             :  * and deliver relevant ones to my frontend.
    1890             :  *
    1891             :  * The current page must have been fetched into page_buffer from shared
    1892             :  * memory.  (We could access the page right in shared memory, but that
    1893             :  * would imply holding the AsyncCtlLock throughout this routine.)
    1894             :  *
    1895             :  * We stop if we reach the "stop" position, or reach a notification from an
    1896             :  * uncommitted transaction, or reach the end of the page.
    1897             :  *
    1898             :  * The function returns true once we have reached the stop position or an
    1899             :  * uncommitted notification, and false if we have finished with the page.
    1900             :  * In other words: once it returns true there is no need to look further.
    1901             :  * The QueuePosition *current is advanced past all processed messages.
    1902             :  */
    1903             : static bool
    1904           0 : asyncQueueProcessPageEntries(volatile QueuePosition *current,
    1905             :                              QueuePosition stop,
    1906             :                              char *page_buffer)
    1907             : {
    1908           0 :     bool        reachedStop = false;
    1909             :     bool        reachedEndOfPage;
    1910             :     AsyncQueueEntry *qe;
    1911             : 
    1912             :     do
    1913             :     {
    1914           0 :         QueuePosition thisentry = *current;
    1915             : 
    1916           0 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    1917           0 :             break;
    1918             : 
    1919           0 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    1920             : 
    1921             :         /*
    1922             :          * Advance *current over this message, possibly to the next page. As
    1923             :          * noted in the comments for asyncQueueReadAllNotifications, we must
    1924             :          * do this before possibly failing while processing the message.
    1925             :          */
    1926           0 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    1927             : 
    1928             :         /* Ignore messages destined for other databases */
    1929           0 :         if (qe->dboid == MyDatabaseId)
    1930             :         {
    1931           0 :             if (TransactionIdIsInProgress(qe->xid))
    1932             :             {
    1933             :                 /*
    1934             :                  * The source transaction is still in progress, so we can't
    1935             :                  * process this message yet.  Break out of the loop, but first
    1936             :                  * back up *current so we will reprocess the message next
    1937             :                  * time.  (Note: it is unlikely but not impossible for
    1938             :                  * TransactionIdDidCommit to fail, so we can't really avoid
    1939             :                  * this advance-then-back-up behavior when dealing with an
    1940             :                  * uncommitted message.)
    1941             :                  *
    1942             :                  * Note that we must test TransactionIdIsInProgress before we
    1943             :                  * test TransactionIdDidCommit, else we might return a message
    1944             :                  * from a transaction that is not yet visible to snapshots;
    1945             :                  * compare the comments at the head of tqual.c.
    1946             :                  */
    1947           0 :                 *current = thisentry;
    1948           0 :                 reachedStop = true;
    1949           0 :                 break;
    1950             :             }
    1951           0 :             else if (TransactionIdDidCommit(qe->xid))
    1952             :             {
    1953             :                 /* qe->data is the null-terminated channel name */
    1954           0 :                 char       *channel = qe->data;
    1955             : 
    1956           0 :                 if (IsListeningOn(channel))
    1957             :                 {
    1958             :                     /* payload follows channel name */
    1959           0 :                     char       *payload = qe->data + strlen(channel) + 1;
    1960             : 
    1961           0 :                     NotifyMyFrontEnd(channel, payload, qe->srcPid);
    1962             :                 }
    1963             :             }
    1964             :             else
    1965             :             {
    1966             :                 /*
    1967             :                  * The source transaction aborted or crashed, so we just
    1968             :                  * ignore its notifications.
    1969             :                  */
    1970             :             }
    1971             :         }
    1972             : 
    1973             :         /* Loop back if we're not at end of page */
    1974           0 :     } while (!reachedEndOfPage);
    1975             : 
    1976           0 :     if (QUEUE_POS_EQUAL(*current, stop))
    1977           0 :         reachedStop = true;
    1978             : 
    1979           0 :     return reachedStop;
    1980             : }
    1981             : 
    1982             : /*
    1983             :  * Advance the shared queue tail variable to the minimum of all the
    1984             :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    1985             :  */
    1986             : static void
    1987           6 : asyncQueueAdvanceTail(void)
    1988             : {
    1989             :     QueuePosition min;
    1990             :     int         i;
    1991             :     int         oldtailpage;
    1992             :     int         newtailpage;
    1993             :     int         boundary;
    1994             : 
    1995           6 :     LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
    1996           6 :     min = QUEUE_HEAD;
    1997         678 :     for (i = 1; i <= MaxBackends; i++)
    1998             :     {
    1999         672 :         if (QUEUE_BACKEND_PID(i) != InvalidPid)
    2000           0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2001             :     }
    2002           6 :     oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
    2003           6 :     QUEUE_TAIL = min;
    2004           6 :     LWLockRelease(AsyncQueueLock);
    2005             : 
    2006             :     /*
    2007             :      * We can truncate something if the global tail advanced across an SLRU
    2008             :      * segment boundary.
    2009             :      *
    2010             :      * XXX it might be better to truncate only once every several segments, to
    2011             :      * reduce the number of directory scans.
    2012             :      */
    2013           6 :     newtailpage = QUEUE_POS_PAGE(min);
    2014           6 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2015           6 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2016             :     {
    2017             :         /*
    2018             :          * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
    2019             :          * the lock again.
    2020             :          */
    2021           0 :         SimpleLruTruncate(AsyncCtl, newtailpage);
    2022             :     }
    2023           6 : }
    2024             : 
    2025             : /*
    2026             :  * ProcessIncomingNotify
    2027             :  *
    2028             :  *      Deal with arriving NOTIFYs from other backends as soon as it's safe to
    2029             :  *      do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
    2030             :  *      signal handler, but isn't anymore.
    2031             :  *
    2032             :  *      Scan the queue for arriving notifications and report them to my front
    2033             :  *      end.
    2034             :  *
    2035             :  *      NOTE: since we are outside any transaction, we must create our own.
    2036             :  */
    2037             : static void
    2038           0 : ProcessIncomingNotify(void)
    2039             : {
    2040             :     /* We *must* reset the flag */
    2041           0 :     notifyInterruptPending = false;
    2042             : 
    2043             :     /* Do nothing else if we aren't actively listening */
    2044           0 :     if (listenChannels == NIL)
    2045           0 :         return;
    2046             : 
    2047           0 :     if (Trace_notify)
    2048           0 :         elog(DEBUG1, "ProcessIncomingNotify");
    2049             : 
    2050           0 :     set_ps_display("notify interrupt", false);
    2051             : 
    2052             :     /*
    2053             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    2054             :      * bad things happen if it gets an error.
    2055             :      */
    2056           0 :     StartTransactionCommand();
    2057             : 
    2058           0 :     asyncQueueReadAllNotifications();
    2059             : 
    2060           0 :     CommitTransactionCommand();
    2061             : 
    2062             :     /*
    2063             :      * Must flush the notify messages to ensure frontend gets them promptly.
    2064             :      */
    2065           0 :     pq_flush();
    2066             : 
    2067           0 :     set_ps_display("idle", false);
    2068             : 
    2069           0 :     if (Trace_notify)
    2070           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    2071             : }
    2072             : 
    2073             : /*
    2074             :  * Send NOTIFY message to my front end.
    2075             :  */
    2076             : void
    2077           0 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    2078             : {
    2079           0 :     if (whereToSendOutput == DestRemote)
    2080             :     {
    2081             :         StringInfoData buf;
    2082             : 
    2083           0 :         pq_beginmessage(&buf, 'A');
    2084           0 :         pq_sendint(&buf, srcPid, sizeof(int32));
    2085           0 :         pq_sendstring(&buf, channel);
    2086           0 :         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
    2087           0 :             pq_sendstring(&buf, payload);
    2088           0 :         pq_endmessage(&buf);
    2089             : 
    2090             :         /*
    2091             :          * NOTE: we do not do pq_flush() here.  For a self-notify, it will
    2092             :          * happen at the end of the transaction, and for incoming notifies
    2093             :          * ProcessIncomingNotify will do it after finding all the notifies.
    2094             :          */
    2095             :     }
    2096             :     else
    2097           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    2098           0 : }
    2099             : 
    2100             : /* Does pendingNotifies include the given channel/payload? */
    2101             : static bool
    2102           4 : AsyncExistsPendingNotify(const char *channel, const char *payload)
    2103             : {
    2104             :     ListCell   *p;
    2105             :     Notification *n;
    2106             : 
    2107           4 :     if (pendingNotifies == NIL)
    2108           4 :         return false;
    2109             : 
    2110           0 :     if (payload == NULL)
    2111           0 :         payload = "";
    2112             : 
    2113             :     /*----------
    2114             :      * We need to append new elements to the end of the list in order to keep
    2115             :      * the order. However, on the other hand we'd like to check the list
    2116             :      * backwards in order to make duplicate-elimination a tad faster when the
    2117             :      * same condition is signaled many times in a row. So as a compromise we
    2118             :      * check the tail element first which we can access directly. If this
    2119             :      * doesn't match, we check the whole list.
    2120             :      *
    2121             :      * As we are not checking our parents' lists, we can still get duplicates
    2122             :      * in combination with subtransactions, like in:
    2123             :      *
    2124             :      * begin;
    2125             :      * notify foo '1';
    2126             :      * savepoint foo;
    2127             :      * notify foo '1';
    2128             :      * commit;
    2129             :      *----------
    2130             :      */
    2131           0 :     n = (Notification *) llast(pendingNotifies);
    2132           0 :     if (strcmp(n->channel, channel) == 0 &&
    2133           0 :         strcmp(n->payload, payload) == 0)
    2134           0 :         return true;
    2135             : 
    2136           0 :     foreach(p, pendingNotifies)
    2137             :     {
    2138           0 :         n = (Notification *) lfirst(p);
    2139             : 
    2140           0 :         if (strcmp(n->channel, channel) == 0 &&
    2141           0 :             strcmp(n->payload, payload) == 0)
    2142           0 :             return true;
    2143             :     }
    2144             : 
    2145           0 :     return false;
    2146             : }
    2147             : 
    2148             : /* Clear the pendingActions and pendingNotifies lists. */
    2149             : static void
    2150        3315 : ClearPendingActionsAndNotifies(void)
    2151             : {
    2152             :     /*
    2153             :      * We used to have to explicitly deallocate the list members and nodes,
    2154             :      * because they were malloc'd.  Now, since we know they are palloc'd in
    2155             :      * CurTransactionContext, we need not do that --- they'll go away
    2156             :      * automatically at transaction exit.  We need only reset the list head
    2157             :      * pointers.
    2158             :      */
    2159        3315 :     pendingActions = NIL;
    2160        3315 :     pendingNotifies = NIL;
    2161        3315 : }

Generated by: LCOV version 1.11