Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : * directory. Inside that directory the state file will contain the slot's
25 : * own data. Additional data can be stored alongside that file if required.
26 : * While the server is running, the state data is also cached in memory for
27 : * efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "common/string.h"
45 : #include "miscadmin.h"
46 : #include "pgstat.h"
47 : #include "replication/slot.h"
48 : #include "storage/fd.h"
49 : #include "storage/proc.h"
50 : #include "storage/procarray.h"
51 : #include "utils/builtins.h"
52 :
53 : /*
54 : * Replication slot on-disk data structure.
55 : */
56 : typedef struct ReplicationSlotOnDisk
57 : {
58 : /* first part of this struct needs to be version independent */
59 :
60 : /* data not covered by checksum */
61 : uint32 magic;
62 : pg_crc32c checksum;
63 :
64 : /* data covered by checksum */
65 : uint32 version;
66 : uint32 length;
67 :
68 : /*
69 : * The actual data in the slot that follows can differ based on the above
70 : * 'version'.
71 : */
72 :
73 : ReplicationSlotPersistentData slotdata;
74 : } ReplicationSlotOnDisk;
75 :
76 : /* size of version independent data */
77 : #define ReplicationSlotOnDiskConstantSize \
78 : offsetof(ReplicationSlotOnDisk, slotdata)
79 : /* size of the part of the slot not covered by the checksum */
80 : #define SnapBuildOnDiskNotChecksummedSize \
81 : offsetof(ReplicationSlotOnDisk, version)
82 : /* size of the part covered by the checksum */
83 : #define SnapBuildOnDiskChecksummedSize \
84 : sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 : /* size of the slot data that is version dependent */
86 : #define ReplicationSlotOnDiskV2Size \
87 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88 :
89 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
90 : #define SLOT_VERSION 2 /* version for new files */
91 :
92 : /* Control array for replication slot management */
93 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94 :
95 : /* My backend's replication slot in the shared memory array */
96 : ReplicationSlot *MyReplicationSlot = NULL;
97 :
98 : /* GUCs */
99 : int max_replication_slots = 0; /* the maximum number of replication
100 : * slots */
101 :
102 : static void ReplicationSlotDropAcquired(void);
103 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104 :
105 : /* internal persistency functions */
106 : static void RestoreSlotFromDisk(const char *name);
107 : static void CreateSlotOnDisk(ReplicationSlot *slot);
108 : static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109 :
110 : /*
111 : * Report shared-memory space needed by ReplicationSlotShmemInit.
112 : */
113 : Size
114 15 : ReplicationSlotsShmemSize(void)
115 : {
116 15 : Size size = 0;
117 :
118 15 : if (max_replication_slots == 0)
119 0 : return size;
120 :
121 15 : size = offsetof(ReplicationSlotCtlData, replication_slots);
122 15 : size = add_size(size,
123 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124 :
125 15 : return size;
126 : }
127 :
128 : /*
129 : * Allocate and initialize walsender-related shared memory.
130 : */
131 : void
132 5 : ReplicationSlotsShmemInit(void)
133 : {
134 : bool found;
135 :
136 5 : if (max_replication_slots == 0)
137 5 : return;
138 :
139 5 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 5 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 : &found);
142 :
143 5 : LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 : "replication_slot_io");
145 :
146 5 : if (!found)
147 : {
148 : int i;
149 :
150 : /* First time through, so initialize */
151 5 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152 :
153 55 : for (i = 0; i < max_replication_slots; i++)
154 : {
155 50 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156 :
157 : /* everything else is zeroed by the memset above */
158 50 : SpinLockInit(&slot->mutex);
159 50 : LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 50 : ConditionVariableInit(&slot->active_cv);
161 : }
162 : }
163 : }
164 :
165 : /*
166 : * Check whether the passed slot name is valid and report errors at elevel.
167 : *
168 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169 : * the name to be used as a directory name on every supported OS.
170 : *
171 : * Returns whether the directory name is valid or not if elevel < ERROR.
172 : */
173 : bool
174 0 : ReplicationSlotValidateName(const char *name, int elevel)
175 : {
176 : const char *cp;
177 :
178 0 : if (strlen(name) == 0)
179 : {
180 0 : ereport(elevel,
181 : (errcode(ERRCODE_INVALID_NAME),
182 : errmsg("replication slot name \"%s\" is too short",
183 : name)));
184 0 : return false;
185 : }
186 :
187 0 : if (strlen(name) >= NAMEDATALEN)
188 : {
189 0 : ereport(elevel,
190 : (errcode(ERRCODE_NAME_TOO_LONG),
191 : errmsg("replication slot name \"%s\" is too long",
192 : name)));
193 0 : return false;
194 : }
195 :
196 0 : for (cp = name; *cp; cp++)
197 : {
198 0 : if (!((*cp >= 'a' && *cp <= 'z')
199 0 : || (*cp >= '0' && *cp <= '9')
200 0 : || (*cp == '_')))
201 : {
202 0 : ereport(elevel,
203 : (errcode(ERRCODE_INVALID_NAME),
204 : errmsg("replication slot name \"%s\" contains invalid character",
205 : name),
206 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 0 : return false;
208 : }
209 : }
210 0 : return true;
211 : }
212 :
213 : /*
214 : * Create a new replication slot and mark it as used by this backend.
215 : *
216 : * name: Name of the slot
217 : * db_specific: logical decoding is db specific; if the slot is going to
218 : * be used for that pass true, otherwise false.
219 : */
220 : void
221 0 : ReplicationSlotCreate(const char *name, bool db_specific,
222 : ReplicationSlotPersistency persistency)
223 : {
224 0 : ReplicationSlot *slot = NULL;
225 : int i;
226 :
227 0 : Assert(MyReplicationSlot == NULL);
228 :
229 0 : ReplicationSlotValidateName(name, ERROR);
230 :
231 : /*
232 : * If some other backend ran this code concurrently with us, we'd likely
233 : * both allocate the same slot, and that would be bad. We'd also be at
234 : * risk of missing a name collision. Also, we don't want to try to create
235 : * a new slot while somebody's busy cleaning up an old one, because we
236 : * might both be monkeying with the same directory.
237 : */
238 0 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239 :
240 : /*
241 : * Check for name collision, and identify an allocatable slot. We need to
242 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 : * else can change the in_use flags while we're looking at them.
244 : */
245 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 0 : for (i = 0; i < max_replication_slots; i++)
247 : {
248 0 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249 :
250 0 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 0 : ereport(ERROR,
252 : (errcode(ERRCODE_DUPLICATE_OBJECT),
253 : errmsg("replication slot \"%s\" already exists", name)));
254 0 : if (!s->in_use && slot == NULL)
255 0 : slot = s;
256 : }
257 0 : LWLockRelease(ReplicationSlotControlLock);
258 :
259 : /* If all slots are in use, we're out of luck. */
260 0 : if (slot == NULL)
261 0 : ereport(ERROR,
262 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 : errmsg("all replication slots are in use"),
264 : errhint("Free one or increase max_replication_slots.")));
265 :
266 : /*
267 : * Since this slot is not in use, nobody should be looking at any part of
268 : * it other than the in_use field unless they're trying to allocate it.
269 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 : * be doing that. So it's safe to initialize the slot.
271 : */
272 0 : Assert(!slot->in_use);
273 0 : Assert(slot->active_pid == 0);
274 :
275 : /* first initialize persistent data */
276 0 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 0 : StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 0 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 0 : slot->data.persistency = persistency;
280 :
281 : /* and then data only present in shared memory */
282 0 : slot->just_dirtied = false;
283 0 : slot->dirty = false;
284 0 : slot->effective_xmin = InvalidTransactionId;
285 0 : slot->effective_catalog_xmin = InvalidTransactionId;
286 0 : slot->candidate_catalog_xmin = InvalidTransactionId;
287 0 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 0 : slot->candidate_restart_valid = InvalidXLogRecPtr;
289 0 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
290 :
291 : /*
292 : * Create the slot on disk. We haven't actually marked the slot allocated
293 : * yet, so no special cleanup is required if this errors out.
294 : */
295 0 : CreateSlotOnDisk(slot);
296 :
297 : /*
298 : * We need to briefly prevent any other backend from iterating over the
299 : * slots while we flip the in_use flag. We also need to set the active
300 : * flag while holding the ControlLock as otherwise a concurrent
301 : * SlotAcquire() could acquire the slot as well.
302 : */
303 0 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304 :
305 0 : slot->in_use = true;
306 :
307 : /* We can now mark the slot active, and that makes it our slot. */
308 0 : SpinLockAcquire(&slot->mutex);
309 0 : Assert(slot->active_pid == 0);
310 0 : slot->active_pid = MyProcPid;
311 0 : SpinLockRelease(&slot->mutex);
312 0 : MyReplicationSlot = slot;
313 :
314 0 : LWLockRelease(ReplicationSlotControlLock);
315 :
316 : /*
317 : * Now that the slot has been marked as in_use and active, it's safe to
318 : * let somebody else try to allocate a slot.
319 : */
320 0 : LWLockRelease(ReplicationSlotAllocationLock);
321 :
322 : /* Let everybody know we've modified this slot */
323 0 : ConditionVariableBroadcast(&slot->active_cv);
324 0 : }
325 :
326 : /*
327 : * Find a previously created slot and mark it as used by this backend.
328 : */
329 : void
330 0 : ReplicationSlotAcquire(const char *name, bool nowait)
331 : {
332 : ReplicationSlot *slot;
333 : int active_pid;
334 : int i;
335 :
336 : retry:
337 0 : Assert(MyReplicationSlot == NULL);
338 :
339 : /*
340 : * Search for the named slot and mark it active if we find it. If the
341 : * slot is already active, we exit the loop with active_pid set to the PID
342 : * of the backend that owns it.
343 : */
344 0 : active_pid = 0;
345 0 : slot = NULL;
346 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 0 : for (i = 0; i < max_replication_slots; i++)
348 : {
349 0 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350 :
351 0 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 : {
353 : /*
354 : * This is the slot we want. We don't know yet if it's active, so
355 : * get ready to sleep on it in case it is. (We may end up not
356 : * sleeping, but we don't want to do this while holding the
357 : * spinlock.)
358 : */
359 0 : ConditionVariablePrepareToSleep(&s->active_cv);
360 :
361 0 : SpinLockAcquire(&s->mutex);
362 :
363 0 : active_pid = s->active_pid;
364 0 : if (active_pid == 0)
365 0 : active_pid = s->active_pid = MyProcPid;
366 :
367 0 : SpinLockRelease(&s->mutex);
368 0 : slot = s;
369 :
370 0 : break;
371 : }
372 : }
373 0 : LWLockRelease(ReplicationSlotControlLock);
374 :
375 : /* If we did not find the slot, error out. */
376 0 : if (slot == NULL)
377 0 : ereport(ERROR,
378 : (errcode(ERRCODE_UNDEFINED_OBJECT),
379 : errmsg("replication slot \"%s\" does not exist", name)));
380 :
381 : /*
382 : * If we found the slot but it's already active in another backend, we
383 : * either error out or retry after a short wait, as caller specified.
384 : */
385 0 : if (active_pid != MyProcPid)
386 : {
387 0 : if (nowait)
388 0 : ereport(ERROR,
389 : (errcode(ERRCODE_OBJECT_IN_USE),
390 : errmsg("replication slot \"%s\" is active for PID %d",
391 : name, active_pid)));
392 :
393 : /* Wait here until we get signaled, and then restart */
394 0 : ConditionVariableSleep(&slot->active_cv,
395 : WAIT_EVENT_REPLICATION_SLOT_DROP);
396 0 : ConditionVariableCancelSleep();
397 0 : goto retry;
398 : }
399 : else
400 0 : ConditionVariableCancelSleep(); /* no sleep needed after all */
401 :
402 : /* Let everybody know we've modified this slot */
403 0 : ConditionVariableBroadcast(&slot->active_cv);
404 :
405 : /* We made this slot active, so it's ours now. */
406 0 : MyReplicationSlot = slot;
407 0 : }
408 :
409 : /*
410 : * Release the replication slot that this backend considers to own.
411 : *
412 : * This or another backend can re-acquire the slot later.
413 : * Resources this slot requires will be preserved.
414 : */
415 : void
416 0 : ReplicationSlotRelease(void)
417 : {
418 0 : ReplicationSlot *slot = MyReplicationSlot;
419 :
420 0 : Assert(slot != NULL && slot->active_pid != 0);
421 :
422 0 : if (slot->data.persistency == RS_EPHEMERAL)
423 : {
424 : /*
425 : * Delete the slot. There is no !PANIC case where this is allowed to
426 : * fail, all that may happen is an incomplete cleanup of the on-disk
427 : * data.
428 : */
429 0 : ReplicationSlotDropAcquired();
430 : }
431 :
432 : /*
433 : * If slot needed to temporarily restrain both data and catalog xmin to
434 : * create the catalog snapshot, remove that temporary constraint.
435 : * Snapshots can only be exported while the initial snapshot is still
436 : * acquired.
437 : */
438 0 : if (!TransactionIdIsValid(slot->data.xmin) &&
439 0 : TransactionIdIsValid(slot->effective_xmin))
440 : {
441 0 : SpinLockAcquire(&slot->mutex);
442 0 : slot->effective_xmin = InvalidTransactionId;
443 0 : SpinLockRelease(&slot->mutex);
444 0 : ReplicationSlotsComputeRequiredXmin(false);
445 : }
446 :
447 0 : if (slot->data.persistency == RS_PERSISTENT)
448 : {
449 : /*
450 : * Mark persistent slot inactive. We're not freeing it, just
451 : * disconnecting, but wake up others that may be waiting for it.
452 : */
453 0 : SpinLockAcquire(&slot->mutex);
454 0 : slot->active_pid = 0;
455 0 : SpinLockRelease(&slot->mutex);
456 0 : ConditionVariableBroadcast(&slot->active_cv);
457 : }
458 :
459 0 : MyReplicationSlot = NULL;
460 :
461 : /* might not have been set when we've been a plain slot */
462 0 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
463 0 : MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
464 0 : LWLockRelease(ProcArrayLock);
465 0 : }
466 :
467 : /*
468 : * Cleanup all temporary slots created in current session.
469 : */
470 : void
471 3558 : ReplicationSlotCleanup(void)
472 : {
473 : int i;
474 :
475 3558 : Assert(MyReplicationSlot == NULL);
476 :
477 : restart:
478 3558 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
479 39138 : for (i = 0; i < max_replication_slots; i++)
480 : {
481 35580 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
482 :
483 35580 : if (!s->in_use)
484 35580 : continue;
485 :
486 0 : SpinLockAcquire(&s->mutex);
487 0 : if (s->active_pid == MyProcPid)
488 : {
489 0 : Assert(s->data.persistency == RS_TEMPORARY);
490 0 : SpinLockRelease(&s->mutex);
491 0 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
492 :
493 0 : ReplicationSlotDropPtr(s);
494 :
495 0 : ConditionVariableBroadcast(&s->active_cv);
496 0 : goto restart;
497 : }
498 : else
499 0 : SpinLockRelease(&s->mutex);
500 : }
501 :
502 3558 : LWLockRelease(ReplicationSlotControlLock);
503 3558 : }
504 :
505 : /*
506 : * Permanently drop replication slot identified by the passed in name.
507 : */
508 : void
509 0 : ReplicationSlotDrop(const char *name, bool nowait)
510 : {
511 0 : Assert(MyReplicationSlot == NULL);
512 :
513 0 : ReplicationSlotAcquire(name, nowait);
514 :
515 0 : ReplicationSlotDropAcquired();
516 0 : }
517 :
518 : /*
519 : * Permanently drop the currently acquired replication slot.
520 : */
521 : static void
522 0 : ReplicationSlotDropAcquired(void)
523 : {
524 0 : ReplicationSlot *slot = MyReplicationSlot;
525 :
526 0 : Assert(MyReplicationSlot != NULL);
527 :
528 : /* slot isn't acquired anymore */
529 0 : MyReplicationSlot = NULL;
530 :
531 0 : ReplicationSlotDropPtr(slot);
532 0 : }
533 :
534 : /*
535 : * Permanently drop the replication slot which will be released by the point
536 : * this function returns.
537 : */
538 : static void
539 0 : ReplicationSlotDropPtr(ReplicationSlot *slot)
540 : {
541 : char path[MAXPGPATH];
542 : char tmppath[MAXPGPATH];
543 :
544 : /*
545 : * If some other backend ran this code concurrently with us, we might try
546 : * to delete a slot with a certain name while someone else was trying to
547 : * create a slot with the same name.
548 : */
549 0 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
550 :
551 : /* Generate pathnames. */
552 0 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
553 0 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
554 :
555 : /*
556 : * Rename the slot directory on disk, so that we'll no longer recognize
557 : * this as a valid slot. Note that if this fails, we've got to mark the
558 : * slot inactive before bailing out. If we're dropping an ephemeral or a
559 : * temporary slot, we better never fail hard as the caller won't expect
560 : * the slot to survive and this might get called during error handling.
561 : */
562 0 : if (rename(path, tmppath) == 0)
563 : {
564 : /*
565 : * We need to fsync() the directory we just renamed and its parent to
566 : * make sure that our changes are on disk in a crash-safe fashion. If
567 : * fsync() fails, we can't be sure whether the changes are on disk or
568 : * not. For now, we handle that by panicking;
569 : * StartupReplicationSlots() will try to straighten it out after
570 : * restart.
571 : */
572 0 : START_CRIT_SECTION();
573 0 : fsync_fname(tmppath, true);
574 0 : fsync_fname("pg_replslot", true);
575 0 : END_CRIT_SECTION();
576 : }
577 : else
578 : {
579 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
580 :
581 0 : SpinLockAcquire(&slot->mutex);
582 0 : slot->active_pid = 0;
583 0 : SpinLockRelease(&slot->mutex);
584 :
585 : /* wake up anyone waiting on this slot */
586 0 : ConditionVariableBroadcast(&slot->active_cv);
587 :
588 0 : ereport(fail_softly ? WARNING : ERROR,
589 : (errcode_for_file_access(),
590 : errmsg("could not rename file \"%s\" to \"%s\": %m",
591 : path, tmppath)));
592 : }
593 :
594 : /*
595 : * The slot is definitely gone. Lock out concurrent scans of the array
596 : * long enough to kill it. It's OK to clear the active PID here without
597 : * grabbing the mutex because nobody else can be scanning the array here,
598 : * and nobody can be attached to this slot and thus access it without
599 : * scanning the array.
600 : *
601 : * Also wake up processes waiting for it.
602 : */
603 0 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
604 0 : slot->active_pid = 0;
605 0 : slot->in_use = false;
606 0 : LWLockRelease(ReplicationSlotControlLock);
607 0 : ConditionVariableBroadcast(&slot->active_cv);
608 :
609 : /*
610 : * Slot is dead and doesn't prevent resource removal anymore, recompute
611 : * limits.
612 : */
613 0 : ReplicationSlotsComputeRequiredXmin(false);
614 0 : ReplicationSlotsComputeRequiredLSN();
615 :
616 : /*
617 : * If removing the directory fails, the worst thing that will happen is
618 : * that the user won't be able to create a new slot with the same name
619 : * until the next server restart. We warn about it, but that's all.
620 : */
621 0 : if (!rmtree(tmppath, true))
622 0 : ereport(WARNING,
623 : (errcode_for_file_access(),
624 : errmsg("could not remove directory \"%s\"", tmppath)));
625 :
626 : /*
627 : * We release this at the very end, so that nobody starts trying to create
628 : * a slot while we're still cleaning up the detritus of the old one.
629 : */
630 0 : LWLockRelease(ReplicationSlotAllocationLock);
631 0 : }
632 :
633 : /*
634 : * Serialize the currently acquired slot's state from memory to disk, thereby
635 : * guaranteeing the current state will survive a crash.
636 : */
637 : void
638 0 : ReplicationSlotSave(void)
639 : {
640 : char path[MAXPGPATH];
641 :
642 0 : Assert(MyReplicationSlot != NULL);
643 :
644 0 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
645 0 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
646 0 : }
647 :
648 : /*
649 : * Signal that it would be useful if the currently acquired slot would be
650 : * flushed out to disk.
651 : *
652 : * Note that the actual flush to disk can be delayed for a long time, if
653 : * required for correctness explicitly do a ReplicationSlotSave().
654 : */
655 : void
656 0 : ReplicationSlotMarkDirty(void)
657 : {
658 0 : ReplicationSlot *slot = MyReplicationSlot;
659 :
660 0 : Assert(MyReplicationSlot != NULL);
661 :
662 0 : SpinLockAcquire(&slot->mutex);
663 0 : MyReplicationSlot->just_dirtied = true;
664 0 : MyReplicationSlot->dirty = true;
665 0 : SpinLockRelease(&slot->mutex);
666 0 : }
667 :
668 : /*
669 : * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
670 : * guaranteeing it will be there after an eventual crash.
671 : */
672 : void
673 0 : ReplicationSlotPersist(void)
674 : {
675 0 : ReplicationSlot *slot = MyReplicationSlot;
676 :
677 0 : Assert(slot != NULL);
678 0 : Assert(slot->data.persistency != RS_PERSISTENT);
679 :
680 0 : SpinLockAcquire(&slot->mutex);
681 0 : slot->data.persistency = RS_PERSISTENT;
682 0 : SpinLockRelease(&slot->mutex);
683 :
684 0 : ReplicationSlotMarkDirty();
685 0 : ReplicationSlotSave();
686 0 : }
687 :
688 : /*
689 : * Compute the oldest xmin across all slots and store it in the ProcArray.
690 : *
691 : * If already_locked is true, ProcArrayLock has already been acquired
692 : * exclusively.
693 : */
694 : void
695 3 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
696 : {
697 : int i;
698 3 : TransactionId agg_xmin = InvalidTransactionId;
699 3 : TransactionId agg_catalog_xmin = InvalidTransactionId;
700 :
701 3 : Assert(ReplicationSlotCtl != NULL);
702 :
703 3 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
704 :
705 33 : for (i = 0; i < max_replication_slots; i++)
706 : {
707 30 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
708 : TransactionId effective_xmin;
709 : TransactionId effective_catalog_xmin;
710 :
711 30 : if (!s->in_use)
712 30 : continue;
713 :
714 0 : SpinLockAcquire(&s->mutex);
715 0 : effective_xmin = s->effective_xmin;
716 0 : effective_catalog_xmin = s->effective_catalog_xmin;
717 0 : SpinLockRelease(&s->mutex);
718 :
719 : /* check the data xmin */
720 0 : if (TransactionIdIsValid(effective_xmin) &&
721 0 : (!TransactionIdIsValid(agg_xmin) ||
722 0 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
723 0 : agg_xmin = effective_xmin;
724 :
725 : /* check the catalog xmin */
726 0 : if (TransactionIdIsValid(effective_catalog_xmin) &&
727 0 : (!TransactionIdIsValid(agg_catalog_xmin) ||
728 0 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
729 0 : agg_catalog_xmin = effective_catalog_xmin;
730 : }
731 :
732 3 : LWLockRelease(ReplicationSlotControlLock);
733 :
734 3 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
735 3 : }
736 :
737 : /*
738 : * Compute the oldest restart LSN across all slots and inform xlog module.
739 : */
740 : void
741 3 : ReplicationSlotsComputeRequiredLSN(void)
742 : {
743 : int i;
744 3 : XLogRecPtr min_required = InvalidXLogRecPtr;
745 :
746 3 : Assert(ReplicationSlotCtl != NULL);
747 :
748 3 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
749 33 : for (i = 0; i < max_replication_slots; i++)
750 : {
751 30 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
752 : XLogRecPtr restart_lsn;
753 :
754 30 : if (!s->in_use)
755 30 : continue;
756 :
757 0 : SpinLockAcquire(&s->mutex);
758 0 : restart_lsn = s->data.restart_lsn;
759 0 : SpinLockRelease(&s->mutex);
760 :
761 0 : if (restart_lsn != InvalidXLogRecPtr &&
762 0 : (min_required == InvalidXLogRecPtr ||
763 : restart_lsn < min_required))
764 0 : min_required = restart_lsn;
765 : }
766 3 : LWLockRelease(ReplicationSlotControlLock);
767 :
768 3 : XLogSetReplicationSlotMinimumLSN(min_required);
769 3 : }
770 :
771 : /*
772 : * Compute the oldest WAL LSN required by *logical* decoding slots..
773 : *
774 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
775 : * slots exist.
776 : *
777 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
778 : * ignores physical replication slots.
779 : *
780 : * The results aren't required frequently, so we don't maintain a precomputed
781 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
782 : */
783 : XLogRecPtr
784 22 : ReplicationSlotsComputeLogicalRestartLSN(void)
785 : {
786 22 : XLogRecPtr result = InvalidXLogRecPtr;
787 : int i;
788 :
789 22 : if (max_replication_slots <= 0)
790 0 : return InvalidXLogRecPtr;
791 :
792 22 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
793 :
794 242 : for (i = 0; i < max_replication_slots; i++)
795 : {
796 : ReplicationSlot *s;
797 : XLogRecPtr restart_lsn;
798 :
799 220 : s = &ReplicationSlotCtl->replication_slots[i];
800 :
801 : /* cannot change while ReplicationSlotCtlLock is held */
802 220 : if (!s->in_use)
803 220 : continue;
804 :
805 : /* we're only interested in logical slots */
806 0 : if (!SlotIsLogical(s))
807 0 : continue;
808 :
809 : /* read once, it's ok if it increases while we're checking */
810 0 : SpinLockAcquire(&s->mutex);
811 0 : restart_lsn = s->data.restart_lsn;
812 0 : SpinLockRelease(&s->mutex);
813 :
814 0 : if (result == InvalidXLogRecPtr ||
815 : restart_lsn < result)
816 0 : result = restart_lsn;
817 : }
818 :
819 22 : LWLockRelease(ReplicationSlotControlLock);
820 :
821 22 : return result;
822 : }
823 :
824 : /*
825 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
826 : * passed database oid.
827 : *
828 : * Returns true if there are any slots referencing the database. *nslots will
829 : * be set to the absolute number of slots in the database, *nactive to ones
830 : * currently active.
831 : */
832 : bool
833 0 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
834 : {
835 : int i;
836 :
837 0 : *nslots = *nactive = 0;
838 :
839 0 : if (max_replication_slots <= 0)
840 0 : return false;
841 :
842 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
843 0 : for (i = 0; i < max_replication_slots; i++)
844 : {
845 : ReplicationSlot *s;
846 :
847 0 : s = &ReplicationSlotCtl->replication_slots[i];
848 :
849 : /* cannot change while ReplicationSlotCtlLock is held */
850 0 : if (!s->in_use)
851 0 : continue;
852 :
853 : /* only logical slots are database specific, skip */
854 0 : if (!SlotIsLogical(s))
855 0 : continue;
856 :
857 : /* not our database, skip */
858 0 : if (s->data.database != dboid)
859 0 : continue;
860 :
861 : /* count slots with spinlock held */
862 0 : SpinLockAcquire(&s->mutex);
863 0 : (*nslots)++;
864 0 : if (s->active_pid != 0)
865 0 : (*nactive)++;
866 0 : SpinLockRelease(&s->mutex);
867 : }
868 0 : LWLockRelease(ReplicationSlotControlLock);
869 :
870 0 : if (*nslots > 0)
871 0 : return true;
872 0 : return false;
873 : }
874 :
875 : /*
876 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
877 : * passed database oid. The caller should hold an exclusive lock on the
878 : * pg_database oid for the database to prevent creation of new slots on the db
879 : * or replay from existing slots.
880 : *
881 : * Another session that concurrently acquires an existing slot on the target DB
882 : * (most likely to drop it) may cause this function to ERROR. If that happens
883 : * it may have dropped some but not all slots.
884 : *
885 : * This routine isn't as efficient as it could be - but we don't drop
886 : * databases often, especially databases with lots of slots.
887 : */
888 : void
889 0 : ReplicationSlotsDropDBSlots(Oid dboid)
890 : {
891 : int i;
892 :
893 0 : if (max_replication_slots <= 0)
894 0 : return;
895 :
896 : restart:
897 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
898 0 : for (i = 0; i < max_replication_slots; i++)
899 : {
900 : ReplicationSlot *s;
901 : char *slotname;
902 : int active_pid;
903 :
904 0 : s = &ReplicationSlotCtl->replication_slots[i];
905 :
906 : /* cannot change while ReplicationSlotCtlLock is held */
907 0 : if (!s->in_use)
908 0 : continue;
909 :
910 : /* only logical slots are database specific, skip */
911 0 : if (!SlotIsLogical(s))
912 0 : continue;
913 :
914 : /* not our database, skip */
915 0 : if (s->data.database != dboid)
916 0 : continue;
917 :
918 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
919 0 : SpinLockAcquire(&s->mutex);
920 : /* can't change while ReplicationSlotControlLock is held */
921 0 : slotname = NameStr(s->data.name);
922 0 : active_pid = s->active_pid;
923 0 : if (active_pid == 0)
924 : {
925 0 : MyReplicationSlot = s;
926 0 : s->active_pid = MyProcPid;
927 : }
928 0 : SpinLockRelease(&s->mutex);
929 :
930 : /*
931 : * Even though we hold an exclusive lock on the database object a
932 : * logical slot for that DB can still be active, e.g. if it's
933 : * concurrently being dropped by a backend connected to another DB.
934 : *
935 : * That's fairly unlikely in practice, so we'll just bail out.
936 : */
937 0 : if (active_pid)
938 0 : ereport(ERROR,
939 : (errcode(ERRCODE_OBJECT_IN_USE),
940 : errmsg("replication slot \"%s\" is active for PID %d",
941 : slotname, active_pid)));
942 :
943 : /*
944 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
945 : * holding ReplicationSlotControlLock over filesystem operations,
946 : * release ReplicationSlotControlLock and use
947 : * ReplicationSlotDropAcquired.
948 : *
949 : * As that means the set of slots could change, restart scan from the
950 : * beginning each time we release the lock.
951 : */
952 0 : LWLockRelease(ReplicationSlotControlLock);
953 0 : ReplicationSlotDropAcquired();
954 0 : goto restart;
955 : }
956 0 : LWLockRelease(ReplicationSlotControlLock);
957 : }
958 :
959 :
960 : /*
961 : * Check whether the server's configuration supports using replication
962 : * slots.
963 : */
964 : void
965 0 : CheckSlotRequirements(void)
966 : {
967 0 : if (max_replication_slots == 0)
968 0 : ereport(ERROR,
969 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970 : (errmsg("replication slots can only be used if max_replication_slots > 0"))));
971 :
972 0 : if (wal_level < WAL_LEVEL_REPLICA)
973 0 : ereport(ERROR,
974 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
975 : errmsg("replication slots can only be used if wal_level >= replica")));
976 0 : }
977 :
978 : /*
979 : * Reserve WAL for the currently active slot.
980 : *
981 : * Compute and set restart_lsn in a manner that's appropriate for the type of
982 : * the slot and concurrency safe.
983 : */
984 : void
985 0 : ReplicationSlotReserveWal(void)
986 : {
987 0 : ReplicationSlot *slot = MyReplicationSlot;
988 :
989 0 : Assert(slot != NULL);
990 0 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
991 :
992 : /*
993 : * The replication slot mechanism is used to prevent removal of required
994 : * WAL. As there is no interlock between this routine and checkpoints, WAL
995 : * segments could concurrently be removed when a now stale return value of
996 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
997 : * this happens we'll just retry.
998 : */
999 : while (true)
1000 : {
1001 : XLogSegNo segno;
1002 :
1003 : /*
1004 : * For logical slots log a standby snapshot and start logical decoding
1005 : * at exactly that position. That allows the slot to start up more
1006 : * quickly.
1007 : *
1008 : * That's not needed (or indeed helpful) for physical slots as they'll
1009 : * start replay at the last logged checkpoint anyway. Instead return
1010 : * the location of the last redo LSN. While that slightly increases
1011 : * the chance that we have to retry, it's where a base backup has to
1012 : * start replay at.
1013 : */
1014 0 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1015 0 : {
1016 : XLogRecPtr flushptr;
1017 :
1018 : /* start at current insert position */
1019 0 : slot->data.restart_lsn = GetXLogInsertRecPtr();
1020 :
1021 : /* make sure we have enough information to start */
1022 0 : flushptr = LogStandbySnapshot();
1023 :
1024 : /* and make sure it's fsynced to disk */
1025 0 : XLogFlush(flushptr);
1026 : }
1027 : else
1028 : {
1029 0 : slot->data.restart_lsn = GetRedoRecPtr();
1030 : }
1031 :
1032 : /* prevent WAL removal as fast as possible */
1033 0 : ReplicationSlotsComputeRequiredLSN();
1034 :
1035 : /*
1036 : * If all required WAL is still there, great, otherwise retry. The
1037 : * slot should prevent further removal of WAL, unless there's a
1038 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1039 : * the new restart_lsn above, so normally we should never need to loop
1040 : * more than twice.
1041 : */
1042 0 : XLByteToSeg(slot->data.restart_lsn, segno);
1043 0 : if (XLogGetLastRemovedSegno() < segno)
1044 0 : break;
1045 0 : }
1046 0 : }
1047 :
1048 : /*
1049 : * Flush all replication slots to disk.
1050 : *
1051 : * This needn't actually be part of a checkpoint, but it's a convenient
1052 : * location.
1053 : */
1054 : void
1055 11 : CheckPointReplicationSlots(void)
1056 : {
1057 : int i;
1058 :
1059 11 : elog(DEBUG1, "performing replication slot checkpoint");
1060 :
1061 : /*
1062 : * Prevent any slot from being created/dropped while we're active. As we
1063 : * explicitly do *not* want to block iterating over replication_slots or
1064 : * acquiring a slot we cannot take the control lock - but that's OK,
1065 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1066 : * enough to guarantee that nobody can change the in_use bits on us.
1067 : */
1068 11 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1069 :
1070 121 : for (i = 0; i < max_replication_slots; i++)
1071 : {
1072 110 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1073 : char path[MAXPGPATH];
1074 :
1075 110 : if (!s->in_use)
1076 110 : continue;
1077 :
1078 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1079 0 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1080 0 : SaveSlotToPath(s, path, LOG);
1081 : }
1082 11 : LWLockRelease(ReplicationSlotAllocationLock);
1083 11 : }
1084 :
1085 : /*
1086 : * Load all replication slots from disk into memory at server startup. This
1087 : * needs to be run before we start crash recovery.
1088 : */
1089 : void
1090 3 : StartupReplicationSlots(void)
1091 : {
1092 : DIR *replication_dir;
1093 : struct dirent *replication_de;
1094 :
1095 3 : elog(DEBUG1, "starting up replication slots");
1096 :
1097 : /* restore all slots by iterating over all on-disk entries */
1098 3 : replication_dir = AllocateDir("pg_replslot");
1099 12 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1100 : {
1101 : struct stat statbuf;
1102 : char path[MAXPGPATH + 12];
1103 :
1104 9 : if (strcmp(replication_de->d_name, ".") == 0 ||
1105 3 : strcmp(replication_de->d_name, "..") == 0)
1106 12 : continue;
1107 :
1108 0 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1109 :
1110 : /* we're only creating directories here, skip if it's not our's */
1111 0 : if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1112 0 : continue;
1113 :
1114 : /* we crashed while a slot was being setup or deleted, clean up */
1115 0 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1116 : {
1117 0 : if (!rmtree(path, true))
1118 : {
1119 0 : ereport(WARNING,
1120 : (errcode_for_file_access(),
1121 : errmsg("could not remove directory \"%s\"", path)));
1122 0 : continue;
1123 : }
1124 0 : fsync_fname("pg_replslot", true);
1125 0 : continue;
1126 : }
1127 :
1128 : /* looks like a slot in a normal state, restore */
1129 0 : RestoreSlotFromDisk(replication_de->d_name);
1130 : }
1131 3 : FreeDir(replication_dir);
1132 :
1133 : /* currently no slots exist, we're done. */
1134 3 : if (max_replication_slots <= 0)
1135 3 : return;
1136 :
1137 : /* Now that we have recovered all the data, compute replication xmin */
1138 3 : ReplicationSlotsComputeRequiredXmin(false);
1139 3 : ReplicationSlotsComputeRequiredLSN();
1140 : }
1141 :
1142 : /* ----
1143 : * Manipulation of on-disk state of replication slots
1144 : *
1145 : * NB: none of the routines below should take any notice whether a slot is the
1146 : * current one or not, that's all handled a layer above.
1147 : * ----
1148 : */
1149 : static void
1150 0 : CreateSlotOnDisk(ReplicationSlot *slot)
1151 : {
1152 : char tmppath[MAXPGPATH];
1153 : char path[MAXPGPATH];
1154 : struct stat st;
1155 :
1156 : /*
1157 : * No need to take out the io_in_progress_lock, nobody else can see this
1158 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1159 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1160 : */
1161 :
1162 0 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1163 0 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1164 :
1165 : /*
1166 : * It's just barely possible that some previous effort to create or drop a
1167 : * slot with this name left a temp directory lying around. If that seems
1168 : * to be the case, try to remove it. If the rmtree() fails, we'll error
1169 : * out at the mkdir() below, so we don't bother checking success.
1170 : */
1171 0 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1172 0 : rmtree(tmppath, true);
1173 :
1174 : /* Create and fsync the temporary slot directory. */
1175 0 : if (mkdir(tmppath, S_IRWXU) < 0)
1176 0 : ereport(ERROR,
1177 : (errcode_for_file_access(),
1178 : errmsg("could not create directory \"%s\": %m",
1179 : tmppath)));
1180 0 : fsync_fname(tmppath, true);
1181 :
1182 : /* Write the actual state file. */
1183 0 : slot->dirty = true; /* signal that we really need to write */
1184 0 : SaveSlotToPath(slot, tmppath, ERROR);
1185 :
1186 : /* Rename the directory into place. */
1187 0 : if (rename(tmppath, path) != 0)
1188 0 : ereport(ERROR,
1189 : (errcode_for_file_access(),
1190 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1191 : tmppath, path)));
1192 :
1193 : /*
1194 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
1195 : * would persist after an OS crash or not - so, force a restart. The
1196 : * restart would try to fsync this again till it works.
1197 : */
1198 0 : START_CRIT_SECTION();
1199 :
1200 0 : fsync_fname(path, true);
1201 0 : fsync_fname("pg_replslot", true);
1202 :
1203 0 : END_CRIT_SECTION();
1204 0 : }
1205 :
1206 : /*
1207 : * Shared functionality between saving and creating a replication slot.
1208 : */
1209 : static void
1210 0 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1211 : {
1212 : char tmppath[MAXPGPATH];
1213 : char path[MAXPGPATH];
1214 : int fd;
1215 : ReplicationSlotOnDisk cp;
1216 : bool was_dirty;
1217 :
1218 : /* first check whether there's something to write out */
1219 0 : SpinLockAcquire(&slot->mutex);
1220 0 : was_dirty = slot->dirty;
1221 0 : slot->just_dirtied = false;
1222 0 : SpinLockRelease(&slot->mutex);
1223 :
1224 : /* and don't do anything if there's nothing to write */
1225 0 : if (!was_dirty)
1226 0 : return;
1227 :
1228 0 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1229 :
1230 : /* silence valgrind :( */
1231 0 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1232 :
1233 0 : sprintf(tmppath, "%s/state.tmp", dir);
1234 0 : sprintf(path, "%s/state", dir);
1235 :
1236 0 : fd = OpenTransientFile(tmppath,
1237 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1238 : S_IRUSR | S_IWUSR);
1239 0 : if (fd < 0)
1240 : {
1241 0 : ereport(elevel,
1242 : (errcode_for_file_access(),
1243 : errmsg("could not create file \"%s\": %m",
1244 : tmppath)));
1245 0 : return;
1246 : }
1247 :
1248 0 : cp.magic = SLOT_MAGIC;
1249 0 : INIT_CRC32C(cp.checksum);
1250 0 : cp.version = SLOT_VERSION;
1251 0 : cp.length = ReplicationSlotOnDiskV2Size;
1252 :
1253 0 : SpinLockAcquire(&slot->mutex);
1254 :
1255 0 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1256 :
1257 0 : SpinLockRelease(&slot->mutex);
1258 :
1259 0 : COMP_CRC32C(cp.checksum,
1260 : (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1261 : SnapBuildOnDiskChecksummedSize);
1262 0 : FIN_CRC32C(cp.checksum);
1263 :
1264 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1265 0 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1266 : {
1267 0 : int save_errno = errno;
1268 :
1269 0 : pgstat_report_wait_end();
1270 0 : CloseTransientFile(fd);
1271 0 : errno = save_errno;
1272 0 : ereport(elevel,
1273 : (errcode_for_file_access(),
1274 : errmsg("could not write to file \"%s\": %m",
1275 : tmppath)));
1276 0 : return;
1277 : }
1278 0 : pgstat_report_wait_end();
1279 :
1280 : /* fsync the temporary file */
1281 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1282 0 : if (pg_fsync(fd) != 0)
1283 : {
1284 0 : int save_errno = errno;
1285 :
1286 0 : pgstat_report_wait_end();
1287 0 : CloseTransientFile(fd);
1288 0 : errno = save_errno;
1289 0 : ereport(elevel,
1290 : (errcode_for_file_access(),
1291 : errmsg("could not fsync file \"%s\": %m",
1292 : tmppath)));
1293 0 : return;
1294 : }
1295 0 : pgstat_report_wait_end();
1296 :
1297 0 : CloseTransientFile(fd);
1298 :
1299 : /* rename to permanent file, fsync file and directory */
1300 0 : if (rename(tmppath, path) != 0)
1301 : {
1302 0 : ereport(elevel,
1303 : (errcode_for_file_access(),
1304 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1305 : tmppath, path)));
1306 0 : return;
1307 : }
1308 :
1309 : /* Check CreateSlot() for the reasoning of using a crit. section. */
1310 0 : START_CRIT_SECTION();
1311 :
1312 0 : fsync_fname(path, false);
1313 0 : fsync_fname(dir, true);
1314 0 : fsync_fname("pg_replslot", true);
1315 :
1316 0 : END_CRIT_SECTION();
1317 :
1318 : /*
1319 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
1320 : * already.
1321 : */
1322 0 : SpinLockAcquire(&slot->mutex);
1323 0 : if (!slot->just_dirtied)
1324 0 : slot->dirty = false;
1325 0 : SpinLockRelease(&slot->mutex);
1326 :
1327 0 : LWLockRelease(&slot->io_in_progress_lock);
1328 : }
1329 :
1330 : /*
1331 : * Load a single slot from disk into memory.
1332 : */
1333 : static void
1334 0 : RestoreSlotFromDisk(const char *name)
1335 : {
1336 : ReplicationSlotOnDisk cp;
1337 : int i;
1338 : char path[MAXPGPATH + 22];
1339 : int fd;
1340 0 : bool restored = false;
1341 : int readBytes;
1342 : pg_crc32c checksum;
1343 :
1344 : /* no need to lock here, no concurrent access allowed yet */
1345 :
1346 : /* delete temp file if it exists */
1347 0 : sprintf(path, "pg_replslot/%s/state.tmp", name);
1348 0 : if (unlink(path) < 0 && errno != ENOENT)
1349 0 : ereport(PANIC,
1350 : (errcode_for_file_access(),
1351 : errmsg("could not remove file \"%s\": %m", path)));
1352 :
1353 0 : sprintf(path, "pg_replslot/%s/state", name);
1354 :
1355 0 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1356 :
1357 0 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1358 :
1359 : /*
1360 : * We do not need to handle this as we are rename()ing the directory into
1361 : * place only after we fsync()ed the state file.
1362 : */
1363 0 : if (fd < 0)
1364 0 : ereport(PANIC,
1365 : (errcode_for_file_access(),
1366 : errmsg("could not open file \"%s\": %m", path)));
1367 :
1368 : /*
1369 : * Sync state file before we're reading from it. We might have crashed
1370 : * while it wasn't synced yet and we shouldn't continue on that basis.
1371 : */
1372 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1373 0 : if (pg_fsync(fd) != 0)
1374 : {
1375 0 : CloseTransientFile(fd);
1376 0 : ereport(PANIC,
1377 : (errcode_for_file_access(),
1378 : errmsg("could not fsync file \"%s\": %m",
1379 : path)));
1380 : }
1381 0 : pgstat_report_wait_end();
1382 :
1383 : /* Also sync the parent directory */
1384 0 : START_CRIT_SECTION();
1385 0 : fsync_fname(path, true);
1386 0 : END_CRIT_SECTION();
1387 :
1388 : /* read part of statefile that's guaranteed to be version independent */
1389 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1390 0 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1391 0 : pgstat_report_wait_end();
1392 0 : if (readBytes != ReplicationSlotOnDiskConstantSize)
1393 : {
1394 0 : int saved_errno = errno;
1395 :
1396 0 : CloseTransientFile(fd);
1397 0 : errno = saved_errno;
1398 0 : ereport(PANIC,
1399 : (errcode_for_file_access(),
1400 : errmsg("could not read file \"%s\", read %d of %u: %m",
1401 : path, readBytes,
1402 : (uint32) ReplicationSlotOnDiskConstantSize)));
1403 : }
1404 :
1405 : /* verify magic */
1406 0 : if (cp.magic != SLOT_MAGIC)
1407 0 : ereport(PANIC,
1408 : (errcode_for_file_access(),
1409 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1410 : path, cp.magic, SLOT_MAGIC)));
1411 :
1412 : /* verify version */
1413 0 : if (cp.version != SLOT_VERSION)
1414 0 : ereport(PANIC,
1415 : (errcode_for_file_access(),
1416 : errmsg("replication slot file \"%s\" has unsupported version %u",
1417 : path, cp.version)));
1418 :
1419 : /* boundary check on length */
1420 0 : if (cp.length != ReplicationSlotOnDiskV2Size)
1421 0 : ereport(PANIC,
1422 : (errcode_for_file_access(),
1423 : errmsg("replication slot file \"%s\" has corrupted length %u",
1424 : path, cp.length)));
1425 :
1426 : /* Now that we know the size, read the entire file */
1427 0 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1428 0 : readBytes = read(fd,
1429 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
1430 : cp.length);
1431 0 : pgstat_report_wait_end();
1432 0 : if (readBytes != cp.length)
1433 : {
1434 0 : int saved_errno = errno;
1435 :
1436 0 : CloseTransientFile(fd);
1437 0 : errno = saved_errno;
1438 0 : ereport(PANIC,
1439 : (errcode_for_file_access(),
1440 : errmsg("could not read file \"%s\", read %d of %u: %m",
1441 : path, readBytes, cp.length)));
1442 : }
1443 :
1444 0 : CloseTransientFile(fd);
1445 :
1446 : /* now verify the CRC */
1447 0 : INIT_CRC32C(checksum);
1448 0 : COMP_CRC32C(checksum,
1449 : (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1450 : SnapBuildOnDiskChecksummedSize);
1451 0 : FIN_CRC32C(checksum);
1452 :
1453 0 : if (!EQ_CRC32C(checksum, cp.checksum))
1454 0 : ereport(PANIC,
1455 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1456 : path, checksum, cp.checksum)));
1457 :
1458 : /*
1459 : * If we crashed with an ephemeral slot active, don't restore but delete
1460 : * it.
1461 : */
1462 0 : if (cp.slotdata.persistency != RS_PERSISTENT)
1463 : {
1464 0 : sprintf(path, "pg_replslot/%s", name);
1465 :
1466 0 : if (!rmtree(path, true))
1467 : {
1468 0 : ereport(WARNING,
1469 : (errcode_for_file_access(),
1470 : errmsg("could not remove directory \"%s\"", path)));
1471 : }
1472 0 : fsync_fname("pg_replslot", true);
1473 0 : return;
1474 : }
1475 :
1476 : /* nothing can be active yet, don't lock anything */
1477 0 : for (i = 0; i < max_replication_slots; i++)
1478 : {
1479 : ReplicationSlot *slot;
1480 :
1481 0 : slot = &ReplicationSlotCtl->replication_slots[i];
1482 :
1483 0 : if (slot->in_use)
1484 0 : continue;
1485 :
1486 : /* restore the entire set of persistent data */
1487 0 : memcpy(&slot->data, &cp.slotdata,
1488 : sizeof(ReplicationSlotPersistentData));
1489 :
1490 : /* initialize in memory state */
1491 0 : slot->effective_xmin = cp.slotdata.xmin;
1492 0 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1493 :
1494 0 : slot->candidate_catalog_xmin = InvalidTransactionId;
1495 0 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1496 0 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
1497 0 : slot->candidate_restart_valid = InvalidXLogRecPtr;
1498 :
1499 0 : slot->in_use = true;
1500 0 : slot->active_pid = 0;
1501 :
1502 0 : restored = true;
1503 0 : break;
1504 : }
1505 :
1506 0 : if (!restored)
1507 0 : ereport(PANIC,
1508 : (errmsg("too many replication slots active before shutdown"),
1509 : errhint("Increase max_replication_slots and try again.")));
1510 : }
|