LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 32 128 25.0 %
Date: 2017-09-29 15:12:54 Functions: 4 9 44.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "postmaster/startup.h"
      27             : #include "replication/walreceiver.h"
      28             : #include "storage/pmsignal.h"
      29             : #include "storage/shmem.h"
      30             : #include "utils/timestamp.h"
      31             : 
      32             : WalRcvData *WalRcv = NULL;
      33             : 
      34             : /*
      35             :  * How long to wait for walreceiver to start up after requesting
      36             :  * postmaster to launch it. In seconds.
      37             :  */
      38             : #define WALRCV_STARTUP_TIMEOUT 10
      39             : 
      40             : /* Report shared memory space needed by WalRcvShmemInit */
      41             : Size
      42          15 : WalRcvShmemSize(void)
      43             : {
      44          15 :     Size        size = 0;
      45             : 
      46          15 :     size = add_size(size, sizeof(WalRcvData));
      47             : 
      48          15 :     return size;
      49             : }
      50             : 
      51             : /* Allocate and initialize walreceiver-related shared memory */
      52             : void
      53           5 : WalRcvShmemInit(void)
      54             : {
      55             :     bool        found;
      56             : 
      57           5 :     WalRcv = (WalRcvData *)
      58           5 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      59             : 
      60           5 :     if (!found)
      61             :     {
      62             :         /* First time through, so initialize */
      63           5 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      64           5 :         WalRcv->walRcvState = WALRCV_STOPPED;
      65           5 :         SpinLockInit(&WalRcv->mutex);
      66           5 :         WalRcv->latch = NULL;
      67             :     }
      68           5 : }
      69             : 
      70             : /* Is walreceiver running (or starting up)? */
      71             : bool
      72           3 : WalRcvRunning(void)
      73             : {
      74           3 :     WalRcvData *walrcv = WalRcv;
      75             :     WalRcvState state;
      76             :     pg_time_t   startTime;
      77             : 
      78           3 :     SpinLockAcquire(&walrcv->mutex);
      79             : 
      80           3 :     state = walrcv->walRcvState;
      81           3 :     startTime = walrcv->startTime;
      82             : 
      83           3 :     SpinLockRelease(&walrcv->mutex);
      84             : 
      85             :     /*
      86             :      * If it has taken too long for walreceiver to start up, give up. Setting
      87             :      * the state to STOPPED ensures that if walreceiver later does start up
      88             :      * after all, it will see that it's not supposed to be running and die
      89             :      * without doing anything.
      90             :      */
      91           3 :     if (state == WALRCV_STARTING)
      92             :     {
      93           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      94             : 
      95           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
      96             :         {
      97           0 :             SpinLockAcquire(&walrcv->mutex);
      98             : 
      99           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     100           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     101             : 
     102           0 :             SpinLockRelease(&walrcv->mutex);
     103             :         }
     104             :     }
     105             : 
     106           3 :     if (state != WALRCV_STOPPED)
     107           0 :         return true;
     108             :     else
     109           3 :         return false;
     110             : }
     111             : 
     112             : /*
     113             :  * Is walreceiver running and streaming (or at least attempting to connect,
     114             :  * or starting up)?
     115             :  */
     116             : bool
     117           0 : WalRcvStreaming(void)
     118             : {
     119           0 :     WalRcvData *walrcv = WalRcv;
     120             :     WalRcvState state;
     121             :     pg_time_t   startTime;
     122             : 
     123           0 :     SpinLockAcquire(&walrcv->mutex);
     124             : 
     125           0 :     state = walrcv->walRcvState;
     126           0 :     startTime = walrcv->startTime;
     127             : 
     128           0 :     SpinLockRelease(&walrcv->mutex);
     129             : 
     130             :     /*
     131             :      * If it has taken too long for walreceiver to start up, give up. Setting
     132             :      * the state to STOPPED ensures that if walreceiver later does start up
     133             :      * after all, it will see that it's not supposed to be running and die
     134             :      * without doing anything.
     135             :      */
     136           0 :     if (state == WALRCV_STARTING)
     137             :     {
     138           0 :         pg_time_t   now = (pg_time_t) time(NULL);
     139             : 
     140           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     141             :         {
     142           0 :             SpinLockAcquire(&walrcv->mutex);
     143             : 
     144           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     145           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     146             : 
     147           0 :             SpinLockRelease(&walrcv->mutex);
     148             :         }
     149             :     }
     150             : 
     151           0 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     152             :         state == WALRCV_RESTARTING)
     153           0 :         return true;
     154             :     else
     155           0 :         return false;
     156             : }
     157             : 
     158             : /*
     159             :  * Stop walreceiver (if running) and wait for it to die.
     160             :  * Executed by the Startup process.
     161             :  */
     162             : void
     163           3 : ShutdownWalRcv(void)
     164             : {
     165           3 :     WalRcvData *walrcv = WalRcv;
     166           3 :     pid_t       walrcvpid = 0;
     167             : 
     168             :     /*
     169             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     170             :      * mode once it's finished, and will also request postmaster to not
     171             :      * restart itself.
     172             :      */
     173           3 :     SpinLockAcquire(&walrcv->mutex);
     174           3 :     switch (walrcv->walRcvState)
     175             :     {
     176             :         case WALRCV_STOPPED:
     177           3 :             break;
     178             :         case WALRCV_STARTING:
     179           0 :             walrcv->walRcvState = WALRCV_STOPPED;
     180           0 :             break;
     181             : 
     182             :         case WALRCV_STREAMING:
     183             :         case WALRCV_WAITING:
     184             :         case WALRCV_RESTARTING:
     185           0 :             walrcv->walRcvState = WALRCV_STOPPING;
     186             :             /* fall through */
     187             :         case WALRCV_STOPPING:
     188           0 :             walrcvpid = walrcv->pid;
     189           0 :             break;
     190             :     }
     191           3 :     SpinLockRelease(&walrcv->mutex);
     192             : 
     193             :     /*
     194             :      * Signal walreceiver process if it was still running.
     195             :      */
     196           3 :     if (walrcvpid != 0)
     197           0 :         kill(walrcvpid, SIGTERM);
     198             : 
     199             :     /*
     200             :      * Wait for walreceiver to acknowledge its death by setting state to
     201             :      * WALRCV_STOPPED.
     202             :      */
     203           6 :     while (WalRcvRunning())
     204             :     {
     205             :         /*
     206             :          * This possibly-long loop needs to handle interrupts of startup
     207             :          * process.
     208             :          */
     209           0 :         HandleStartupProcInterrupts();
     210             : 
     211           0 :         pg_usleep(100000);      /* 100ms */
     212             :     }
     213           3 : }
     214             : 
     215             : /*
     216             :  * Request postmaster to start walreceiver.
     217             :  *
     218             :  * recptr indicates the position where streaming should begin, conninfo
     219             :  * is a libpq connection string to use, and slotname is, optionally, the name
     220             :  * of a replication slot to acquire.
     221             :  */
     222             : void
     223           0 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     224             :                      const char *slotname)
     225             : {
     226           0 :     WalRcvData *walrcv = WalRcv;
     227           0 :     bool        launch = false;
     228           0 :     pg_time_t   now = (pg_time_t) time(NULL);
     229             : 
     230             :     /*
     231             :      * We always start at the beginning of the segment. That prevents a broken
     232             :      * segment (i.e., with no records in the first half of a segment) from
     233             :      * being created by XLOG streaming, which might cause trouble later on if
     234             :      * the segment is e.g archived.
     235             :      */
     236           0 :     if (recptr % XLogSegSize != 0)
     237           0 :         recptr -= recptr % XLogSegSize;
     238             : 
     239           0 :     SpinLockAcquire(&walrcv->mutex);
     240             : 
     241             :     /* It better be stopped if we try to restart it */
     242           0 :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     243             :            walrcv->walRcvState == WALRCV_WAITING);
     244             : 
     245           0 :     if (conninfo != NULL)
     246           0 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     247             :     else
     248           0 :         walrcv->conninfo[0] = '\0';
     249             : 
     250           0 :     if (slotname != NULL)
     251           0 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     252             :     else
     253           0 :         walrcv->slotname[0] = '\0';
     254             : 
     255           0 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     256             :     {
     257           0 :         launch = true;
     258           0 :         walrcv->walRcvState = WALRCV_STARTING;
     259             :     }
     260             :     else
     261           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
     262           0 :     walrcv->startTime = now;
     263             : 
     264             :     /*
     265             :      * If this is the first startup of walreceiver (on this timeline),
     266             :      * initialize receivedUpto and latestChunkStart to the starting point.
     267             :      */
     268           0 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     269             :     {
     270           0 :         walrcv->receivedUpto = recptr;
     271           0 :         walrcv->receivedTLI = tli;
     272           0 :         walrcv->latestChunkStart = recptr;
     273             :     }
     274           0 :     walrcv->receiveStart = recptr;
     275           0 :     walrcv->receiveStartTLI = tli;
     276             : 
     277           0 :     SpinLockRelease(&walrcv->mutex);
     278             : 
     279           0 :     if (launch)
     280           0 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     281           0 :     else if (walrcv->latch)
     282           0 :         SetLatch(walrcv->latch);
     283           0 : }
     284             : 
     285             : /*
     286             :  * Returns the last+1 byte position that walreceiver has written.
     287             :  *
     288             :  * Optionally, returns the previous chunk start, that is the first byte
     289             :  * written in the most recent walreceiver flush cycle.  Callers not
     290             :  * interested in that value may pass NULL for latestChunkStart. Same for
     291             :  * receiveTLI.
     292             :  */
     293             : XLogRecPtr
     294           0 : GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     295             : {
     296           0 :     WalRcvData *walrcv = WalRcv;
     297             :     XLogRecPtr  recptr;
     298             : 
     299           0 :     SpinLockAcquire(&walrcv->mutex);
     300           0 :     recptr = walrcv->receivedUpto;
     301           0 :     if (latestChunkStart)
     302           0 :         *latestChunkStart = walrcv->latestChunkStart;
     303           0 :     if (receiveTLI)
     304           0 :         *receiveTLI = walrcv->receivedTLI;
     305           0 :     SpinLockRelease(&walrcv->mutex);
     306             : 
     307           0 :     return recptr;
     308             : }
     309             : 
     310             : /*
     311             :  * Returns the replication apply delay in ms or -1
     312             :  * if the apply delay info is not available
     313             :  */
     314             : int
     315           0 : GetReplicationApplyDelay(void)
     316             : {
     317           0 :     WalRcvData *walrcv = WalRcv;
     318             :     XLogRecPtr  receivePtr;
     319             :     XLogRecPtr  replayPtr;
     320             : 
     321             :     long        secs;
     322             :     int         usecs;
     323             : 
     324             :     TimestampTz chunkReplayStartTime;
     325             : 
     326           0 :     SpinLockAcquire(&walrcv->mutex);
     327           0 :     receivePtr = walrcv->receivedUpto;
     328           0 :     SpinLockRelease(&walrcv->mutex);
     329             : 
     330           0 :     replayPtr = GetXLogReplayRecPtr(NULL);
     331             : 
     332           0 :     if (receivePtr == replayPtr)
     333           0 :         return 0;
     334             : 
     335           0 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     336             : 
     337           0 :     if (chunkReplayStartTime == 0)
     338           0 :         return -1;
     339             : 
     340           0 :     TimestampDifference(chunkReplayStartTime,
     341             :                         GetCurrentTimestamp(),
     342             :                         &secs, &usecs);
     343             : 
     344           0 :     return (((int) secs * 1000) + (usecs / 1000));
     345             : }
     346             : 
     347             : /*
     348             :  * Returns the network latency in ms, note that this includes any
     349             :  * difference in clock settings between the servers, as well as timezone.
     350             :  */
     351             : int
     352           0 : GetReplicationTransferLatency(void)
     353             : {
     354           0 :     WalRcvData *walrcv = WalRcv;
     355             : 
     356             :     TimestampTz lastMsgSendTime;
     357             :     TimestampTz lastMsgReceiptTime;
     358             : 
     359           0 :     long        secs = 0;
     360           0 :     int         usecs = 0;
     361             :     int         ms;
     362             : 
     363           0 :     SpinLockAcquire(&walrcv->mutex);
     364           0 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     365           0 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     366           0 :     SpinLockRelease(&walrcv->mutex);
     367             : 
     368           0 :     TimestampDifference(lastMsgSendTime,
     369             :                         lastMsgReceiptTime,
     370             :                         &secs, &usecs);
     371             : 
     372           0 :     ms = ((int) secs * 1000) + (usecs / 1000);
     373             : 
     374           0 :     return ms;
     375             : }

Generated by: LCOV version 1.11