LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 82 506 16.2 %
Date: 2017-09-29 15:12:54 Functions: 8 25 32.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
      24             :  * directory. Inside that directory the state file will contain the slot's
      25             :  * own data. Additional data can be stored alongside that file if required.
      26             :  * While the server is running, the state data is also cached in memory for
      27             :  * efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "common/string.h"
      45             : #include "miscadmin.h"
      46             : #include "pgstat.h"
      47             : #include "replication/slot.h"
      48             : #include "storage/fd.h"
      49             : #include "storage/proc.h"
      50             : #include "storage/procarray.h"
      51             : #include "utils/builtins.h"
      52             : 
      53             : /*
      54             :  * Replication slot on-disk data structure.
      55             :  */
      56             : typedef struct ReplicationSlotOnDisk
      57             : {
      58             :     /* first part of this struct needs to be version independent */
      59             : 
      60             :     /* data not covered by checksum */
      61             :     uint32      magic;
      62             :     pg_crc32c   checksum;
      63             : 
      64             :     /* data covered by checksum */
      65             :     uint32      version;
      66             :     uint32      length;
      67             : 
      68             :     /*
      69             :      * The actual data in the slot that follows can differ based on the above
      70             :      * 'version'.
      71             :      */
      72             : 
      73             :     ReplicationSlotPersistentData slotdata;
      74             : } ReplicationSlotOnDisk;
      75             : 
      76             : /* size of version independent data */
      77             : #define ReplicationSlotOnDiskConstantSize \
      78             :     offsetof(ReplicationSlotOnDisk, slotdata)
      79             : /* size of the part of the slot not covered by the checksum */
      80             : #define SnapBuildOnDiskNotChecksummedSize \
      81             :     offsetof(ReplicationSlotOnDisk, version)
      82             : /* size of the part covered by the checksum */
      83             : #define SnapBuildOnDiskChecksummedSize \
      84             :     sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
      85             : /* size of the slot data that is version dependent */
      86             : #define ReplicationSlotOnDiskV2Size \
      87             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
      88             : 
      89             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
      90             : #define SLOT_VERSION    2       /* version for new files */
      91             : 
      92             : /* Control array for replication slot management */
      93             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
      94             : 
      95             : /* My backend's replication slot in the shared memory array */
      96             : ReplicationSlot *MyReplicationSlot = NULL;
      97             : 
      98             : /* GUCs */
      99             : int         max_replication_slots = 0;  /* the maximum number of replication
     100             :                                          * slots */
     101             : 
     102             : static void ReplicationSlotDropAcquired(void);
     103             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     104             : 
     105             : /* internal persistency functions */
     106             : static void RestoreSlotFromDisk(const char *name);
     107             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     108             : static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
     109             : 
     110             : /*
     111             :  * Report shared-memory space needed by ReplicationSlotShmemInit.
     112             :  */
     113             : Size
     114          15 : ReplicationSlotsShmemSize(void)
     115             : {
     116          15 :     Size        size = 0;
     117             : 
     118          15 :     if (max_replication_slots == 0)
     119           0 :         return size;
     120             : 
     121          15 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     122          15 :     size = add_size(size,
     123             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     124             : 
     125          15 :     return size;
     126             : }
     127             : 
     128             : /*
     129             :  * Allocate and initialize walsender-related shared memory.
     130             :  */
     131             : void
     132           5 : ReplicationSlotsShmemInit(void)
     133             : {
     134             :     bool        found;
     135             : 
     136           5 :     if (max_replication_slots == 0)
     137           5 :         return;
     138             : 
     139           5 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     140           5 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     141             :                         &found);
     142             : 
     143           5 :     LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
     144             :                           "replication_slot_io");
     145             : 
     146           5 :     if (!found)
     147             :     {
     148             :         int         i;
     149             : 
     150             :         /* First time through, so initialize */
     151           5 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     152             : 
     153          55 :         for (i = 0; i < max_replication_slots; i++)
     154             :         {
     155          50 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     156             : 
     157             :             /* everything else is zeroed by the memset above */
     158          50 :             SpinLockInit(&slot->mutex);
     159          50 :             LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
     160          50 :             ConditionVariableInit(&slot->active_cv);
     161             :         }
     162             :     }
     163             : }
     164             : 
     165             : /*
     166             :  * Check whether the passed slot name is valid and report errors at elevel.
     167             :  *
     168             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     169             :  * the name to be used as a directory name on every supported OS.
     170             :  *
     171             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     172             :  */
     173             : bool
     174           0 : ReplicationSlotValidateName(const char *name, int elevel)
     175             : {
     176             :     const char *cp;
     177             : 
     178           0 :     if (strlen(name) == 0)
     179             :     {
     180           0 :         ereport(elevel,
     181             :                 (errcode(ERRCODE_INVALID_NAME),
     182             :                  errmsg("replication slot name \"%s\" is too short",
     183             :                         name)));
     184           0 :         return false;
     185             :     }
     186             : 
     187           0 :     if (strlen(name) >= NAMEDATALEN)
     188             :     {
     189           0 :         ereport(elevel,
     190             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     191             :                  errmsg("replication slot name \"%s\" is too long",
     192             :                         name)));
     193           0 :         return false;
     194             :     }
     195             : 
     196           0 :     for (cp = name; *cp; cp++)
     197             :     {
     198           0 :         if (!((*cp >= 'a' && *cp <= 'z')
     199           0 :               || (*cp >= '0' && *cp <= '9')
     200           0 :               || (*cp == '_')))
     201             :         {
     202           0 :             ereport(elevel,
     203             :                     (errcode(ERRCODE_INVALID_NAME),
     204             :                      errmsg("replication slot name \"%s\" contains invalid character",
     205             :                             name),
     206             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     207           0 :             return false;
     208             :         }
     209             :     }
     210           0 :     return true;
     211             : }
     212             : 
     213             : /*
     214             :  * Create a new replication slot and mark it as used by this backend.
     215             :  *
     216             :  * name: Name of the slot
     217             :  * db_specific: logical decoding is db specific; if the slot is going to
     218             :  *     be used for that pass true, otherwise false.
     219             :  */
     220             : void
     221           0 : ReplicationSlotCreate(const char *name, bool db_specific,
     222             :                       ReplicationSlotPersistency persistency)
     223             : {
     224           0 :     ReplicationSlot *slot = NULL;
     225             :     int         i;
     226             : 
     227           0 :     Assert(MyReplicationSlot == NULL);
     228             : 
     229           0 :     ReplicationSlotValidateName(name, ERROR);
     230             : 
     231             :     /*
     232             :      * If some other backend ran this code concurrently with us, we'd likely
     233             :      * both allocate the same slot, and that would be bad.  We'd also be at
     234             :      * risk of missing a name collision.  Also, we don't want to try to create
     235             :      * a new slot while somebody's busy cleaning up an old one, because we
     236             :      * might both be monkeying with the same directory.
     237             :      */
     238           0 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     239             : 
     240             :     /*
     241             :      * Check for name collision, and identify an allocatable slot.  We need to
     242             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     243             :      * else can change the in_use flags while we're looking at them.
     244             :      */
     245           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     246           0 :     for (i = 0; i < max_replication_slots; i++)
     247             :     {
     248           0 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     249             : 
     250           0 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     251           0 :             ereport(ERROR,
     252             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     253             :                      errmsg("replication slot \"%s\" already exists", name)));
     254           0 :         if (!s->in_use && slot == NULL)
     255           0 :             slot = s;
     256             :     }
     257           0 :     LWLockRelease(ReplicationSlotControlLock);
     258             : 
     259             :     /* If all slots are in use, we're out of luck. */
     260           0 :     if (slot == NULL)
     261           0 :         ereport(ERROR,
     262             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     263             :                  errmsg("all replication slots are in use"),
     264             :                  errhint("Free one or increase max_replication_slots.")));
     265             : 
     266             :     /*
     267             :      * Since this slot is not in use, nobody should be looking at any part of
     268             :      * it other than the in_use field unless they're trying to allocate it.
     269             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     270             :      * be doing that.  So it's safe to initialize the slot.
     271             :      */
     272           0 :     Assert(!slot->in_use);
     273           0 :     Assert(slot->active_pid == 0);
     274             : 
     275             :     /* first initialize persistent data */
     276           0 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     277           0 :     StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
     278           0 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     279           0 :     slot->data.persistency = persistency;
     280             : 
     281             :     /* and then data only present in shared memory */
     282           0 :     slot->just_dirtied = false;
     283           0 :     slot->dirty = false;
     284           0 :     slot->effective_xmin = InvalidTransactionId;
     285           0 :     slot->effective_catalog_xmin = InvalidTransactionId;
     286           0 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     287           0 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     288           0 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     289           0 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     290             : 
     291             :     /*
     292             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     293             :      * yet, so no special cleanup is required if this errors out.
     294             :      */
     295           0 :     CreateSlotOnDisk(slot);
     296             : 
     297             :     /*
     298             :      * We need to briefly prevent any other backend from iterating over the
     299             :      * slots while we flip the in_use flag. We also need to set the active
     300             :      * flag while holding the ControlLock as otherwise a concurrent
     301             :      * SlotAcquire() could acquire the slot as well.
     302             :      */
     303           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     304             : 
     305           0 :     slot->in_use = true;
     306             : 
     307             :     /* We can now mark the slot active, and that makes it our slot. */
     308           0 :     SpinLockAcquire(&slot->mutex);
     309           0 :     Assert(slot->active_pid == 0);
     310           0 :     slot->active_pid = MyProcPid;
     311           0 :     SpinLockRelease(&slot->mutex);
     312           0 :     MyReplicationSlot = slot;
     313             : 
     314           0 :     LWLockRelease(ReplicationSlotControlLock);
     315             : 
     316             :     /*
     317             :      * Now that the slot has been marked as in_use and active, it's safe to
     318             :      * let somebody else try to allocate a slot.
     319             :      */
     320           0 :     LWLockRelease(ReplicationSlotAllocationLock);
     321             : 
     322             :     /* Let everybody know we've modified this slot */
     323           0 :     ConditionVariableBroadcast(&slot->active_cv);
     324           0 : }
     325             : 
     326             : /*
     327             :  * Find a previously created slot and mark it as used by this backend.
     328             :  */
     329             : void
     330           0 : ReplicationSlotAcquire(const char *name, bool nowait)
     331             : {
     332             :     ReplicationSlot *slot;
     333             :     int         active_pid;
     334             :     int         i;
     335             : 
     336             : retry:
     337           0 :     Assert(MyReplicationSlot == NULL);
     338             : 
     339             :     /*
     340             :      * Search for the named slot and mark it active if we find it.  If the
     341             :      * slot is already active, we exit the loop with active_pid set to the PID
     342             :      * of the backend that owns it.
     343             :      */
     344           0 :     active_pid = 0;
     345           0 :     slot = NULL;
     346           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     347           0 :     for (i = 0; i < max_replication_slots; i++)
     348             :     {
     349           0 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     350             : 
     351           0 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     352             :         {
     353             :             /*
     354             :              * This is the slot we want.  We don't know yet if it's active, so
     355             :              * get ready to sleep on it in case it is.  (We may end up not
     356             :              * sleeping, but we don't want to do this while holding the
     357             :              * spinlock.)
     358             :              */
     359           0 :             ConditionVariablePrepareToSleep(&s->active_cv);
     360             : 
     361           0 :             SpinLockAcquire(&s->mutex);
     362             : 
     363           0 :             active_pid = s->active_pid;
     364           0 :             if (active_pid == 0)
     365           0 :                 active_pid = s->active_pid = MyProcPid;
     366             : 
     367           0 :             SpinLockRelease(&s->mutex);
     368           0 :             slot = s;
     369             : 
     370           0 :             break;
     371             :         }
     372             :     }
     373           0 :     LWLockRelease(ReplicationSlotControlLock);
     374             : 
     375             :     /* If we did not find the slot, error out. */
     376           0 :     if (slot == NULL)
     377           0 :         ereport(ERROR,
     378             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     379             :                  errmsg("replication slot \"%s\" does not exist", name)));
     380             : 
     381             :     /*
     382             :      * If we found the slot but it's already active in another backend, we
     383             :      * either error out or retry after a short wait, as caller specified.
     384             :      */
     385           0 :     if (active_pid != MyProcPid)
     386             :     {
     387           0 :         if (nowait)
     388           0 :             ereport(ERROR,
     389             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     390             :                      errmsg("replication slot \"%s\" is active for PID %d",
     391             :                             name, active_pid)));
     392             : 
     393             :         /* Wait here until we get signaled, and then restart */
     394           0 :         ConditionVariableSleep(&slot->active_cv,
     395             :                                WAIT_EVENT_REPLICATION_SLOT_DROP);
     396           0 :         ConditionVariableCancelSleep();
     397           0 :         goto retry;
     398             :     }
     399             :     else
     400           0 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     401             : 
     402             :     /* Let everybody know we've modified this slot */
     403           0 :     ConditionVariableBroadcast(&slot->active_cv);
     404             : 
     405             :     /* We made this slot active, so it's ours now. */
     406           0 :     MyReplicationSlot = slot;
     407           0 : }
     408             : 
     409             : /*
     410             :  * Release the replication slot that this backend considers to own.
     411             :  *
     412             :  * This or another backend can re-acquire the slot later.
     413             :  * Resources this slot requires will be preserved.
     414             :  */
     415             : void
     416           0 : ReplicationSlotRelease(void)
     417             : {
     418           0 :     ReplicationSlot *slot = MyReplicationSlot;
     419             : 
     420           0 :     Assert(slot != NULL && slot->active_pid != 0);
     421             : 
     422           0 :     if (slot->data.persistency == RS_EPHEMERAL)
     423             :     {
     424             :         /*
     425             :          * Delete the slot. There is no !PANIC case where this is allowed to
     426             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     427             :          * data.
     428             :          */
     429           0 :         ReplicationSlotDropAcquired();
     430             :     }
     431             : 
     432             :     /*
     433             :      * If slot needed to temporarily restrain both data and catalog xmin to
     434             :      * create the catalog snapshot, remove that temporary constraint.
     435             :      * Snapshots can only be exported while the initial snapshot is still
     436             :      * acquired.
     437             :      */
     438           0 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     439           0 :         TransactionIdIsValid(slot->effective_xmin))
     440             :     {
     441           0 :         SpinLockAcquire(&slot->mutex);
     442           0 :         slot->effective_xmin = InvalidTransactionId;
     443           0 :         SpinLockRelease(&slot->mutex);
     444           0 :         ReplicationSlotsComputeRequiredXmin(false);
     445             :     }
     446             : 
     447           0 :     if (slot->data.persistency == RS_PERSISTENT)
     448             :     {
     449             :         /*
     450             :          * Mark persistent slot inactive.  We're not freeing it, just
     451             :          * disconnecting, but wake up others that may be waiting for it.
     452             :          */
     453           0 :         SpinLockAcquire(&slot->mutex);
     454           0 :         slot->active_pid = 0;
     455           0 :         SpinLockRelease(&slot->mutex);
     456           0 :         ConditionVariableBroadcast(&slot->active_cv);
     457             :     }
     458             : 
     459           0 :     MyReplicationSlot = NULL;
     460             : 
     461             :     /* might not have been set when we've been a plain slot */
     462           0 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     463           0 :     MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
     464           0 :     LWLockRelease(ProcArrayLock);
     465           0 : }
     466             : 
     467             : /*
     468             :  * Cleanup all temporary slots created in current session.
     469             :  */
     470             : void
     471        3587 : ReplicationSlotCleanup(void)
     472             : {
     473             :     int         i;
     474             : 
     475        3587 :     Assert(MyReplicationSlot == NULL);
     476             : 
     477             : restart:
     478        3587 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     479       39457 :     for (i = 0; i < max_replication_slots; i++)
     480             :     {
     481       35870 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     482             : 
     483       35870 :         if (!s->in_use)
     484       35870 :             continue;
     485             : 
     486           0 :         SpinLockAcquire(&s->mutex);
     487           0 :         if (s->active_pid == MyProcPid)
     488             :         {
     489           0 :             Assert(s->data.persistency == RS_TEMPORARY);
     490           0 :             SpinLockRelease(&s->mutex);
     491           0 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     492             : 
     493           0 :             ReplicationSlotDropPtr(s);
     494             : 
     495           0 :             ConditionVariableBroadcast(&s->active_cv);
     496           0 :             goto restart;
     497             :         }
     498             :         else
     499           0 :             SpinLockRelease(&s->mutex);
     500             :     }
     501             : 
     502        3587 :     LWLockRelease(ReplicationSlotControlLock);
     503        3587 : }
     504             : 
     505             : /*
     506             :  * Permanently drop replication slot identified by the passed in name.
     507             :  */
     508             : void
     509           0 : ReplicationSlotDrop(const char *name, bool nowait)
     510             : {
     511           0 :     Assert(MyReplicationSlot == NULL);
     512             : 
     513           0 :     ReplicationSlotAcquire(name, nowait);
     514             : 
     515           0 :     ReplicationSlotDropAcquired();
     516           0 : }
     517             : 
     518             : /*
     519             :  * Permanently drop the currently acquired replication slot.
     520             :  */
     521             : static void
     522           0 : ReplicationSlotDropAcquired(void)
     523             : {
     524           0 :     ReplicationSlot *slot = MyReplicationSlot;
     525             : 
     526           0 :     Assert(MyReplicationSlot != NULL);
     527             : 
     528             :     /* slot isn't acquired anymore */
     529           0 :     MyReplicationSlot = NULL;
     530             : 
     531           0 :     ReplicationSlotDropPtr(slot);
     532           0 : }
     533             : 
     534             : /*
     535             :  * Permanently drop the replication slot which will be released by the point
     536             :  * this function returns.
     537             :  */
     538             : static void
     539           0 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     540             : {
     541             :     char        path[MAXPGPATH];
     542             :     char        tmppath[MAXPGPATH];
     543             : 
     544             :     /*
     545             :      * If some other backend ran this code concurrently with us, we might try
     546             :      * to delete a slot with a certain name while someone else was trying to
     547             :      * create a slot with the same name.
     548             :      */
     549           0 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     550             : 
     551             :     /* Generate pathnames. */
     552           0 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
     553           0 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
     554             : 
     555             :     /*
     556             :      * Rename the slot directory on disk, so that we'll no longer recognize
     557             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     558             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     559             :      * temporary slot, we better never fail hard as the caller won't expect
     560             :      * the slot to survive and this might get called during error handling.
     561             :      */
     562           0 :     if (rename(path, tmppath) == 0)
     563             :     {
     564             :         /*
     565             :          * We need to fsync() the directory we just renamed and its parent to
     566             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     567             :          * fsync() fails, we can't be sure whether the changes are on disk or
     568             :          * not.  For now, we handle that by panicking;
     569             :          * StartupReplicationSlots() will try to straighten it out after
     570             :          * restart.
     571             :          */
     572           0 :         START_CRIT_SECTION();
     573           0 :         fsync_fname(tmppath, true);
     574           0 :         fsync_fname("pg_replslot", true);
     575           0 :         END_CRIT_SECTION();
     576             :     }
     577             :     else
     578             :     {
     579           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     580             : 
     581           0 :         SpinLockAcquire(&slot->mutex);
     582           0 :         slot->active_pid = 0;
     583           0 :         SpinLockRelease(&slot->mutex);
     584             : 
     585             :         /* wake up anyone waiting on this slot */
     586           0 :         ConditionVariableBroadcast(&slot->active_cv);
     587             : 
     588           0 :         ereport(fail_softly ? WARNING : ERROR,
     589             :                 (errcode_for_file_access(),
     590             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     591             :                         path, tmppath)));
     592             :     }
     593             : 
     594             :     /*
     595             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     596             :      * long enough to kill it.  It's OK to clear the active PID here without
     597             :      * grabbing the mutex because nobody else can be scanning the array here,
     598             :      * and nobody can be attached to this slot and thus access it without
     599             :      * scanning the array.
     600             :      *
     601             :      * Also wake up processes waiting for it.
     602             :      */
     603           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     604           0 :     slot->active_pid = 0;
     605           0 :     slot->in_use = false;
     606           0 :     LWLockRelease(ReplicationSlotControlLock);
     607           0 :     ConditionVariableBroadcast(&slot->active_cv);
     608             : 
     609             :     /*
     610             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     611             :      * limits.
     612             :      */
     613           0 :     ReplicationSlotsComputeRequiredXmin(false);
     614           0 :     ReplicationSlotsComputeRequiredLSN();
     615             : 
     616             :     /*
     617             :      * If removing the directory fails, the worst thing that will happen is
     618             :      * that the user won't be able to create a new slot with the same name
     619             :      * until the next server restart.  We warn about it, but that's all.
     620             :      */
     621           0 :     if (!rmtree(tmppath, true))
     622           0 :         ereport(WARNING,
     623             :                 (errcode_for_file_access(),
     624             :                  errmsg("could not remove directory \"%s\"", tmppath)));
     625             : 
     626             :     /*
     627             :      * We release this at the very end, so that nobody starts trying to create
     628             :      * a slot while we're still cleaning up the detritus of the old one.
     629             :      */
     630           0 :     LWLockRelease(ReplicationSlotAllocationLock);
     631           0 : }
     632             : 
     633             : /*
     634             :  * Serialize the currently acquired slot's state from memory to disk, thereby
     635             :  * guaranteeing the current state will survive a crash.
     636             :  */
     637             : void
     638           0 : ReplicationSlotSave(void)
     639             : {
     640             :     char        path[MAXPGPATH];
     641             : 
     642           0 :     Assert(MyReplicationSlot != NULL);
     643             : 
     644           0 :     sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
     645           0 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
     646           0 : }
     647             : 
     648             : /*
     649             :  * Signal that it would be useful if the currently acquired slot would be
     650             :  * flushed out to disk.
     651             :  *
     652             :  * Note that the actual flush to disk can be delayed for a long time, if
     653             :  * required for correctness explicitly do a ReplicationSlotSave().
     654             :  */
     655             : void
     656           0 : ReplicationSlotMarkDirty(void)
     657             : {
     658           0 :     ReplicationSlot *slot = MyReplicationSlot;
     659             : 
     660           0 :     Assert(MyReplicationSlot != NULL);
     661             : 
     662           0 :     SpinLockAcquire(&slot->mutex);
     663           0 :     MyReplicationSlot->just_dirtied = true;
     664           0 :     MyReplicationSlot->dirty = true;
     665           0 :     SpinLockRelease(&slot->mutex);
     666           0 : }
     667             : 
     668             : /*
     669             :  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
     670             :  * guaranteeing it will be there after an eventual crash.
     671             :  */
     672             : void
     673           0 : ReplicationSlotPersist(void)
     674             : {
     675           0 :     ReplicationSlot *slot = MyReplicationSlot;
     676             : 
     677           0 :     Assert(slot != NULL);
     678           0 :     Assert(slot->data.persistency != RS_PERSISTENT);
     679             : 
     680           0 :     SpinLockAcquire(&slot->mutex);
     681           0 :     slot->data.persistency = RS_PERSISTENT;
     682           0 :     SpinLockRelease(&slot->mutex);
     683             : 
     684           0 :     ReplicationSlotMarkDirty();
     685           0 :     ReplicationSlotSave();
     686           0 : }
     687             : 
     688             : /*
     689             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
     690             :  *
     691             :  * If already_locked is true, ProcArrayLock has already been acquired
     692             :  * exclusively.
     693             :  */
     694             : void
     695           3 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
     696             : {
     697             :     int         i;
     698           3 :     TransactionId agg_xmin = InvalidTransactionId;
     699           3 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
     700             : 
     701           3 :     Assert(ReplicationSlotCtl != NULL);
     702             : 
     703           3 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     704             : 
     705          33 :     for (i = 0; i < max_replication_slots; i++)
     706             :     {
     707          30 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     708             :         TransactionId effective_xmin;
     709             :         TransactionId effective_catalog_xmin;
     710             : 
     711          30 :         if (!s->in_use)
     712          30 :             continue;
     713             : 
     714           0 :         SpinLockAcquire(&s->mutex);
     715           0 :         effective_xmin = s->effective_xmin;
     716           0 :         effective_catalog_xmin = s->effective_catalog_xmin;
     717           0 :         SpinLockRelease(&s->mutex);
     718             : 
     719             :         /* check the data xmin */
     720           0 :         if (TransactionIdIsValid(effective_xmin) &&
     721           0 :             (!TransactionIdIsValid(agg_xmin) ||
     722           0 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
     723           0 :             agg_xmin = effective_xmin;
     724             : 
     725             :         /* check the catalog xmin */
     726           0 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
     727           0 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
     728           0 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
     729           0 :             agg_catalog_xmin = effective_catalog_xmin;
     730             :     }
     731             : 
     732           3 :     LWLockRelease(ReplicationSlotControlLock);
     733             : 
     734           3 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
     735           3 : }
     736             : 
     737             : /*
     738             :  * Compute the oldest restart LSN across all slots and inform xlog module.
     739             :  */
     740             : void
     741           3 : ReplicationSlotsComputeRequiredLSN(void)
     742             : {
     743             :     int         i;
     744           3 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
     745             : 
     746           3 :     Assert(ReplicationSlotCtl != NULL);
     747             : 
     748           3 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     749          33 :     for (i = 0; i < max_replication_slots; i++)
     750             :     {
     751          30 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     752             :         XLogRecPtr  restart_lsn;
     753             : 
     754          30 :         if (!s->in_use)
     755          30 :             continue;
     756             : 
     757           0 :         SpinLockAcquire(&s->mutex);
     758           0 :         restart_lsn = s->data.restart_lsn;
     759           0 :         SpinLockRelease(&s->mutex);
     760             : 
     761           0 :         if (restart_lsn != InvalidXLogRecPtr &&
     762           0 :             (min_required == InvalidXLogRecPtr ||
     763             :              restart_lsn < min_required))
     764           0 :             min_required = restart_lsn;
     765             :     }
     766           3 :     LWLockRelease(ReplicationSlotControlLock);
     767             : 
     768           3 :     XLogSetReplicationSlotMinimumLSN(min_required);
     769           3 : }
     770             : 
     771             : /*
     772             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
     773             :  *
     774             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
     775             :  * slots exist.
     776             :  *
     777             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
     778             :  * ignores physical replication slots.
     779             :  *
     780             :  * The results aren't required frequently, so we don't maintain a precomputed
     781             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
     782             :  */
     783             : XLogRecPtr
     784          22 : ReplicationSlotsComputeLogicalRestartLSN(void)
     785             : {
     786          22 :     XLogRecPtr  result = InvalidXLogRecPtr;
     787             :     int         i;
     788             : 
     789          22 :     if (max_replication_slots <= 0)
     790           0 :         return InvalidXLogRecPtr;
     791             : 
     792          22 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     793             : 
     794         242 :     for (i = 0; i < max_replication_slots; i++)
     795             :     {
     796             :         ReplicationSlot *s;
     797             :         XLogRecPtr  restart_lsn;
     798             : 
     799         220 :         s = &ReplicationSlotCtl->replication_slots[i];
     800             : 
     801             :         /* cannot change while ReplicationSlotCtlLock is held */
     802         220 :         if (!s->in_use)
     803         220 :             continue;
     804             : 
     805             :         /* we're only interested in logical slots */
     806           0 :         if (!SlotIsLogical(s))
     807           0 :             continue;
     808             : 
     809             :         /* read once, it's ok if it increases while we're checking */
     810           0 :         SpinLockAcquire(&s->mutex);
     811           0 :         restart_lsn = s->data.restart_lsn;
     812           0 :         SpinLockRelease(&s->mutex);
     813             : 
     814           0 :         if (result == InvalidXLogRecPtr ||
     815             :             restart_lsn < result)
     816           0 :             result = restart_lsn;
     817             :     }
     818             : 
     819          22 :     LWLockRelease(ReplicationSlotControlLock);
     820             : 
     821          22 :     return result;
     822             : }
     823             : 
     824             : /*
     825             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
     826             :  * passed database oid.
     827             :  *
     828             :  * Returns true if there are any slots referencing the database. *nslots will
     829             :  * be set to the absolute number of slots in the database, *nactive to ones
     830             :  * currently active.
     831             :  */
     832             : bool
     833           0 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
     834             : {
     835             :     int         i;
     836             : 
     837           0 :     *nslots = *nactive = 0;
     838             : 
     839           0 :     if (max_replication_slots <= 0)
     840           0 :         return false;
     841             : 
     842           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     843           0 :     for (i = 0; i < max_replication_slots; i++)
     844             :     {
     845             :         ReplicationSlot *s;
     846             : 
     847           0 :         s = &ReplicationSlotCtl->replication_slots[i];
     848             : 
     849             :         /* cannot change while ReplicationSlotCtlLock is held */
     850           0 :         if (!s->in_use)
     851           0 :             continue;
     852             : 
     853             :         /* only logical slots are database specific, skip */
     854           0 :         if (!SlotIsLogical(s))
     855           0 :             continue;
     856             : 
     857             :         /* not our database, skip */
     858           0 :         if (s->data.database != dboid)
     859           0 :             continue;
     860             : 
     861             :         /* count slots with spinlock held */
     862           0 :         SpinLockAcquire(&s->mutex);
     863           0 :         (*nslots)++;
     864           0 :         if (s->active_pid != 0)
     865           0 :             (*nactive)++;
     866           0 :         SpinLockRelease(&s->mutex);
     867             :     }
     868           0 :     LWLockRelease(ReplicationSlotControlLock);
     869             : 
     870           0 :     if (*nslots > 0)
     871           0 :         return true;
     872           0 :     return false;
     873             : }
     874             : 
     875             : /*
     876             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
     877             :  * passed database oid. The caller should hold an exclusive lock on the
     878             :  * pg_database oid for the database to prevent creation of new slots on the db
     879             :  * or replay from existing slots.
     880             :  *
     881             :  * Another session that concurrently acquires an existing slot on the target DB
     882             :  * (most likely to drop it) may cause this function to ERROR. If that happens
     883             :  * it may have dropped some but not all slots.
     884             :  *
     885             :  * This routine isn't as efficient as it could be - but we don't drop
     886             :  * databases often, especially databases with lots of slots.
     887             :  */
     888             : void
     889           0 : ReplicationSlotsDropDBSlots(Oid dboid)
     890             : {
     891             :     int         i;
     892             : 
     893           0 :     if (max_replication_slots <= 0)
     894           0 :         return;
     895             : 
     896             : restart:
     897           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     898           0 :     for (i = 0; i < max_replication_slots; i++)
     899             :     {
     900             :         ReplicationSlot *s;
     901             :         char       *slotname;
     902             :         int         active_pid;
     903             : 
     904           0 :         s = &ReplicationSlotCtl->replication_slots[i];
     905             : 
     906             :         /* cannot change while ReplicationSlotCtlLock is held */
     907           0 :         if (!s->in_use)
     908           0 :             continue;
     909             : 
     910             :         /* only logical slots are database specific, skip */
     911           0 :         if (!SlotIsLogical(s))
     912           0 :             continue;
     913             : 
     914             :         /* not our database, skip */
     915           0 :         if (s->data.database != dboid)
     916           0 :             continue;
     917             : 
     918             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
     919           0 :         SpinLockAcquire(&s->mutex);
     920             :         /* can't change while ReplicationSlotControlLock is held */
     921           0 :         slotname = NameStr(s->data.name);
     922           0 :         active_pid = s->active_pid;
     923           0 :         if (active_pid == 0)
     924             :         {
     925           0 :             MyReplicationSlot = s;
     926           0 :             s->active_pid = MyProcPid;
     927             :         }
     928           0 :         SpinLockRelease(&s->mutex);
     929             : 
     930             :         /*
     931             :          * Even though we hold an exclusive lock on the database object a
     932             :          * logical slot for that DB can still be active, e.g. if it's
     933             :          * concurrently being dropped by a backend connected to another DB.
     934             :          *
     935             :          * That's fairly unlikely in practice, so we'll just bail out.
     936             :          */
     937           0 :         if (active_pid)
     938           0 :             ereport(ERROR,
     939             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     940             :                      errmsg("replication slot \"%s\" is active for PID %d",
     941             :                             slotname, active_pid)));
     942             : 
     943             :         /*
     944             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
     945             :          * holding ReplicationSlotControlLock over filesystem operations,
     946             :          * release ReplicationSlotControlLock and use
     947             :          * ReplicationSlotDropAcquired.
     948             :          *
     949             :          * As that means the set of slots could change, restart scan from the
     950             :          * beginning each time we release the lock.
     951             :          */
     952           0 :         LWLockRelease(ReplicationSlotControlLock);
     953           0 :         ReplicationSlotDropAcquired();
     954           0 :         goto restart;
     955             :     }
     956           0 :     LWLockRelease(ReplicationSlotControlLock);
     957             : }
     958             : 
     959             : 
     960             : /*
     961             :  * Check whether the server's configuration supports using replication
     962             :  * slots.
     963             :  */
     964             : void
     965           0 : CheckSlotRequirements(void)
     966             : {
     967           0 :     if (max_replication_slots == 0)
     968           0 :         ereport(ERROR,
     969             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     970             :                  (errmsg("replication slots can only be used if max_replication_slots > 0"))));
     971             : 
     972           0 :     if (wal_level < WAL_LEVEL_REPLICA)
     973           0 :         ereport(ERROR,
     974             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     975             :                  errmsg("replication slots can only be used if wal_level >= replica")));
     976           0 : }
     977             : 
     978             : /*
     979             :  * Reserve WAL for the currently active slot.
     980             :  *
     981             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
     982             :  * the slot and concurrency safe.
     983             :  */
     984             : void
     985           0 : ReplicationSlotReserveWal(void)
     986             : {
     987           0 :     ReplicationSlot *slot = MyReplicationSlot;
     988             : 
     989           0 :     Assert(slot != NULL);
     990           0 :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
     991             : 
     992             :     /*
     993             :      * The replication slot mechanism is used to prevent removal of required
     994             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
     995             :      * segments could concurrently be removed when a now stale return value of
     996             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
     997             :      * this happens we'll just retry.
     998             :      */
     999             :     while (true)
    1000             :     {
    1001             :         XLogSegNo   segno;
    1002             : 
    1003             :         /*
    1004             :          * For logical slots log a standby snapshot and start logical decoding
    1005             :          * at exactly that position. That allows the slot to start up more
    1006             :          * quickly.
    1007             :          *
    1008             :          * That's not needed (or indeed helpful) for physical slots as they'll
    1009             :          * start replay at the last logged checkpoint anyway. Instead return
    1010             :          * the location of the last redo LSN. While that slightly increases
    1011             :          * the chance that we have to retry, it's where a base backup has to
    1012             :          * start replay at.
    1013             :          */
    1014           0 :         if (!RecoveryInProgress() && SlotIsLogical(slot))
    1015           0 :         {
    1016             :             XLogRecPtr  flushptr;
    1017             : 
    1018             :             /* start at current insert position */
    1019           0 :             slot->data.restart_lsn = GetXLogInsertRecPtr();
    1020             : 
    1021             :             /* make sure we have enough information to start */
    1022           0 :             flushptr = LogStandbySnapshot();
    1023             : 
    1024             :             /* and make sure it's fsynced to disk */
    1025           0 :             XLogFlush(flushptr);
    1026             :         }
    1027             :         else
    1028             :         {
    1029           0 :             slot->data.restart_lsn = GetRedoRecPtr();
    1030             :         }
    1031             : 
    1032             :         /* prevent WAL removal as fast as possible */
    1033           0 :         ReplicationSlotsComputeRequiredLSN();
    1034             : 
    1035             :         /*
    1036             :          * If all required WAL is still there, great, otherwise retry. The
    1037             :          * slot should prevent further removal of WAL, unless there's a
    1038             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1039             :          * the new restart_lsn above, so normally we should never need to loop
    1040             :          * more than twice.
    1041             :          */
    1042           0 :         XLByteToSeg(slot->data.restart_lsn, segno);
    1043           0 :         if (XLogGetLastRemovedSegno() < segno)
    1044           0 :             break;
    1045           0 :     }
    1046           0 : }
    1047             : 
    1048             : /*
    1049             :  * Flush all replication slots to disk.
    1050             :  *
    1051             :  * This needn't actually be part of a checkpoint, but it's a convenient
    1052             :  * location.
    1053             :  */
    1054             : void
    1055          11 : CheckPointReplicationSlots(void)
    1056             : {
    1057             :     int         i;
    1058             : 
    1059          11 :     elog(DEBUG1, "performing replication slot checkpoint");
    1060             : 
    1061             :     /*
    1062             :      * Prevent any slot from being created/dropped while we're active. As we
    1063             :      * explicitly do *not* want to block iterating over replication_slots or
    1064             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1065             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1066             :      * enough to guarantee that nobody can change the in_use bits on us.
    1067             :      */
    1068          11 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1069             : 
    1070         121 :     for (i = 0; i < max_replication_slots; i++)
    1071             :     {
    1072         110 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1073             :         char        path[MAXPGPATH];
    1074             : 
    1075         110 :         if (!s->in_use)
    1076         110 :             continue;
    1077             : 
    1078             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1079           0 :         sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
    1080           0 :         SaveSlotToPath(s, path, LOG);
    1081             :     }
    1082          11 :     LWLockRelease(ReplicationSlotAllocationLock);
    1083          11 : }
    1084             : 
    1085             : /*
    1086             :  * Load all replication slots from disk into memory at server startup. This
    1087             :  * needs to be run before we start crash recovery.
    1088             :  */
    1089             : void
    1090           3 : StartupReplicationSlots(void)
    1091             : {
    1092             :     DIR        *replication_dir;
    1093             :     struct dirent *replication_de;
    1094             : 
    1095           3 :     elog(DEBUG1, "starting up replication slots");
    1096             : 
    1097             :     /* restore all slots by iterating over all on-disk entries */
    1098           3 :     replication_dir = AllocateDir("pg_replslot");
    1099          12 :     while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    1100             :     {
    1101             :         struct stat statbuf;
    1102             :         char        path[MAXPGPATH + 12];
    1103             : 
    1104           9 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1105           3 :             strcmp(replication_de->d_name, "..") == 0)
    1106          12 :             continue;
    1107             : 
    1108           0 :         snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
    1109             : 
    1110             :         /* we're only creating directories here, skip if it's not our's */
    1111           0 :         if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    1112           0 :             continue;
    1113             : 
    1114             :         /* we crashed while a slot was being setup or deleted, clean up */
    1115           0 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1116             :         {
    1117           0 :             if (!rmtree(path, true))
    1118             :             {
    1119           0 :                 ereport(WARNING,
    1120             :                         (errcode_for_file_access(),
    1121             :                          errmsg("could not remove directory \"%s\"", path)));
    1122           0 :                 continue;
    1123             :             }
    1124           0 :             fsync_fname("pg_replslot", true);
    1125           0 :             continue;
    1126             :         }
    1127             : 
    1128             :         /* looks like a slot in a normal state, restore */
    1129           0 :         RestoreSlotFromDisk(replication_de->d_name);
    1130             :     }
    1131           3 :     FreeDir(replication_dir);
    1132             : 
    1133             :     /* currently no slots exist, we're done. */
    1134           3 :     if (max_replication_slots <= 0)
    1135           3 :         return;
    1136             : 
    1137             :     /* Now that we have recovered all the data, compute replication xmin */
    1138           3 :     ReplicationSlotsComputeRequiredXmin(false);
    1139           3 :     ReplicationSlotsComputeRequiredLSN();
    1140             : }
    1141             : 
    1142             : /* ----
    1143             :  * Manipulation of on-disk state of replication slots
    1144             :  *
    1145             :  * NB: none of the routines below should take any notice whether a slot is the
    1146             :  * current one or not, that's all handled a layer above.
    1147             :  * ----
    1148             :  */
    1149             : static void
    1150           0 : CreateSlotOnDisk(ReplicationSlot *slot)
    1151             : {
    1152             :     char        tmppath[MAXPGPATH];
    1153             :     char        path[MAXPGPATH];
    1154             :     struct stat st;
    1155             : 
    1156             :     /*
    1157             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1158             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1159             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1160             :      */
    1161             : 
    1162           0 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    1163           0 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
    1164             : 
    1165             :     /*
    1166             :      * It's just barely possible that some previous effort to create or drop a
    1167             :      * slot with this name left a temp directory lying around. If that seems
    1168             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    1169             :      * out at the mkdir() below, so we don't bother checking success.
    1170             :      */
    1171           0 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    1172           0 :         rmtree(tmppath, true);
    1173             : 
    1174             :     /* Create and fsync the temporary slot directory. */
    1175           0 :     if (mkdir(tmppath, S_IRWXU) < 0)
    1176           0 :         ereport(ERROR,
    1177             :                 (errcode_for_file_access(),
    1178             :                  errmsg("could not create directory \"%s\": %m",
    1179             :                         tmppath)));
    1180           0 :     fsync_fname(tmppath, true);
    1181             : 
    1182             :     /* Write the actual state file. */
    1183           0 :     slot->dirty = true;          /* signal that we really need to write */
    1184           0 :     SaveSlotToPath(slot, tmppath, ERROR);
    1185             : 
    1186             :     /* Rename the directory into place. */
    1187           0 :     if (rename(tmppath, path) != 0)
    1188           0 :         ereport(ERROR,
    1189             :                 (errcode_for_file_access(),
    1190             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1191             :                         tmppath, path)));
    1192             : 
    1193             :     /*
    1194             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    1195             :      * would persist after an OS crash or not - so, force a restart. The
    1196             :      * restart would try to fsync this again till it works.
    1197             :      */
    1198           0 :     START_CRIT_SECTION();
    1199             : 
    1200           0 :     fsync_fname(path, true);
    1201           0 :     fsync_fname("pg_replslot", true);
    1202             : 
    1203           0 :     END_CRIT_SECTION();
    1204           0 : }
    1205             : 
    1206             : /*
    1207             :  * Shared functionality between saving and creating a replication slot.
    1208             :  */
    1209             : static void
    1210           0 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    1211             : {
    1212             :     char        tmppath[MAXPGPATH];
    1213             :     char        path[MAXPGPATH];
    1214             :     int         fd;
    1215             :     ReplicationSlotOnDisk cp;
    1216             :     bool        was_dirty;
    1217             : 
    1218             :     /* first check whether there's something to write out */
    1219           0 :     SpinLockAcquire(&slot->mutex);
    1220           0 :     was_dirty = slot->dirty;
    1221           0 :     slot->just_dirtied = false;
    1222           0 :     SpinLockRelease(&slot->mutex);
    1223             : 
    1224             :     /* and don't do anything if there's nothing to write */
    1225           0 :     if (!was_dirty)
    1226           0 :         return;
    1227             : 
    1228           0 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    1229             : 
    1230             :     /* silence valgrind :( */
    1231           0 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    1232             : 
    1233           0 :     sprintf(tmppath, "%s/state.tmp", dir);
    1234           0 :     sprintf(path, "%s/state", dir);
    1235             : 
    1236           0 :     fd = OpenTransientFile(tmppath,
    1237             :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
    1238             :                            S_IRUSR | S_IWUSR);
    1239           0 :     if (fd < 0)
    1240             :     {
    1241           0 :         ereport(elevel,
    1242             :                 (errcode_for_file_access(),
    1243             :                  errmsg("could not create file \"%s\": %m",
    1244             :                         tmppath)));
    1245           0 :         return;
    1246             :     }
    1247             : 
    1248           0 :     cp.magic = SLOT_MAGIC;
    1249           0 :     INIT_CRC32C(cp.checksum);
    1250           0 :     cp.version = SLOT_VERSION;
    1251           0 :     cp.length = ReplicationSlotOnDiskV2Size;
    1252             : 
    1253           0 :     SpinLockAcquire(&slot->mutex);
    1254             : 
    1255           0 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    1256             : 
    1257           0 :     SpinLockRelease(&slot->mutex);
    1258             : 
    1259           0 :     COMP_CRC32C(cp.checksum,
    1260             :                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
    1261             :                 SnapBuildOnDiskChecksummedSize);
    1262           0 :     FIN_CRC32C(cp.checksum);
    1263             : 
    1264           0 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    1265           0 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    1266             :     {
    1267           0 :         int         save_errno = errno;
    1268             : 
    1269           0 :         pgstat_report_wait_end();
    1270           0 :         CloseTransientFile(fd);
    1271           0 :         errno = save_errno;
    1272           0 :         ereport(elevel,
    1273             :                 (errcode_for_file_access(),
    1274             :                  errmsg("could not write to file \"%s\": %m",
    1275             :                         tmppath)));
    1276           0 :         return;
    1277             :     }
    1278           0 :     pgstat_report_wait_end();
    1279             : 
    1280             :     /* fsync the temporary file */
    1281           0 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    1282           0 :     if (pg_fsync(fd) != 0)
    1283             :     {
    1284           0 :         int         save_errno = errno;
    1285             : 
    1286           0 :         pgstat_report_wait_end();
    1287           0 :         CloseTransientFile(fd);
    1288           0 :         errno = save_errno;
    1289           0 :         ereport(elevel,
    1290             :                 (errcode_for_file_access(),
    1291             :                  errmsg("could not fsync file \"%s\": %m",
    1292             :                         tmppath)));
    1293           0 :         return;
    1294             :     }
    1295           0 :     pgstat_report_wait_end();
    1296             : 
    1297           0 :     CloseTransientFile(fd);
    1298             : 
    1299             :     /* rename to permanent file, fsync file and directory */
    1300           0 :     if (rename(tmppath, path) != 0)
    1301             :     {
    1302           0 :         ereport(elevel,
    1303             :                 (errcode_for_file_access(),
    1304             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1305             :                         tmppath, path)));
    1306           0 :         return;
    1307             :     }
    1308             : 
    1309             :     /* Check CreateSlot() for the reasoning of using a crit. section. */
    1310           0 :     START_CRIT_SECTION();
    1311             : 
    1312           0 :     fsync_fname(path, false);
    1313           0 :     fsync_fname(dir, true);
    1314           0 :     fsync_fname("pg_replslot", true);
    1315             : 
    1316           0 :     END_CRIT_SECTION();
    1317             : 
    1318             :     /*
    1319             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    1320             :      * already.
    1321             :      */
    1322           0 :     SpinLockAcquire(&slot->mutex);
    1323           0 :     if (!slot->just_dirtied)
    1324           0 :         slot->dirty = false;
    1325           0 :     SpinLockRelease(&slot->mutex);
    1326             : 
    1327           0 :     LWLockRelease(&slot->io_in_progress_lock);
    1328             : }
    1329             : 
    1330             : /*
    1331             :  * Load a single slot from disk into memory.
    1332             :  */
    1333             : static void
    1334           0 : RestoreSlotFromDisk(const char *name)
    1335             : {
    1336             :     ReplicationSlotOnDisk cp;
    1337             :     int         i;
    1338             :     char        path[MAXPGPATH + 22];
    1339             :     int         fd;
    1340           0 :     bool        restored = false;
    1341             :     int         readBytes;
    1342             :     pg_crc32c   checksum;
    1343             : 
    1344             :     /* no need to lock here, no concurrent access allowed yet */
    1345             : 
    1346             :     /* delete temp file if it exists */
    1347           0 :     sprintf(path, "pg_replslot/%s/state.tmp", name);
    1348           0 :     if (unlink(path) < 0 && errno != ENOENT)
    1349           0 :         ereport(PANIC,
    1350             :                 (errcode_for_file_access(),
    1351             :                  errmsg("could not remove file \"%s\": %m", path)));
    1352             : 
    1353           0 :     sprintf(path, "pg_replslot/%s/state", name);
    1354             : 
    1355           0 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    1356             : 
    1357           0 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
    1358             : 
    1359             :     /*
    1360             :      * We do not need to handle this as we are rename()ing the directory into
    1361             :      * place only after we fsync()ed the state file.
    1362             :      */
    1363           0 :     if (fd < 0)
    1364           0 :         ereport(PANIC,
    1365             :                 (errcode_for_file_access(),
    1366             :                  errmsg("could not open file \"%s\": %m", path)));
    1367             : 
    1368             :     /*
    1369             :      * Sync state file before we're reading from it. We might have crashed
    1370             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    1371             :      */
    1372           0 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    1373           0 :     if (pg_fsync(fd) != 0)
    1374             :     {
    1375           0 :         CloseTransientFile(fd);
    1376           0 :         ereport(PANIC,
    1377             :                 (errcode_for_file_access(),
    1378             :                  errmsg("could not fsync file \"%s\": %m",
    1379             :                         path)));
    1380             :     }
    1381           0 :     pgstat_report_wait_end();
    1382             : 
    1383             :     /* Also sync the parent directory */
    1384           0 :     START_CRIT_SECTION();
    1385           0 :     fsync_fname(path, true);
    1386           0 :     END_CRIT_SECTION();
    1387             : 
    1388             :     /* read part of statefile that's guaranteed to be version independent */
    1389           0 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1390           0 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    1391           0 :     pgstat_report_wait_end();
    1392           0 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    1393             :     {
    1394           0 :         int         saved_errno = errno;
    1395             : 
    1396           0 :         CloseTransientFile(fd);
    1397           0 :         errno = saved_errno;
    1398           0 :         ereport(PANIC,
    1399             :                 (errcode_for_file_access(),
    1400             :                  errmsg("could not read file \"%s\", read %d of %u: %m",
    1401             :                         path, readBytes,
    1402             :                         (uint32) ReplicationSlotOnDiskConstantSize)));
    1403             :     }
    1404             : 
    1405             :     /* verify magic */
    1406           0 :     if (cp.magic != SLOT_MAGIC)
    1407           0 :         ereport(PANIC,
    1408             :                 (errcode_for_file_access(),
    1409             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    1410             :                         path, cp.magic, SLOT_MAGIC)));
    1411             : 
    1412             :     /* verify version */
    1413           0 :     if (cp.version != SLOT_VERSION)
    1414           0 :         ereport(PANIC,
    1415             :                 (errcode_for_file_access(),
    1416             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    1417             :                         path, cp.version)));
    1418             : 
    1419             :     /* boundary check on length */
    1420           0 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    1421           0 :         ereport(PANIC,
    1422             :                 (errcode_for_file_access(),
    1423             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    1424             :                         path, cp.length)));
    1425             : 
    1426             :     /* Now that we know the size, read the entire file */
    1427           0 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1428           0 :     readBytes = read(fd,
    1429             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    1430             :                      cp.length);
    1431           0 :     pgstat_report_wait_end();
    1432           0 :     if (readBytes != cp.length)
    1433             :     {
    1434           0 :         int         saved_errno = errno;
    1435             : 
    1436           0 :         CloseTransientFile(fd);
    1437           0 :         errno = saved_errno;
    1438           0 :         ereport(PANIC,
    1439             :                 (errcode_for_file_access(),
    1440             :                  errmsg("could not read file \"%s\", read %d of %u: %m",
    1441             :                         path, readBytes, cp.length)));
    1442             :     }
    1443             : 
    1444           0 :     CloseTransientFile(fd);
    1445             : 
    1446             :     /* now verify the CRC */
    1447           0 :     INIT_CRC32C(checksum);
    1448           0 :     COMP_CRC32C(checksum,
    1449             :                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
    1450             :                 SnapBuildOnDiskChecksummedSize);
    1451           0 :     FIN_CRC32C(checksum);
    1452             : 
    1453           0 :     if (!EQ_CRC32C(checksum, cp.checksum))
    1454           0 :         ereport(PANIC,
    1455             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    1456             :                         path, checksum, cp.checksum)));
    1457             : 
    1458             :     /*
    1459             :      * If we crashed with an ephemeral slot active, don't restore but delete
    1460             :      * it.
    1461             :      */
    1462           0 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    1463             :     {
    1464           0 :         sprintf(path, "pg_replslot/%s", name);
    1465             : 
    1466           0 :         if (!rmtree(path, true))
    1467             :         {
    1468           0 :             ereport(WARNING,
    1469             :                     (errcode_for_file_access(),
    1470             :                      errmsg("could not remove directory \"%s\"", path)));
    1471             :         }
    1472           0 :         fsync_fname("pg_replslot", true);
    1473           0 :         return;
    1474             :     }
    1475             : 
    1476             :     /* nothing can be active yet, don't lock anything */
    1477           0 :     for (i = 0; i < max_replication_slots; i++)
    1478             :     {
    1479             :         ReplicationSlot *slot;
    1480             : 
    1481           0 :         slot = &ReplicationSlotCtl->replication_slots[i];
    1482             : 
    1483           0 :         if (slot->in_use)
    1484           0 :             continue;
    1485             : 
    1486             :         /* restore the entire set of persistent data */
    1487           0 :         memcpy(&slot->data, &cp.slotdata,
    1488             :                sizeof(ReplicationSlotPersistentData));
    1489             : 
    1490             :         /* initialize in memory state */
    1491           0 :         slot->effective_xmin = cp.slotdata.xmin;
    1492           0 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    1493             : 
    1494           0 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    1495           0 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1496           0 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    1497           0 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    1498             : 
    1499           0 :         slot->in_use = true;
    1500           0 :         slot->active_pid = 0;
    1501             : 
    1502           0 :         restored = true;
    1503           0 :         break;
    1504             :     }
    1505             : 
    1506           0 :     if (!restored)
    1507           0 :         ereport(PANIC,
    1508             :                 (errmsg("too many replication slots active before shutdown"),
    1509             :                  errhint("Increase max_replication_slots and try again.")));
    1510             : }

Generated by: LCOV version 1.11