1 /*-------------------------------------------------------------------------
4 * Replication slot management.
7 * Copyright (c) 2012-2014, PostgreSQL Global Development Group
11 * src/backend/replication/slot.c
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
34 *-------------------------------------------------------------------------
42 #include "access/transam.h"
43 #include "miscadmin.h"
44 #include "replication/slot.h"
45 #include "storage/fd.h"
46 #include "storage/proc.h"
47 #include "storage/procarray.h"
50 * Replication slot on-disk data structure.
52 typedef struct ReplicationSlotOnDisk
54 /* first part of this struct needs to be version independent */
56 /* data not covered by checksum */
60 /* data covered by checksum */
64 ReplicationSlotPersistentData slotdata;
65 } ReplicationSlotOnDisk;
67 /* size of the part of the slot that is version independent */
68 #define ReplicationSlotOnDiskConstantSize \
69 offsetof(ReplicationSlotOnDisk, slotdata)
70 /* size of the slots that is not version indepenent */
71 #define ReplicationSlotOnDiskDynamicSize \
72 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
74 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
75 #define SLOT_VERSION 1 /* version for new files */
77 /* Control array for replication slot management */
78 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
80 /* My backend's replication slot in the shared memory array */
81 ReplicationSlot *MyReplicationSlot = NULL;
84 int max_replication_slots = 0; /* the maximum number of replication slots */
86 static void ReplicationSlotDropAcquired(void);
88 /* internal persistency functions */
89 static void RestoreSlotFromDisk(const char *name);
90 static void CreateSlotOnDisk(ReplicationSlot *slot);
91 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
94 * Report shared-memory space needed by ReplicationSlotShmemInit.
97 ReplicationSlotsShmemSize(void)
101 if (max_replication_slots == 0)
104 size = offsetof(ReplicationSlotCtlData, replication_slots);
105 size = add_size(size,
106 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
112 * Allocate and initialize walsender-related shared memory.
115 ReplicationSlotsShmemInit(void)
119 if (max_replication_slots == 0)
122 ReplicationSlotCtl = (ReplicationSlotCtlData *)
123 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
130 /* First time through, so initialize */
131 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
133 for (i = 0; i < max_replication_slots; i++)
135 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
137 /* everything else is zeroed by the memset above */
138 SpinLockInit(&slot->mutex);
139 slot->io_in_progress_lock = LWLockAssign();
145 * Check whether the passed slot name is valid and report errors at elevel.
147 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
148 * the name to be used as a directory name on every supported OS.
150 * Returns whether the directory name is valid or not if elevel < ERROR.
153 ReplicationSlotValidateName(const char *name, int elevel)
157 if (strlen(name) == 0)
160 (errcode(ERRCODE_INVALID_NAME),
161 errmsg("replication slot name \"%s\" is too short",
166 if (strlen(name) >= NAMEDATALEN)
169 (errcode(ERRCODE_NAME_TOO_LONG),
170 errmsg("replication slot name \"%s\" is too long",
175 for (cp = name; *cp; cp++)
177 if (!((*cp >= 'a' && *cp <= 'z')
178 || (*cp >= '0' && *cp <= '9')
182 (errcode(ERRCODE_INVALID_NAME),
183 errmsg("replication slot name \"%s\" contains invalid character",
185 errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
193 * Create a new replication slot and mark it as used by this backend.
195 * name: Name of the slot
196 * db_specific: logical decoding is db specific; if the slot is going to
197 * be used for that pass true, otherwise false.
200 ReplicationSlotCreate(const char *name, bool db_specific,
201 ReplicationSlotPersistency persistency)
203 ReplicationSlot *slot = NULL;
206 Assert(MyReplicationSlot == NULL);
208 ReplicationSlotValidateName(name, ERROR);
211 * If some other backend ran this code currently with us, we'd likely
212 * both allocate the same slot, and that would be bad. We'd also be
213 * at risk of missing a name collision. Also, we don't want to try to
214 * create a new slot while somebody's busy cleaning up an old one, because
215 * we might both be monkeying with the same directory.
217 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
220 * Check for name collision, and identify an allocatable slot. We need
221 * to hold ReplicationSlotControlLock in shared mode for this, so that
222 * nobody else can change the in_use flags while we're looking at them.
224 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
225 for (i = 0; i < max_replication_slots; i++)
227 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
229 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
231 (errcode(ERRCODE_DUPLICATE_OBJECT),
232 errmsg("replication slot \"%s\" already exists", name)));
233 if (!s->in_use && slot == NULL)
236 LWLockRelease(ReplicationSlotControlLock);
238 /* If all slots are in use, we're out of luck. */
241 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
242 errmsg("all replication slots are in use"),
243 errhint("Free one or increase max_replication_slots.")));
246 * Since this slot is not in use, nobody should be looking at any
247 * part of it other than the in_use field unless they're trying to allocate
248 * it. And since we hold ReplicationSlotAllocationLock, nobody except us
249 * can be doing that. So it's safe to initialize the slot.
251 Assert(!slot->in_use);
252 Assert(!slot->active);
253 slot->data.persistency = persistency;
254 slot->data.xmin = InvalidTransactionId;
255 slot->effective_xmin = InvalidTransactionId;
256 strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
257 NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
258 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
259 slot->data.restart_lsn = InvalidXLogRecPtr;
262 * Create the slot on disk. We haven't actually marked the slot allocated
263 * yet, so no special cleanup is required if this errors out.
265 CreateSlotOnDisk(slot);
268 * We need to briefly prevent any other backend from iterating over the
269 * slots while we flip the in_use flag. We also need to set the active
270 * flag while holding the ControlLock as otherwise a concurrent
271 * SlotAcquire() could acquire the slot as well.
273 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
277 /* We can now mark the slot active, and that makes it our slot. */
279 volatile ReplicationSlot *vslot = slot;
281 SpinLockAcquire(&slot->mutex);
282 Assert(!vslot->active);
283 vslot->active = true;
284 SpinLockRelease(&slot->mutex);
285 MyReplicationSlot = slot;
288 LWLockRelease(ReplicationSlotControlLock);
291 * Now that the slot has been marked as in_use and in_active, it's safe to
292 * let somebody else try to allocate a slot.
294 LWLockRelease(ReplicationSlotAllocationLock);
298 * Find a previously created slot and mark it as used by this backend.
301 ReplicationSlotAcquire(const char *name)
303 ReplicationSlot *slot = NULL;
307 Assert(MyReplicationSlot == NULL);
309 ReplicationSlotValidateName(name, ERROR);
311 /* Search for the named slot and mark it active if we find it. */
312 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
313 for (i = 0; i < max_replication_slots; i++)
315 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
317 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
319 volatile ReplicationSlot *vslot = s;
321 SpinLockAcquire(&s->mutex);
322 active = vslot->active;
323 vslot->active = true;
324 SpinLockRelease(&s->mutex);
329 LWLockRelease(ReplicationSlotControlLock);
331 /* If we did not find the slot or it was already active, error out. */
334 (errcode(ERRCODE_UNDEFINED_OBJECT),
335 errmsg("replication slot \"%s\" does not exist", name)));
338 (errcode(ERRCODE_OBJECT_IN_USE),
339 errmsg("replication slot \"%s\" is already active", name)));
341 /* We made this slot active, so it's ours now. */
342 MyReplicationSlot = slot;
346 * Release a replication slot, this or another backend can ReAcquire it
347 * later. Resources this slot requires will be preserved.
350 ReplicationSlotRelease(void)
352 ReplicationSlot *slot = MyReplicationSlot;
354 Assert(slot != NULL && slot->active);
356 if (slot->data.persistency == RS_EPHEMERAL)
359 * Delete the slot. There is no !PANIC case where this is allowed to
360 * fail, all that may happen is an incomplete cleanup of the on-disk
363 ReplicationSlotDropAcquired();
367 /* Mark slot inactive. We're not freeing it, just disconnecting. */
368 volatile ReplicationSlot *vslot = slot;
369 SpinLockAcquire(&slot->mutex);
370 vslot->active = false;
371 SpinLockRelease(&slot->mutex);
374 MyReplicationSlot = NULL;
376 /* might not have been set when we've been a plain slot */
377 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
378 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
379 LWLockRelease(ProcArrayLock);
383 * Permanently drop replication slot identified by the passed in name.
386 ReplicationSlotDrop(const char *name)
388 Assert(MyReplicationSlot == NULL);
390 ReplicationSlotAcquire(name);
392 ReplicationSlotDropAcquired();
396 * Permanently drop the currently acquired replication slot which will be
397 * released by the point this function returns.
400 ReplicationSlotDropAcquired(void)
402 char path[MAXPGPATH];
403 char tmppath[MAXPGPATH];
404 ReplicationSlot *slot = MyReplicationSlot;
406 Assert(MyReplicationSlot != NULL);
408 /* slot isn't acquired anymore */
409 MyReplicationSlot = NULL;
412 * If some other backend ran this code concurrently with us, we might try
413 * to delete a slot with a certain name while someone else was trying to
414 * create a slot with the same name.
416 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
418 /* Generate pathnames. */
419 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
420 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
423 * Rename the slot directory on disk, so that we'll no longer recognize
424 * this as a valid slot. Note that if this fails, we've got to mark the
425 * slot inactive before bailing out. If we're dropping a ephemeral slot,
426 * we better never fail hard as the caller won't expect the slot to
427 * survive and this might get called during error handling.
429 if (rename(path, tmppath) == 0)
432 * We need to fsync() the directory we just renamed and its parent to
433 * make sure that our changes are on disk in a crash-safe fashion. If
434 * fsync() fails, we can't be sure whether the changes are on disk or
435 * not. For now, we handle that by panicking;
436 * StartupReplicationSlots() will try to straighten it out after
439 START_CRIT_SECTION();
440 fsync_fname(tmppath, true);
441 fsync_fname("pg_replslot", true);
446 volatile ReplicationSlot *vslot = slot;
447 bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
449 SpinLockAcquire(&slot->mutex);
450 vslot->active = false;
451 SpinLockRelease(&slot->mutex);
453 ereport(fail_softly ? WARNING : ERROR,
454 (errcode_for_file_access(),
455 errmsg("could not rename \"%s\" to \"%s\": %m",
460 * The slot is definitely gone. Lock out concurrent scans of the array
461 * long enough to kill it. It's OK to clear the active flag here without
462 * grabbing the mutex because nobody else can be scanning the array here,
463 * and nobody can be attached to this slot and thus access it without
464 * scanning the array.
466 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
467 slot->active = false;
468 slot->in_use = false;
469 LWLockRelease(ReplicationSlotControlLock);
472 * Slot is dead and doesn't prevent resource removal anymore, recompute
475 ReplicationSlotsComputeRequiredXmin(false);
476 ReplicationSlotsComputeRequiredLSN();
479 * If removing the directory fails, the worst thing that will happen is
480 * that the user won't be able to create a new slot with the same name
481 * until the next server restart. We warn about it, but that's all.
483 if (!rmtree(tmppath, true))
485 (errcode_for_file_access(),
486 errmsg("could not remove directory \"%s\"", tmppath)));
489 * We release this at the very end, so that nobody starts trying to create
490 * a slot while we're still cleaning up the detritus of the old one.
492 LWLockRelease(ReplicationSlotAllocationLock);
496 * Serialize the currently acquired slot's state from memory to disk, thereby
497 * guaranteeing the current state will survive a crash.
500 ReplicationSlotSave(void)
502 char path[MAXPGPATH];
504 Assert(MyReplicationSlot != NULL);
506 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
507 SaveSlotToPath(MyReplicationSlot, path, ERROR);
511 * Signal that it would be useful if the currently acquired slot would be
512 * flushed out to disk.
514 * Note that the actual flush to disk can be delayed for a long time, if
515 * required for correctness explicitly do a ReplicationSlotSave().
518 ReplicationSlotMarkDirty(void)
520 Assert(MyReplicationSlot != NULL);
523 volatile ReplicationSlot *vslot = MyReplicationSlot;
525 SpinLockAcquire(&vslot->mutex);
526 MyReplicationSlot->just_dirtied = true;
527 MyReplicationSlot->dirty = true;
528 SpinLockRelease(&vslot->mutex);
533 * Convert a slot that's marked as RS_DROP_ON_ERROR to a RS_PERSISTENT slot,
534 * guaranteeing it will be there after a eventual crash.
537 ReplicationSlotPersist(void)
539 ReplicationSlot *slot = MyReplicationSlot;
541 Assert(slot != NULL);
542 Assert(slot->data.persistency != RS_PERSISTENT);
545 volatile ReplicationSlot *vslot = slot;
547 SpinLockAcquire(&slot->mutex);
548 vslot->data.persistency = RS_PERSISTENT;
549 SpinLockRelease(&slot->mutex);
552 ReplicationSlotMarkDirty();
553 ReplicationSlotSave();
557 * Compute the oldest xmin across all slots and store it in the ProcArray.
560 ReplicationSlotsComputeRequiredXmin(bool already_locked)
563 TransactionId agg_xmin = InvalidTransactionId;
564 TransactionId agg_catalog_xmin = InvalidTransactionId;
566 Assert(ReplicationSlotCtl != NULL);
569 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
571 for (i = 0; i < max_replication_slots; i++)
573 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
574 TransactionId effective_xmin;
575 TransactionId effective_catalog_xmin;
581 volatile ReplicationSlot *vslot = s;
583 SpinLockAcquire(&s->mutex);
584 effective_xmin = vslot->effective_xmin;
585 effective_catalog_xmin = vslot->effective_catalog_xmin;
586 SpinLockRelease(&s->mutex);
589 /* check the data xmin */
590 if (TransactionIdIsValid(effective_xmin) &&
591 (!TransactionIdIsValid(agg_xmin) ||
592 TransactionIdPrecedes(effective_xmin, agg_xmin)))
593 agg_xmin = effective_xmin;
595 /* check the catalog xmin */
596 if (TransactionIdIsValid(effective_catalog_xmin) &&
597 (!TransactionIdIsValid(agg_catalog_xmin) ||
598 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
599 agg_catalog_xmin = effective_catalog_xmin;
603 LWLockRelease(ReplicationSlotControlLock);
605 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
609 * Compute the oldest restart LSN across all slots and inform xlog module.
612 ReplicationSlotsComputeRequiredLSN(void)
615 XLogRecPtr min_required = InvalidXLogRecPtr;
617 Assert(ReplicationSlotCtl != NULL);
619 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
620 for (i = 0; i < max_replication_slots; i++)
622 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
623 XLogRecPtr restart_lsn;
629 volatile ReplicationSlot *vslot = s;
631 SpinLockAcquire(&s->mutex);
632 restart_lsn = vslot->data.restart_lsn;
633 SpinLockRelease(&s->mutex);
636 if (restart_lsn != InvalidXLogRecPtr &&
637 (min_required == InvalidXLogRecPtr ||
638 restart_lsn < min_required))
639 min_required = restart_lsn;
641 LWLockRelease(ReplicationSlotControlLock);
643 XLogSetReplicationSlotMinimumLSN(min_required);
647 * Compute the oldest WAL LSN required by *logical* decoding slots..
649 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
652 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
653 * ignores physical replication slots.
655 * The results aren't required frequently, so we don't maintain a precomputed
656 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
659 ReplicationSlotsComputeLogicalRestartLSN(void)
661 XLogRecPtr result = InvalidXLogRecPtr;
664 if (max_replication_slots <= 0)
665 return InvalidXLogRecPtr;
667 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
669 for (i = 0; i < max_replication_slots; i++)
671 volatile ReplicationSlot *s;
672 XLogRecPtr restart_lsn;
674 s = &ReplicationSlotCtl->replication_slots[i];
676 /* cannot change while ReplicationSlotCtlLock is held */
680 /* we're only interested in logical slots */
681 if (s->data.database == InvalidOid)
684 /* read once, it's ok if it increases while we're checking */
685 SpinLockAcquire(&s->mutex);
686 restart_lsn = s->data.restart_lsn;
687 SpinLockRelease(&s->mutex);
689 if (result == InvalidXLogRecPtr ||
690 restart_lsn < result)
691 result = restart_lsn;
694 LWLockRelease(ReplicationSlotControlLock);
700 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
701 * passed database oid.
703 * Returns true if there are any slots referencing the database. *nslots will
704 * be set to the absolute number of slots in the database, *nactive to ones
708 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
712 *nslots = *nactive = 0;
714 if (max_replication_slots <= 0)
717 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
718 for (i = 0; i < max_replication_slots; i++)
720 volatile ReplicationSlot *s;
722 s = &ReplicationSlotCtl->replication_slots[i];
724 /* cannot change while ReplicationSlotCtlLock is held */
728 /* not database specific, skip */
729 if (s->data.database == InvalidOid)
731 /* not our database, skip */
732 if (s->data.database != dboid)
735 /* count slots with spinlock held */
736 SpinLockAcquire(&s->mutex);
740 SpinLockRelease(&s->mutex);
742 LWLockRelease(ReplicationSlotControlLock);
751 * Check whether the server's configuration supports using replication
755 CheckSlotRequirements(void)
757 if (max_replication_slots == 0)
759 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
760 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
762 if (wal_level < WAL_LEVEL_ARCHIVE)
764 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
765 errmsg("replication slots can only be used if wal_level >= archive")));
769 * Returns whether the string `str' has the postfix `end'.
772 string_endswith(const char *str, const char *end)
774 size_t slen = strlen(str);
775 size_t elen = strlen(end);
777 /* can't be a postfix if longer */
781 /* compare the end of the strings */
783 return strcmp(str, end) == 0;
787 * Flush all replication slots to disk.
789 * This needn't actually be part of a checkpoint, but it's a convenient
793 CheckPointReplicationSlots(void)
798 (errmsg("performing replication slot checkpoint")));
801 * Prevent any slot from being created/dropped while we're active. As we
802 * explicitly do *not* want to block iterating over replication_slots or
803 * acquiring a slot we cannot take the control lock - but that's OK,
804 * because holding ReplicationSlotAllocationLock is strictly stronger,
805 * and enough to guarantee that nobody can change the in_use bits on us.
807 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
809 for (i = 0; i < max_replication_slots; i++)
811 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
812 char path[MAXPGPATH];
817 /* save the slot to disk, locking is handled in SaveSlotToPath() */
818 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
819 SaveSlotToPath(s, path, LOG);
821 LWLockRelease(ReplicationSlotAllocationLock);
825 * Load all replication slots from disk into memory at server startup. This
826 * needs to be run before we start crash recovery.
829 StartupReplicationSlots(XLogRecPtr checkPointRedo)
831 DIR *replication_dir;
832 struct dirent *replication_de;
835 (errmsg("starting up replication slots")));
837 /* restore all slots by iterating over all on-disk entries */
838 replication_dir = AllocateDir("pg_replslot");
839 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
842 char path[MAXPGPATH];
844 if (strcmp(replication_de->d_name, ".") == 0 ||
845 strcmp(replication_de->d_name, "..") == 0)
848 snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
850 /* we're only creating directories here, skip if it's not our's */
851 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
854 /* we crashed while a slot was being setup or deleted, clean up */
855 if (string_endswith(replication_de->d_name, ".tmp"))
857 if (!rmtree(path, true))
860 (errcode_for_file_access(),
861 errmsg("could not remove directory \"%s\"", path)));
864 fsync_fname("pg_replslot", true);
868 /* looks like a slot in a normal state, restore */
869 RestoreSlotFromDisk(replication_de->d_name);
871 FreeDir(replication_dir);
873 /* currently no slots exist, we're done. */
874 if (max_replication_slots <= 0)
877 /* Now that we have recovered all the data, compute replication xmin */
878 ReplicationSlotsComputeRequiredXmin(false);
879 ReplicationSlotsComputeRequiredLSN();
883 * Manipulation of ondisk state of replication slots
885 * NB: none of the routines below should take any notice whether a slot is the
886 * current one or not, that's all handled a layer above.
890 CreateSlotOnDisk(ReplicationSlot *slot)
892 char tmppath[MAXPGPATH];
893 char path[MAXPGPATH];
897 * No need to take out the io_in_progress_lock, nobody else can see this
898 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
899 * takes out the lock, if we'd take the lock here, we'd deadlock.
902 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
903 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
906 * It's just barely possible that some previous effort to create or
907 * drop a slot with this name left a temp directory lying around.
908 * If that seems to be the case, try to remove it. If the rmtree()
909 * fails, we'll error out at the mkdir() below, so we don't bother
912 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
913 rmtree(tmppath, true);
915 /* Create and fsync the temporary slot directory. */
916 if (mkdir(tmppath, S_IRWXU) < 0)
918 (errcode_for_file_access(),
919 errmsg("could not create directory \"%s\": %m",
921 fsync_fname(tmppath, true);
923 /* Write the actual state file. */
924 slot->dirty = true; /* signal that we really need to write */
925 SaveSlotToPath(slot, tmppath, ERROR);
927 /* Rename the directory into place. */
928 if (rename(tmppath, path) != 0)
930 (errcode_for_file_access(),
931 errmsg("could not rename file \"%s\" to \"%s\": %m",
935 * If we'd now fail - really unlikely - we wouldn't know whether this slot
936 * would persist after an OS crash or not - so, force a restart. The
937 * restart would try to fysnc this again till it works.
939 START_CRIT_SECTION();
941 fsync_fname(path, true);
942 fsync_fname("pg_replslot", true);
948 * Shared functionality between saving and creating a replication slot.
951 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
953 char tmppath[MAXPGPATH];
954 char path[MAXPGPATH];
956 ReplicationSlotOnDisk cp;
959 /* first check whether there's something to write out */
961 volatile ReplicationSlot *vslot = slot;
963 SpinLockAcquire(&vslot->mutex);
964 was_dirty = vslot->dirty;
965 vslot->just_dirtied = false;
966 SpinLockRelease(&vslot->mutex);
969 /* and don't do anything if there's nothing to write */
973 LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
975 /* silence valgrind :( */
976 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
978 sprintf(tmppath, "%s/state.tmp", dir);
979 sprintf(path, "%s/state", dir);
981 fd = OpenTransientFile(tmppath,
982 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
987 (errcode_for_file_access(),
988 errmsg("could not create file \"%s\": %m",
993 cp.magic = SLOT_MAGIC;
994 INIT_CRC32(cp.checksum);
996 cp.length = ReplicationSlotOnDiskDynamicSize;
998 SpinLockAcquire(&slot->mutex);
1000 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1002 SpinLockRelease(&slot->mutex);
1004 COMP_CRC32(cp.checksum,
1005 (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
1006 ReplicationSlotOnDiskDynamicSize);
1008 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1010 int save_errno = errno;
1011 CloseTransientFile(fd);
1014 (errcode_for_file_access(),
1015 errmsg("could not write to file \"%s\": %m",
1020 /* fsync the temporary file */
1021 if (pg_fsync(fd) != 0)
1023 int save_errno = errno;
1024 CloseTransientFile(fd);
1027 (errcode_for_file_access(),
1028 errmsg("could not fsync file \"%s\": %m",
1033 CloseTransientFile(fd);
1035 /* rename to permanent file, fsync file and directory */
1036 if (rename(tmppath, path) != 0)
1039 (errcode_for_file_access(),
1040 errmsg("could not rename \"%s\" to \"%s\": %m",
1045 /* Check CreateSlot() for the reasoning of using a crit. section. */
1046 START_CRIT_SECTION();
1048 fsync_fname(path, false);
1049 fsync_fname((char *) dir, true);
1050 fsync_fname("pg_replslot", true);
1055 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1059 volatile ReplicationSlot *vslot = slot;
1061 SpinLockAcquire(&vslot->mutex);
1062 if (!vslot->just_dirtied)
1063 vslot->dirty = false;
1064 SpinLockRelease(&vslot->mutex);
1067 LWLockRelease(slot->io_in_progress_lock);
1071 * Load a single slot from disk into memory.
1074 RestoreSlotFromDisk(const char *name)
1076 ReplicationSlotOnDisk cp;
1078 char path[MAXPGPATH];
1080 bool restored = false;
1084 /* no need to lock here, no concurrent access allowed yet */
1086 /* delete temp file if it exists */
1087 sprintf(path, "pg_replslot/%s/state.tmp", name);
1088 if (unlink(path) < 0 && errno != ENOENT)
1090 (errcode_for_file_access(),
1091 errmsg("could not unlink file \"%s\": %m", path)));
1093 sprintf(path, "pg_replslot/%s/state", name);
1095 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1097 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1100 * We do not need to handle this as we are rename()ing the directory into
1101 * place only after we fsync()ed the state file.
1105 (errcode_for_file_access(),
1106 errmsg("could not open file \"%s\": %m", path)));
1109 * Sync state file before we're reading from it. We might have crashed
1110 * while it wasn't synced yet and we shouldn't continue on that basis.
1112 if (pg_fsync(fd) != 0)
1114 CloseTransientFile(fd);
1116 (errcode_for_file_access(),
1117 errmsg("could not fsync file \"%s\": %m",
1121 /* Also sync the parent directory */
1122 START_CRIT_SECTION();
1123 fsync_fname(path, true);
1126 /* read part of statefile that's guaranteed to be version independent */
1127 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1128 if (readBytes != ReplicationSlotOnDiskConstantSize)
1130 int saved_errno = errno;
1132 CloseTransientFile(fd);
1133 errno = saved_errno;
1135 (errcode_for_file_access(),
1136 errmsg("could not read file \"%s\", read %d of %u: %m",
1138 (uint32) ReplicationSlotOnDiskConstantSize)));
1142 if (cp.magic != SLOT_MAGIC)
1144 (errcode_for_file_access(),
1145 errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
1146 path, cp.magic, SLOT_MAGIC)));
1148 /* verify version */
1149 if (cp.version != SLOT_VERSION)
1151 (errcode_for_file_access(),
1152 errmsg("replication slot file \"%s\" has unsupported version %u",
1153 path, cp.version)));
1155 /* boundary check on length */
1156 if (cp.length != ReplicationSlotOnDiskDynamicSize)
1158 (errcode_for_file_access(),
1159 errmsg("replication slot file \"%s\" has corrupted length %u",
1162 /* Now that we know the size, read the entire file */
1163 readBytes = read(fd,
1164 (char *)&cp + ReplicationSlotOnDiskConstantSize,
1166 if (readBytes != cp.length)
1168 int saved_errno = errno;
1170 CloseTransientFile(fd);
1171 errno = saved_errno;
1173 (errcode_for_file_access(),
1174 errmsg("could not read file \"%s\", read %d of %u: %m",
1175 path, readBytes, cp.length)));
1178 CloseTransientFile(fd);
1180 /* now verify the CRC32 */
1181 INIT_CRC32(checksum);
1182 COMP_CRC32(checksum,
1183 (char *)&cp + ReplicationSlotOnDiskConstantSize,
1184 ReplicationSlotOnDiskDynamicSize);
1186 if (!EQ_CRC32(checksum, cp.checksum))
1188 (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
1189 path, checksum, cp.checksum)));
1191 /* nothing can be active yet, don't lock anything */
1192 for (i = 0; i < max_replication_slots; i++)
1194 ReplicationSlot *slot;
1196 slot = &ReplicationSlotCtl->replication_slots[i];
1201 /* restore the entire set of persistent data */
1202 memcpy(&slot->data, &cp.slotdata,
1203 sizeof(ReplicationSlotPersistentData));
1205 /* Don't restore the slot if it's not parked as persistent. */
1206 if (slot->data.persistency != RS_PERSISTENT)
1209 /* initialize in memory state */
1210 slot->effective_xmin = cp.slotdata.xmin;
1211 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1213 slot->candidate_catalog_xmin = InvalidTransactionId;
1214 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1215 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1216 slot->candidate_restart_valid = InvalidXLogRecPtr;
1218 slot->in_use = true;
1219 slot->active = false;
1227 (errmsg("too many replication slots active before shutdown"),
1228 errhint("Increase max_replication_slots and try again.")));