1 /*-------------------------------------------------------------------------
4 * Replication slot management.
7 * Copyright (c) 2012-2015, PostgreSQL Global Development Group
11 * src/backend/replication/slot.c
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
34 *-------------------------------------------------------------------------
42 #include "access/transam.h"
43 #include "common/string.h"
44 #include "miscadmin.h"
45 #include "replication/slot.h"
46 #include "storage/fd.h"
47 #include "storage/proc.h"
48 #include "storage/procarray.h"
51 * Replication slot on-disk data structure.
53 typedef struct ReplicationSlotOnDisk
55 /* first part of this struct needs to be version independent */
57 /* data not covered by checksum */
61 /* data covered by checksum */
66 * The actual data in the slot that follows can differ based on the above
70 ReplicationSlotPersistentData slotdata;
71 } ReplicationSlotOnDisk;
73 /* size of version independent data */
74 #define ReplicationSlotOnDiskConstantSize \
75 offsetof(ReplicationSlotOnDisk, slotdata)
76 /* size of the part of the slot not covered by the checksum */
77 #define SnapBuildOnDiskNotChecksummedSize \
78 offsetof(ReplicationSlotOnDisk, version)
79 /* size of the part covered by the checksum */
80 #define SnapBuildOnDiskChecksummedSize \
81 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
82 /* size of the slot data that is version dependent */
83 #define ReplicationSlotOnDiskV2Size \
84 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
86 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
87 #define SLOT_VERSION 2 /* version for new files */
89 /* Control array for replication slot management */
90 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
92 /* My backend's replication slot in the shared memory array */
93 ReplicationSlot *MyReplicationSlot = NULL;
96 int max_replication_slots = 0; /* the maximum number of replication
99 static void ReplicationSlotDropAcquired(void);
101 /* internal persistency functions */
102 static void RestoreSlotFromDisk(const char *name);
103 static void CreateSlotOnDisk(ReplicationSlot *slot);
104 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
107 * Report shared-memory space needed by ReplicationSlotShmemInit.
110 ReplicationSlotsShmemSize(void)
114 if (max_replication_slots == 0)
117 size = offsetof(ReplicationSlotCtlData, replication_slots);
118 size = add_size(size,
119 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
125 * Allocate and initialize walsender-related shared memory.
128 ReplicationSlotsShmemInit(void)
132 if (max_replication_slots == 0)
135 ReplicationSlotCtl = (ReplicationSlotCtlData *)
136 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
143 /* First time through, so initialize */
144 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
146 for (i = 0; i < max_replication_slots; i++)
148 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
150 /* everything else is zeroed by the memset above */
151 SpinLockInit(&slot->mutex);
152 slot->io_in_progress_lock = LWLockAssign();
158 * Check whether the passed slot name is valid and report errors at elevel.
160 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
161 * the name to be used as a directory name on every supported OS.
163 * Returns whether the directory name is valid or not if elevel < ERROR.
166 ReplicationSlotValidateName(const char *name, int elevel)
170 if (strlen(name) == 0)
173 (errcode(ERRCODE_INVALID_NAME),
174 errmsg("replication slot name \"%s\" is too short",
179 if (strlen(name) >= NAMEDATALEN)
182 (errcode(ERRCODE_NAME_TOO_LONG),
183 errmsg("replication slot name \"%s\" is too long",
188 for (cp = name; *cp; cp++)
190 if (!((*cp >= 'a' && *cp <= 'z')
191 || (*cp >= '0' && *cp <= '9')
195 (errcode(ERRCODE_INVALID_NAME),
196 errmsg("replication slot name \"%s\" contains invalid character",
198 errhint("Replication slot names may only contain letters, numbers, and the underscore character.")));
206 * Create a new replication slot and mark it as used by this backend.
208 * name: Name of the slot
209 * db_specific: logical decoding is db specific; if the slot is going to
210 * be used for that pass true, otherwise false.
213 ReplicationSlotCreate(const char *name, bool db_specific,
214 ReplicationSlotPersistency persistency)
216 ReplicationSlot *slot = NULL;
219 Assert(MyReplicationSlot == NULL);
221 ReplicationSlotValidateName(name, ERROR);
224 * If some other backend ran this code currently with us, we'd likely both
225 * allocate the same slot, and that would be bad. We'd also be at risk of
226 * missing a name collision. Also, we don't want to try to create a new
227 * slot while somebody's busy cleaning up an old one, because we might
228 * both be monkeying with the same directory.
230 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
233 * Check for name collision, and identify an allocatable slot. We need to
234 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
235 * else can change the in_use flags while we're looking at them.
237 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
238 for (i = 0; i < max_replication_slots; i++)
240 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
242 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
244 (errcode(ERRCODE_DUPLICATE_OBJECT),
245 errmsg("replication slot \"%s\" already exists", name)));
246 if (!s->in_use && slot == NULL)
249 LWLockRelease(ReplicationSlotControlLock);
251 /* If all slots are in use, we're out of luck. */
254 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
255 errmsg("all replication slots are in use"),
256 errhint("Free one or increase max_replication_slots.")));
259 * Since this slot is not in use, nobody should be looking at any part of
260 * it other than the in_use field unless they're trying to allocate it.
261 * And since we hold ReplicationSlotAllocationLock, nobody except us can
262 * be doing that. So it's safe to initialize the slot.
264 Assert(!slot->in_use);
265 Assert(slot->active_pid == 0);
266 slot->data.persistency = persistency;
267 slot->data.xmin = InvalidTransactionId;
268 slot->effective_xmin = InvalidTransactionId;
269 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
270 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
271 slot->data.restart_lsn = InvalidXLogRecPtr;
274 * Create the slot on disk. We haven't actually marked the slot allocated
275 * yet, so no special cleanup is required if this errors out.
277 CreateSlotOnDisk(slot);
280 * We need to briefly prevent any other backend from iterating over the
281 * slots while we flip the in_use flag. We also need to set the active
282 * flag while holding the ControlLock as otherwise a concurrent
283 * SlotAcquire() could acquire the slot as well.
285 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
289 /* We can now mark the slot active, and that makes it our slot. */
291 volatile ReplicationSlot *vslot = slot;
293 SpinLockAcquire(&slot->mutex);
294 Assert(vslot->active_pid == 0);
295 vslot->active_pid = MyProcPid;
296 SpinLockRelease(&slot->mutex);
297 MyReplicationSlot = slot;
300 LWLockRelease(ReplicationSlotControlLock);
303 * Now that the slot has been marked as in_use and in_active, it's safe to
304 * let somebody else try to allocate a slot.
306 LWLockRelease(ReplicationSlotAllocationLock);
310 * Find a previously created slot and mark it as used by this backend.
313 ReplicationSlotAcquire(const char *name)
315 ReplicationSlot *slot = NULL;
319 Assert(MyReplicationSlot == NULL);
321 ReplicationSlotValidateName(name, ERROR);
323 /* Search for the named slot and mark it active if we find it. */
324 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
325 for (i = 0; i < max_replication_slots; i++)
327 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
329 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
331 volatile ReplicationSlot *vslot = s;
333 SpinLockAcquire(&s->mutex);
334 active_pid = vslot->active_pid;
336 vslot->active_pid = MyProcPid;
337 SpinLockRelease(&s->mutex);
342 LWLockRelease(ReplicationSlotControlLock);
344 /* If we did not find the slot or it was already active, error out. */
347 (errcode(ERRCODE_UNDEFINED_OBJECT),
348 errmsg("replication slot \"%s\" does not exist", name)));
351 (errcode(ERRCODE_OBJECT_IN_USE),
352 errmsg("replication slot \"%s\" is already active for pid %d",
355 /* We made this slot active, so it's ours now. */
356 MyReplicationSlot = slot;
360 * Release a replication slot, this or another backend can ReAcquire it
361 * later. Resources this slot requires will be preserved.
364 ReplicationSlotRelease(void)
366 ReplicationSlot *slot = MyReplicationSlot;
368 Assert(slot != NULL && slot->active_pid != 0);
370 if (slot->data.persistency == RS_EPHEMERAL)
373 * Delete the slot. There is no !PANIC case where this is allowed to
374 * fail, all that may happen is an incomplete cleanup of the on-disk
377 ReplicationSlotDropAcquired();
381 /* Mark slot inactive. We're not freeing it, just disconnecting. */
382 volatile ReplicationSlot *vslot = slot;
384 SpinLockAcquire(&slot->mutex);
385 vslot->active_pid = 0;
386 SpinLockRelease(&slot->mutex);
389 MyReplicationSlot = NULL;
391 /* might not have been set when we've been a plain slot */
392 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
393 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
394 LWLockRelease(ProcArrayLock);
398 * Permanently drop replication slot identified by the passed in name.
401 ReplicationSlotDrop(const char *name)
403 Assert(MyReplicationSlot == NULL);
405 ReplicationSlotAcquire(name);
407 ReplicationSlotDropAcquired();
411 * Permanently drop the currently acquired replication slot which will be
412 * released by the point this function returns.
415 ReplicationSlotDropAcquired(void)
417 char path[MAXPGPATH];
418 char tmppath[MAXPGPATH];
419 ReplicationSlot *slot = MyReplicationSlot;
421 Assert(MyReplicationSlot != NULL);
423 /* slot isn't acquired anymore */
424 MyReplicationSlot = NULL;
427 * If some other backend ran this code concurrently with us, we might try
428 * to delete a slot with a certain name while someone else was trying to
429 * create a slot with the same name.
431 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
433 /* Generate pathnames. */
434 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
435 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
438 * Rename the slot directory on disk, so that we'll no longer recognize
439 * this as a valid slot. Note that if this fails, we've got to mark the
440 * slot inactive before bailing out. If we're dropping an ephemeral slot,
441 * we better never fail hard as the caller won't expect the slot to
442 * survive and this might get called during error handling.
444 if (rename(path, tmppath) == 0)
447 * We need to fsync() the directory we just renamed and its parent to
448 * make sure that our changes are on disk in a crash-safe fashion. If
449 * fsync() fails, we can't be sure whether the changes are on disk or
450 * not. For now, we handle that by panicking;
451 * StartupReplicationSlots() will try to straighten it out after
454 START_CRIT_SECTION();
455 fsync_fname(tmppath, true);
456 fsync_fname("pg_replslot", true);
461 volatile ReplicationSlot *vslot = slot;
462 bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
464 SpinLockAcquire(&slot->mutex);
465 vslot->active_pid = 0;
466 SpinLockRelease(&slot->mutex);
468 ereport(fail_softly ? WARNING : ERROR,
469 (errcode_for_file_access(),
470 errmsg("could not rename file \"%s\" to \"%s\": %m",
475 * The slot is definitely gone. Lock out concurrent scans of the array
476 * long enough to kill it. It's OK to clear the active flag here without
477 * grabbing the mutex because nobody else can be scanning the array here,
478 * and nobody can be attached to this slot and thus access it without
479 * scanning the array.
481 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
482 slot->active_pid = 0;
483 slot->in_use = false;
484 LWLockRelease(ReplicationSlotControlLock);
487 * Slot is dead and doesn't prevent resource removal anymore, recompute
490 ReplicationSlotsComputeRequiredXmin(false);
491 ReplicationSlotsComputeRequiredLSN();
494 * If removing the directory fails, the worst thing that will happen is
495 * that the user won't be able to create a new slot with the same name
496 * until the next server restart. We warn about it, but that's all.
498 if (!rmtree(tmppath, true))
500 (errcode_for_file_access(),
501 errmsg("could not remove directory \"%s\"", tmppath)));
504 * We release this at the very end, so that nobody starts trying to create
505 * a slot while we're still cleaning up the detritus of the old one.
507 LWLockRelease(ReplicationSlotAllocationLock);
511 * Serialize the currently acquired slot's state from memory to disk, thereby
512 * guaranteeing the current state will survive a crash.
515 ReplicationSlotSave(void)
517 char path[MAXPGPATH];
519 Assert(MyReplicationSlot != NULL);
521 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
522 SaveSlotToPath(MyReplicationSlot, path, ERROR);
526 * Signal that it would be useful if the currently acquired slot would be
527 * flushed out to disk.
529 * Note that the actual flush to disk can be delayed for a long time, if
530 * required for correctness explicitly do a ReplicationSlotSave().
533 ReplicationSlotMarkDirty(void)
535 Assert(MyReplicationSlot != NULL);
538 volatile ReplicationSlot *vslot = MyReplicationSlot;
540 SpinLockAcquire(&vslot->mutex);
541 MyReplicationSlot->just_dirtied = true;
542 MyReplicationSlot->dirty = true;
543 SpinLockRelease(&vslot->mutex);
548 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
549 * guaranteeing it will be there after an eventual crash.
552 ReplicationSlotPersist(void)
554 ReplicationSlot *slot = MyReplicationSlot;
556 Assert(slot != NULL);
557 Assert(slot->data.persistency != RS_PERSISTENT);
560 volatile ReplicationSlot *vslot = slot;
562 SpinLockAcquire(&slot->mutex);
563 vslot->data.persistency = RS_PERSISTENT;
564 SpinLockRelease(&slot->mutex);
567 ReplicationSlotMarkDirty();
568 ReplicationSlotSave();
572 * Compute the oldest xmin across all slots and store it in the ProcArray.
575 ReplicationSlotsComputeRequiredXmin(bool already_locked)
578 TransactionId agg_xmin = InvalidTransactionId;
579 TransactionId agg_catalog_xmin = InvalidTransactionId;
581 Assert(ReplicationSlotCtl != NULL);
584 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
586 for (i = 0; i < max_replication_slots; i++)
588 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
589 TransactionId effective_xmin;
590 TransactionId effective_catalog_xmin;
596 volatile ReplicationSlot *vslot = s;
598 SpinLockAcquire(&s->mutex);
599 effective_xmin = vslot->effective_xmin;
600 effective_catalog_xmin = vslot->effective_catalog_xmin;
601 SpinLockRelease(&s->mutex);
604 /* check the data xmin */
605 if (TransactionIdIsValid(effective_xmin) &&
606 (!TransactionIdIsValid(agg_xmin) ||
607 TransactionIdPrecedes(effective_xmin, agg_xmin)))
608 agg_xmin = effective_xmin;
610 /* check the catalog xmin */
611 if (TransactionIdIsValid(effective_catalog_xmin) &&
612 (!TransactionIdIsValid(agg_catalog_xmin) ||
613 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
614 agg_catalog_xmin = effective_catalog_xmin;
618 LWLockRelease(ReplicationSlotControlLock);
620 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
624 * Compute the oldest restart LSN across all slots and inform xlog module.
627 ReplicationSlotsComputeRequiredLSN(void)
630 XLogRecPtr min_required = InvalidXLogRecPtr;
632 Assert(ReplicationSlotCtl != NULL);
634 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
635 for (i = 0; i < max_replication_slots; i++)
637 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
638 XLogRecPtr restart_lsn;
644 volatile ReplicationSlot *vslot = s;
646 SpinLockAcquire(&s->mutex);
647 restart_lsn = vslot->data.restart_lsn;
648 SpinLockRelease(&s->mutex);
651 if (restart_lsn != InvalidXLogRecPtr &&
652 (min_required == InvalidXLogRecPtr ||
653 restart_lsn < min_required))
654 min_required = restart_lsn;
656 LWLockRelease(ReplicationSlotControlLock);
658 XLogSetReplicationSlotMinimumLSN(min_required);
662 * Compute the oldest WAL LSN required by *logical* decoding slots..
664 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
667 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
668 * ignores physical replication slots.
670 * The results aren't required frequently, so we don't maintain a precomputed
671 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
674 ReplicationSlotsComputeLogicalRestartLSN(void)
676 XLogRecPtr result = InvalidXLogRecPtr;
679 if (max_replication_slots <= 0)
680 return InvalidXLogRecPtr;
682 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
684 for (i = 0; i < max_replication_slots; i++)
686 volatile ReplicationSlot *s;
687 XLogRecPtr restart_lsn;
689 s = &ReplicationSlotCtl->replication_slots[i];
691 /* cannot change while ReplicationSlotCtlLock is held */
695 /* we're only interested in logical slots */
696 if (s->data.database == InvalidOid)
699 /* read once, it's ok if it increases while we're checking */
700 SpinLockAcquire(&s->mutex);
701 restart_lsn = s->data.restart_lsn;
702 SpinLockRelease(&s->mutex);
704 if (result == InvalidXLogRecPtr ||
705 restart_lsn < result)
706 result = restart_lsn;
709 LWLockRelease(ReplicationSlotControlLock);
715 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
716 * passed database oid.
718 * Returns true if there are any slots referencing the database. *nslots will
719 * be set to the absolute number of slots in the database, *nactive to ones
723 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
727 *nslots = *nactive = 0;
729 if (max_replication_slots <= 0)
732 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
733 for (i = 0; i < max_replication_slots; i++)
735 volatile ReplicationSlot *s;
737 s = &ReplicationSlotCtl->replication_slots[i];
739 /* cannot change while ReplicationSlotCtlLock is held */
743 /* not database specific, skip */
744 if (s->data.database == InvalidOid)
747 /* not our database, skip */
748 if (s->data.database != dboid)
751 /* count slots with spinlock held */
752 SpinLockAcquire(&s->mutex);
754 if (s->active_pid != 0)
756 SpinLockRelease(&s->mutex);
758 LWLockRelease(ReplicationSlotControlLock);
767 * Check whether the server's configuration supports using replication
771 CheckSlotRequirements(void)
773 if (max_replication_slots == 0)
775 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
776 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
778 if (wal_level < WAL_LEVEL_ARCHIVE)
780 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
781 errmsg("replication slots can only be used if wal_level >= archive")));
785 * Flush all replication slots to disk.
787 * This needn't actually be part of a checkpoint, but it's a convenient
791 CheckPointReplicationSlots(void)
795 elog(DEBUG1, "performing replication slot checkpoint");
798 * Prevent any slot from being created/dropped while we're active. As we
799 * explicitly do *not* want to block iterating over replication_slots or
800 * acquiring a slot we cannot take the control lock - but that's OK,
801 * because holding ReplicationSlotAllocationLock is strictly stronger, and
802 * enough to guarantee that nobody can change the in_use bits on us.
804 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
806 for (i = 0; i < max_replication_slots; i++)
808 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
809 char path[MAXPGPATH];
814 /* save the slot to disk, locking is handled in SaveSlotToPath() */
815 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
816 SaveSlotToPath(s, path, LOG);
818 LWLockRelease(ReplicationSlotAllocationLock);
822 * Load all replication slots from disk into memory at server startup. This
823 * needs to be run before we start crash recovery.
826 StartupReplicationSlots(void)
828 DIR *replication_dir;
829 struct dirent *replication_de;
831 elog(DEBUG1, "starting up replication slots");
833 /* restore all slots by iterating over all on-disk entries */
834 replication_dir = AllocateDir("pg_replslot");
835 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
838 char path[MAXPGPATH];
840 if (strcmp(replication_de->d_name, ".") == 0 ||
841 strcmp(replication_de->d_name, "..") == 0)
844 snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
846 /* we're only creating directories here, skip if it's not our's */
847 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
850 /* we crashed while a slot was being setup or deleted, clean up */
851 if (pg_str_endswith(replication_de->d_name, ".tmp"))
853 if (!rmtree(path, true))
856 (errcode_for_file_access(),
857 errmsg("could not remove directory \"%s\"", path)));
860 fsync_fname("pg_replslot", true);
864 /* looks like a slot in a normal state, restore */
865 RestoreSlotFromDisk(replication_de->d_name);
867 FreeDir(replication_dir);
869 /* currently no slots exist, we're done. */
870 if (max_replication_slots <= 0)
873 /* Now that we have recovered all the data, compute replication xmin */
874 ReplicationSlotsComputeRequiredXmin(false);
875 ReplicationSlotsComputeRequiredLSN();
879 * Manipulation of ondisk state of replication slots
881 * NB: none of the routines below should take any notice whether a slot is the
882 * current one or not, that's all handled a layer above.
886 CreateSlotOnDisk(ReplicationSlot *slot)
888 char tmppath[MAXPGPATH];
889 char path[MAXPGPATH];
893 * No need to take out the io_in_progress_lock, nobody else can see this
894 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
895 * takes out the lock, if we'd take the lock here, we'd deadlock.
898 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
899 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
902 * It's just barely possible that some previous effort to create or drop a
903 * slot with this name left a temp directory lying around. If that seems
904 * to be the case, try to remove it. If the rmtree() fails, we'll error
905 * out at the mkdir() below, so we don't bother checking success.
907 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
908 rmtree(tmppath, true);
910 /* Create and fsync the temporary slot directory. */
911 if (mkdir(tmppath, S_IRWXU) < 0)
913 (errcode_for_file_access(),
914 errmsg("could not create directory \"%s\": %m",
916 fsync_fname(tmppath, true);
918 /* Write the actual state file. */
919 slot->dirty = true; /* signal that we really need to write */
920 SaveSlotToPath(slot, tmppath, ERROR);
922 /* Rename the directory into place. */
923 if (rename(tmppath, path) != 0)
925 (errcode_for_file_access(),
926 errmsg("could not rename file \"%s\" to \"%s\": %m",
930 * If we'd now fail - really unlikely - we wouldn't know whether this slot
931 * would persist after an OS crash or not - so, force a restart. The
932 * restart would try to fysnc this again till it works.
934 START_CRIT_SECTION();
936 fsync_fname(path, true);
937 fsync_fname("pg_replslot", true);
943 * Shared functionality between saving and creating a replication slot.
946 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
948 char tmppath[MAXPGPATH];
949 char path[MAXPGPATH];
951 ReplicationSlotOnDisk cp;
954 /* first check whether there's something to write out */
956 volatile ReplicationSlot *vslot = slot;
958 SpinLockAcquire(&vslot->mutex);
959 was_dirty = vslot->dirty;
960 vslot->just_dirtied = false;
961 SpinLockRelease(&vslot->mutex);
964 /* and don't do anything if there's nothing to write */
968 LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
970 /* silence valgrind :( */
971 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
973 sprintf(tmppath, "%s/state.tmp", dir);
974 sprintf(path, "%s/state", dir);
976 fd = OpenTransientFile(tmppath,
977 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
982 (errcode_for_file_access(),
983 errmsg("could not create file \"%s\": %m",
988 cp.magic = SLOT_MAGIC;
989 INIT_CRC32C(cp.checksum);
990 cp.version = SLOT_VERSION;
991 cp.length = ReplicationSlotOnDiskV2Size;
993 SpinLockAcquire(&slot->mutex);
995 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
997 SpinLockRelease(&slot->mutex);
999 COMP_CRC32C(cp.checksum,
1000 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1001 SnapBuildOnDiskChecksummedSize);
1002 FIN_CRC32C(cp.checksum);
1004 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1006 int save_errno = errno;
1008 CloseTransientFile(fd);
1011 (errcode_for_file_access(),
1012 errmsg("could not write to file \"%s\": %m",
1017 /* fsync the temporary file */
1018 if (pg_fsync(fd) != 0)
1020 int save_errno = errno;
1022 CloseTransientFile(fd);
1025 (errcode_for_file_access(),
1026 errmsg("could not fsync file \"%s\": %m",
1031 CloseTransientFile(fd);
1033 /* rename to permanent file, fsync file and directory */
1034 if (rename(tmppath, path) != 0)
1037 (errcode_for_file_access(),
1038 errmsg("could not rename file \"%s\" to \"%s\": %m",
1043 /* Check CreateSlot() for the reasoning of using a crit. section. */
1044 START_CRIT_SECTION();
1046 fsync_fname(path, false);
1047 fsync_fname((char *) dir, true);
1048 fsync_fname("pg_replslot", true);
1053 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1057 volatile ReplicationSlot *vslot = slot;
1059 SpinLockAcquire(&vslot->mutex);
1060 if (!vslot->just_dirtied)
1061 vslot->dirty = false;
1062 SpinLockRelease(&vslot->mutex);
1065 LWLockRelease(slot->io_in_progress_lock);
1069 * Load a single slot from disk into memory.
1072 RestoreSlotFromDisk(const char *name)
1074 ReplicationSlotOnDisk cp;
1076 char path[MAXPGPATH];
1078 bool restored = false;
1082 /* no need to lock here, no concurrent access allowed yet */
1084 /* delete temp file if it exists */
1085 sprintf(path, "pg_replslot/%s/state.tmp", name);
1086 if (unlink(path) < 0 && errno != ENOENT)
1088 (errcode_for_file_access(),
1089 errmsg("could not remove file \"%s\": %m", path)));
1091 sprintf(path, "pg_replslot/%s/state", name);
1093 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1095 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1098 * We do not need to handle this as we are rename()ing the directory into
1099 * place only after we fsync()ed the state file.
1103 (errcode_for_file_access(),
1104 errmsg("could not open file \"%s\": %m", path)));
1107 * Sync state file before we're reading from it. We might have crashed
1108 * while it wasn't synced yet and we shouldn't continue on that basis.
1110 if (pg_fsync(fd) != 0)
1112 CloseTransientFile(fd);
1114 (errcode_for_file_access(),
1115 errmsg("could not fsync file \"%s\": %m",
1119 /* Also sync the parent directory */
1120 START_CRIT_SECTION();
1121 fsync_fname(path, true);
1124 /* read part of statefile that's guaranteed to be version independent */
1125 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1126 if (readBytes != ReplicationSlotOnDiskConstantSize)
1128 int saved_errno = errno;
1130 CloseTransientFile(fd);
1131 errno = saved_errno;
1133 (errcode_for_file_access(),
1134 errmsg("could not read file \"%s\", read %d of %u: %m",
1136 (uint32) ReplicationSlotOnDiskConstantSize)));
1140 if (cp.magic != SLOT_MAGIC)
1142 (errcode_for_file_access(),
1143 errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
1144 path, cp.magic, SLOT_MAGIC)));
1146 /* verify version */
1147 if (cp.version != SLOT_VERSION)
1149 (errcode_for_file_access(),
1150 errmsg("replication slot file \"%s\" has unsupported version %u",
1151 path, cp.version)));
1153 /* boundary check on length */
1154 if (cp.length != ReplicationSlotOnDiskV2Size)
1156 (errcode_for_file_access(),
1157 errmsg("replication slot file \"%s\" has corrupted length %u",
1160 /* Now that we know the size, read the entire file */
1161 readBytes = read(fd,
1162 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1164 if (readBytes != cp.length)
1166 int saved_errno = errno;
1168 CloseTransientFile(fd);
1169 errno = saved_errno;
1171 (errcode_for_file_access(),
1172 errmsg("could not read file \"%s\", read %d of %u: %m",
1173 path, readBytes, cp.length)));
1176 CloseTransientFile(fd);
1178 /* now verify the CRC */
1179 INIT_CRC32C(checksum);
1180 COMP_CRC32C(checksum,
1181 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1182 SnapBuildOnDiskChecksummedSize);
1183 FIN_CRC32C(checksum);
1185 if (!EQ_CRC32C(checksum, cp.checksum))
1187 (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
1188 path, checksum, cp.checksum)));
1191 * If we crashed with an ephemeral slot active, don't restore but delete
1194 if (cp.slotdata.persistency != RS_PERSISTENT)
1196 sprintf(path, "pg_replslot/%s", name);
1198 if (!rmtree(path, true))
1201 (errcode_for_file_access(),
1202 errmsg("could not remove directory \"%s\"", path)));
1204 fsync_fname("pg_replslot", true);
1208 /* nothing can be active yet, don't lock anything */
1209 for (i = 0; i < max_replication_slots; i++)
1211 ReplicationSlot *slot;
1213 slot = &ReplicationSlotCtl->replication_slots[i];
1218 /* restore the entire set of persistent data */
1219 memcpy(&slot->data, &cp.slotdata,
1220 sizeof(ReplicationSlotPersistentData));
1222 /* initialize in memory state */
1223 slot->effective_xmin = cp.slotdata.xmin;
1224 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1226 slot->candidate_catalog_xmin = InvalidTransactionId;
1227 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1228 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1229 slot->candidate_restart_valid = InvalidXLogRecPtr;
1231 slot->in_use = true;
1232 slot->active_pid = 0;
1240 (errmsg("too many replication slots active before shutdown"),
1241 errhint("Increase max_replication_slots and try again.")));