]> granicus.if.org Git - postgresql/commitdiff
Add facility to copy replication slots
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 5 Apr 2019 17:52:45 +0000 (14:52 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 5 Apr 2019 21:05:18 +0000 (18:05 -0300)
This allows the user to create duplicates of existing replication slots,
either logical or physical, and even changing properties such as whether
they are temporary or the output plugin used.

There are multiple uses for this, such as initializing multiple replicas
using the slot for one base backup; when doing investigation of logical
replication issues; and to select a different output plugins.

Author: Masahiko Sawada
Reviewed-by: Michael Paquier, Andres Freund, Petr Jelinek
Discussion: https://postgr.es/m/CAD21AoAm7XX8y_tOPP6j4Nzzch12FvA1wPqiO690RCk+uYVstg@mail.gmail.com

contrib/test_decoding/expected/slot.out
contrib/test_decoding/sql/slot.sql
doc/src/sgml/func.sgml
src/backend/replication/logical/logical.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/logical.h

index 523621a705db0b4bff9d50fda029562ed43d6272..3da6b0be633fcf101db3f2480d2af27f5cdeaf04 100644 (file)
@@ -150,3 +150,237 @@ SELECT pg_drop_replication_slot('regression_slot3');
  
 (1 row)
 
+--
+-- Test copy functions for logical replication slots
+--
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin      | pgoutput      | f
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin_temp | pgoutput      | t
+ orig_slot1 | test_decoding | f         | copied_slot1_no_change          | test_decoding | f
+(3 rows)
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  all replication slots are in use
+HINT:  Free one or increase max_replication_slots.
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin      | pgoutput      | t
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin_temp | pgoutput      | f
+ orig_slot2 | test_decoding | t         | copied_slot2_no_change          | test_decoding | t
+(3 rows)
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR:  cannot copy physical replication slot "orig_slot2" as a logical replication slot
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+--
+-- Test copy functions for physical replication slots
+--
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+       slot_name        | slot_type | temporary 
+------------------------+-----------+-----------
+ orig_slot1             | physical  | f
+ orig_slot2             | physical  | f
+ copied_slot1_no_change | physical  | f
+ copied_slot1_temp      | physical  | t
+(4 rows)
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  cannot copy logical replication slot "orig_slot1" as a physical replication slot
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR:  cannot copy a replication slot that doesn't reserve WAL
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  | temporary |       slot_name        | temporary 
+------------+-----------+------------------------+-----------
+ orig_slot2 | t         | copied_slot2_no_change | t
+ orig_slot2 | t         | copied_slot2_notemp    | f
+(2 rows)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
index c8d08f85417e703f480297b4bac48ef5da55fd53..6d83fb26782f362049934fc39fd408c35067625b 100644 (file)
@@ -76,3 +76,97 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3');
 SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
 SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
 SELECT pg_drop_replication_slot('regression_slot3');
+
+--
+-- Test copy functions for logical replication slots
+--
+
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+
+--
+-- Test copy functions for physical replication slots
+--
+
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
index 2aa1d1fc29e9346f7f02ad0f680f255dbe7b13df..5c3724ab9e373fd7142c2fc5f3408cd1e3d30584 100644 (file)
@@ -20431,6 +20431,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        </entry>
       </row>
 
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_copy_physical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
+        to a physical replication slot named <parameter>dst_slot_name</parameter>.
+        The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
+        source slot.
+        <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
+        is omitted, the same value as the source slot is used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_copy_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type> <optional>, <parameter>plugin</parameter> <type>name</type></optional></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
+        to a logical replication slot named <parameter>dst_slot_name</parameter>
+        while changing the output plugin and persistence. The copied logical slot starts
+        from the same <acronym>LSN</acronym> as the source logical slot. Both
+        <parameter>temporary</parameter> and <parameter>plugin</parameter> are optional.
+        If <parameter>temporary</parameter> or <parameter>plugin</parameter> are omitted,
+        the same values as the source logical slot are used.
+       </entry>
+      </row>
+
       <row>
        <entry>
         <indexterm>
index 6e5bc12e779a473fde14938b2b69492a89587dff..424fe86a1b68eb03803ec831f71e09ad0bbe34b4 100644 (file)
@@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options,
 /*
  * Create a new decoding context, for a new logical slot.
  *
- * plugin contains the name of the output plugin
- * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write, update_progress
- *             callbacks that have to be filled to perform the use-case dependent,
- *             actual, work.
+ * plugin -- contains the name of the output plugin
+ * output_plugin_options -- contains options passed to the output plugin
+ * restart_lsn -- if given as invalid, it's this routine's responsibility to
+ *             mark WAL as reserved by setting a convenient restart_lsn for the slot.
+ *             Otherwise, we set for decoding to start from the given LSN without
+ *             marking WAL reserved beforehand.  In that scenario, it's up to the
+ *             caller to guarantee that WAL remains available.
+ * read_page, prepare_write, do_write, update_progress --
+ *             callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -228,6 +232,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
                                                  bool need_full_snapshot,
+                                                 XLogRecPtr restart_lsn,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  LogicalOutputPluginWriterWrite do_write,
@@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin,
        StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
        SpinLockRelease(&slot->mutex);
 
-       ReplicationSlotReserveWal();
+       if (XLogRecPtrIsInvalid(restart_lsn))
+               ReplicationSlotReserveWal();
+       else
+       {
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
+       }
 
        /* ----
         * This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
 
-       ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+       ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
                                                                 need_full_snapshot, false,
                                                                 read_page, prepare_write, do_write,
                                                                 update_progress);
index 224dd920c8dba4edc41c971d960255f5aacbd64e..d7c53c54bdb94f43c9dc55687ca48a7bf527d915 100644 (file)
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
+#include "access/htup_details.h"
+#include "access/xlog_internal.h"
 #include "funcapi.h"
 #include "miscadmin.h"
-
-#include "access/htup_details.h"
 #include "replication/decode.h"
 #include "replication/slot.h"
 #include "replication/logical.h"
@@ -35,6 +34,38 @@ check_permissions(void)
                                 (errmsg("must be superuser or replication role to use replication slots"))));
 }
 
+/*
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
+ *
+ * If restart_lsn is a valid value, we use it without WAL reservation
+ * routine. So the caller must guarantee that WAL is available.
+ */
+static void
+create_physical_replication_slot(char *name, bool immediately_reserve,
+                                                                bool temporary, XLogRecPtr restart_lsn)
+{
+       Assert(!MyReplicationSlot);
+
+       /* acquire replication slot, this will check for conflicting names */
+       ReplicationSlotCreate(name, false,
+                                                 temporary ? RS_TEMPORARY : RS_PERSISTENT);
+
+       if (immediately_reserve)
+       {
+               /* Reserve WAL as the user asked for it */
+               if (XLogRecPtrIsInvalid(restart_lsn))
+                       ReplicationSlotReserveWal();
+               else
+                       MyReplicationSlot->data.restart_lsn = restart_lsn;
+
+               /* Write this slot to disk */
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
+       }
+}
+
 /*
  * SQL function for creating a new physical (streaming replication)
  * replication slot.
@@ -51,8 +82,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
        HeapTuple       tuple;
        Datum           result;
 
-       Assert(!MyReplicationSlot);
-
        if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
                elog(ERROR, "return type must be a row type");
 
@@ -60,29 +89,21 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotRequirements();
 
-       /* acquire replication slot, this will check for conflicting names */
-       ReplicationSlotCreate(NameStr(*name), false,
-                                                 temporary ? RS_TEMPORARY : RS_PERSISTENT);
+       create_physical_replication_slot(NameStr(*name),
+                                                                        immediately_reserve,
+                                                                        temporary,
+                                                                        InvalidXLogRecPtr);
 
        values[0] = NameGetDatum(&MyReplicationSlot->data.name);
        nulls[0] = false;
 
        if (immediately_reserve)
        {
-               /* Reserve WAL as the user asked for it */
-               ReplicationSlotReserveWal();
-
-               /* Write this slot to disk */
-               ReplicationSlotMarkDirty();
-               ReplicationSlotSave();
-
                values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
                nulls[1] = false;
        }
        else
-       {
                nulls[1] = true;
-       }
 
        tuple = heap_form_tuple(tupdesc, values, nulls);
        result = HeapTupleGetDatum(tuple);
@@ -94,32 +115,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
 
 /*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
  */
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static void
+create_logical_replication_slot(char *name, char *plugin,
+                                                               bool temporary, XLogRecPtr restart_lsn)
 {
-       Name            name = PG_GETARG_NAME(0);
-       Name            plugin = PG_GETARG_NAME(1);
-       bool            temporary = PG_GETARG_BOOL(2);
-
        LogicalDecodingContext *ctx = NULL;
 
-       TupleDesc       tupdesc;
-       HeapTuple       tuple;
-       Datum           result;
-       Datum           values[2];
-       bool            nulls[2];
-
        Assert(!MyReplicationSlot);
 
-       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-               elog(ERROR, "return type must be a row type");
-
-       check_permissions();
-
-       CheckLogicalDecodingRequirements();
-
        /*
         * Acquire a logical decoding slot, this will check for conflicting names.
         * Initially create persistent slot as ephemeral - that allows us to
@@ -128,25 +135,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
         * slots can be created as temporary from beginning as they get dropped on
         * error as well.
         */
-       ReplicationSlotCreate(NameStr(*name), true,
+       ReplicationSlotCreate(name, true,
                                                  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
        /*
         * Create logical decoding context, to build the initial snapshot.
         */
-       ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+       ctx = CreateInitDecodingContext(plugin, NIL,
                                                                        false,  /* do not build snapshot */
+                                                                       restart_lsn,
                                                                        logical_read_local_xlog_page, NULL, NULL,
                                                                        NULL);
 
        /* build initial snapshot, might take a while */
        DecodingContextFindStartpoint(ctx);
 
-       values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-       values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
        /* don't need the decoding context anymore */
        FreeDecodingContext(ctx);
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+       Name            name = PG_GETARG_NAME(0);
+       Name            plugin = PG_GETARG_NAME(1);
+       bool            temporary = PG_GETARG_BOOL(2);
+       Datum           result;
+       TupleDesc       tupdesc;
+       HeapTuple       tuple;
+       Datum           values[2];
+       bool            nulls[2];
+
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       check_permissions();
+
+       CheckLogicalDecodingRequirements();
+
+       create_logical_replication_slot(NameStr(*name),
+                                                                       NameStr(*plugin),
+                                                                       temporary,
+                                                                       InvalidXLogRecPtr);
+
+       values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+       values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
 
        memset(nulls, 0, sizeof(nulls));
 
@@ -558,3 +594,235 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 
        PG_RETURN_DATUM(result);
 }
+
+/*
+ * Helper function of copying a replication slot.
+ */
+static Datum
+copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
+{
+       Name            src_name = PG_GETARG_NAME(0);
+       Name            dst_name = PG_GETARG_NAME(1);
+       ReplicationSlot *src = NULL;
+       XLogRecPtr      src_restart_lsn;
+       bool            src_islogical;
+       bool            temporary;
+       char       *plugin;
+       Datum           values[2];
+       bool            nulls[2];
+       Datum           result;
+       TupleDesc       tupdesc;
+       HeapTuple       tuple;
+
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       check_permissions();
+
+       if (logical_slot)
+               CheckLogicalDecodingRequirements();
+       else
+               CheckSlotRequirements();
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+       /*
+        * We need to prevent the source slot's reserved WAL from being removed,
+        * but we don't want to lock that slot for very long, and it can advance
+        * in the meantime.  So obtain the source slot's data, and create a new
+        * slot using its restart_lsn.  Afterwards we lock the source slot again
+        * and verify that the data we copied (name, type) has not changed
+        * incompatibly.  No inconvenient WAL removal can occur once the new slot
+        * is created -- but since WAL removal could have occurred before we
+        * managed to create the new slot, we advance the new slot's restart_lsn
+        * to the source slot's updated restart_lsn the second time we lock it.
+        */
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
+               {
+                       SpinLockAcquire(&s->mutex);
+                       src_islogical = SlotIsLogical(s);
+                       src_restart_lsn = s->data.restart_lsn;
+                       temporary = s->data.persistency == RS_TEMPORARY;
+                       plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+                       SpinLockRelease(&s->mutex);
+
+                       src = s;
+                       break;
+               }
+       }
+
+       LWLockRelease(ReplicationSlotControlLock);
+
+       if (src == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+
+       /* Check type of replication slot */
+       if (src_islogical != logical_slot)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                src_islogical ?
+                                errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
+                                               NameStr(*src_name)) :
+                                errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
+                                               NameStr(*src_name))));
+
+       /* Copying non-reserved slot doesn't make sense */
+       if (XLogRecPtrIsInvalid(src_restart_lsn))
+       {
+               Assert(!logical_slot);
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
+       }
+
+       /* Overwrite params from optional arguments */
+       if (PG_NARGS() >= 3)
+               temporary = PG_GETARG_BOOL(2);
+       if (PG_NARGS() >= 4)
+       {
+               Assert(logical_slot);
+               plugin = NameStr(*(PG_GETARG_NAME(3)));
+       }
+
+       /* Create new slot and acquire it */
+       if (logical_slot)
+               create_logical_replication_slot(NameStr(*dst_name),
+                                                                               plugin,
+                                                                               temporary,
+                                                                               src_restart_lsn);
+       else
+               create_physical_replication_slot(NameStr(*dst_name),
+                                                                                true,
+                                                                                temporary,
+                                                                                src_restart_lsn);
+
+       /*
+        * Update the destination slot to current values of the source slot;
+        * recheck that the source slot is still the one we saw previously.
+        */
+       {
+               TransactionId copy_effective_xmin;
+               TransactionId copy_effective_catalog_xmin;
+               TransactionId copy_xmin;
+               TransactionId copy_catalog_xmin;
+               XLogRecPtr      copy_restart_lsn;
+               bool            copy_islogical;
+               char       *copy_name;
+
+               /* Copy data of source slot again */
+               SpinLockAcquire(&src->mutex);
+               copy_effective_xmin = src->effective_xmin;
+               copy_effective_catalog_xmin = src->effective_catalog_xmin;
+
+               copy_xmin = src->data.xmin;
+               copy_catalog_xmin = src->data.catalog_xmin;
+               copy_restart_lsn = src->data.restart_lsn;
+
+               /* for existence check */
+               copy_name = pstrdup(NameStr(src->data.name));
+               copy_islogical = SlotIsLogical(src);
+               SpinLockRelease(&src->mutex);
+
+               /*
+                * Check if the source slot still exists and is valid. We regards it
+                * as invalid if the type of replication slot or name has been
+                * changed, or the restart_lsn either is invalid or has gone backward.
+                * (The restart_lsn could go backwards if the source slot is dropped
+                * and copied from an older slot during installation.)
+                *
+                * Since erroring out will release and drop the destination slot we
+                * don't need to release it here.
+                */
+               if (copy_restart_lsn < src_restart_lsn ||
+                       src_islogical != copy_islogical ||
+                       strcmp(copy_name, NameStr(*src_name)) != 0)
+                       ereport(ERROR,
+                                       (errmsg("could not copy replication slot \"%s\"",
+                                                       NameStr(*src_name)),
+                                        errdetail("The source replication slot was modified incompatibly during the copy operation.")));
+
+               /* Install copied values again */
+               SpinLockAcquire(&MyReplicationSlot->mutex);
+               MyReplicationSlot->effective_xmin = copy_effective_xmin;
+               MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
+
+               MyReplicationSlot->data.xmin = copy_xmin;
+               MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
+               MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+               SpinLockRelease(&MyReplicationSlot->mutex);
+
+               ReplicationSlotMarkDirty();
+               ReplicationSlotsComputeRequiredXmin(false);
+               ReplicationSlotsComputeRequiredLSN();
+               ReplicationSlotSave();
+
+#ifdef USE_ASSERT_CHECKING
+               /* Check that the restart_lsn is available */
+               {
+                       XLogSegNo       segno;
+
+                       XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
+                       Assert(XLogGetLastRemovedSegno() < segno);
+               }
+#endif
+       }
+
+       /* target slot fully created, mark as persistent if needed */
+       if (logical_slot && !temporary)
+               ReplicationSlotPersist();
+
+       /* All done.  Set up the return values */
+       values[0] = NameGetDatum(dst_name);
+       nulls[0] = false;
+       if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
+       {
+               values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+               nulls[1] = false;
+       }
+       else
+               nulls[1] = true;
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       result = HeapTupleGetDatum(tuple);
+
+       ReplicationSlotRelease();
+
+       PG_RETURN_DATUM(result);
+}
+
+/* The wrappers below are all to appease opr_sanity */
+Datum
+pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+       return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+       return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
+{
+       return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+       return copy_replication_slot(fcinfo, false);
+}
+
+Datum
+pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+       return copy_replication_slot(fcinfo, false);
+}
index 21f5c868f18f5e40607f70684afba10d79643de8..aae6adc15c19eb726840bd9ddec2cf2bf700abb3 100644 (file)
@@ -934,6 +934,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                }
 
                ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+                                                                               InvalidXLogRecPtr,
                                                                                logical_read_xlog_page,
                                                                                WalSndPrepareWrite, WalSndWriteData,
                                                                                WalSndUpdateProgress);
index 1fe54c266533f52befc4c4edd91edd07fb587244..bfd2bfc186c9c131a04181434f5bd889820dd3cd 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201904031
+#define CATALOG_VERSION_NO     201904051
 
 #endif
index fb257c17c8924935af2dfed400839997eb23d780..ad4519e0011c847bfca1239d1ebbf0e4d6d89000 100644 (file)
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4220', descr => 'copy a physical replication slot, changing temporality',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot_a' },
+{ oid => '4221', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot_b' },
 { oid => '3780', descr => 'drop a replication slot',
   proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
   prorettype => 'void', proargtypes => 'name',
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name',
+  proallargtypes => '{name,name,bool,name,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_a' },
+{ oid => '4223', descr => 'copy a logical replication slot, changing temporality',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_b' },
+{ oid => '4224', descr => 'copy a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_c' },
 { oid => '3782', descr => 'get changes from replication slot',
   proname => 'pg_logical_slot_get_changes', procost => '1000',
   prorows => '1000', provariadic => 'text', proisstrict => 'f',
index c8ffc4c43477686c43fb0278e9ae8759dc544b58..0a2a63a48c8034c4a13841b5ec15eaf9a8a30c1e 100644 (file)
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
                                                  bool need_full_snapshot,
+                                                 XLogRecPtr restart_lsn,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  LogicalOutputPluginWriterWrite do_write,