]> granicus.if.org Git - postgresql/blob - src/backend/replication/slot.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / replication / slot.c
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *         Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "common/string.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "replication/slot.h"
48 #include "storage/fd.h"
49 #include "storage/proc.h"
50 #include "storage/procarray.h"
51 #include "utils/builtins.h"
52
53 /*
54  * Replication slot on-disk data structure.
55  */
56 typedef struct ReplicationSlotOnDisk
57 {
58         /* first part of this struct needs to be version independent */
59
60         /* data not covered by checksum */
61         uint32          magic;
62         pg_crc32c       checksum;
63
64         /* data covered by checksum */
65         uint32          version;
66         uint32          length;
67
68         /*
69          * The actual data in the slot that follows can differ based on the above
70          * 'version'.
71          */
72
73         ReplicationSlotPersistentData slotdata;
74 } ReplicationSlotOnDisk;
75
76 /* size of version independent data */
77 #define ReplicationSlotOnDiskConstantSize \
78         offsetof(ReplicationSlotOnDisk, slotdata)
79 /* size of the part of the slot not covered by the checksum */
80 #define SnapBuildOnDiskNotChecksummedSize \
81         offsetof(ReplicationSlotOnDisk, version)
82 /* size of the part covered by the checksum */
83 #define SnapBuildOnDiskChecksummedSize \
84         sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85 /* size of the slot data that is version dependent */
86 #define ReplicationSlotOnDiskV2Size \
87         sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88
89 #define SLOT_MAGIC              0x1051CA1       /* format identifier */
90 #define SLOT_VERSION    2               /* version for new files */
91
92 /* Control array for replication slot management */
93 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94
95 /* My backend's replication slot in the shared memory array */
96 ReplicationSlot *MyReplicationSlot = NULL;
97
98 /* GUCs */
99 int                     max_replication_slots = 0;      /* the maximum number of replication
100                                                                                  * slots */
101
102 static void ReplicationSlotDropAcquired(void);
103 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104
105 /* internal persistency functions */
106 static void RestoreSlotFromDisk(const char *name);
107 static void CreateSlotOnDisk(ReplicationSlot *slot);
108 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109
110 /*
111  * Report shared-memory space needed by ReplicationSlotShmemInit.
112  */
113 Size
114 ReplicationSlotsShmemSize(void)
115 {
116         Size            size = 0;
117
118         if (max_replication_slots == 0)
119                 return size;
120
121         size = offsetof(ReplicationSlotCtlData, replication_slots);
122         size = add_size(size,
123                                         mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124
125         return size;
126 }
127
128 /*
129  * Allocate and initialize walsender-related shared memory.
130  */
131 void
132 ReplicationSlotsShmemInit(void)
133 {
134         bool            found;
135
136         if (max_replication_slots == 0)
137                 return;
138
139         ReplicationSlotCtl = (ReplicationSlotCtlData *)
140                 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141                                                 &found);
142
143         LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144                                                   "replication_slot_io");
145
146         if (!found)
147         {
148                 int                     i;
149
150                 /* First time through, so initialize */
151                 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152
153                 for (i = 0; i < max_replication_slots; i++)
154                 {
155                         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156
157                         /* everything else is zeroed by the memset above */
158                         SpinLockInit(&slot->mutex);
159                         LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160                 }
161         }
162 }
163
164 /*
165  * Check whether the passed slot name is valid and report errors at elevel.
166  *
167  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
168  * the name to be used as a directory name on every supported OS.
169  *
170  * Returns whether the directory name is valid or not if elevel < ERROR.
171  */
172 bool
173 ReplicationSlotValidateName(const char *name, int elevel)
174 {
175         const char *cp;
176
177         if (strlen(name) == 0)
178         {
179                 ereport(elevel,
180                                 (errcode(ERRCODE_INVALID_NAME),
181                                  errmsg("replication slot name \"%s\" is too short",
182                                                 name)));
183                 return false;
184         }
185
186         if (strlen(name) >= NAMEDATALEN)
187         {
188                 ereport(elevel,
189                                 (errcode(ERRCODE_NAME_TOO_LONG),
190                                  errmsg("replication slot name \"%s\" is too long",
191                                                 name)));
192                 return false;
193         }
194
195         for (cp = name; *cp; cp++)
196         {
197                 if (!((*cp >= 'a' && *cp <= 'z')
198                           || (*cp >= '0' && *cp <= '9')
199                           || (*cp == '_')))
200                 {
201                         ereport(elevel,
202                                         (errcode(ERRCODE_INVALID_NAME),
203                         errmsg("replication slot name \"%s\" contains invalid character",
204                                    name),
205                                          errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
206                         return false;
207                 }
208         }
209         return true;
210 }
211
212 /*
213  * Create a new replication slot and mark it as used by this backend.
214  *
215  * name: Name of the slot
216  * db_specific: logical decoding is db specific; if the slot is going to
217  *         be used for that pass true, otherwise false.
218  */
219 void
220 ReplicationSlotCreate(const char *name, bool db_specific,
221                                           ReplicationSlotPersistency persistency)
222 {
223         ReplicationSlot *slot = NULL;
224         int                     i;
225
226         Assert(MyReplicationSlot == NULL);
227
228         ReplicationSlotValidateName(name, ERROR);
229
230         /*
231          * If some other backend ran this code concurrently with us, we'd likely
232          * both allocate the same slot, and that would be bad.  We'd also be at
233          * risk of missing a name collision.  Also, we don't want to try to create
234          * a new slot while somebody's busy cleaning up an old one, because we
235          * might both be monkeying with the same directory.
236          */
237         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
238
239         /*
240          * Check for name collision, and identify an allocatable slot.  We need to
241          * hold ReplicationSlotControlLock in shared mode for this, so that nobody
242          * else can change the in_use flags while we're looking at them.
243          */
244         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
245         for (i = 0; i < max_replication_slots; i++)
246         {
247                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
248
249                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
250                         ereport(ERROR,
251                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
252                                          errmsg("replication slot \"%s\" already exists", name)));
253                 if (!s->in_use && slot == NULL)
254                         slot = s;
255         }
256         LWLockRelease(ReplicationSlotControlLock);
257
258         /* If all slots are in use, we're out of luck. */
259         if (slot == NULL)
260                 ereport(ERROR,
261                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
262                                  errmsg("all replication slots are in use"),
263                                  errhint("Free one or increase max_replication_slots.")));
264
265         /*
266          * Since this slot is not in use, nobody should be looking at any part of
267          * it other than the in_use field unless they're trying to allocate it.
268          * And since we hold ReplicationSlotAllocationLock, nobody except us can
269          * be doing that.  So it's safe to initialize the slot.
270          */
271         Assert(!slot->in_use);
272         Assert(slot->active_pid == 0);
273
274         /* first initialize persistent data */
275         memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
276         StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
277         slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
278         slot->data.persistency = persistency;
279
280         /* and then data only present in shared memory */
281         slot->just_dirtied = false;
282         slot->dirty = false;
283         slot->effective_xmin = InvalidTransactionId;
284         slot->effective_catalog_xmin = InvalidTransactionId;
285         slot->candidate_catalog_xmin = InvalidTransactionId;
286         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
287         slot->candidate_restart_valid = InvalidXLogRecPtr;
288         slot->candidate_restart_lsn = InvalidXLogRecPtr;
289
290         /*
291          * Create the slot on disk.  We haven't actually marked the slot allocated
292          * yet, so no special cleanup is required if this errors out.
293          */
294         CreateSlotOnDisk(slot);
295
296         /*
297          * We need to briefly prevent any other backend from iterating over the
298          * slots while we flip the in_use flag. We also need to set the active
299          * flag while holding the ControlLock as otherwise a concurrent
300          * SlotAcquire() could acquire the slot as well.
301          */
302         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
303
304         slot->in_use = true;
305
306         /* We can now mark the slot active, and that makes it our slot. */
307         SpinLockAcquire(&slot->mutex);
308         Assert(slot->active_pid == 0);
309         slot->active_pid = MyProcPid;
310         SpinLockRelease(&slot->mutex);
311         MyReplicationSlot = slot;
312
313         LWLockRelease(ReplicationSlotControlLock);
314
315         /*
316          * Now that the slot has been marked as in_use and in_active, it's safe to
317          * let somebody else try to allocate a slot.
318          */
319         LWLockRelease(ReplicationSlotAllocationLock);
320 }
321
322 /*
323  * Find a previously created slot and mark it as used by this backend.
324  */
325 void
326 ReplicationSlotAcquire(const char *name)
327 {
328         ReplicationSlot *slot = NULL;
329         int                     i;
330         int                     active_pid = 0; /* Keep compiler quiet */
331
332         Assert(MyReplicationSlot == NULL);
333
334         /* Search for the named slot and mark it active if we find it. */
335         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
336         for (i = 0; i < max_replication_slots; i++)
337         {
338                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
339
340                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
341                 {
342                         SpinLockAcquire(&s->mutex);
343                         active_pid = s->active_pid;
344                         if (active_pid == 0)
345                                 active_pid = s->active_pid = MyProcPid;
346                         SpinLockRelease(&s->mutex);
347                         slot = s;
348                         break;
349                 }
350         }
351         LWLockRelease(ReplicationSlotControlLock);
352
353         /* If we did not find the slot or it was already active, error out. */
354         if (slot == NULL)
355                 ereport(ERROR,
356                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
357                                  errmsg("replication slot \"%s\" does not exist", name)));
358         if (active_pid != MyProcPid)
359                 ereport(ERROR,
360                                 (errcode(ERRCODE_OBJECT_IN_USE),
361                                  errmsg("replication slot \"%s\" is active for PID %d",
362                                                 name, active_pid)));
363
364         /* We made this slot active, so it's ours now. */
365         MyReplicationSlot = slot;
366 }
367
368 /*
369  * Release a replication slot, this or another backend can ReAcquire it
370  * later. Resources this slot requires will be preserved.
371  */
372 void
373 ReplicationSlotRelease(void)
374 {
375         ReplicationSlot *slot = MyReplicationSlot;
376
377         Assert(slot != NULL && slot->active_pid != 0);
378
379         if (slot->data.persistency == RS_EPHEMERAL)
380         {
381                 /*
382                  * Delete the slot. There is no !PANIC case where this is allowed to
383                  * fail, all that may happen is an incomplete cleanup of the on-disk
384                  * data.
385                  */
386                 ReplicationSlotDropAcquired();
387         }
388         else if (slot->data.persistency == RS_PERSISTENT)
389         {
390                 /*
391                  * Mark persistent slot inactive.  We're not freeing it, just
392                  * disconnecting.
393                  */
394                 SpinLockAcquire(&slot->mutex);
395                 slot->active_pid = 0;
396                 SpinLockRelease(&slot->mutex);
397         }
398
399
400         /*
401          * If slot needed to temporarily restrain both data and catalog xmin to
402          * create the catalog snapshot, remove that temporary constraint.
403          * Snapshots can only be exported while the initial snapshot is still
404          * acquired.
405          */
406         if (!TransactionIdIsValid(slot->data.xmin) &&
407                 TransactionIdIsValid(slot->effective_xmin))
408         {
409                 SpinLockAcquire(&slot->mutex);
410                 slot->effective_xmin = InvalidTransactionId;
411                 SpinLockRelease(&slot->mutex);
412                 ReplicationSlotsComputeRequiredXmin(false);
413         }
414
415         MyReplicationSlot = NULL;
416
417         /* might not have been set when we've been a plain slot */
418         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
419         MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
420         LWLockRelease(ProcArrayLock);
421 }
422
423 /*
424  * Cleanup all temporary slots created in current session.
425  */
426 void
427 ReplicationSlotCleanup(void)
428 {
429         int                     i;
430
431         Assert(MyReplicationSlot == NULL);
432
433         /*
434          * No need for locking as we are only interested in slots active in
435          * current process and those are not touched by other processes.
436          */
437         for (i = 0; i < max_replication_slots; i++)
438         {
439                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
440
441                 if (s->active_pid == MyProcPid)
442                 {
443                         Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
444
445                         ReplicationSlotDropPtr(s);
446                 }
447         }
448 }
449
450 /*
451  * Permanently drop replication slot identified by the passed in name.
452  */
453 void
454 ReplicationSlotDrop(const char *name)
455 {
456         Assert(MyReplicationSlot == NULL);
457
458         ReplicationSlotAcquire(name);
459
460         ReplicationSlotDropAcquired();
461 }
462
463 /*
464  * Permanently drop the currently acquired replication slot.
465  */
466 static void
467 ReplicationSlotDropAcquired(void)
468 {
469         ReplicationSlot *slot = MyReplicationSlot;
470
471         Assert(MyReplicationSlot != NULL);
472
473         /* slot isn't acquired anymore */
474         MyReplicationSlot = NULL;
475
476         ReplicationSlotDropPtr(slot);
477 }
478
479 /*
480  * Permanently drop the replication slot which will be released by the point
481  * this function returns.
482  */
483 static void
484 ReplicationSlotDropPtr(ReplicationSlot *slot)
485 {
486         char            path[MAXPGPATH];
487         char            tmppath[MAXPGPATH];
488
489         /*
490          * If some other backend ran this code concurrently with us, we might try
491          * to delete a slot with a certain name while someone else was trying to
492          * create a slot with the same name.
493          */
494         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
495
496         /* Generate pathnames. */
497         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
498         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
499
500         /*
501          * Rename the slot directory on disk, so that we'll no longer recognize
502          * this as a valid slot.  Note that if this fails, we've got to mark the
503          * slot inactive before bailing out.  If we're dropping an ephemeral or a
504          * temporary slot, we better never fail hard as the caller won't expect
505          * the slot to survive and this might get called during error handling.
506          */
507         if (rename(path, tmppath) == 0)
508         {
509                 /*
510                  * We need to fsync() the directory we just renamed and its parent to
511                  * make sure that our changes are on disk in a crash-safe fashion.  If
512                  * fsync() fails, we can't be sure whether the changes are on disk or
513                  * not.  For now, we handle that by panicking;
514                  * StartupReplicationSlots() will try to straighten it out after
515                  * restart.
516                  */
517                 START_CRIT_SECTION();
518                 fsync_fname(tmppath, true);
519                 fsync_fname("pg_replslot", true);
520                 END_CRIT_SECTION();
521         }
522         else
523         {
524                 bool            fail_softly = slot->data.persistency != RS_PERSISTENT;
525
526                 SpinLockAcquire(&slot->mutex);
527                 slot->active_pid = 0;
528                 SpinLockRelease(&slot->mutex);
529
530                 ereport(fail_softly ? WARNING : ERROR,
531                                 (errcode_for_file_access(),
532                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
533                                                 path, tmppath)));
534         }
535
536         /*
537          * The slot is definitely gone.  Lock out concurrent scans of the array
538          * long enough to kill it.  It's OK to clear the active flag here without
539          * grabbing the mutex because nobody else can be scanning the array here,
540          * and nobody can be attached to this slot and thus access it without
541          * scanning the array.
542          */
543         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
544         slot->active_pid = 0;
545         slot->in_use = false;
546         LWLockRelease(ReplicationSlotControlLock);
547
548         /*
549          * Slot is dead and doesn't prevent resource removal anymore, recompute
550          * limits.
551          */
552         ReplicationSlotsComputeRequiredXmin(false);
553         ReplicationSlotsComputeRequiredLSN();
554
555         /*
556          * If removing the directory fails, the worst thing that will happen is
557          * that the user won't be able to create a new slot with the same name
558          * until the next server restart.  We warn about it, but that's all.
559          */
560         if (!rmtree(tmppath, true))
561                 ereport(WARNING,
562                                 (errcode_for_file_access(),
563                                  errmsg("could not remove directory \"%s\"", tmppath)));
564
565         /*
566          * We release this at the very end, so that nobody starts trying to create
567          * a slot while we're still cleaning up the detritus of the old one.
568          */
569         LWLockRelease(ReplicationSlotAllocationLock);
570 }
571
572 /*
573  * Serialize the currently acquired slot's state from memory to disk, thereby
574  * guaranteeing the current state will survive a crash.
575  */
576 void
577 ReplicationSlotSave(void)
578 {
579         char            path[MAXPGPATH];
580
581         Assert(MyReplicationSlot != NULL);
582
583         sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
584         SaveSlotToPath(MyReplicationSlot, path, ERROR);
585 }
586
587 /*
588  * Signal that it would be useful if the currently acquired slot would be
589  * flushed out to disk.
590  *
591  * Note that the actual flush to disk can be delayed for a long time, if
592  * required for correctness explicitly do a ReplicationSlotSave().
593  */
594 void
595 ReplicationSlotMarkDirty(void)
596 {
597         ReplicationSlot *slot = MyReplicationSlot;
598
599         Assert(MyReplicationSlot != NULL);
600
601         SpinLockAcquire(&slot->mutex);
602         MyReplicationSlot->just_dirtied = true;
603         MyReplicationSlot->dirty = true;
604         SpinLockRelease(&slot->mutex);
605 }
606
607 /*
608  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
609  * guaranteeing it will be there after an eventual crash.
610  */
611 void
612 ReplicationSlotPersist(void)
613 {
614         ReplicationSlot *slot = MyReplicationSlot;
615
616         Assert(slot != NULL);
617         Assert(slot->data.persistency != RS_PERSISTENT);
618
619         SpinLockAcquire(&slot->mutex);
620         slot->data.persistency = RS_PERSISTENT;
621         SpinLockRelease(&slot->mutex);
622
623         ReplicationSlotMarkDirty();
624         ReplicationSlotSave();
625 }
626
627 /*
628  * Compute the oldest xmin across all slots and store it in the ProcArray.
629  *
630  * If already_locked is true, ProcArrayLock has already been acquired
631  * exclusively.
632  */
633 void
634 ReplicationSlotsComputeRequiredXmin(bool already_locked)
635 {
636         int                     i;
637         TransactionId agg_xmin = InvalidTransactionId;
638         TransactionId agg_catalog_xmin = InvalidTransactionId;
639
640         Assert(ReplicationSlotCtl != NULL);
641
642         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
643
644         for (i = 0; i < max_replication_slots; i++)
645         {
646                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
647                 TransactionId effective_xmin;
648                 TransactionId effective_catalog_xmin;
649
650                 if (!s->in_use)
651                         continue;
652
653                 SpinLockAcquire(&s->mutex);
654                 effective_xmin = s->effective_xmin;
655                 effective_catalog_xmin = s->effective_catalog_xmin;
656                 SpinLockRelease(&s->mutex);
657
658                 /* check the data xmin */
659                 if (TransactionIdIsValid(effective_xmin) &&
660                         (!TransactionIdIsValid(agg_xmin) ||
661                          TransactionIdPrecedes(effective_xmin, agg_xmin)))
662                         agg_xmin = effective_xmin;
663
664                 /* check the catalog xmin */
665                 if (TransactionIdIsValid(effective_catalog_xmin) &&
666                         (!TransactionIdIsValid(agg_catalog_xmin) ||
667                          TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
668                         agg_catalog_xmin = effective_catalog_xmin;
669         }
670
671         LWLockRelease(ReplicationSlotControlLock);
672
673         ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
674 }
675
676 /*
677  * Compute the oldest restart LSN across all slots and inform xlog module.
678  */
679 void
680 ReplicationSlotsComputeRequiredLSN(void)
681 {
682         int                     i;
683         XLogRecPtr      min_required = InvalidXLogRecPtr;
684
685         Assert(ReplicationSlotCtl != NULL);
686
687         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
688         for (i = 0; i < max_replication_slots; i++)
689         {
690                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
691                 XLogRecPtr      restart_lsn;
692
693                 if (!s->in_use)
694                         continue;
695
696                 SpinLockAcquire(&s->mutex);
697                 restart_lsn = s->data.restart_lsn;
698                 SpinLockRelease(&s->mutex);
699
700                 if (restart_lsn != InvalidXLogRecPtr &&
701                         (min_required == InvalidXLogRecPtr ||
702                          restart_lsn < min_required))
703                         min_required = restart_lsn;
704         }
705         LWLockRelease(ReplicationSlotControlLock);
706
707         XLogSetReplicationSlotMinimumLSN(min_required);
708 }
709
710 /*
711  * Compute the oldest WAL LSN required by *logical* decoding slots..
712  *
713  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
714  * slots exist.
715  *
716  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
717  * ignores physical replication slots.
718  *
719  * The results aren't required frequently, so we don't maintain a precomputed
720  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
721  */
722 XLogRecPtr
723 ReplicationSlotsComputeLogicalRestartLSN(void)
724 {
725         XLogRecPtr      result = InvalidXLogRecPtr;
726         int                     i;
727
728         if (max_replication_slots <= 0)
729                 return InvalidXLogRecPtr;
730
731         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
732
733         for (i = 0; i < max_replication_slots; i++)
734         {
735                 ReplicationSlot *s;
736                 XLogRecPtr      restart_lsn;
737
738                 s = &ReplicationSlotCtl->replication_slots[i];
739
740                 /* cannot change while ReplicationSlotCtlLock is held */
741                 if (!s->in_use)
742                         continue;
743
744                 /* we're only interested in logical slots */
745                 if (!SlotIsLogical(s))
746                         continue;
747
748                 /* read once, it's ok if it increases while we're checking */
749                 SpinLockAcquire(&s->mutex);
750                 restart_lsn = s->data.restart_lsn;
751                 SpinLockRelease(&s->mutex);
752
753                 if (result == InvalidXLogRecPtr ||
754                         restart_lsn < result)
755                         result = restart_lsn;
756         }
757
758         LWLockRelease(ReplicationSlotControlLock);
759
760         return result;
761 }
762
763 /*
764  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
765  * passed database oid.
766  *
767  * Returns true if there are any slots referencing the database. *nslots will
768  * be set to the absolute number of slots in the database, *nactive to ones
769  * currently active.
770  */
771 bool
772 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
773 {
774         int                     i;
775
776         *nslots = *nactive = 0;
777
778         if (max_replication_slots <= 0)
779                 return false;
780
781         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
782         for (i = 0; i < max_replication_slots; i++)
783         {
784                 ReplicationSlot *s;
785
786                 s = &ReplicationSlotCtl->replication_slots[i];
787
788                 /* cannot change while ReplicationSlotCtlLock is held */
789                 if (!s->in_use)
790                         continue;
791
792                 /* only logical slots are database specific, skip */
793                 if (!SlotIsLogical(s))
794                         continue;
795
796                 /* not our database, skip */
797                 if (s->data.database != dboid)
798                         continue;
799
800                 /* count slots with spinlock held */
801                 SpinLockAcquire(&s->mutex);
802                 (*nslots)++;
803                 if (s->active_pid != 0)
804                         (*nactive)++;
805                 SpinLockRelease(&s->mutex);
806         }
807         LWLockRelease(ReplicationSlotControlLock);
808
809         if (*nslots > 0)
810                 return true;
811         return false;
812 }
813
814 /*
815  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
816  * passed database oid. The caller should hold an exclusive lock on the
817  * pg_database oid for the database to prevent creation of new slots on the db
818  * or replay from existing slots.
819  *
820  * Another session that concurrently acquires an existing slot on the target DB
821  * (most likely to drop it) may cause this function to ERROR. If that happens
822  * it may have dropped some but not all slots.
823  *
824  * This routine isn't as efficient as it could be - but we don't drop
825  * databases often, especially databases with lots of slots.
826  */
827 void
828 ReplicationSlotsDropDBSlots(Oid dboid)
829 {
830         int                     i;
831
832         if (max_replication_slots <= 0)
833                 return;
834
835 restart:
836         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
837         for (i = 0; i < max_replication_slots; i++)
838         {
839                 ReplicationSlot *s;
840                 char       *slotname;
841                 int                     active_pid;
842
843                 s = &ReplicationSlotCtl->replication_slots[i];
844
845                 /* cannot change while ReplicationSlotCtlLock is held */
846                 if (!s->in_use)
847                         continue;
848
849                 /* only logical slots are database specific, skip */
850                 if (!SlotIsLogical(s))
851                         continue;
852
853                 /* not our database, skip */
854                 if (s->data.database != dboid)
855                         continue;
856
857                 /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
858                 SpinLockAcquire(&s->mutex);
859                 /* can't change while ReplicationSlotControlLock is held */
860                 slotname = NameStr(s->data.name);
861                 active_pid = s->active_pid;
862                 if (active_pid == 0)
863                 {
864                         MyReplicationSlot = s;
865                         s->active_pid = MyProcPid;
866                 }
867                 SpinLockRelease(&s->mutex);
868
869                 /*
870                  * Even though we hold an exclusive lock on the database object a
871                  * logical slot for that DB can still be active, e.g. if it's
872                  * concurrently being dropped by a backend connected to another DB.
873                  *
874                  * That's fairly unlikely in practice, so we'll just bail out.
875                  */
876                 if (active_pid)
877                         ereport(ERROR,
878                                         (errcode(ERRCODE_OBJECT_IN_USE),
879                                          errmsg("replication slot \"%s\" is active for PID %d",
880                                                         slotname, active_pid)));
881
882                 /*
883                  * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
884                  * holding ReplicationSlotControlLock over filesystem operations,
885                  * release ReplicationSlotControlLock and use
886                  * ReplicationSlotDropAcquired.
887                  *
888                  * As that means the set of slots could change, restart scan from the
889                  * beginning each time we release the lock.
890                  */
891                 LWLockRelease(ReplicationSlotControlLock);
892                 ReplicationSlotDropAcquired();
893                 goto restart;
894         }
895         LWLockRelease(ReplicationSlotControlLock);
896 }
897
898
899 /*
900  * Check whether the server's configuration supports using replication
901  * slots.
902  */
903 void
904 CheckSlotRequirements(void)
905 {
906         if (max_replication_slots == 0)
907                 ereport(ERROR,
908                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
909                                  (errmsg("replication slots can only be used if max_replication_slots > 0"))));
910
911         if (wal_level < WAL_LEVEL_REPLICA)
912                 ereport(ERROR,
913                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
914                                  errmsg("replication slots can only be used if wal_level >= replica")));
915 }
916
917 /*
918  * Reserve WAL for the currently active slot.
919  *
920  * Compute and set restart_lsn in a manner that's appropriate for the type of
921  * the slot and concurrency safe.
922  */
923 void
924 ReplicationSlotReserveWal(void)
925 {
926         ReplicationSlot *slot = MyReplicationSlot;
927
928         Assert(slot != NULL);
929         Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
930
931         /*
932          * The replication slot mechanism is used to prevent removal of required
933          * WAL. As there is no interlock between this routine and checkpoints, WAL
934          * segments could concurrently be removed when a now stale return value of
935          * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
936          * this happens we'll just retry.
937          */
938         while (true)
939         {
940                 XLogSegNo       segno;
941
942                 /*
943                  * For logical slots log a standby snapshot and start logical decoding
944                  * at exactly that position. That allows the slot to start up more
945                  * quickly.
946                  *
947                  * That's not needed (or indeed helpful) for physical slots as they'll
948                  * start replay at the last logged checkpoint anyway. Instead return
949                  * the location of the last redo LSN. While that slightly increases
950                  * the chance that we have to retry, it's where a base backup has to
951                  * start replay at.
952                  */
953                 if (!RecoveryInProgress() && SlotIsLogical(slot))
954                 {
955                         XLogRecPtr      flushptr;
956
957                         /* start at current insert position */
958                         slot->data.restart_lsn = GetXLogInsertRecPtr();
959
960                         /* make sure we have enough information to start */
961                         flushptr = LogStandbySnapshot();
962
963                         /* and make sure it's fsynced to disk */
964                         XLogFlush(flushptr);
965                 }
966                 else
967                 {
968                         slot->data.restart_lsn = GetRedoRecPtr();
969                 }
970
971                 /* prevent WAL removal as fast as possible */
972                 ReplicationSlotsComputeRequiredLSN();
973
974                 /*
975                  * If all required WAL is still there, great, otherwise retry. The
976                  * slot should prevent further removal of WAL, unless there's a
977                  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
978                  * the new restart_lsn above, so normally we should never need to loop
979                  * more than twice.
980                  */
981                 XLByteToSeg(slot->data.restart_lsn, segno);
982                 if (XLogGetLastRemovedSegno() < segno)
983                         break;
984         }
985 }
986
987 /*
988  * Flush all replication slots to disk.
989  *
990  * This needn't actually be part of a checkpoint, but it's a convenient
991  * location.
992  */
993 void
994 CheckPointReplicationSlots(void)
995 {
996         int                     i;
997
998         elog(DEBUG1, "performing replication slot checkpoint");
999
1000         /*
1001          * Prevent any slot from being created/dropped while we're active. As we
1002          * explicitly do *not* want to block iterating over replication_slots or
1003          * acquiring a slot we cannot take the control lock - but that's OK,
1004          * because holding ReplicationSlotAllocationLock is strictly stronger, and
1005          * enough to guarantee that nobody can change the in_use bits on us.
1006          */
1007         LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1008
1009         for (i = 0; i < max_replication_slots; i++)
1010         {
1011                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1012                 char            path[MAXPGPATH];
1013
1014                 if (!s->in_use)
1015                         continue;
1016
1017                 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1018                 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1019                 SaveSlotToPath(s, path, LOG);
1020         }
1021         LWLockRelease(ReplicationSlotAllocationLock);
1022 }
1023
1024 /*
1025  * Load all replication slots from disk into memory at server startup. This
1026  * needs to be run before we start crash recovery.
1027  */
1028 void
1029 StartupReplicationSlots(void)
1030 {
1031         DIR                *replication_dir;
1032         struct dirent *replication_de;
1033
1034         elog(DEBUG1, "starting up replication slots");
1035
1036         /* restore all slots by iterating over all on-disk entries */
1037         replication_dir = AllocateDir("pg_replslot");
1038         while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1039         {
1040                 struct stat statbuf;
1041                 char            path[MAXPGPATH + 12];
1042
1043                 if (strcmp(replication_de->d_name, ".") == 0 ||
1044                         strcmp(replication_de->d_name, "..") == 0)
1045                         continue;
1046
1047                 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1048
1049                 /* we're only creating directories here, skip if it's not our's */
1050                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1051                         continue;
1052
1053                 /* we crashed while a slot was being setup or deleted, clean up */
1054                 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1055                 {
1056                         if (!rmtree(path, true))
1057                         {
1058                                 ereport(WARNING,
1059                                                 (errcode_for_file_access(),
1060                                                  errmsg("could not remove directory \"%s\"", path)));
1061                                 continue;
1062                         }
1063                         fsync_fname("pg_replslot", true);
1064                         continue;
1065                 }
1066
1067                 /* looks like a slot in a normal state, restore */
1068                 RestoreSlotFromDisk(replication_de->d_name);
1069         }
1070         FreeDir(replication_dir);
1071
1072         /* currently no slots exist, we're done. */
1073         if (max_replication_slots <= 0)
1074                 return;
1075
1076         /* Now that we have recovered all the data, compute replication xmin */
1077         ReplicationSlotsComputeRequiredXmin(false);
1078         ReplicationSlotsComputeRequiredLSN();
1079 }
1080
1081 /* ----
1082  * Manipulation of on-disk state of replication slots
1083  *
1084  * NB: none of the routines below should take any notice whether a slot is the
1085  * current one or not, that's all handled a layer above.
1086  * ----
1087  */
1088 static void
1089 CreateSlotOnDisk(ReplicationSlot *slot)
1090 {
1091         char            tmppath[MAXPGPATH];
1092         char            path[MAXPGPATH];
1093         struct stat st;
1094
1095         /*
1096          * No need to take out the io_in_progress_lock, nobody else can see this
1097          * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1098          * takes out the lock, if we'd take the lock here, we'd deadlock.
1099          */
1100
1101         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1102         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1103
1104         /*
1105          * It's just barely possible that some previous effort to create or drop a
1106          * slot with this name left a temp directory lying around. If that seems
1107          * to be the case, try to remove it.  If the rmtree() fails, we'll error
1108          * out at the mkdir() below, so we don't bother checking success.
1109          */
1110         if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1111                 rmtree(tmppath, true);
1112
1113         /* Create and fsync the temporary slot directory. */
1114         if (mkdir(tmppath, S_IRWXU) < 0)
1115                 ereport(ERROR,
1116                                 (errcode_for_file_access(),
1117                                  errmsg("could not create directory \"%s\": %m",
1118                                                 tmppath)));
1119         fsync_fname(tmppath, true);
1120
1121         /* Write the actual state file. */
1122         slot->dirty = true;                     /* signal that we really need to write */
1123         SaveSlotToPath(slot, tmppath, ERROR);
1124
1125         /* Rename the directory into place. */
1126         if (rename(tmppath, path) != 0)
1127                 ereport(ERROR,
1128                                 (errcode_for_file_access(),
1129                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1130                                                 tmppath, path)));
1131
1132         /*
1133          * If we'd now fail - really unlikely - we wouldn't know whether this slot
1134          * would persist after an OS crash or not - so, force a restart. The
1135          * restart would try to fsync this again till it works.
1136          */
1137         START_CRIT_SECTION();
1138
1139         fsync_fname(path, true);
1140         fsync_fname("pg_replslot", true);
1141
1142         END_CRIT_SECTION();
1143 }
1144
1145 /*
1146  * Shared functionality between saving and creating a replication slot.
1147  */
1148 static void
1149 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1150 {
1151         char            tmppath[MAXPGPATH];
1152         char            path[MAXPGPATH];
1153         int                     fd;
1154         ReplicationSlotOnDisk cp;
1155         bool            was_dirty;
1156
1157         /* first check whether there's something to write out */
1158         SpinLockAcquire(&slot->mutex);
1159         was_dirty = slot->dirty;
1160         slot->just_dirtied = false;
1161         SpinLockRelease(&slot->mutex);
1162
1163         /* and don't do anything if there's nothing to write */
1164         if (!was_dirty)
1165                 return;
1166
1167         LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1168
1169         /* silence valgrind :( */
1170         memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1171
1172         sprintf(tmppath, "%s/state.tmp", dir);
1173         sprintf(path, "%s/state", dir);
1174
1175         fd = OpenTransientFile(tmppath,
1176                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1177                                                    S_IRUSR | S_IWUSR);
1178         if (fd < 0)
1179         {
1180                 ereport(elevel,
1181                                 (errcode_for_file_access(),
1182                                  errmsg("could not create file \"%s\": %m",
1183                                                 tmppath)));
1184                 return;
1185         }
1186
1187         cp.magic = SLOT_MAGIC;
1188         INIT_CRC32C(cp.checksum);
1189         cp.version = SLOT_VERSION;
1190         cp.length = ReplicationSlotOnDiskV2Size;
1191
1192         SpinLockAcquire(&slot->mutex);
1193
1194         memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1195
1196         SpinLockRelease(&slot->mutex);
1197
1198         COMP_CRC32C(cp.checksum,
1199                                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1200                                 SnapBuildOnDiskChecksummedSize);
1201         FIN_CRC32C(cp.checksum);
1202
1203         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1204         if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1205         {
1206                 int                     save_errno = errno;
1207
1208                 pgstat_report_wait_end();
1209                 CloseTransientFile(fd);
1210                 errno = save_errno;
1211                 ereport(elevel,
1212                                 (errcode_for_file_access(),
1213                                  errmsg("could not write to file \"%s\": %m",
1214                                                 tmppath)));
1215                 return;
1216         }
1217         pgstat_report_wait_end();
1218
1219         /* fsync the temporary file */
1220         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1221         if (pg_fsync(fd) != 0)
1222         {
1223                 int                     save_errno = errno;
1224
1225                 pgstat_report_wait_end();
1226                 CloseTransientFile(fd);
1227                 errno = save_errno;
1228                 ereport(elevel,
1229                                 (errcode_for_file_access(),
1230                                  errmsg("could not fsync file \"%s\": %m",
1231                                                 tmppath)));
1232                 return;
1233         }
1234         pgstat_report_wait_end();
1235
1236         CloseTransientFile(fd);
1237
1238         /* rename to permanent file, fsync file and directory */
1239         if (rename(tmppath, path) != 0)
1240         {
1241                 ereport(elevel,
1242                                 (errcode_for_file_access(),
1243                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1244                                                 tmppath, path)));
1245                 return;
1246         }
1247
1248         /* Check CreateSlot() for the reasoning of using a crit. section. */
1249         START_CRIT_SECTION();
1250
1251         fsync_fname(path, false);
1252         fsync_fname(dir, true);
1253         fsync_fname("pg_replslot", true);
1254
1255         END_CRIT_SECTION();
1256
1257         /*
1258          * Successfully wrote, unset dirty bit, unless somebody dirtied again
1259          * already.
1260          */
1261         SpinLockAcquire(&slot->mutex);
1262         if (!slot->just_dirtied)
1263                 slot->dirty = false;
1264         SpinLockRelease(&slot->mutex);
1265
1266         LWLockRelease(&slot->io_in_progress_lock);
1267 }
1268
1269 /*
1270  * Load a single slot from disk into memory.
1271  */
1272 static void
1273 RestoreSlotFromDisk(const char *name)
1274 {
1275         ReplicationSlotOnDisk cp;
1276         int                     i;
1277         char            path[MAXPGPATH + 22];
1278         int                     fd;
1279         bool            restored = false;
1280         int                     readBytes;
1281         pg_crc32c       checksum;
1282
1283         /* no need to lock here, no concurrent access allowed yet */
1284
1285         /* delete temp file if it exists */
1286         sprintf(path, "pg_replslot/%s/state.tmp", name);
1287         if (unlink(path) < 0 && errno != ENOENT)
1288                 ereport(PANIC,
1289                                 (errcode_for_file_access(),
1290                                  errmsg("could not remove file \"%s\": %m", path)));
1291
1292         sprintf(path, "pg_replslot/%s/state", name);
1293
1294         elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1295
1296         fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1297
1298         /*
1299          * We do not need to handle this as we are rename()ing the directory into
1300          * place only after we fsync()ed the state file.
1301          */
1302         if (fd < 0)
1303                 ereport(PANIC,
1304                                 (errcode_for_file_access(),
1305                                  errmsg("could not open file \"%s\": %m", path)));
1306
1307         /*
1308          * Sync state file before we're reading from it. We might have crashed
1309          * while it wasn't synced yet and we shouldn't continue on that basis.
1310          */
1311         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1312         if (pg_fsync(fd) != 0)
1313         {
1314                 CloseTransientFile(fd);
1315                 ereport(PANIC,
1316                                 (errcode_for_file_access(),
1317                                  errmsg("could not fsync file \"%s\": %m",
1318                                                 path)));
1319         }
1320         pgstat_report_wait_end();
1321
1322         /* Also sync the parent directory */
1323         START_CRIT_SECTION();
1324         fsync_fname(path, true);
1325         END_CRIT_SECTION();
1326
1327         /* read part of statefile that's guaranteed to be version independent */
1328         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1329         readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1330         pgstat_report_wait_end();
1331         if (readBytes != ReplicationSlotOnDiskConstantSize)
1332         {
1333                 int                     saved_errno = errno;
1334
1335                 CloseTransientFile(fd);
1336                 errno = saved_errno;
1337                 ereport(PANIC,
1338                                 (errcode_for_file_access(),
1339                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1340                                                 path, readBytes,
1341                                                 (uint32) ReplicationSlotOnDiskConstantSize)));
1342         }
1343
1344         /* verify magic */
1345         if (cp.magic != SLOT_MAGIC)
1346                 ereport(PANIC,
1347                                 (errcode_for_file_access(),
1348                                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1349                                                 path, cp.magic, SLOT_MAGIC)));
1350
1351         /* verify version */
1352         if (cp.version != SLOT_VERSION)
1353                 ereport(PANIC,
1354                                 (errcode_for_file_access(),
1355                         errmsg("replication slot file \"%s\" has unsupported version %u",
1356                                    path, cp.version)));
1357
1358         /* boundary check on length */
1359         if (cp.length != ReplicationSlotOnDiskV2Size)
1360                 ereport(PANIC,
1361                                 (errcode_for_file_access(),
1362                            errmsg("replication slot file \"%s\" has corrupted length %u",
1363                                           path, cp.length)));
1364
1365         /* Now that we know the size, read the entire file */
1366         pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1367         readBytes = read(fd,
1368                                          (char *) &cp + ReplicationSlotOnDiskConstantSize,
1369                                          cp.length);
1370         pgstat_report_wait_end();
1371         if (readBytes != cp.length)
1372         {
1373                 int                     saved_errno = errno;
1374
1375                 CloseTransientFile(fd);
1376                 errno = saved_errno;
1377                 ereport(PANIC,
1378                                 (errcode_for_file_access(),
1379                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1380                                                 path, readBytes, cp.length)));
1381         }
1382
1383         CloseTransientFile(fd);
1384
1385         /* now verify the CRC */
1386         INIT_CRC32C(checksum);
1387         COMP_CRC32C(checksum,
1388                                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1389                                 SnapBuildOnDiskChecksummedSize);
1390         FIN_CRC32C(checksum);
1391
1392         if (!EQ_CRC32C(checksum, cp.checksum))
1393                 ereport(PANIC,
1394                                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1395                                                 path, checksum, cp.checksum)));
1396
1397         /*
1398          * If we crashed with an ephemeral slot active, don't restore but delete
1399          * it.
1400          */
1401         if (cp.slotdata.persistency != RS_PERSISTENT)
1402         {
1403                 sprintf(path, "pg_replslot/%s", name);
1404
1405                 if (!rmtree(path, true))
1406                 {
1407                         ereport(WARNING,
1408                                         (errcode_for_file_access(),
1409                                          errmsg("could not remove directory \"%s\"", path)));
1410                 }
1411                 fsync_fname("pg_replslot", true);
1412                 return;
1413         }
1414
1415         /* nothing can be active yet, don't lock anything */
1416         for (i = 0; i < max_replication_slots; i++)
1417         {
1418                 ReplicationSlot *slot;
1419
1420                 slot = &ReplicationSlotCtl->replication_slots[i];
1421
1422                 if (slot->in_use)
1423                         continue;
1424
1425                 /* restore the entire set of persistent data */
1426                 memcpy(&slot->data, &cp.slotdata,
1427                            sizeof(ReplicationSlotPersistentData));
1428
1429                 /* initialize in memory state */
1430                 slot->effective_xmin = cp.slotdata.xmin;
1431                 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1432
1433                 slot->candidate_catalog_xmin = InvalidTransactionId;
1434                 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1435                 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1436                 slot->candidate_restart_valid = InvalidXLogRecPtr;
1437
1438                 slot->in_use = true;
1439                 slot->active_pid = 0;
1440
1441                 restored = true;
1442                 break;
1443         }
1444
1445         if (!restored)
1446                 ereport(PANIC,
1447                                 (errmsg("too many replication slots active before shutdown"),
1448                                  errhint("Increase max_replication_slots and try again.")));
1449 }