]> granicus.if.org Git - postgresql/commitdiff
Allow pg_create_physical_replication_slot() to reserve WAL.
authorAndres Freund <andres@anarazel.de>
Tue, 11 Aug 2015 10:34:31 +0000 (12:34 +0200)
committerAndres Freund <andres@anarazel.de>
Tue, 11 Aug 2015 10:34:31 +0000 (12:34 +0200)
When creating a physical slot it's often useful to immediately reserve
the current WAL position instead of only doing after the first feedback
message arrives. That e.g. allows slots to guarantee that all the WAL
for a base backup will be available afterwards.

Logical slots already have to reserve WAL during creation, so generalize
that logic into being usable for both physical and logical slots.

Catversion bump because of the new parameter.

Author: Gurjeet Singh
Reviewed-By: Andres Freund
Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com

doc/src/sgml/func.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/replication/slot.h

index 4fcc4fe5aaaf77b06b8ef9b4cac29bc1cab75d93..b1df55d7f6295b532898551c6dc16d8f365cabab 100644 (file)
@@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <parameter>slot_name</parameter>. Streaming changes from a physical slot
         is only possible with the streaming-replication protocol - see <xref
         linkend="protocol-replication">. Corresponds to the replication protocol
-        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
+        command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
+        second parameter,  when <literal>true</>, specifies that the <acronym>LSN</>
+        for this replication slot be reserved immediately; the <acronym<LSN</>
+        is otherwise reserved on first connection from a streaming replication
+        client.
        </entry>
       </row>
       <row>
index 3190c7f7e018965f6afb974547d99bc4b152bb14..ccc030fd7fb9956a37b7387d1808ce8620a3f62a 100644 (file)
@@ -917,6 +917,13 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
+    IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+    OUT slot_name name, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+AS 'pg_create_physical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
index 5411e599eb2e2d6ed6ec3a70729736544a13057f..5a07e1d9a69c946e2a57988aba79f9d2264b8abb 100644 (file)
@@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin,
        StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
        SpinLockRelease(&slot->mutex);
 
-       /*
-        * The replication slot mechanism is used to prevent removal of required
-        * WAL. As there is no interlock between this and checkpoints required WAL
-        * could be removed before ReplicationSlotsComputeRequiredLSN() has been
-        * called to prevent that. In the very unlikely case that this happens
-        * we'll just retry.
-        */
-       while (true)
-       {
-               XLogSegNo       segno;
-
-               /*
-                * Let's start with enough information if we can, so log a standby
-                * snapshot and start decoding at exactly that position.
-                */
-               if (!RecoveryInProgress())
-               {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
-                       slot->data.restart_lsn = GetXLogInsertRecPtr();
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
-
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
-               }
-               else
-                       slot->data.restart_lsn = GetRedoRecPtr();
-
-               /* prevent WAL removal as fast as possible */
-               ReplicationSlotsComputeRequiredLSN();
-
-               /*
-                * If all required WAL is still there, great, otherwise retry. The
-                * slot should prevent further removal of WAL, unless there's a
-                * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-                * the new restart_lsn above, so normally we should never need to loop
-                * more than twice.
-                */
-               XLByteToSeg(slot->data.restart_lsn, segno);
-               if (XLogGetLastRemovedSegno() < segno)
-                       break;
-       }
-
+       ReplicationSlotReserveWal();
 
        /* ----
         * This is a bit tricky: We need to determine a safe xmin horizon to start
index 1f013af8871a8ad03ca6901b90777a378118c5bc..c66619cda299090c24bfb6cd22757e1bdf3657b8 100644 (file)
@@ -40,6 +40,7 @@
 #include <sys/stat.h>
 
 #include "access/transam.h"
+#include "access/xlog_internal.h"
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
@@ -781,6 +782,76 @@ CheckSlotRequirements(void)
                                 errmsg("replication slots can only be used if wal_level >= archive")));
 }
 
+/*
+ * Reserve WAL for the currently active slot.
+ *
+ * Compute and set restart_lsn in a manner that's appropriate for the type of
+ * the slot and concurrency safe.
+ */
+void
+ReplicationSlotReserveWal(void)
+{
+       ReplicationSlot *slot = MyReplicationSlot;
+
+       Assert(slot != NULL);
+       Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+
+       /*
+        * The replication slot mechanism is used to prevent removal of required
+        * WAL. As there is no interlock between this routine and checkpoints, WAL
+        * segments could concurrently be removed when a now stale return value of
+        * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+        * this happens we'll just retry.
+        */
+       while (true)
+       {
+               XLogSegNo       segno;
+
+               /*
+                * For logical slots log a standby snapshot and start logical decoding
+                * at exactly that position. That allows the slot to start up more
+                * quickly.
+                *
+                * That's not needed (or indeed helpful) for physical slots as they'll
+                * start replay at the last logged checkpoint anyway. Instead return
+                * the location of the last redo LSN. While that slightly increases
+                * the chance that we have to retry, it's where a base backup has to
+                * start replay at.
+                */
+               if (!RecoveryInProgress() && SlotIsLogical(slot))
+               {
+                       XLogRecPtr      flushptr;
+
+                       /* start at current insert position */
+                       slot->data.restart_lsn = GetXLogInsertRecPtr();
+
+                       /* make sure we have enough information to start */
+                       flushptr = LogStandbySnapshot();
+
+                       /* and make sure it's fsynced to disk */
+                       XLogFlush(flushptr);
+               }
+               else
+               {
+                       slot->data.restart_lsn = GetRedoRecPtr();
+               }
+
+               /* prevent WAL removal as fast as possible */
+               ReplicationSlotsComputeRequiredLSN();
+
+               /*
+                * If all required WAL is still there, great, otherwise retry. The
+                * slot should prevent further removal of WAL, unless there's a
+                * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+                * the new restart_lsn above, so normally we should never need to loop
+                * more than twice.
+                */
+               XLByteToSeg(slot->data.restart_lsn, segno);
+               if (XLogGetLastRemovedSegno() < segno)
+                       break;
+       }
+}
+
 /*
  * Flush all replication slots to disk.
  *
index ecfcb0754bda3e84bdc6b8553a70f0247e47bc23..2dc682799002fc5bebb6771756d921889f024f49 100644 (file)
@@ -40,6 +40,7 @@ Datum
 pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
        Name            name = PG_GETARG_NAME(0);
+       bool            immediately_reserve = PG_GETARG_BOOL(1);
        Datum           values[2];
        bool            nulls[2];
        TupleDesc       tupdesc;
@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
        ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
 
        values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-
        nulls[0] = false;
-       nulls[1] = true;
+
+       if (immediately_reserve)
+       {
+               /* Reserve WAL as the user asked for it */
+               ReplicationSlotReserveWal();
+
+               /* Write this slot to disk */
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
+
+               values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
+               nulls[1] = false;
+       }
+       else
+       {
+               values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+               nulls[1] = true;
+       }
 
        tuple = heap_form_tuple(tupdesc, values, nulls);
        result = HeapTupleGetDatum(tuple);
index 8cd677298795a89323e75e854868792069519d26..b58fe464476010231623e80332c16458e7735f2a 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201508101
+#define CATALOG_VERSION_NO     201508111
 
 #endif
index 51639624a9eb0d8b9558fa0a6dba54d3476ff057..ddf7c6737103295a07c25ddce36d34ad984e03f6 100644 (file)
@@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent  PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
index 367ef0a38dc3836b89fe68981164e1b165a0333d..20dd7a283c94c2bace1cb3178922d6d3d642c2f7 100644 (file)
@@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);