]> granicus.if.org Git - postgresql/blob - src/backend/replication/slot.c
Introduce logical decoding.
[postgresql] / src / backend / replication / slot.c
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  *         Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster.  Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups).  The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot.  The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36
37 #include "postgres.h"
38
39 #include <unistd.h>
40 #include <sys/stat.h>
41
42 #include "access/transam.h"
43 #include "miscadmin.h"
44 #include "replication/slot.h"
45 #include "storage/fd.h"
46 #include "storage/proc.h"
47 #include "storage/procarray.h"
48
49 /*
50  * Replication slot on-disk data structure.
51  */
52 typedef struct ReplicationSlotOnDisk
53 {
54         /* first part of this struct needs to be version independent */
55
56         /* data not covered by checksum */
57         uint32          magic;
58         pg_crc32        checksum;
59
60         /* data covered by checksum */
61         uint32          version;
62         uint32          length;
63
64         ReplicationSlotPersistentData slotdata;
65 } ReplicationSlotOnDisk;
66
67 /* size of the part of the slot that is version independent */
68 #define ReplicationSlotOnDiskConstantSize \
69         offsetof(ReplicationSlotOnDisk, slotdata)
70 /* size of the slots that is not version indepenent */
71 #define ReplicationSlotOnDiskDynamicSize \
72         sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
73
74 #define SLOT_MAGIC              0x1051CA1               /* format identifier */
75 #define SLOT_VERSION    1                               /* version for new files */
76
77 /* Control array for replication slot management */
78 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
79
80 /* My backend's replication slot in the shared memory array */
81 ReplicationSlot *MyReplicationSlot = NULL;
82
83 /* GUCs */
84 int                     max_replication_slots = 0;      /* the maximum number of replication slots */
85
86 static void ReplicationSlotDropAcquired(void);
87
88 /* internal persistency functions */
89 static void RestoreSlotFromDisk(const char *name);
90 static void CreateSlotOnDisk(ReplicationSlot *slot);
91 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
92
93 /*
94  * Report shared-memory space needed by ReplicationSlotShmemInit.
95  */
96 Size
97 ReplicationSlotsShmemSize(void)
98 {
99         Size            size = 0;
100
101         if (max_replication_slots == 0)
102                 return size;
103
104         size = offsetof(ReplicationSlotCtlData, replication_slots);
105         size = add_size(size,
106                                         mul_size(max_replication_slots, sizeof(ReplicationSlot)));
107
108         return size;
109 }
110
111 /*
112  * Allocate and initialize walsender-related shared memory.
113  */
114 void
115 ReplicationSlotsShmemInit(void)
116 {
117         bool            found;
118
119         if (max_replication_slots == 0)
120                 return;
121
122         ReplicationSlotCtl = (ReplicationSlotCtlData *)
123                 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
124                                                 &found);
125
126         if (!found)
127         {
128                 int                     i;
129
130                 /* First time through, so initialize */
131                 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
132
133                 for (i = 0; i < max_replication_slots; i++)
134                 {
135                         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
136
137                         /* everything else is zeroed by the memset above */
138                         SpinLockInit(&slot->mutex);
139                         slot->io_in_progress_lock = LWLockAssign();
140                 }
141         }
142 }
143
144 /*
145  * Check whether the passed slot name is valid and report errors at elevel.
146  *
147  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
148  * the name to be used as a directory name on every supported OS.
149  *
150  * Returns whether the directory name is valid or not if elevel < ERROR.
151  */
152 bool
153 ReplicationSlotValidateName(const char *name, int elevel)
154 {
155         const char *cp;
156
157         if (strlen(name) == 0)
158         {
159                 ereport(elevel,
160                                 (errcode(ERRCODE_INVALID_NAME),
161                                  errmsg("replication slot name \"%s\" is too short",
162                                                 name)));
163                 return false;
164         }
165
166         if (strlen(name) >= NAMEDATALEN)
167         {
168                 ereport(elevel,
169                                 (errcode(ERRCODE_NAME_TOO_LONG),
170                                  errmsg("replication slot name \"%s\" is too long",
171                                                 name)));
172                 return false;
173         }
174
175         for (cp = name; *cp; cp++)
176         {
177                 if (!((*cp >= 'a' && *cp <= 'z')
178                           || (*cp >= '0' && *cp <= '9')
179                           || (*cp == '_')))
180                 {
181                         ereport(elevel,
182                                         (errcode(ERRCODE_INVALID_NAME),
183                                          errmsg("replication slot name \"%s\" contains invalid character",
184                                                         name),
185                                          errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
186                         return false;
187                 }
188         }
189         return true;
190 }
191
192 /*
193  * Create a new replication slot and mark it as used by this backend.
194  *
195  * name: Name of the slot
196  * db_specific: logical decoding is db specific; if the slot is going to
197  *     be used for that pass true, otherwise false.
198  */
199 void
200 ReplicationSlotCreate(const char *name, bool db_specific,
201                                           ReplicationSlotPersistency persistency)
202 {
203         ReplicationSlot *slot = NULL;
204         int                     i;
205
206         Assert(MyReplicationSlot == NULL);
207
208         ReplicationSlotValidateName(name, ERROR);
209
210         /*
211          * If some other backend ran this code currently with us, we'd likely
212          * both allocate the same slot, and that would be bad.  We'd also be
213          * at risk of missing a name collision.  Also, we don't want to try to
214          * create a new slot while somebody's busy cleaning up an old one, because
215          * we might both be monkeying with the same directory.
216          */
217         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
218
219         /*
220          * Check for name collision, and identify an allocatable slot.  We need
221          * to hold ReplicationSlotControlLock in shared mode for this, so that
222          * nobody else can change the in_use flags while we're looking at them.
223          */
224         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
225         for (i = 0; i < max_replication_slots; i++)
226         {
227                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
228
229                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
230                         ereport(ERROR,
231                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
232                                          errmsg("replication slot \"%s\" already exists", name)));
233                 if (!s->in_use && slot == NULL)
234                         slot = s;
235         }
236         LWLockRelease(ReplicationSlotControlLock);
237
238         /* If all slots are in use, we're out of luck. */
239         if (slot == NULL)
240                 ereport(ERROR,
241                                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
242                                  errmsg("all replication slots are in use"),
243                                  errhint("Free one or increase max_replication_slots.")));
244
245         /*
246          * Since this slot is not in use, nobody should be looking at any
247          * part of it other than the in_use field unless they're trying to allocate
248          * it.  And since we hold ReplicationSlotAllocationLock, nobody except us
249          * can be doing that.  So it's safe to initialize the slot.
250          */
251         Assert(!slot->in_use);
252         Assert(!slot->active);
253         slot->data.persistency = persistency;
254         slot->data.xmin = InvalidTransactionId;
255         slot->effective_xmin = InvalidTransactionId;
256         strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
257         NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
258         slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
259         slot->data.restart_lsn = InvalidXLogRecPtr;
260
261         /*
262          * Create the slot on disk.  We haven't actually marked the slot allocated
263          * yet, so no special cleanup is required if this errors out.
264          */
265         CreateSlotOnDisk(slot);
266
267         /*
268          * We need to briefly prevent any other backend from iterating over the
269          * slots while we flip the in_use flag. We also need to set the active
270          * flag while holding the ControlLock as otherwise a concurrent
271          * SlotAcquire() could acquire the slot as well.
272          */
273         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
274
275         slot->in_use = true;
276
277         /* We can now mark the slot active, and that makes it our slot. */
278         {
279                 volatile ReplicationSlot *vslot = slot;
280
281                 SpinLockAcquire(&slot->mutex);
282                 Assert(!vslot->active);
283                 vslot->active = true;
284                 SpinLockRelease(&slot->mutex);
285                 MyReplicationSlot = slot;
286         }
287
288         LWLockRelease(ReplicationSlotControlLock);
289
290         /*
291          * Now that the slot has been marked as in_use and in_active, it's safe to
292          * let somebody else try to allocate a slot.
293          */
294         LWLockRelease(ReplicationSlotAllocationLock);
295 }
296
297 /*
298  * Find a previously created slot and mark it as used by this backend.
299  */
300 void
301 ReplicationSlotAcquire(const char *name)
302 {
303         ReplicationSlot *slot = NULL;
304         int                     i;
305         bool            active = false;
306
307         Assert(MyReplicationSlot == NULL);
308
309         ReplicationSlotValidateName(name, ERROR);
310
311         /* Search for the named slot and mark it active if we find it. */
312         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
313         for (i = 0; i < max_replication_slots; i++)
314         {
315                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
316
317                 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
318                 {
319                         volatile ReplicationSlot *vslot = s;
320
321                         SpinLockAcquire(&s->mutex);
322                         active = vslot->active;
323                         vslot->active = true;
324                         SpinLockRelease(&s->mutex);
325                         slot = s;
326                         break;
327                 }
328         }
329         LWLockRelease(ReplicationSlotControlLock);
330
331         /* If we did not find the slot or it was already active, error out. */
332         if (slot == NULL)
333                 ereport(ERROR,
334                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
335                                  errmsg("replication slot \"%s\" does not exist", name)));
336         if (active)
337                 ereport(ERROR,
338                                 (errcode(ERRCODE_OBJECT_IN_USE),
339                                  errmsg("replication slot \"%s\" is already active", name)));
340
341         /* We made this slot active, so it's ours now. */
342         MyReplicationSlot = slot;
343 }
344
345 /*
346  * Release a replication slot, this or another backend can ReAcquire it
347  * later. Resources this slot requires will be preserved.
348  */
349 void
350 ReplicationSlotRelease(void)
351 {
352         ReplicationSlot *slot = MyReplicationSlot;
353
354         Assert(slot != NULL && slot->active);
355
356         if (slot->data.persistency == RS_EPHEMERAL)
357         {
358                 /*
359                  * Delete the slot. There is no !PANIC case where this is allowed to
360                  * fail, all that may happen is an incomplete cleanup of the on-disk
361                  * data.
362                  */
363                 ReplicationSlotDropAcquired();
364         }
365         else
366         {
367                 /* Mark slot inactive.  We're not freeing it, just disconnecting. */
368                 volatile ReplicationSlot *vslot = slot;
369                 SpinLockAcquire(&slot->mutex);
370                 vslot->active = false;
371                 SpinLockRelease(&slot->mutex);
372         }
373
374         MyReplicationSlot = NULL;
375
376         /* might not have been set when we've been a plain slot */
377         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
378         MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
379         LWLockRelease(ProcArrayLock);
380 }
381
382 /*
383  * Permanently drop replication slot identified by the passed in name.
384  */
385 void
386 ReplicationSlotDrop(const char *name)
387 {
388         Assert(MyReplicationSlot == NULL);
389
390         ReplicationSlotAcquire(name);
391
392         ReplicationSlotDropAcquired();
393 }
394
395 /*
396  * Permanently drop the currently acquired replication slot which will be
397  * released by the point this function returns.
398  */
399 static void
400 ReplicationSlotDropAcquired(void)
401 {
402         char            path[MAXPGPATH];
403         char            tmppath[MAXPGPATH];
404         ReplicationSlot *slot = MyReplicationSlot;
405
406         Assert(MyReplicationSlot != NULL);
407
408         /* slot isn't acquired anymore */
409         MyReplicationSlot = NULL;
410
411         /*
412          * If some other backend ran this code concurrently with us, we might try
413          * to delete a slot with a certain name while someone else was trying to
414          * create a slot with the same name.
415          */
416         LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
417
418         /* Generate pathnames. */
419         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
420         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
421
422         /*
423          * Rename the slot directory on disk, so that we'll no longer recognize
424          * this as a valid slot.  Note that if this fails, we've got to mark the
425          * slot inactive before bailing out.  If we're dropping a ephemeral slot,
426          * we better never fail hard as the caller won't expect the slot to
427          * survive and this might get called during error handling.
428          */
429         if (rename(path, tmppath) == 0)
430         {
431                 /*
432                  * We need to fsync() the directory we just renamed and its parent to
433                  * make sure that our changes are on disk in a crash-safe fashion.  If
434                  * fsync() fails, we can't be sure whether the changes are on disk or
435                  * not.  For now, we handle that by panicking;
436                  * StartupReplicationSlots() will try to straighten it out after
437                  * restart.
438                  */
439                 START_CRIT_SECTION();
440                 fsync_fname(tmppath, true);
441                 fsync_fname("pg_replslot", true);
442                 END_CRIT_SECTION();
443         }
444         else
445         {
446                 volatile ReplicationSlot *vslot = slot;
447                 bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
448
449                 SpinLockAcquire(&slot->mutex);
450                 vslot->active = false;
451                 SpinLockRelease(&slot->mutex);
452
453                 ereport(fail_softly ? WARNING : ERROR,
454                                 (errcode_for_file_access(),
455                                  errmsg("could not rename \"%s\" to \"%s\": %m",
456                                                 path, tmppath)));
457         }
458
459         /*
460          * The slot is definitely gone.  Lock out concurrent scans of the array
461          * long enough to kill it.  It's OK to clear the active flag here without
462          * grabbing the mutex because nobody else can be scanning the array here,
463          * and nobody can be attached to this slot and thus access it without
464          * scanning the array.
465          */
466         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
467         slot->active = false;
468         slot->in_use = false;
469         LWLockRelease(ReplicationSlotControlLock);
470
471         /*
472          * Slot is dead and doesn't prevent resource removal anymore, recompute
473          * limits.
474          */
475         ReplicationSlotsComputeRequiredXmin(false);
476         ReplicationSlotsComputeRequiredLSN();
477
478         /*
479          * If removing the directory fails, the worst thing that will happen is
480          * that the user won't be able to create a new slot with the same name
481          * until the next server restart.  We warn about it, but that's all.
482          */
483         if (!rmtree(tmppath, true))
484                 ereport(WARNING,
485                                 (errcode_for_file_access(),
486                                  errmsg("could not remove directory \"%s\"", tmppath)));
487
488         /*
489          * We release this at the very end, so that nobody starts trying to create
490          * a slot while we're still cleaning up the detritus of the old one.
491          */
492         LWLockRelease(ReplicationSlotAllocationLock);
493 }
494
495 /*
496  * Serialize the currently acquired slot's state from memory to disk, thereby
497  * guaranteeing the current state will survive a crash.
498  */
499 void
500 ReplicationSlotSave(void)
501 {
502         char            path[MAXPGPATH];
503
504         Assert(MyReplicationSlot != NULL);
505
506         sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
507         SaveSlotToPath(MyReplicationSlot, path, ERROR);
508 }
509
510 /*
511  * Signal that it would be useful if the currently acquired slot would be
512  * flushed out to disk.
513  *
514  * Note that the actual flush to disk can be delayed for a long time, if
515  * required for correctness explicitly do a ReplicationSlotSave().
516  */
517 void
518 ReplicationSlotMarkDirty(void)
519 {
520         Assert(MyReplicationSlot != NULL);
521
522         {
523                 volatile ReplicationSlot *vslot = MyReplicationSlot;
524
525                 SpinLockAcquire(&vslot->mutex);
526                 MyReplicationSlot->just_dirtied = true;
527                 MyReplicationSlot->dirty = true;
528                 SpinLockRelease(&vslot->mutex);
529         }
530 }
531
532 /*
533  * Convert a slot that's marked as RS_DROP_ON_ERROR to a RS_PERSISTENT slot,
534  * guaranteeing it will be there after a eventual crash.
535  */
536 void
537 ReplicationSlotPersist(void)
538 {
539         ReplicationSlot *slot = MyReplicationSlot;
540
541         Assert(slot != NULL);
542         Assert(slot->data.persistency != RS_PERSISTENT);
543
544         {
545                 volatile ReplicationSlot *vslot = slot;
546
547                 SpinLockAcquire(&slot->mutex);
548                 vslot->data.persistency = RS_PERSISTENT;
549                 SpinLockRelease(&slot->mutex);
550         }
551
552         ReplicationSlotMarkDirty();
553         ReplicationSlotSave();
554 }
555
556 /*
557  * Compute the oldest xmin across all slots and store it in the ProcArray.
558  */
559 void
560 ReplicationSlotsComputeRequiredXmin(bool already_locked)
561 {
562         int                     i;
563         TransactionId agg_xmin = InvalidTransactionId;
564         TransactionId agg_catalog_xmin = InvalidTransactionId;
565
566         Assert(ReplicationSlotCtl != NULL);
567
568         if (!already_locked)
569                 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
570
571         for (i = 0; i < max_replication_slots; i++)
572         {
573                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
574                 TransactionId   effective_xmin;
575                 TransactionId   effective_catalog_xmin;
576
577                 if (!s->in_use)
578                         continue;
579
580                 {
581                         volatile ReplicationSlot *vslot = s;
582
583                         SpinLockAcquire(&s->mutex);
584                         effective_xmin = vslot->effective_xmin;
585                         effective_catalog_xmin = vslot->effective_catalog_xmin;
586                         SpinLockRelease(&s->mutex);
587                 }
588
589                 /* check the data xmin */
590                 if (TransactionIdIsValid(effective_xmin) &&
591                         (!TransactionIdIsValid(agg_xmin) ||
592                          TransactionIdPrecedes(effective_xmin, agg_xmin)))
593                         agg_xmin = effective_xmin;
594
595                 /* check the catalog xmin */
596                 if (TransactionIdIsValid(effective_catalog_xmin) &&
597                         (!TransactionIdIsValid(agg_catalog_xmin) ||
598                          TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
599                         agg_catalog_xmin = effective_catalog_xmin;
600         }
601
602         if (!already_locked)
603                 LWLockRelease(ReplicationSlotControlLock);
604
605         ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
606 }
607
608 /*
609  * Compute the oldest restart LSN across all slots and inform xlog module.
610  */
611 void
612 ReplicationSlotsComputeRequiredLSN(void)
613 {
614         int                     i;
615         XLogRecPtr  min_required = InvalidXLogRecPtr;
616
617         Assert(ReplicationSlotCtl != NULL);
618
619         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
620         for (i = 0; i < max_replication_slots; i++)
621         {
622                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
623                 XLogRecPtr restart_lsn;
624
625                 if (!s->in_use)
626                         continue;
627
628                 {
629                         volatile ReplicationSlot *vslot = s;
630
631                         SpinLockAcquire(&s->mutex);
632                         restart_lsn = vslot->data.restart_lsn;
633                         SpinLockRelease(&s->mutex);
634                 }
635
636                 if (restart_lsn != InvalidXLogRecPtr &&
637                         (min_required == InvalidXLogRecPtr ||
638                          restart_lsn < min_required))
639                         min_required = restart_lsn;
640         }
641         LWLockRelease(ReplicationSlotControlLock);
642
643         XLogSetReplicationSlotMinimumLSN(min_required);
644 }
645
646 /*
647  * Compute the oldest WAL LSN required by *logical* decoding slots..
648  *
649  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
650  * slots exist.
651  *
652  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
653  * ignores physical replication slots.
654  *
655  * The results aren't required frequently, so we don't maintain a precomputed
656  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
657  */
658 XLogRecPtr
659 ReplicationSlotsComputeLogicalRestartLSN(void)
660 {
661         XLogRecPtr      result = InvalidXLogRecPtr;
662         int                     i;
663
664         if (max_replication_slots <= 0)
665                 return InvalidXLogRecPtr;
666
667         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
668
669         for (i = 0; i < max_replication_slots; i++)
670         {
671                 volatile ReplicationSlot *s;
672                 XLogRecPtr              restart_lsn;
673
674                 s = &ReplicationSlotCtl->replication_slots[i];
675
676                 /* cannot change while ReplicationSlotCtlLock is held */
677                 if (!s->in_use)
678                         continue;
679
680                 /* we're only interested in logical slots */
681                 if (s->data.database == InvalidOid)
682                         continue;
683
684                 /* read once, it's ok if it increases while we're checking */
685                 SpinLockAcquire(&s->mutex);
686                 restart_lsn = s->data.restart_lsn;
687                 SpinLockRelease(&s->mutex);
688
689                 if (result == InvalidXLogRecPtr ||
690                         restart_lsn < result)
691                         result = restart_lsn;
692         }
693
694         LWLockRelease(ReplicationSlotControlLock);
695
696         return result;
697 }
698
699 /*
700  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
701  * passed database oid.
702  *
703  * Returns true if there are any slots referencing the database. *nslots will
704  * be set to the absolute number of slots in the database, *nactive to ones
705  * currently active.
706  */
707 bool
708 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
709 {
710         int                     i;
711
712         *nslots = *nactive = 0;
713
714         if (max_replication_slots <= 0)
715                 return false;
716
717         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
718         for (i = 0; i < max_replication_slots; i++)
719         {
720                 volatile ReplicationSlot *s;
721
722                 s = &ReplicationSlotCtl->replication_slots[i];
723
724                 /* cannot change while ReplicationSlotCtlLock is held */
725                 if (!s->in_use)
726                         continue;
727
728                 /* not database specific, skip */
729                 if (s->data.database == InvalidOid)
730
731                 /* not our database, skip */
732                 if (s->data.database != dboid)
733                         continue;
734
735                 /* count slots with spinlock held */
736                 SpinLockAcquire(&s->mutex);
737                 (*nslots)++;
738                 if (s->active)
739                         (*nactive)++;
740                 SpinLockRelease(&s->mutex);
741         }
742         LWLockRelease(ReplicationSlotControlLock);
743
744         if (*nslots > 0)
745                 return true;
746         return false;
747 }
748
749
750 /*
751  * Check whether the server's configuration supports using replication
752  * slots.
753  */
754 void
755 CheckSlotRequirements(void)
756 {
757         if (max_replication_slots == 0)
758                 ereport(ERROR,
759                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
760                                  (errmsg("replication slots can only be used if max_replication_slots > 0"))));
761
762         if (wal_level < WAL_LEVEL_ARCHIVE)
763                 ereport(ERROR,
764                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
765                                  errmsg("replication slots can only be used if wal_level >= archive")));
766 }
767
768 /*
769  * Returns whether the string `str' has the postfix `end'.
770  */
771 static bool
772 string_endswith(const char *str, const char *end)
773 {
774         size_t slen = strlen(str);
775         size_t elen = strlen(end);
776
777         /* can't be a postfix if longer */
778         if (elen > slen)
779                 return false;
780
781         /* compare the end of the strings */
782         str += slen - elen;
783         return strcmp(str, end) == 0;
784 }
785
786 /*
787  * Flush all replication slots to disk.
788  *
789  * This needn't actually be part of a checkpoint, but it's a convenient
790  * location.
791  */
792 void
793 CheckPointReplicationSlots(void)
794 {
795         int                     i;
796
797         ereport(DEBUG1,
798                         (errmsg("performing replication slot checkpoint")));
799
800         /*
801          * Prevent any slot from being created/dropped while we're active. As we
802          * explicitly do *not* want to block iterating over replication_slots or
803          * acquiring a slot we cannot take the control lock - but that's OK,
804          * because holding ReplicationSlotAllocationLock is strictly stronger,
805          * and enough to guarantee that nobody can change the in_use bits on us.
806          */
807         LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
808
809         for (i = 0; i < max_replication_slots; i++)
810         {
811                 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
812                 char            path[MAXPGPATH];
813
814                 if (!s->in_use)
815                         continue;
816
817                 /* save the slot to disk, locking is handled in SaveSlotToPath() */
818                 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
819                 SaveSlotToPath(s, path, LOG);
820         }
821         LWLockRelease(ReplicationSlotAllocationLock);
822 }
823
824 /*
825  * Load all replication slots from disk into memory at server startup. This
826  * needs to be run before we start crash recovery.
827  */
828 void
829 StartupReplicationSlots(XLogRecPtr checkPointRedo)
830 {
831         DIR                *replication_dir;
832         struct dirent *replication_de;
833
834         ereport(DEBUG1,
835                         (errmsg("starting up replication slots")));
836
837         /* restore all slots by iterating over all on-disk entries */
838         replication_dir = AllocateDir("pg_replslot");
839         while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
840         {
841                 struct stat     statbuf;
842                 char            path[MAXPGPATH];
843
844                 if (strcmp(replication_de->d_name, ".") == 0 ||
845                         strcmp(replication_de->d_name, "..") == 0)
846                         continue;
847
848                 snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
849
850                 /* we're only creating directories here, skip if it's not our's */
851                 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
852                         continue;
853
854                 /* we crashed while a slot was being setup or deleted, clean up */
855                 if (string_endswith(replication_de->d_name, ".tmp"))
856                 {
857                         if (!rmtree(path, true))
858                         {
859                                 ereport(WARNING,
860                                                 (errcode_for_file_access(),
861                                                  errmsg("could not remove directory \"%s\"", path)));
862                                 continue;
863                         }
864                         fsync_fname("pg_replslot", true);
865                         continue;
866                 }
867
868                 /* looks like a slot in a normal state, restore */
869                 RestoreSlotFromDisk(replication_de->d_name);
870         }
871         FreeDir(replication_dir);
872
873         /* currently no slots exist, we're done. */
874         if (max_replication_slots <= 0)
875                 return;
876
877         /* Now that we have recovered all the data, compute replication xmin */
878         ReplicationSlotsComputeRequiredXmin(false);
879         ReplicationSlotsComputeRequiredLSN();
880 }
881
882 /* ----
883  * Manipulation of ondisk state of replication slots
884  *
885  * NB: none of the routines below should take any notice whether a slot is the
886  * current one or not, that's all handled a layer above.
887  * ----
888  */
889 static void
890 CreateSlotOnDisk(ReplicationSlot *slot)
891 {
892         char            tmppath[MAXPGPATH];
893         char            path[MAXPGPATH];
894         struct stat     st;
895
896         /*
897          * No need to take out the io_in_progress_lock, nobody else can see this
898          * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
899          * takes out the lock, if we'd take the lock here, we'd deadlock.
900          */
901
902         sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
903         sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
904
905         /*
906          * It's just barely possible that some previous effort to create or
907          * drop a slot with this name left a temp directory lying around.
908          * If that seems to be the case, try to remove it.  If the rmtree()
909          * fails, we'll error out at the mkdir() below, so we don't bother
910          * checking success.
911          */
912         if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
913                 rmtree(tmppath, true);
914
915         /* Create and fsync the temporary slot directory. */
916         if (mkdir(tmppath, S_IRWXU) < 0)
917                 ereport(ERROR,
918                                 (errcode_for_file_access(),
919                                  errmsg("could not create directory \"%s\": %m",
920                                                 tmppath)));
921         fsync_fname(tmppath, true);
922
923         /* Write the actual state file. */
924         slot->dirty = true; /* signal that we really need to write */
925         SaveSlotToPath(slot, tmppath, ERROR);
926
927         /* Rename the directory into place. */
928         if (rename(tmppath, path) != 0)
929                 ereport(ERROR,
930                                 (errcode_for_file_access(),
931                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
932                                                 tmppath, path)));
933
934         /*
935          * If we'd now fail - really unlikely - we wouldn't know whether this slot
936          * would persist after an OS crash or not - so, force a restart. The
937          * restart would try to fysnc this again till it works.
938          */
939         START_CRIT_SECTION();
940
941         fsync_fname(path, true);
942         fsync_fname("pg_replslot", true);
943
944         END_CRIT_SECTION();
945 }
946
947 /*
948  * Shared functionality between saving and creating a replication slot.
949  */
950 static void
951 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
952 {
953         char            tmppath[MAXPGPATH];
954         char            path[MAXPGPATH];
955         int                     fd;
956         ReplicationSlotOnDisk cp;
957         bool            was_dirty;
958
959         /* first check whether there's something to write out */
960         {
961                 volatile ReplicationSlot *vslot = slot;
962
963                 SpinLockAcquire(&vslot->mutex);
964                 was_dirty = vslot->dirty;
965                 vslot->just_dirtied = false;
966                 SpinLockRelease(&vslot->mutex);
967         }
968
969         /* and don't do anything if there's nothing to write */
970         if (!was_dirty)
971                 return;
972
973         LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
974
975         /* silence valgrind :( */
976         memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
977
978         sprintf(tmppath, "%s/state.tmp", dir);
979         sprintf(path, "%s/state", dir);
980
981         fd = OpenTransientFile(tmppath,
982                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
983                                                    S_IRUSR | S_IWUSR);
984         if (fd < 0)
985         {
986                 ereport(elevel,
987                                 (errcode_for_file_access(),
988                                  errmsg("could not create file \"%s\": %m",
989                                                 tmppath)));
990                 return;
991         }
992
993         cp.magic = SLOT_MAGIC;
994         INIT_CRC32(cp.checksum);
995         cp.version = 1;
996         cp.length = ReplicationSlotOnDiskDynamicSize;
997
998         SpinLockAcquire(&slot->mutex);
999
1000         memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1001
1002         SpinLockRelease(&slot->mutex);
1003
1004         COMP_CRC32(cp.checksum,
1005                            (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
1006                            ReplicationSlotOnDiskDynamicSize);
1007
1008         if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1009         {
1010                 int save_errno = errno;
1011                 CloseTransientFile(fd);
1012                 errno = save_errno;
1013                 ereport(elevel,
1014                                 (errcode_for_file_access(),
1015                                  errmsg("could not write to file \"%s\": %m",
1016                                                 tmppath)));
1017                 return;
1018         }
1019
1020         /* fsync the temporary file */
1021         if (pg_fsync(fd) != 0)
1022         {
1023                 int save_errno = errno;
1024                 CloseTransientFile(fd);
1025                 errno = save_errno;
1026                 ereport(elevel,
1027                                 (errcode_for_file_access(),
1028                                  errmsg("could not fsync file \"%s\": %m",
1029                                                 tmppath)));
1030                 return;
1031         }
1032
1033         CloseTransientFile(fd);
1034
1035         /* rename to permanent file, fsync file and directory */
1036         if (rename(tmppath, path) != 0)
1037         {
1038                 ereport(elevel,
1039                                 (errcode_for_file_access(),
1040                                  errmsg("could not rename \"%s\" to \"%s\": %m",
1041                                                 tmppath, path)));
1042                 return;
1043         }
1044
1045         /* Check CreateSlot() for the reasoning of using a crit. section. */
1046         START_CRIT_SECTION();
1047
1048         fsync_fname(path, false);
1049         fsync_fname((char *) dir, true);
1050         fsync_fname("pg_replslot", true);
1051
1052         END_CRIT_SECTION();
1053
1054         /*
1055          * Successfully wrote, unset dirty bit, unless somebody dirtied again
1056          * already.
1057          */
1058         {
1059                 volatile ReplicationSlot *vslot = slot;
1060
1061                 SpinLockAcquire(&vslot->mutex);
1062                 if (!vslot->just_dirtied)
1063                         vslot->dirty = false;
1064                 SpinLockRelease(&vslot->mutex);
1065         }
1066
1067         LWLockRelease(slot->io_in_progress_lock);
1068 }
1069
1070 /*
1071  * Load a single slot from disk into memory.
1072  */
1073 static void
1074 RestoreSlotFromDisk(const char *name)
1075 {
1076         ReplicationSlotOnDisk cp;
1077         int                     i;
1078         char            path[MAXPGPATH];
1079         int                     fd;
1080         bool            restored = false;
1081         int                     readBytes;
1082         pg_crc32        checksum;
1083
1084         /* no need to lock here, no concurrent access allowed yet */
1085
1086         /* delete temp file if it exists */
1087         sprintf(path, "pg_replslot/%s/state.tmp", name);
1088         if (unlink(path) < 0 && errno != ENOENT)
1089                 ereport(PANIC,
1090                                 (errcode_for_file_access(),
1091                                  errmsg("could not unlink file \"%s\": %m", path)));
1092
1093         sprintf(path, "pg_replslot/%s/state", name);
1094
1095         elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1096
1097         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1098
1099         /*
1100          * We do not need to handle this as we are rename()ing the directory into
1101          * place only after we fsync()ed the state file.
1102          */
1103         if (fd < 0)
1104                 ereport(PANIC,
1105                                 (errcode_for_file_access(),
1106                                  errmsg("could not open file \"%s\": %m", path)));
1107
1108         /*
1109          * Sync state file before we're reading from it. We might have crashed
1110          * while it wasn't synced yet and we shouldn't continue on that basis.
1111          */
1112         if (pg_fsync(fd) != 0)
1113         {
1114                 CloseTransientFile(fd);
1115                 ereport(PANIC,
1116                                 (errcode_for_file_access(),
1117                                  errmsg("could not fsync file \"%s\": %m",
1118                                                 path)));
1119         }
1120
1121         /* Also sync the parent directory */
1122         START_CRIT_SECTION();
1123         fsync_fname(path, true);
1124         END_CRIT_SECTION();
1125
1126         /* read part of statefile that's guaranteed to be version independent */
1127         readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1128         if (readBytes != ReplicationSlotOnDiskConstantSize)
1129         {
1130                 int                     saved_errno = errno;
1131
1132                 CloseTransientFile(fd);
1133                 errno = saved_errno;
1134                 ereport(PANIC,
1135                                 (errcode_for_file_access(),
1136                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1137                                                 path, readBytes,
1138                                                 (uint32) ReplicationSlotOnDiskConstantSize)));
1139         }
1140
1141         /* verify magic */
1142         if (cp.magic != SLOT_MAGIC)
1143                 ereport(PANIC,
1144                                 (errcode_for_file_access(),
1145                                  errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
1146                                                 path, cp.magic, SLOT_MAGIC)));
1147
1148         /* verify version */
1149         if (cp.version != SLOT_VERSION)
1150                 ereport(PANIC,
1151                                 (errcode_for_file_access(),
1152                                  errmsg("replication slot file \"%s\" has unsupported version %u",
1153                                                 path, cp.version)));
1154
1155         /* boundary check on length */
1156         if (cp.length != ReplicationSlotOnDiskDynamicSize)
1157                 ereport(PANIC,
1158                                 (errcode_for_file_access(),
1159                                  errmsg("replication slot file \"%s\" has corrupted length %u",
1160                                                 path, cp.length)));
1161
1162         /* Now that we know the size, read the entire file */
1163         readBytes = read(fd,
1164                                          (char *)&cp + ReplicationSlotOnDiskConstantSize,
1165                                          cp.length);
1166         if (readBytes != cp.length)
1167         {
1168                 int                     saved_errno = errno;
1169
1170                 CloseTransientFile(fd);
1171                 errno = saved_errno;
1172                 ereport(PANIC,
1173                                 (errcode_for_file_access(),
1174                                  errmsg("could not read file \"%s\", read %d of %u: %m",
1175                                                 path, readBytes, cp.length)));
1176         }
1177
1178         CloseTransientFile(fd);
1179
1180         /* now verify the CRC32 */
1181         INIT_CRC32(checksum);
1182         COMP_CRC32(checksum,
1183                            (char *)&cp + ReplicationSlotOnDiskConstantSize,
1184                            ReplicationSlotOnDiskDynamicSize);
1185
1186         if (!EQ_CRC32(checksum, cp.checksum))
1187                 ereport(PANIC,
1188                                 (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
1189                                                 path, checksum, cp.checksum)));
1190
1191         /* nothing can be active yet, don't lock anything */
1192         for (i = 0; i < max_replication_slots; i++)
1193         {
1194                 ReplicationSlot *slot;
1195
1196                 slot = &ReplicationSlotCtl->replication_slots[i];
1197
1198                 if (slot->in_use)
1199                         continue;
1200
1201                 /* restore the entire set of persistent data */
1202                 memcpy(&slot->data, &cp.slotdata,
1203                            sizeof(ReplicationSlotPersistentData));
1204
1205                 /* Don't restore the slot if it's not parked as persistent. */
1206                 if (slot->data.persistency != RS_PERSISTENT)
1207                         return;
1208
1209                 /* initialize in memory state */
1210                 slot->effective_xmin = cp.slotdata.xmin;
1211                 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1212
1213                 slot->candidate_catalog_xmin = InvalidTransactionId;
1214                 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1215                 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1216                 slot->candidate_restart_valid = InvalidXLogRecPtr;
1217
1218                 slot->in_use = true;
1219                 slot->active = false;
1220
1221                 restored = true;
1222                 break;
1223         }
1224
1225         if (!restored)
1226                 ereport(PANIC,
1227                                 (errmsg("too many replication slots active before shutdown"),
1228                                  errhint("Increase max_replication_slots and try again.")));
1229 }