]> granicus.if.org Git - postgresql/blob - src/backend/replication/slot.c
pgindent run for 9.5
[postgresql] / src / backend / replication / slot.c
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *         Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2015, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "common/string.h"
44 #include "miscadmin.h"
45 #include "replication/slot.h"
46 #include "storage/fd.h"
47 #include "storage/proc.h"
48 #include "storage/procarray.h"
49
50 /*
51  * Replication slot on-disk data structure.
52  */
53 typedef struct ReplicationSlotOnDisk
54 {
55         /* first part of this struct needs to be version independent */
56
57         /* data not covered by checksum */
58         uint32          magic;
59         pg_crc32c       checksum;
60
61         /* data covered by checksum */
62         uint32          version;
63         uint32          length;
64
65         /*
66          * The actual data in the slot that follows can differ based on the above
67          * 'version'.
68          */
69
70         ReplicationSlotPersistentData slotdata;
71 } ReplicationSlotOnDisk;
72
73 /* size of version independent data */
74 #define ReplicationSlotOnDiskConstantSize \
75         offsetof(ReplicationSlotOnDisk, slotdata)
76 /* size of the part of the slot not covered by the checksum */
77 #define SnapBuildOnDiskNotChecksummedSize \
78         offsetof(ReplicationSlotOnDisk, version)
79 /* size of the part covered by the checksum */
80 #define SnapBuildOnDiskChecksummedSize \
81         sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
82 /* size of the slot data that is version dependent */
83 #define ReplicationSlotOnDiskV2Size \
84         sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
85
86 #define SLOT_MAGIC              0x1051CA1               /* format identifier */
87 #define SLOT_VERSION    2               /* version for new files */
88
89 /* Control array for replication slot management */
90 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
91
92 /* My backend's replication slot in the shared memory array */
93 ReplicationSlot *MyReplicationSlot = NULL;
94
95 /* GUCs */
96 int                     max_replication_slots = 0;      /* the maximum number of replication
97                                                                                  * slots */
98
99 static void ReplicationSlotDropAcquired(void);
100
101 /* internal persistency functions */
102 static void RestoreSlotFromDisk(const char *name);
103 static void CreateSlotOnDisk(ReplicationSlot *slot);
104 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
105
106 /*
107  * Report shared-memory space needed by ReplicationSlotShmemInit.
108  */
109 Size
110 ReplicationSlotsShmemSize(void)
111 {
112         Size            size = 0;
113
114         if (max_replication_slots == 0)
115                 return size;
116
117         size = offsetof(ReplicationSlotCtlData, replication_slots);
118         size = add_size(size,
119                                         mul_size(max_replication_slots, sizeof(ReplicationSlot)));
120
121         return size;
122 }
123
124 /*
125  * Allocate and initialize walsender-related shared memory.
126  */
127 void
128 ReplicationSlotsShmemInit(void)
129 {
130         bool            found;
131
132         if (max_replication_slots == 0)
133                 return;
134
135         ReplicationSlotCtl = (ReplicationSlotCtlData *)
136                 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
137                                                 &found);
138
139         if (!found)
140         {
141                 int                     i;
142
143                 /* First time through, so initialize */
144                 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
145
146                 for (i = 0; i < max_replication_slots; i++)
147                 {
148                         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
149
150                         /* everything else is zeroed by the memset above */
151                         SpinLockInit(&slot->mutex);
152                         slot->io_in_progress_lock = LWLockAssign();
153                 }
154         }
155 }
156
157 /*
158  * Check whether the passed slot name is valid and report errors at elevel.
159  *
160  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
161  * the name to be used as a directory name on every supported OS.
162  *
163  * Returns whether the directory name is valid or not if elevel < ERROR.
164  */
165 bool
166 ReplicationSlotValidateName(const char *name, int elevel)
167 {
168         const char *cp;
169
170         if (strlen(name) == 0)
171         {
172                 ereport(elevel,
173                                 (errcode(ERRCODE_INVALID_NAME),
174                                  errmsg("replication slot name \"%s\" is too short",
175                                                 name)));
176                 return false;
177         }
178
179         if (strlen(name) >= NAMEDATALEN)
180         {
181                 ereport(elevel,
182                                 (errcode(ERRCODE_NAME_TOO_LONG),
183                                  errmsg("replication slot name \"%s\" is too long",
184                                                 name)));
185                 return false;
186         }
187
188         for (cp = name; *cp; cp++)
189         {
190                 if (!((*cp >= 'a' && *cp <= 'z')
191                           || (*cp >= '0' && *cp <= '9')
192                           || (*cp == '_')))
193                 {
194                         ereport(elevel,
195                                         (errcode(ERRCODE_INVALID_NAME),
196                         errmsg("replication slot name \"%s\" contains invalid character",
197                                    name),
198                                          errhint("Replication slot names may only contain letters, numbers, and the underscore character.")));
199                         return false;
200                 }
201         }
202         return true;
203 }
204
205 /*
206  * Create a new replication slot and mark it as used by this backend.
207  *
208  * name: Name of the slot
209  * db_specific: logical decoding is db specific; if the slot is going to
210  *         be used for that pass true, otherwise false.
211  */
212 void
213 ReplicationSlotCreate(const char *name, bool db_specific,
214                                           ReplicationSlotPersistency persistency)
215 {
216         ReplicationSlot *slot = NULL;
217         int                     i;
218
219         Assert(MyReplicationSlot == NULL);
220
221         ReplicationSlotValidateName(name, ERROR);
222
223         /*
224          * If some other backend ran this code currently with us, we'd likely both
225          * allocate the same slot, and that would be bad.  We'd also be at risk of
226          * missing a name collision.  Also, we don't want to try to create a new
227          * slot while somebody's busy cleaning up an old one, because we might
228          * both be monkeying with the same directory.
229          */
230         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
231
232         /*
233          * Check for name collision, and identify an allocatable slot.  We need to
234          * hold ReplicationSlotControlLock in shared mode for this, so that nobody
235          * else can change the in_use flags while we're looking at them.
236          */
237         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
238         for (i = 0; i < max_replication_slots; i++)
239         {
240                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
241
242                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
243                         ereport(ERROR,
244                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
245                                          errmsg("replication slot \"%s\" already exists", name)));
246                 if (!s->in_use && slot == NULL)
247                         slot = s;
248         }
249         LWLockRelease(ReplicationSlotControlLock);
250
251         /* If all slots are in use, we're out of luck. */
252         if (slot == NULL)
253                 ereport(ERROR,
254                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
255                                  errmsg("all replication slots are in use"),
256                                  errhint("Free one or increase max_replication_slots.")));
257
258         /*
259          * Since this slot is not in use, nobody should be looking at any part of
260          * it other than the in_use field unless they're trying to allocate it.
261          * And since we hold ReplicationSlotAllocationLock, nobody except us can
262          * be doing that.  So it's safe to initialize the slot.
263          */
264         Assert(!slot->in_use);
265         Assert(slot->active_pid == 0);
266         slot->data.persistency = persistency;
267         slot->data.xmin = InvalidTransactionId;
268         slot->effective_xmin = InvalidTransactionId;
269         StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
270         slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
271         slot->data.restart_lsn = InvalidXLogRecPtr;
272
273         /*
274          * Create the slot on disk.  We haven't actually marked the slot allocated
275          * yet, so no special cleanup is required if this errors out.
276          */
277         CreateSlotOnDisk(slot);
278
279         /*
280          * We need to briefly prevent any other backend from iterating over the
281          * slots while we flip the in_use flag. We also need to set the active
282          * flag while holding the ControlLock as otherwise a concurrent
283          * SlotAcquire() could acquire the slot as well.
284          */
285         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
286
287         slot->in_use = true;
288
289         /* We can now mark the slot active, and that makes it our slot. */
290         {
291                 volatile ReplicationSlot *vslot = slot;
292
293                 SpinLockAcquire(&slot->mutex);
294                 Assert(vslot->active_pid == 0);
295                 vslot->active_pid = MyProcPid;
296                 SpinLockRelease(&slot->mutex);
297                 MyReplicationSlot = slot;
298         }
299
300         LWLockRelease(ReplicationSlotControlLock);
301
302         /*
303          * Now that the slot has been marked as in_use and in_active, it's safe to
304          * let somebody else try to allocate a slot.
305          */
306         LWLockRelease(ReplicationSlotAllocationLock);
307 }
308
309 /*
310  * Find a previously created slot and mark it as used by this backend.
311  */
312 void
313 ReplicationSlotAcquire(const char *name)
314 {
315         ReplicationSlot *slot = NULL;
316         int                     i;
317         int                     active_pid = 0;
318
319         Assert(MyReplicationSlot == NULL);
320
321         ReplicationSlotValidateName(name, ERROR);
322
323         /* Search for the named slot and mark it active if we find it. */
324         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
325         for (i = 0; i < max_replication_slots; i++)
326         {
327                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
328
329                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
330                 {
331                         volatile ReplicationSlot *vslot = s;
332
333                         SpinLockAcquire(&s->mutex);
334                         active_pid = vslot->active_pid;
335                         if (active_pid == 0)
336                                 vslot->active_pid = MyProcPid;
337                         SpinLockRelease(&s->mutex);
338                         slot = s;
339                         break;
340                 }
341         }
342         LWLockRelease(ReplicationSlotControlLock);
343
344         /* If we did not find the slot or it was already active, error out. */
345         if (slot == NULL)
346                 ereport(ERROR,
347                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
348                                  errmsg("replication slot \"%s\" does not exist", name)));
349         if (active_pid != 0)
350                 ereport(ERROR,
351                                 (errcode(ERRCODE_OBJECT_IN_USE),
352                            errmsg("replication slot \"%s\" is already active for pid %d",
353                                           name, active_pid)));
354
355         /* We made this slot active, so it's ours now. */
356         MyReplicationSlot = slot;
357 }
358
359 /*
360  * Release a replication slot, this or another backend can ReAcquire it
361  * later. Resources this slot requires will be preserved.
362  */
363 void
364 ReplicationSlotRelease(void)
365 {
366         ReplicationSlot *slot = MyReplicationSlot;
367
368         Assert(slot != NULL && slot->active_pid != 0);
369
370         if (slot->data.persistency == RS_EPHEMERAL)
371         {
372                 /*
373                  * Delete the slot. There is no !PANIC case where this is allowed to
374                  * fail, all that may happen is an incomplete cleanup of the on-disk
375                  * data.
376                  */
377                 ReplicationSlotDropAcquired();
378         }
379         else
380         {
381                 /* Mark slot inactive.  We're not freeing it, just disconnecting. */
382                 volatile ReplicationSlot *vslot = slot;
383
384                 SpinLockAcquire(&slot->mutex);
385                 vslot->active_pid = 0;
386                 SpinLockRelease(&slot->mutex);
387         }
388
389         MyReplicationSlot = NULL;
390
391         /* might not have been set when we've been a plain slot */
392         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
393         MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
394         LWLockRelease(ProcArrayLock);
395 }
396
397 /*
398  * Permanently drop replication slot identified by the passed in name.
399  */
400 void
401 ReplicationSlotDrop(const char *name)
402 {
403         Assert(MyReplicationSlot == NULL);
404
405         ReplicationSlotAcquire(name);
406
407         ReplicationSlotDropAcquired();
408 }
409
410 /*
411  * Permanently drop the currently acquired replication slot which will be
412  * released by the point this function returns.
413  */
414 static void
415 ReplicationSlotDropAcquired(void)
416 {
417         char            path[MAXPGPATH];
418         char            tmppath[MAXPGPATH];
419         ReplicationSlot *slot = MyReplicationSlot;
420
421         Assert(MyReplicationSlot != NULL);
422
423         /* slot isn't acquired anymore */
424         MyReplicationSlot = NULL;
425
426         /*
427          * If some other backend ran this code concurrently with us, we might try
428          * to delete a slot with a certain name while someone else was trying to
429          * create a slot with the same name.
430          */
431         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
432
433         /* Generate pathnames. */
434         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
435         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
436
437         /*
438          * Rename the slot directory on disk, so that we'll no longer recognize
439          * this as a valid slot.  Note that if this fails, we've got to mark the
440          * slot inactive before bailing out.  If we're dropping an ephemeral slot,
441          * we better never fail hard as the caller won't expect the slot to
442          * survive and this might get called during error handling.
443          */
444         if (rename(path, tmppath) == 0)
445         {
446                 /*
447                  * We need to fsync() the directory we just renamed and its parent to
448                  * make sure that our changes are on disk in a crash-safe fashion.  If
449                  * fsync() fails, we can't be sure whether the changes are on disk or
450                  * not.  For now, we handle that by panicking;
451                  * StartupReplicationSlots() will try to straighten it out after
452                  * restart.
453                  */
454                 START_CRIT_SECTION();
455                 fsync_fname(tmppath, true);
456                 fsync_fname("pg_replslot", true);
457                 END_CRIT_SECTION();
458         }
459         else
460         {
461                 volatile ReplicationSlot *vslot = slot;
462                 bool            fail_softly = slot->data.persistency == RS_EPHEMERAL;
463
464                 SpinLockAcquire(&slot->mutex);
465                 vslot->active_pid = 0;
466                 SpinLockRelease(&slot->mutex);
467
468                 ereport(fail_softly ? WARNING : ERROR,
469                                 (errcode_for_file_access(),
470                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
471                                                 path, tmppath)));
472         }
473
474         /*
475          * The slot is definitely gone.  Lock out concurrent scans of the array
476          * long enough to kill it.  It's OK to clear the active flag here without
477          * grabbing the mutex because nobody else can be scanning the array here,
478          * and nobody can be attached to this slot and thus access it without
479          * scanning the array.
480          */
481         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
482         slot->active_pid = 0;
483         slot->in_use = false;
484         LWLockRelease(ReplicationSlotControlLock);
485
486         /*
487          * Slot is dead and doesn't prevent resource removal anymore, recompute
488          * limits.
489          */
490         ReplicationSlotsComputeRequiredXmin(false);
491         ReplicationSlotsComputeRequiredLSN();
492
493         /*
494          * If removing the directory fails, the worst thing that will happen is
495          * that the user won't be able to create a new slot with the same name
496          * until the next server restart.  We warn about it, but that's all.
497          */
498         if (!rmtree(tmppath, true))
499                 ereport(WARNING,
500                                 (errcode_for_file_access(),
501                                  errmsg("could not remove directory \"%s\"", tmppath)));
502
503         /*
504          * We release this at the very end, so that nobody starts trying to create
505          * a slot while we're still cleaning up the detritus of the old one.
506          */
507         LWLockRelease(ReplicationSlotAllocationLock);
508 }
509
510 /*
511  * Serialize the currently acquired slot's state from memory to disk, thereby
512  * guaranteeing the current state will survive a crash.
513  */
514 void
515 ReplicationSlotSave(void)
516 {
517         char            path[MAXPGPATH];
518
519         Assert(MyReplicationSlot != NULL);
520
521         sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
522         SaveSlotToPath(MyReplicationSlot, path, ERROR);
523 }
524
525 /*
526  * Signal that it would be useful if the currently acquired slot would be
527  * flushed out to disk.
528  *
529  * Note that the actual flush to disk can be delayed for a long time, if
530  * required for correctness explicitly do a ReplicationSlotSave().
531  */
532 void
533 ReplicationSlotMarkDirty(void)
534 {
535         Assert(MyReplicationSlot != NULL);
536
537         {
538                 volatile ReplicationSlot *vslot = MyReplicationSlot;
539
540                 SpinLockAcquire(&vslot->mutex);
541                 MyReplicationSlot->just_dirtied = true;
542                 MyReplicationSlot->dirty = true;
543                 SpinLockRelease(&vslot->mutex);
544         }
545 }
546
547 /*
548  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
549  * guaranteeing it will be there after an eventual crash.
550  */
551 void
552 ReplicationSlotPersist(void)
553 {
554         ReplicationSlot *slot = MyReplicationSlot;
555
556         Assert(slot != NULL);
557         Assert(slot->data.persistency != RS_PERSISTENT);
558
559         {
560                 volatile ReplicationSlot *vslot = slot;
561
562                 SpinLockAcquire(&slot->mutex);
563                 vslot->data.persistency = RS_PERSISTENT;
564                 SpinLockRelease(&slot->mutex);
565         }
566
567         ReplicationSlotMarkDirty();
568         ReplicationSlotSave();
569 }
570
571 /*
572  * Compute the oldest xmin across all slots and store it in the ProcArray.
573  */
574 void
575 ReplicationSlotsComputeRequiredXmin(bool already_locked)
576 {
577         int                     i;
578         TransactionId agg_xmin = InvalidTransactionId;
579         TransactionId agg_catalog_xmin = InvalidTransactionId;
580
581         Assert(ReplicationSlotCtl != NULL);
582
583         if (!already_locked)
584                 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
585
586         for (i = 0; i < max_replication_slots; i++)
587         {
588                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
589                 TransactionId effective_xmin;
590                 TransactionId effective_catalog_xmin;
591
592                 if (!s->in_use)
593                         continue;
594
595                 {
596                         volatile ReplicationSlot *vslot = s;
597
598                         SpinLockAcquire(&s->mutex);
599                         effective_xmin = vslot->effective_xmin;
600                         effective_catalog_xmin = vslot->effective_catalog_xmin;
601                         SpinLockRelease(&s->mutex);
602                 }
603
604                 /* check the data xmin */
605                 if (TransactionIdIsValid(effective_xmin) &&
606                         (!TransactionIdIsValid(agg_xmin) ||
607                          TransactionIdPrecedes(effective_xmin, agg_xmin)))
608                         agg_xmin = effective_xmin;
609
610                 /* check the catalog xmin */
611                 if (TransactionIdIsValid(effective_catalog_xmin) &&
612                         (!TransactionIdIsValid(agg_catalog_xmin) ||
613                          TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
614                         agg_catalog_xmin = effective_catalog_xmin;
615         }
616
617         if (!already_locked)
618                 LWLockRelease(ReplicationSlotControlLock);
619
620         ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
621 }
622
623 /*
624  * Compute the oldest restart LSN across all slots and inform xlog module.
625  */
626 void
627 ReplicationSlotsComputeRequiredLSN(void)
628 {
629         int                     i;
630         XLogRecPtr      min_required = InvalidXLogRecPtr;
631
632         Assert(ReplicationSlotCtl != NULL);
633
634         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
635         for (i = 0; i < max_replication_slots; i++)
636         {
637                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
638                 XLogRecPtr      restart_lsn;
639
640                 if (!s->in_use)
641                         continue;
642
643                 {
644                         volatile ReplicationSlot *vslot = s;
645
646                         SpinLockAcquire(&s->mutex);
647                         restart_lsn = vslot->data.restart_lsn;
648                         SpinLockRelease(&s->mutex);
649                 }
650
651                 if (restart_lsn != InvalidXLogRecPtr &&
652                         (min_required == InvalidXLogRecPtr ||
653                          restart_lsn < min_required))
654                         min_required = restart_lsn;
655         }
656         LWLockRelease(ReplicationSlotControlLock);
657
658         XLogSetReplicationSlotMinimumLSN(min_required);
659 }
660
661 /*
662  * Compute the oldest WAL LSN required by *logical* decoding slots..
663  *
664  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
665  * slots exist.
666  *
667  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
668  * ignores physical replication slots.
669  *
670  * The results aren't required frequently, so we don't maintain a precomputed
671  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
672  */
673 XLogRecPtr
674 ReplicationSlotsComputeLogicalRestartLSN(void)
675 {
676         XLogRecPtr      result = InvalidXLogRecPtr;
677         int                     i;
678
679         if (max_replication_slots <= 0)
680                 return InvalidXLogRecPtr;
681
682         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
683
684         for (i = 0; i < max_replication_slots; i++)
685         {
686                 volatile ReplicationSlot *s;
687                 XLogRecPtr      restart_lsn;
688
689                 s = &ReplicationSlotCtl->replication_slots[i];
690
691                 /* cannot change while ReplicationSlotCtlLock is held */
692                 if (!s->in_use)
693                         continue;
694
695                 /* we're only interested in logical slots */
696                 if (s->data.database == InvalidOid)
697                         continue;
698
699                 /* read once, it's ok if it increases while we're checking */
700                 SpinLockAcquire(&s->mutex);
701                 restart_lsn = s->data.restart_lsn;
702                 SpinLockRelease(&s->mutex);
703
704                 if (result == InvalidXLogRecPtr ||
705                         restart_lsn < result)
706                         result = restart_lsn;
707         }
708
709         LWLockRelease(ReplicationSlotControlLock);
710
711         return result;
712 }
713
714 /*
715  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
716  * passed database oid.
717  *
718  * Returns true if there are any slots referencing the database. *nslots will
719  * be set to the absolute number of slots in the database, *nactive to ones
720  * currently active.
721  */
722 bool
723 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
724 {
725         int                     i;
726
727         *nslots = *nactive = 0;
728
729         if (max_replication_slots <= 0)
730                 return false;
731
732         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
733         for (i = 0; i < max_replication_slots; i++)
734         {
735                 volatile ReplicationSlot *s;
736
737                 s = &ReplicationSlotCtl->replication_slots[i];
738
739                 /* cannot change while ReplicationSlotCtlLock is held */
740                 if (!s->in_use)
741                         continue;
742
743                 /* not database specific, skip */
744                 if (s->data.database == InvalidOid)
745                         continue;
746
747                 /* not our database, skip */
748                 if (s->data.database != dboid)
749                         continue;
750
751                 /* count slots with spinlock held */
752                 SpinLockAcquire(&s->mutex);
753                 (*nslots)++;
754                 if (s->active_pid != 0)
755                         (*nactive)++;
756                 SpinLockRelease(&s->mutex);
757         }
758         LWLockRelease(ReplicationSlotControlLock);
759
760         if (*nslots > 0)
761                 return true;
762         return false;
763 }
764
765
766 /*
767  * Check whether the server's configuration supports using replication
768  * slots.
769  */
770 void
771 CheckSlotRequirements(void)
772 {
773         if (max_replication_slots == 0)
774                 ereport(ERROR,
775                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
776                                  (errmsg("replication slots can only be used if max_replication_slots > 0"))));
777
778         if (wal_level < WAL_LEVEL_ARCHIVE)
779                 ereport(ERROR,
780                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
781                                  errmsg("replication slots can only be used if wal_level >= archive")));
782 }
783
784 /*
785  * Flush all replication slots to disk.
786  *
787  * This needn't actually be part of a checkpoint, but it's a convenient
788  * location.
789  */
790 void
791 CheckPointReplicationSlots(void)
792 {
793         int                     i;
794
795         elog(DEBUG1, "performing replication slot checkpoint");
796
797         /*
798          * Prevent any slot from being created/dropped while we're active. As we
799          * explicitly do *not* want to block iterating over replication_slots or
800          * acquiring a slot we cannot take the control lock - but that's OK,
801          * because holding ReplicationSlotAllocationLock is strictly stronger, and
802          * enough to guarantee that nobody can change the in_use bits on us.
803          */
804         LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
805
806         for (i = 0; i < max_replication_slots; i++)
807         {
808                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
809                 char            path[MAXPGPATH];
810
811                 if (!s->in_use)
812                         continue;
813
814                 /* save the slot to disk, locking is handled in SaveSlotToPath() */
815                 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
816                 SaveSlotToPath(s, path, LOG);
817         }
818         LWLockRelease(ReplicationSlotAllocationLock);
819 }
820
821 /*
822  * Load all replication slots from disk into memory at server startup. This
823  * needs to be run before we start crash recovery.
824  */
825 void
826 StartupReplicationSlots(void)
827 {
828         DIR                *replication_dir;
829         struct dirent *replication_de;
830
831         elog(DEBUG1, "starting up replication slots");
832
833         /* restore all slots by iterating over all on-disk entries */
834         replication_dir = AllocateDir("pg_replslot");
835         while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
836         {
837                 struct stat statbuf;
838                 char            path[MAXPGPATH];
839
840                 if (strcmp(replication_de->d_name, ".") == 0 ||
841                         strcmp(replication_de->d_name, "..") == 0)
842                         continue;
843
844                 snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
845
846                 /* we're only creating directories here, skip if it's not our's */
847                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
848                         continue;
849
850                 /* we crashed while a slot was being setup or deleted, clean up */
851                 if (pg_str_endswith(replication_de->d_name, ".tmp"))
852                 {
853                         if (!rmtree(path, true))
854                         {
855                                 ereport(WARNING,
856                                                 (errcode_for_file_access(),
857                                                  errmsg("could not remove directory \"%s\"", path)));
858                                 continue;
859                         }
860                         fsync_fname("pg_replslot", true);
861                         continue;
862                 }
863
864                 /* looks like a slot in a normal state, restore */
865                 RestoreSlotFromDisk(replication_de->d_name);
866         }
867         FreeDir(replication_dir);
868
869         /* currently no slots exist, we're done. */
870         if (max_replication_slots <= 0)
871                 return;
872
873         /* Now that we have recovered all the data, compute replication xmin */
874         ReplicationSlotsComputeRequiredXmin(false);
875         ReplicationSlotsComputeRequiredLSN();
876 }
877
878 /* ----
879  * Manipulation of ondisk state of replication slots
880  *
881  * NB: none of the routines below should take any notice whether a slot is the
882  * current one or not, that's all handled a layer above.
883  * ----
884  */
885 static void
886 CreateSlotOnDisk(ReplicationSlot *slot)
887 {
888         char            tmppath[MAXPGPATH];
889         char            path[MAXPGPATH];
890         struct stat st;
891
892         /*
893          * No need to take out the io_in_progress_lock, nobody else can see this
894          * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
895          * takes out the lock, if we'd take the lock here, we'd deadlock.
896          */
897
898         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
899         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
900
901         /*
902          * It's just barely possible that some previous effort to create or drop a
903          * slot with this name left a temp directory lying around. If that seems
904          * to be the case, try to remove it.  If the rmtree() fails, we'll error
905          * out at the mkdir() below, so we don't bother checking success.
906          */
907         if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
908                 rmtree(tmppath, true);
909
910         /* Create and fsync the temporary slot directory. */
911         if (mkdir(tmppath, S_IRWXU) < 0)
912                 ereport(ERROR,
913                                 (errcode_for_file_access(),
914                                  errmsg("could not create directory \"%s\": %m",
915                                                 tmppath)));
916         fsync_fname(tmppath, true);
917
918         /* Write the actual state file. */
919         slot->dirty = true;                     /* signal that we really need to write */
920         SaveSlotToPath(slot, tmppath, ERROR);
921
922         /* Rename the directory into place. */
923         if (rename(tmppath, path) != 0)
924                 ereport(ERROR,
925                                 (errcode_for_file_access(),
926                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
927                                                 tmppath, path)));
928
929         /*
930          * If we'd now fail - really unlikely - we wouldn't know whether this slot
931          * would persist after an OS crash or not - so, force a restart. The
932          * restart would try to fysnc this again till it works.
933          */
934         START_CRIT_SECTION();
935
936         fsync_fname(path, true);
937         fsync_fname("pg_replslot", true);
938
939         END_CRIT_SECTION();
940 }
941
942 /*
943  * Shared functionality between saving and creating a replication slot.
944  */
945 static void
946 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
947 {
948         char            tmppath[MAXPGPATH];
949         char            path[MAXPGPATH];
950         int                     fd;
951         ReplicationSlotOnDisk cp;
952         bool            was_dirty;
953
954         /* first check whether there's something to write out */
955         {
956                 volatile ReplicationSlot *vslot = slot;
957
958                 SpinLockAcquire(&vslot->mutex);
959                 was_dirty = vslot->dirty;
960                 vslot->just_dirtied = false;
961                 SpinLockRelease(&vslot->mutex);
962         }
963
964         /* and don't do anything if there's nothing to write */
965         if (!was_dirty)
966                 return;
967
968         LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
969
970         /* silence valgrind :( */
971         memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
972
973         sprintf(tmppath, "%s/state.tmp", dir);
974         sprintf(path, "%s/state", dir);
975
976         fd = OpenTransientFile(tmppath,
977                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
978                                                    S_IRUSR | S_IWUSR);
979         if (fd < 0)
980         {
981                 ereport(elevel,
982                                 (errcode_for_file_access(),
983                                  errmsg("could not create file \"%s\": %m",
984                                                 tmppath)));
985                 return;
986         }
987
988         cp.magic = SLOT_MAGIC;
989         INIT_CRC32C(cp.checksum);
990         cp.version = SLOT_VERSION;
991         cp.length = ReplicationSlotOnDiskV2Size;
992
993         SpinLockAcquire(&slot->mutex);
994
995         memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
996
997         SpinLockRelease(&slot->mutex);
998
999         COMP_CRC32C(cp.checksum,
1000                                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1001                                 SnapBuildOnDiskChecksummedSize);
1002         FIN_CRC32C(cp.checksum);
1003
1004         if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1005         {
1006                 int                     save_errno = errno;
1007
1008                 CloseTransientFile(fd);
1009                 errno = save_errno;
1010                 ereport(elevel,
1011                                 (errcode_for_file_access(),
1012                                  errmsg("could not write to file \"%s\": %m",
1013                                                 tmppath)));
1014                 return;
1015         }
1016
1017         /* fsync the temporary file */
1018         if (pg_fsync(fd) != 0)
1019         {
1020                 int                     save_errno = errno;
1021
1022                 CloseTransientFile(fd);
1023                 errno = save_errno;
1024                 ereport(elevel,
1025                                 (errcode_for_file_access(),
1026                                  errmsg("could not fsync file \"%s\": %m",
1027                                                 tmppath)));
1028                 return;
1029         }
1030
1031         CloseTransientFile(fd);
1032
1033         /* rename to permanent file, fsync file and directory */
1034         if (rename(tmppath, path) != 0)
1035         {
1036                 ereport(elevel,
1037                                 (errcode_for_file_access(),
1038                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1039                                                 tmppath, path)));
1040                 return;
1041         }
1042
1043         /* Check CreateSlot() for the reasoning of using a crit. section. */
1044         START_CRIT_SECTION();
1045
1046         fsync_fname(path, false);
1047         fsync_fname((char *) dir, true);
1048         fsync_fname("pg_replslot", true);
1049
1050         END_CRIT_SECTION();
1051
1052         /*
1053          * Successfully wrote, unset dirty bit, unless somebody dirtied again
1054          * already.
1055          */
1056         {
1057                 volatile ReplicationSlot *vslot = slot;
1058
1059                 SpinLockAcquire(&vslot->mutex);
1060                 if (!vslot->just_dirtied)
1061                         vslot->dirty = false;
1062                 SpinLockRelease(&vslot->mutex);
1063         }
1064
1065         LWLockRelease(slot->io_in_progress_lock);
1066 }
1067
1068 /*
1069  * Load a single slot from disk into memory.
1070  */
1071 static void
1072 RestoreSlotFromDisk(const char *name)
1073 {
1074         ReplicationSlotOnDisk cp;
1075         int                     i;
1076         char            path[MAXPGPATH];
1077         int                     fd;
1078         bool            restored = false;
1079         int                     readBytes;
1080         pg_crc32c       checksum;
1081
1082         /* no need to lock here, no concurrent access allowed yet */
1083
1084         /* delete temp file if it exists */
1085         sprintf(path, "pg_replslot/%s/state.tmp", name);
1086         if (unlink(path) < 0 && errno != ENOENT)
1087                 ereport(PANIC,
1088                                 (errcode_for_file_access(),
1089                                  errmsg("could not remove file \"%s\": %m", path)));
1090
1091         sprintf(path, "pg_replslot/%s/state", name);
1092
1093         elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1094
1095         fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1096
1097         /*
1098          * We do not need to handle this as we are rename()ing the directory into
1099          * place only after we fsync()ed the state file.
1100          */
1101         if (fd < 0)
1102                 ereport(PANIC,
1103                                 (errcode_for_file_access(),
1104                                  errmsg("could not open file \"%s\": %m", path)));
1105
1106         /*
1107          * Sync state file before we're reading from it. We might have crashed
1108          * while it wasn't synced yet and we shouldn't continue on that basis.
1109          */
1110         if (pg_fsync(fd) != 0)
1111         {
1112                 CloseTransientFile(fd);
1113                 ereport(PANIC,
1114                                 (errcode_for_file_access(),
1115                                  errmsg("could not fsync file \"%s\": %m",
1116                                                 path)));
1117         }
1118
1119         /* Also sync the parent directory */
1120         START_CRIT_SECTION();
1121         fsync_fname(path, true);
1122         END_CRIT_SECTION();
1123
1124         /* read part of statefile that's guaranteed to be version independent */
1125         readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1126         if (readBytes != ReplicationSlotOnDiskConstantSize)
1127         {
1128                 int                     saved_errno = errno;
1129
1130                 CloseTransientFile(fd);
1131                 errno = saved_errno;
1132                 ereport(PANIC,
1133                                 (errcode_for_file_access(),
1134                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1135                                                 path, readBytes,
1136                                                 (uint32) ReplicationSlotOnDiskConstantSize)));
1137         }
1138
1139         /* verify magic */
1140         if (cp.magic != SLOT_MAGIC)
1141                 ereport(PANIC,
1142                                 (errcode_for_file_access(),
1143                                  errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
1144                                                 path, cp.magic, SLOT_MAGIC)));
1145
1146         /* verify version */
1147         if (cp.version != SLOT_VERSION)
1148                 ereport(PANIC,
1149                                 (errcode_for_file_access(),
1150                         errmsg("replication slot file \"%s\" has unsupported version %u",
1151                                    path, cp.version)));
1152
1153         /* boundary check on length */
1154         if (cp.length != ReplicationSlotOnDiskV2Size)
1155                 ereport(PANIC,
1156                                 (errcode_for_file_access(),
1157                            errmsg("replication slot file \"%s\" has corrupted length %u",
1158                                           path, cp.length)));
1159
1160         /* Now that we know the size, read the entire file */
1161         readBytes = read(fd,
1162                                          (char *) &cp + ReplicationSlotOnDiskConstantSize,
1163                                          cp.length);
1164         if (readBytes != cp.length)
1165         {
1166                 int                     saved_errno = errno;
1167
1168                 CloseTransientFile(fd);
1169                 errno = saved_errno;
1170                 ereport(PANIC,
1171                                 (errcode_for_file_access(),
1172                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1173                                                 path, readBytes, cp.length)));
1174         }
1175
1176         CloseTransientFile(fd);
1177
1178         /* now verify the CRC */
1179         INIT_CRC32C(checksum);
1180         COMP_CRC32C(checksum,
1181                                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1182                                 SnapBuildOnDiskChecksummedSize);
1183         FIN_CRC32C(checksum);
1184
1185         if (!EQ_CRC32C(checksum, cp.checksum))
1186                 ereport(PANIC,
1187                                 (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
1188                                                 path, checksum, cp.checksum)));
1189
1190         /*
1191          * If we crashed with an ephemeral slot active, don't restore but delete
1192          * it.
1193          */
1194         if (cp.slotdata.persistency != RS_PERSISTENT)
1195         {
1196                 sprintf(path, "pg_replslot/%s", name);
1197
1198                 if (!rmtree(path, true))
1199                 {
1200                         ereport(WARNING,
1201                                         (errcode_for_file_access(),
1202                                          errmsg("could not remove directory \"%s\"", path)));
1203                 }
1204                 fsync_fname("pg_replslot", true);
1205                 return;
1206         }
1207
1208         /* nothing can be active yet, don't lock anything */
1209         for (i = 0; i < max_replication_slots; i++)
1210         {
1211                 ReplicationSlot *slot;
1212
1213                 slot = &ReplicationSlotCtl->replication_slots[i];
1214
1215                 if (slot->in_use)
1216                         continue;
1217
1218                 /* restore the entire set of persistent data */
1219                 memcpy(&slot->data, &cp.slotdata,
1220                            sizeof(ReplicationSlotPersistentData));
1221
1222                 /* initialize in memory state */
1223                 slot->effective_xmin = cp.slotdata.xmin;
1224                 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1225
1226                 slot->candidate_catalog_xmin = InvalidTransactionId;
1227                 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1228                 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1229                 slot->candidate_restart_valid = InvalidXLogRecPtr;
1230
1231                 slot->in_use = true;
1232                 slot->active_pid = 0;
1233
1234                 restored = true;
1235                 break;
1236         }
1237
1238         if (!restored)
1239                 ereport(PANIC,
1240                                 (errmsg("too many replication slots active before shutdown"),
1241                                  errhint("Increase max_replication_slots and try again.")));
1242 }