1 /*-------------------------------------------------------------------------
4 * Replication slot management.
7 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
11 * src/backend/replication/slot.c
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
34 *-------------------------------------------------------------------------
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
54 * Replication slot on-disk data structure.
56 typedef struct ReplicationSlotOnDisk
58 /* first part of this struct needs to be version independent */
60 /* data not covered by checksum */
64 /* data covered by checksum */
69 * The actual data in the slot that follows can differ based on the above
73 ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78 offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81 offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
89 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
90 #define SLOT_VERSION 2 /* version for new files */
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
99 int max_replication_slots = 0; /* the maximum number of replication
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
111 * Report shared-memory space needed by ReplicationSlotShmemInit.
114 ReplicationSlotsShmemSize(void)
118 if (max_replication_slots == 0)
121 size = offsetof(ReplicationSlotCtlData, replication_slots);
122 size = add_size(size,
123 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
129 * Allocate and initialize walsender-related shared memory.
132 ReplicationSlotsShmemInit(void)
136 if (max_replication_slots == 0)
139 ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
143 LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 "replication_slot_io");
150 /* First time through, so initialize */
151 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
153 for (i = 0; i < max_replication_slots; i++)
155 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
157 /* everything else is zeroed by the memset above */
158 SpinLockInit(&slot->mutex);
159 LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
165 * Check whether the passed slot name is valid and report errors at elevel.
167 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
168 * the name to be used as a directory name on every supported OS.
170 * Returns whether the directory name is valid or not if elevel < ERROR.
173 ReplicationSlotValidateName(const char *name, int elevel)
177 if (strlen(name) == 0)
180 (errcode(ERRCODE_INVALID_NAME),
181 errmsg("replication slot name \"%s\" is too short",
186 if (strlen(name) >= NAMEDATALEN)
189 (errcode(ERRCODE_NAME_TOO_LONG),
190 errmsg("replication slot name \"%s\" is too long",
195 for (cp = name; *cp; cp++)
197 if (!((*cp >= 'a' && *cp <= 'z')
198 || (*cp >= '0' && *cp <= '9')
202 (errcode(ERRCODE_INVALID_NAME),
203 errmsg("replication slot name \"%s\" contains invalid character",
205 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
213 * Create a new replication slot and mark it as used by this backend.
215 * name: Name of the slot
216 * db_specific: logical decoding is db specific; if the slot is going to
217 * be used for that pass true, otherwise false.
220 ReplicationSlotCreate(const char *name, bool db_specific,
221 ReplicationSlotPersistency persistency)
223 ReplicationSlot *slot = NULL;
226 Assert(MyReplicationSlot == NULL);
228 ReplicationSlotValidateName(name, ERROR);
231 * If some other backend ran this code concurrently with us, we'd likely
232 * both allocate the same slot, and that would be bad. We'd also be at
233 * risk of missing a name collision. Also, we don't want to try to create
234 * a new slot while somebody's busy cleaning up an old one, because we
235 * might both be monkeying with the same directory.
237 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
240 * Check for name collision, and identify an allocatable slot. We need to
241 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
242 * else can change the in_use flags while we're looking at them.
244 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
245 for (i = 0; i < max_replication_slots; i++)
247 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 (errcode(ERRCODE_DUPLICATE_OBJECT),
252 errmsg("replication slot \"%s\" already exists", name)));
253 if (!s->in_use && slot == NULL)
256 LWLockRelease(ReplicationSlotControlLock);
258 /* If all slots are in use, we're out of luck. */
261 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
262 errmsg("all replication slots are in use"),
263 errhint("Free one or increase max_replication_slots.")));
266 * Since this slot is not in use, nobody should be looking at any part of
267 * it other than the in_use field unless they're trying to allocate it.
268 * And since we hold ReplicationSlotAllocationLock, nobody except us can
269 * be doing that. So it's safe to initialize the slot.
271 Assert(!slot->in_use);
272 Assert(slot->active_pid == 0);
274 /* first initialize persistent data */
275 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
276 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
277 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
278 slot->data.persistency = persistency;
280 /* and then data only present in shared memory */
281 slot->just_dirtied = false;
283 slot->effective_xmin = InvalidTransactionId;
284 slot->effective_catalog_xmin = InvalidTransactionId;
285 slot->candidate_catalog_xmin = InvalidTransactionId;
286 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
287 slot->candidate_restart_valid = InvalidXLogRecPtr;
288 slot->candidate_restart_lsn = InvalidXLogRecPtr;
291 * Create the slot on disk. We haven't actually marked the slot allocated
292 * yet, so no special cleanup is required if this errors out.
294 CreateSlotOnDisk(slot);
297 * We need to briefly prevent any other backend from iterating over the
298 * slots while we flip the in_use flag. We also need to set the active
299 * flag while holding the ControlLock as otherwise a concurrent
300 * SlotAcquire() could acquire the slot as well.
302 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
306 /* We can now mark the slot active, and that makes it our slot. */
307 SpinLockAcquire(&slot->mutex);
308 Assert(slot->active_pid == 0);
309 slot->active_pid = MyProcPid;
310 SpinLockRelease(&slot->mutex);
311 MyReplicationSlot = slot;
313 LWLockRelease(ReplicationSlotControlLock);
316 * Now that the slot has been marked as in_use and in_active, it's safe to
317 * let somebody else try to allocate a slot.
319 LWLockRelease(ReplicationSlotAllocationLock);
323 * Find a previously created slot and mark it as used by this backend.
326 ReplicationSlotAcquire(const char *name)
328 ReplicationSlot *slot = NULL;
330 int active_pid = 0; /* Keep compiler quiet */
332 Assert(MyReplicationSlot == NULL);
334 /* Search for the named slot and mark it active if we find it. */
335 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
336 for (i = 0; i < max_replication_slots; i++)
338 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
340 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
342 SpinLockAcquire(&s->mutex);
343 active_pid = s->active_pid;
345 active_pid = s->active_pid = MyProcPid;
346 SpinLockRelease(&s->mutex);
351 LWLockRelease(ReplicationSlotControlLock);
353 /* If we did not find the slot or it was already active, error out. */
356 (errcode(ERRCODE_UNDEFINED_OBJECT),
357 errmsg("replication slot \"%s\" does not exist", name)));
358 if (active_pid != MyProcPid)
360 (errcode(ERRCODE_OBJECT_IN_USE),
361 errmsg("replication slot \"%s\" is active for PID %d",
364 /* We made this slot active, so it's ours now. */
365 MyReplicationSlot = slot;
369 * Release a replication slot, this or another backend can ReAcquire it
370 * later. Resources this slot requires will be preserved.
373 ReplicationSlotRelease(void)
375 ReplicationSlot *slot = MyReplicationSlot;
377 Assert(slot != NULL && slot->active_pid != 0);
379 if (slot->data.persistency == RS_EPHEMERAL)
382 * Delete the slot. There is no !PANIC case where this is allowed to
383 * fail, all that may happen is an incomplete cleanup of the on-disk
386 ReplicationSlotDropAcquired();
388 else if (slot->data.persistency == RS_PERSISTENT)
391 * Mark persistent slot inactive. We're not freeing it, just
394 SpinLockAcquire(&slot->mutex);
395 slot->active_pid = 0;
396 SpinLockRelease(&slot->mutex);
401 * If slot needed to temporarily restrain both data and catalog xmin to
402 * create the catalog snapshot, remove that temporary constraint.
403 * Snapshots can only be exported while the initial snapshot is still
406 if (!TransactionIdIsValid(slot->data.xmin) &&
407 TransactionIdIsValid(slot->effective_xmin))
409 SpinLockAcquire(&slot->mutex);
410 slot->effective_xmin = InvalidTransactionId;
411 SpinLockRelease(&slot->mutex);
412 ReplicationSlotsComputeRequiredXmin(false);
415 MyReplicationSlot = NULL;
417 /* might not have been set when we've been a plain slot */
418 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
419 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
420 LWLockRelease(ProcArrayLock);
424 * Cleanup all temporary slots created in current session.
427 ReplicationSlotCleanup(void)
431 Assert(MyReplicationSlot == NULL);
434 * No need for locking as we are only interested in slots active in
435 * current process and those are not touched by other processes.
437 for (i = 0; i < max_replication_slots; i++)
439 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
441 if (s->active_pid == MyProcPid)
443 Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
445 ReplicationSlotDropPtr(s);
451 * Permanently drop replication slot identified by the passed in name.
454 ReplicationSlotDrop(const char *name)
456 Assert(MyReplicationSlot == NULL);
458 ReplicationSlotAcquire(name);
460 ReplicationSlotDropAcquired();
464 * Permanently drop the currently acquired replication slot.
467 ReplicationSlotDropAcquired(void)
469 ReplicationSlot *slot = MyReplicationSlot;
471 Assert(MyReplicationSlot != NULL);
473 /* slot isn't acquired anymore */
474 MyReplicationSlot = NULL;
476 ReplicationSlotDropPtr(slot);
480 * Permanently drop the replication slot which will be released by the point
481 * this function returns.
484 ReplicationSlotDropPtr(ReplicationSlot *slot)
486 char path[MAXPGPATH];
487 char tmppath[MAXPGPATH];
490 * If some other backend ran this code concurrently with us, we might try
491 * to delete a slot with a certain name while someone else was trying to
492 * create a slot with the same name.
494 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
496 /* Generate pathnames. */
497 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
498 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
501 * Rename the slot directory on disk, so that we'll no longer recognize
502 * this as a valid slot. Note that if this fails, we've got to mark the
503 * slot inactive before bailing out. If we're dropping an ephemeral or a
504 * temporary slot, we better never fail hard as the caller won't expect
505 * the slot to survive and this might get called during error handling.
507 if (rename(path, tmppath) == 0)
510 * We need to fsync() the directory we just renamed and its parent to
511 * make sure that our changes are on disk in a crash-safe fashion. If
512 * fsync() fails, we can't be sure whether the changes are on disk or
513 * not. For now, we handle that by panicking;
514 * StartupReplicationSlots() will try to straighten it out after
517 START_CRIT_SECTION();
518 fsync_fname(tmppath, true);
519 fsync_fname("pg_replslot", true);
524 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
526 SpinLockAcquire(&slot->mutex);
527 slot->active_pid = 0;
528 SpinLockRelease(&slot->mutex);
530 ereport(fail_softly ? WARNING : ERROR,
531 (errcode_for_file_access(),
532 errmsg("could not rename file \"%s\" to \"%s\": %m",
537 * The slot is definitely gone. Lock out concurrent scans of the array
538 * long enough to kill it. It's OK to clear the active flag here without
539 * grabbing the mutex because nobody else can be scanning the array here,
540 * and nobody can be attached to this slot and thus access it without
541 * scanning the array.
543 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
544 slot->active_pid = 0;
545 slot->in_use = false;
546 LWLockRelease(ReplicationSlotControlLock);
549 * Slot is dead and doesn't prevent resource removal anymore, recompute
552 ReplicationSlotsComputeRequiredXmin(false);
553 ReplicationSlotsComputeRequiredLSN();
556 * If removing the directory fails, the worst thing that will happen is
557 * that the user won't be able to create a new slot with the same name
558 * until the next server restart. We warn about it, but that's all.
560 if (!rmtree(tmppath, true))
562 (errcode_for_file_access(),
563 errmsg("could not remove directory \"%s\"", tmppath)));
566 * We release this at the very end, so that nobody starts trying to create
567 * a slot while we're still cleaning up the detritus of the old one.
569 LWLockRelease(ReplicationSlotAllocationLock);
573 * Serialize the currently acquired slot's state from memory to disk, thereby
574 * guaranteeing the current state will survive a crash.
577 ReplicationSlotSave(void)
579 char path[MAXPGPATH];
581 Assert(MyReplicationSlot != NULL);
583 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
584 SaveSlotToPath(MyReplicationSlot, path, ERROR);
588 * Signal that it would be useful if the currently acquired slot would be
589 * flushed out to disk.
591 * Note that the actual flush to disk can be delayed for a long time, if
592 * required for correctness explicitly do a ReplicationSlotSave().
595 ReplicationSlotMarkDirty(void)
597 ReplicationSlot *slot = MyReplicationSlot;
599 Assert(MyReplicationSlot != NULL);
601 SpinLockAcquire(&slot->mutex);
602 MyReplicationSlot->just_dirtied = true;
603 MyReplicationSlot->dirty = true;
604 SpinLockRelease(&slot->mutex);
608 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
609 * guaranteeing it will be there after an eventual crash.
612 ReplicationSlotPersist(void)
614 ReplicationSlot *slot = MyReplicationSlot;
616 Assert(slot != NULL);
617 Assert(slot->data.persistency != RS_PERSISTENT);
619 SpinLockAcquire(&slot->mutex);
620 slot->data.persistency = RS_PERSISTENT;
621 SpinLockRelease(&slot->mutex);
623 ReplicationSlotMarkDirty();
624 ReplicationSlotSave();
628 * Compute the oldest xmin across all slots and store it in the ProcArray.
630 * If already_locked is true, ProcArrayLock has already been acquired
634 ReplicationSlotsComputeRequiredXmin(bool already_locked)
637 TransactionId agg_xmin = InvalidTransactionId;
638 TransactionId agg_catalog_xmin = InvalidTransactionId;
640 Assert(ReplicationSlotCtl != NULL);
642 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
644 for (i = 0; i < max_replication_slots; i++)
646 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
647 TransactionId effective_xmin;
648 TransactionId effective_catalog_xmin;
653 SpinLockAcquire(&s->mutex);
654 effective_xmin = s->effective_xmin;
655 effective_catalog_xmin = s->effective_catalog_xmin;
656 SpinLockRelease(&s->mutex);
658 /* check the data xmin */
659 if (TransactionIdIsValid(effective_xmin) &&
660 (!TransactionIdIsValid(agg_xmin) ||
661 TransactionIdPrecedes(effective_xmin, agg_xmin)))
662 agg_xmin = effective_xmin;
664 /* check the catalog xmin */
665 if (TransactionIdIsValid(effective_catalog_xmin) &&
666 (!TransactionIdIsValid(agg_catalog_xmin) ||
667 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
668 agg_catalog_xmin = effective_catalog_xmin;
671 LWLockRelease(ReplicationSlotControlLock);
673 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
677 * Compute the oldest restart LSN across all slots and inform xlog module.
680 ReplicationSlotsComputeRequiredLSN(void)
683 XLogRecPtr min_required = InvalidXLogRecPtr;
685 Assert(ReplicationSlotCtl != NULL);
687 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
688 for (i = 0; i < max_replication_slots; i++)
690 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
691 XLogRecPtr restart_lsn;
696 SpinLockAcquire(&s->mutex);
697 restart_lsn = s->data.restart_lsn;
698 SpinLockRelease(&s->mutex);
700 if (restart_lsn != InvalidXLogRecPtr &&
701 (min_required == InvalidXLogRecPtr ||
702 restart_lsn < min_required))
703 min_required = restart_lsn;
705 LWLockRelease(ReplicationSlotControlLock);
707 XLogSetReplicationSlotMinimumLSN(min_required);
711 * Compute the oldest WAL LSN required by *logical* decoding slots..
713 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
716 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
717 * ignores physical replication slots.
719 * The results aren't required frequently, so we don't maintain a precomputed
720 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
723 ReplicationSlotsComputeLogicalRestartLSN(void)
725 XLogRecPtr result = InvalidXLogRecPtr;
728 if (max_replication_slots <= 0)
729 return InvalidXLogRecPtr;
731 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
733 for (i = 0; i < max_replication_slots; i++)
736 XLogRecPtr restart_lsn;
738 s = &ReplicationSlotCtl->replication_slots[i];
740 /* cannot change while ReplicationSlotCtlLock is held */
744 /* we're only interested in logical slots */
745 if (!SlotIsLogical(s))
748 /* read once, it's ok if it increases while we're checking */
749 SpinLockAcquire(&s->mutex);
750 restart_lsn = s->data.restart_lsn;
751 SpinLockRelease(&s->mutex);
753 if (result == InvalidXLogRecPtr ||
754 restart_lsn < result)
755 result = restart_lsn;
758 LWLockRelease(ReplicationSlotControlLock);
764 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
765 * passed database oid.
767 * Returns true if there are any slots referencing the database. *nslots will
768 * be set to the absolute number of slots in the database, *nactive to ones
772 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
776 *nslots = *nactive = 0;
778 if (max_replication_slots <= 0)
781 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
782 for (i = 0; i < max_replication_slots; i++)
786 s = &ReplicationSlotCtl->replication_slots[i];
788 /* cannot change while ReplicationSlotCtlLock is held */
792 /* only logical slots are database specific, skip */
793 if (!SlotIsLogical(s))
796 /* not our database, skip */
797 if (s->data.database != dboid)
800 /* count slots with spinlock held */
801 SpinLockAcquire(&s->mutex);
803 if (s->active_pid != 0)
805 SpinLockRelease(&s->mutex);
807 LWLockRelease(ReplicationSlotControlLock);
815 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
816 * passed database oid. The caller should hold an exclusive lock on the
817 * pg_database oid for the database to prevent creation of new slots on the db
818 * or replay from existing slots.
820 * Another session that concurrently acquires an existing slot on the target DB
821 * (most likely to drop it) may cause this function to ERROR. If that happens
822 * it may have dropped some but not all slots.
824 * This routine isn't as efficient as it could be - but we don't drop
825 * databases often, especially databases with lots of slots.
828 ReplicationSlotsDropDBSlots(Oid dboid)
832 if (max_replication_slots <= 0)
836 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
837 for (i = 0; i < max_replication_slots; i++)
843 s = &ReplicationSlotCtl->replication_slots[i];
845 /* cannot change while ReplicationSlotCtlLock is held */
849 /* only logical slots are database specific, skip */
850 if (!SlotIsLogical(s))
853 /* not our database, skip */
854 if (s->data.database != dboid)
857 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
858 SpinLockAcquire(&s->mutex);
859 /* can't change while ReplicationSlotControlLock is held */
860 slotname = NameStr(s->data.name);
861 active_pid = s->active_pid;
864 MyReplicationSlot = s;
865 s->active_pid = MyProcPid;
867 SpinLockRelease(&s->mutex);
870 * Even though we hold an exclusive lock on the database object a
871 * logical slot for that DB can still be active, e.g. if it's
872 * concurrently being dropped by a backend connected to another DB.
874 * That's fairly unlikely in practice, so we'll just bail out.
878 (errcode(ERRCODE_OBJECT_IN_USE),
879 errmsg("replication slot \"%s\" is active for PID %d",
880 slotname, active_pid)));
883 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
884 * holding ReplicationSlotControlLock over filesystem operations,
885 * release ReplicationSlotControlLock and use
886 * ReplicationSlotDropAcquired.
888 * As that means the set of slots could change, restart scan from the
889 * beginning each time we release the lock.
891 LWLockRelease(ReplicationSlotControlLock);
892 ReplicationSlotDropAcquired();
895 LWLockRelease(ReplicationSlotControlLock);
900 * Check whether the server's configuration supports using replication
904 CheckSlotRequirements(void)
906 if (max_replication_slots == 0)
908 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
909 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
911 if (wal_level < WAL_LEVEL_REPLICA)
913 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
914 errmsg("replication slots can only be used if wal_level >= replica")));
918 * Reserve WAL for the currently active slot.
920 * Compute and set restart_lsn in a manner that's appropriate for the type of
921 * the slot and concurrency safe.
924 ReplicationSlotReserveWal(void)
926 ReplicationSlot *slot = MyReplicationSlot;
928 Assert(slot != NULL);
929 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
932 * The replication slot mechanism is used to prevent removal of required
933 * WAL. As there is no interlock between this routine and checkpoints, WAL
934 * segments could concurrently be removed when a now stale return value of
935 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
936 * this happens we'll just retry.
943 * For logical slots log a standby snapshot and start logical decoding
944 * at exactly that position. That allows the slot to start up more
947 * That's not needed (or indeed helpful) for physical slots as they'll
948 * start replay at the last logged checkpoint anyway. Instead return
949 * the location of the last redo LSN. While that slightly increases
950 * the chance that we have to retry, it's where a base backup has to
953 if (!RecoveryInProgress() && SlotIsLogical(slot))
957 /* start at current insert position */
958 slot->data.restart_lsn = GetXLogInsertRecPtr();
960 /* make sure we have enough information to start */
961 flushptr = LogStandbySnapshot();
963 /* and make sure it's fsynced to disk */
968 slot->data.restart_lsn = GetRedoRecPtr();
971 /* prevent WAL removal as fast as possible */
972 ReplicationSlotsComputeRequiredLSN();
975 * If all required WAL is still there, great, otherwise retry. The
976 * slot should prevent further removal of WAL, unless there's a
977 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
978 * the new restart_lsn above, so normally we should never need to loop
981 XLByteToSeg(slot->data.restart_lsn, segno);
982 if (XLogGetLastRemovedSegno() < segno)
988 * Flush all replication slots to disk.
990 * This needn't actually be part of a checkpoint, but it's a convenient
994 CheckPointReplicationSlots(void)
998 elog(DEBUG1, "performing replication slot checkpoint");
1001 * Prevent any slot from being created/dropped while we're active. As we
1002 * explicitly do *not* want to block iterating over replication_slots or
1003 * acquiring a slot we cannot take the control lock - but that's OK,
1004 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1005 * enough to guarantee that nobody can change the in_use bits on us.
1007 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1009 for (i = 0; i < max_replication_slots; i++)
1011 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1012 char path[MAXPGPATH];
1017 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1018 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1019 SaveSlotToPath(s, path, LOG);
1021 LWLockRelease(ReplicationSlotAllocationLock);
1025 * Load all replication slots from disk into memory at server startup. This
1026 * needs to be run before we start crash recovery.
1029 StartupReplicationSlots(void)
1031 DIR *replication_dir;
1032 struct dirent *replication_de;
1034 elog(DEBUG1, "starting up replication slots");
1036 /* restore all slots by iterating over all on-disk entries */
1037 replication_dir = AllocateDir("pg_replslot");
1038 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1040 struct stat statbuf;
1041 char path[MAXPGPATH + 12];
1043 if (strcmp(replication_de->d_name, ".") == 0 ||
1044 strcmp(replication_de->d_name, "..") == 0)
1047 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1049 /* we're only creating directories here, skip if it's not our's */
1050 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1053 /* we crashed while a slot was being setup or deleted, clean up */
1054 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1056 if (!rmtree(path, true))
1059 (errcode_for_file_access(),
1060 errmsg("could not remove directory \"%s\"", path)));
1063 fsync_fname("pg_replslot", true);
1067 /* looks like a slot in a normal state, restore */
1068 RestoreSlotFromDisk(replication_de->d_name);
1070 FreeDir(replication_dir);
1072 /* currently no slots exist, we're done. */
1073 if (max_replication_slots <= 0)
1076 /* Now that we have recovered all the data, compute replication xmin */
1077 ReplicationSlotsComputeRequiredXmin(false);
1078 ReplicationSlotsComputeRequiredLSN();
1082 * Manipulation of on-disk state of replication slots
1084 * NB: none of the routines below should take any notice whether a slot is the
1085 * current one or not, that's all handled a layer above.
1089 CreateSlotOnDisk(ReplicationSlot *slot)
1091 char tmppath[MAXPGPATH];
1092 char path[MAXPGPATH];
1096 * No need to take out the io_in_progress_lock, nobody else can see this
1097 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1098 * takes out the lock, if we'd take the lock here, we'd deadlock.
1101 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1102 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1105 * It's just barely possible that some previous effort to create or drop a
1106 * slot with this name left a temp directory lying around. If that seems
1107 * to be the case, try to remove it. If the rmtree() fails, we'll error
1108 * out at the mkdir() below, so we don't bother checking success.
1110 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1111 rmtree(tmppath, true);
1113 /* Create and fsync the temporary slot directory. */
1114 if (mkdir(tmppath, S_IRWXU) < 0)
1116 (errcode_for_file_access(),
1117 errmsg("could not create directory \"%s\": %m",
1119 fsync_fname(tmppath, true);
1121 /* Write the actual state file. */
1122 slot->dirty = true; /* signal that we really need to write */
1123 SaveSlotToPath(slot, tmppath, ERROR);
1125 /* Rename the directory into place. */
1126 if (rename(tmppath, path) != 0)
1128 (errcode_for_file_access(),
1129 errmsg("could not rename file \"%s\" to \"%s\": %m",
1133 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1134 * would persist after an OS crash or not - so, force a restart. The
1135 * restart would try to fsync this again till it works.
1137 START_CRIT_SECTION();
1139 fsync_fname(path, true);
1140 fsync_fname("pg_replslot", true);
1146 * Shared functionality between saving and creating a replication slot.
1149 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1151 char tmppath[MAXPGPATH];
1152 char path[MAXPGPATH];
1154 ReplicationSlotOnDisk cp;
1157 /* first check whether there's something to write out */
1158 SpinLockAcquire(&slot->mutex);
1159 was_dirty = slot->dirty;
1160 slot->just_dirtied = false;
1161 SpinLockRelease(&slot->mutex);
1163 /* and don't do anything if there's nothing to write */
1167 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1169 /* silence valgrind :( */
1170 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1172 sprintf(tmppath, "%s/state.tmp", dir);
1173 sprintf(path, "%s/state", dir);
1175 fd = OpenTransientFile(tmppath,
1176 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1181 (errcode_for_file_access(),
1182 errmsg("could not create file \"%s\": %m",
1187 cp.magic = SLOT_MAGIC;
1188 INIT_CRC32C(cp.checksum);
1189 cp.version = SLOT_VERSION;
1190 cp.length = ReplicationSlotOnDiskV2Size;
1192 SpinLockAcquire(&slot->mutex);
1194 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1196 SpinLockRelease(&slot->mutex);
1198 COMP_CRC32C(cp.checksum,
1199 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1200 SnapBuildOnDiskChecksummedSize);
1201 FIN_CRC32C(cp.checksum);
1203 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1204 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1206 int save_errno = errno;
1208 pgstat_report_wait_end();
1209 CloseTransientFile(fd);
1212 (errcode_for_file_access(),
1213 errmsg("could not write to file \"%s\": %m",
1217 pgstat_report_wait_end();
1219 /* fsync the temporary file */
1220 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1221 if (pg_fsync(fd) != 0)
1223 int save_errno = errno;
1225 pgstat_report_wait_end();
1226 CloseTransientFile(fd);
1229 (errcode_for_file_access(),
1230 errmsg("could not fsync file \"%s\": %m",
1234 pgstat_report_wait_end();
1236 CloseTransientFile(fd);
1238 /* rename to permanent file, fsync file and directory */
1239 if (rename(tmppath, path) != 0)
1242 (errcode_for_file_access(),
1243 errmsg("could not rename file \"%s\" to \"%s\": %m",
1248 /* Check CreateSlot() for the reasoning of using a crit. section. */
1249 START_CRIT_SECTION();
1251 fsync_fname(path, false);
1252 fsync_fname(dir, true);
1253 fsync_fname("pg_replslot", true);
1258 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1261 SpinLockAcquire(&slot->mutex);
1262 if (!slot->just_dirtied)
1263 slot->dirty = false;
1264 SpinLockRelease(&slot->mutex);
1266 LWLockRelease(&slot->io_in_progress_lock);
1270 * Load a single slot from disk into memory.
1273 RestoreSlotFromDisk(const char *name)
1275 ReplicationSlotOnDisk cp;
1277 char path[MAXPGPATH + 22];
1279 bool restored = false;
1283 /* no need to lock here, no concurrent access allowed yet */
1285 /* delete temp file if it exists */
1286 sprintf(path, "pg_replslot/%s/state.tmp", name);
1287 if (unlink(path) < 0 && errno != ENOENT)
1289 (errcode_for_file_access(),
1290 errmsg("could not remove file \"%s\": %m", path)));
1292 sprintf(path, "pg_replslot/%s/state", name);
1294 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1296 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1299 * We do not need to handle this as we are rename()ing the directory into
1300 * place only after we fsync()ed the state file.
1304 (errcode_for_file_access(),
1305 errmsg("could not open file \"%s\": %m", path)));
1308 * Sync state file before we're reading from it. We might have crashed
1309 * while it wasn't synced yet and we shouldn't continue on that basis.
1311 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1312 if (pg_fsync(fd) != 0)
1314 CloseTransientFile(fd);
1316 (errcode_for_file_access(),
1317 errmsg("could not fsync file \"%s\": %m",
1320 pgstat_report_wait_end();
1322 /* Also sync the parent directory */
1323 START_CRIT_SECTION();
1324 fsync_fname(path, true);
1327 /* read part of statefile that's guaranteed to be version independent */
1328 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1329 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1330 pgstat_report_wait_end();
1331 if (readBytes != ReplicationSlotOnDiskConstantSize)
1333 int saved_errno = errno;
1335 CloseTransientFile(fd);
1336 errno = saved_errno;
1338 (errcode_for_file_access(),
1339 errmsg("could not read file \"%s\", read %d of %u: %m",
1341 (uint32) ReplicationSlotOnDiskConstantSize)));
1345 if (cp.magic != SLOT_MAGIC)
1347 (errcode_for_file_access(),
1348 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1349 path, cp.magic, SLOT_MAGIC)));
1351 /* verify version */
1352 if (cp.version != SLOT_VERSION)
1354 (errcode_for_file_access(),
1355 errmsg("replication slot file \"%s\" has unsupported version %u",
1356 path, cp.version)));
1358 /* boundary check on length */
1359 if (cp.length != ReplicationSlotOnDiskV2Size)
1361 (errcode_for_file_access(),
1362 errmsg("replication slot file \"%s\" has corrupted length %u",
1365 /* Now that we know the size, read the entire file */
1366 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1367 readBytes = read(fd,
1368 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1370 pgstat_report_wait_end();
1371 if (readBytes != cp.length)
1373 int saved_errno = errno;
1375 CloseTransientFile(fd);
1376 errno = saved_errno;
1378 (errcode_for_file_access(),
1379 errmsg("could not read file \"%s\", read %d of %u: %m",
1380 path, readBytes, cp.length)));
1383 CloseTransientFile(fd);
1385 /* now verify the CRC */
1386 INIT_CRC32C(checksum);
1387 COMP_CRC32C(checksum,
1388 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1389 SnapBuildOnDiskChecksummedSize);
1390 FIN_CRC32C(checksum);
1392 if (!EQ_CRC32C(checksum, cp.checksum))
1394 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1395 path, checksum, cp.checksum)));
1398 * If we crashed with an ephemeral slot active, don't restore but delete
1401 if (cp.slotdata.persistency != RS_PERSISTENT)
1403 sprintf(path, "pg_replslot/%s", name);
1405 if (!rmtree(path, true))
1408 (errcode_for_file_access(),
1409 errmsg("could not remove directory \"%s\"", path)));
1411 fsync_fname("pg_replslot", true);
1415 /* nothing can be active yet, don't lock anything */
1416 for (i = 0; i < max_replication_slots; i++)
1418 ReplicationSlot *slot;
1420 slot = &ReplicationSlotCtl->replication_slots[i];
1425 /* restore the entire set of persistent data */
1426 memcpy(&slot->data, &cp.slotdata,
1427 sizeof(ReplicationSlotPersistentData));
1429 /* initialize in memory state */
1430 slot->effective_xmin = cp.slotdata.xmin;
1431 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1433 slot->candidate_catalog_xmin = InvalidTransactionId;
1434 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1435 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1436 slot->candidate_restart_valid = InvalidXLogRecPtr;
1438 slot->in_use = true;
1439 slot->active_pid = 0;
1447 (errmsg("too many replication slots active before shutdown"),
1448 errhint("Increase max_replication_slots and try again.")));