(1 row)
+--
+-- Test copy functions for logical replication slots
+--
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+ ?column?
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name | plugin | temporary | slot_name | plugin | temporary
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f
+ orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t
+ orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f
+(3 rows)
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR: all replication slots are in use
+HINT: Free one or increase max_replication_slots.
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+ ?column?
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name | plugin | temporary | slot_name | plugin | temporary
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t
+ orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f
+ orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t
+(3 rows)
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR: cannot copy physical replication slot "orig_slot2" as a logical replication slot
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+--
+-- Test copy functions for physical replication slots
+--
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+ ?column?
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+ slot_name | slot_type | temporary
+------------------------+-----------+-----------
+ orig_slot1 | physical | f
+ orig_slot2 | physical | f
+ copied_slot1_no_change | physical | f
+ copied_slot1_temp | physical | t
+(4 rows)
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR: cannot copy logical replication slot "orig_slot1" as a physical replication slot
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR: cannot copy a replication slot that doesn't reserve WAL
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column?
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+ ?column?
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name | temporary | slot_name | temporary
+------------+-----------+------------------------+-----------
+ orig_slot2 | t | copied_slot2_no_change | t
+ orig_slot2 | t | copied_slot2_notemp | f
+(2 rows)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
SELECT pg_drop_replication_slot('regression_slot3');
+
+--
+-- Test copy functions for logical replication slots
+--
+
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+
+--
+-- Test copy functions for physical replication slots
+--
+
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
</entry>
</row>
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_copy_physical_replication_slot</primary>
+ </indexterm>
+ <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional>)</function></literal>
+ </entry>
+ <entry>
+ (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+ </entry>
+ <entry>
+ Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
+ to a physical replication slot named <parameter>dst_slot_name</parameter>.
+ The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
+ source slot.
+ <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
+ is omitted, the same value as the source slot is used.
+ </entry>
+ </row>
+
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_copy_logical_replication_slot</primary>
+ </indexterm>
+ <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type> <optional>, <parameter>plugin</parameter> <type>name</type></optional></optional>)</function></literal>
+ </entry>
+ <entry>
+ (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+ </entry>
+ <entry>
+ Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
+ to a logical replication slot named <parameter>dst_slot_name</parameter>
+ while changing the output plugin and persistence. The copied logical slot starts
+ from the same <acronym>LSN</acronym> as the source logical slot. Both
+ <parameter>temporary</parameter> and <parameter>plugin</parameter> are optional.
+ If <parameter>temporary</parameter> or <parameter>plugin</parameter> are omitted,
+ the same values as the source logical slot are used.
+ </entry>
+ </row>
+
<row>
<entry>
<indexterm>
/*
* Create a new decoding context, for a new logical slot.
*
- * plugin contains the name of the output plugin
- * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write, update_progress
- * callbacks that have to be filled to perform the use-case dependent,
- * actual, work.
+ * plugin -- contains the name of the output plugin
+ * output_plugin_options -- contains options passed to the output plugin
+ * restart_lsn -- if given as invalid, it's this routine's responsibility to
+ * mark WAL as reserved by setting a convenient restart_lsn for the slot.
+ * Otherwise, we set for decoding to start from the given LSN without
+ * marking WAL reserved beforehand. In that scenario, it's up to the
+ * caller to guarantee that WAL remains available.
+ * read_page, prepare_write, do_write, update_progress --
+ * callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
- ReplicationSlotReserveWal();
+ if (XLogRecPtrIsInvalid(restart_lsn))
+ ReplicationSlotReserveWal();
+ else
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+ }
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
ReplicationSlotMarkDirty();
ReplicationSlotSave();
- ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+ ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
read_page, prepare_write, do_write,
update_progress);
*
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"
+#include "access/htup_details.h"
+#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
-
-#include "access/htup_details.h"
#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
(errmsg("must be superuser or replication role to use replication slots"))));
}
+/*
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
+ *
+ * If restart_lsn is a valid value, we use it without WAL reservation
+ * routine. So the caller must guarantee that WAL is available.
+ */
+static void
+create_physical_replication_slot(char *name, bool immediately_reserve,
+ bool temporary, XLogRecPtr restart_lsn)
+{
+ Assert(!MyReplicationSlot);
+
+ /* acquire replication slot, this will check for conflicting names */
+ ReplicationSlotCreate(name, false,
+ temporary ? RS_TEMPORARY : RS_PERSISTENT);
+
+ if (immediately_reserve)
+ {
+ /* Reserve WAL as the user asked for it */
+ if (XLogRecPtrIsInvalid(restart_lsn))
+ ReplicationSlotReserveWal();
+ else
+ MyReplicationSlot->data.restart_lsn = restart_lsn;
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }
+}
+
/*
* SQL function for creating a new physical (streaming replication)
* replication slot.
HeapTuple tuple;
Datum result;
- Assert(!MyReplicationSlot);
-
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
CheckSlotRequirements();
- /* acquire replication slot, this will check for conflicting names */
- ReplicationSlotCreate(NameStr(*name), false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ create_physical_replication_slot(NameStr(*name),
+ immediately_reserve,
+ temporary,
+ InvalidXLogRecPtr);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
if (immediately_reserve)
{
- /* Reserve WAL as the user asked for it */
- ReplicationSlotReserveWal();
-
- /* Write this slot to disk */
- ReplicationSlotMarkDirty();
- ReplicationSlotSave();
-
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
nulls[1] = false;
}
else
- {
nulls[1] = true;
- }
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
/*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
*/
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static void
+create_logical_replication_slot(char *name, char *plugin,
+ bool temporary, XLogRecPtr restart_lsn)
{
- Name name = PG_GETARG_NAME(0);
- Name plugin = PG_GETARG_NAME(1);
- bool temporary = PG_GETARG_BOOL(2);
-
LogicalDecodingContext *ctx = NULL;
- TupleDesc tupdesc;
- HeapTuple tuple;
- Datum result;
- Datum values[2];
- bool nulls[2];
-
Assert(!MyReplicationSlot);
- if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
- elog(ERROR, "return type must be a row type");
-
- check_permissions();
-
- CheckLogicalDecodingRequirements();
-
/*
* Acquire a logical decoding slot, this will check for conflicting names.
* Initially create persistent slot as ephemeral - that allows us to
* slots can be created as temporary from beginning as they get dropped on
* error as well.
*/
- ReplicationSlotCreate(NameStr(*name), true,
+ ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
* Create logical decoding context, to build the initial snapshot.
*/
- ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+ ctx = CreateInitDecodingContext(plugin, NIL,
false, /* do not build snapshot */
+ restart_lsn,
logical_read_local_xlog_page, NULL, NULL,
NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
- values[0] = NameGetDatum(&MyReplicationSlot->data.name);
- values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Name plugin = PG_GETARG_NAME(1);
+ bool temporary = PG_GETARG_BOOL(2);
+ Datum result;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum values[2];
+ bool nulls[2];
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ create_logical_replication_slot(NameStr(*name),
+ NameStr(*plugin),
+ temporary,
+ InvalidXLogRecPtr);
+
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
memset(nulls, 0, sizeof(nulls));
PG_RETURN_DATUM(result);
}
+
+/*
+ * Helper function of copying a replication slot.
+ */
+static Datum
+copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
+{
+ Name src_name = PG_GETARG_NAME(0);
+ Name dst_name = PG_GETARG_NAME(1);
+ ReplicationSlot *src = NULL;
+ XLogRecPtr src_restart_lsn;
+ bool src_islogical;
+ bool temporary;
+ char *plugin;
+ Datum values[2];
+ bool nulls[2];
+ Datum result;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ check_permissions();
+
+ if (logical_slot)
+ CheckLogicalDecodingRequirements();
+ else
+ CheckSlotRequirements();
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ /*
+ * We need to prevent the source slot's reserved WAL from being removed,
+ * but we don't want to lock that slot for very long, and it can advance
+ * in the meantime. So obtain the source slot's data, and create a new
+ * slot using its restart_lsn. Afterwards we lock the source slot again
+ * and verify that the data we copied (name, type) has not changed
+ * incompatibly. No inconvenient WAL removal can occur once the new slot
+ * is created -- but since WAL removal could have occurred before we
+ * managed to create the new slot, we advance the new slot's restart_lsn
+ * to the source slot's updated restart_lsn the second time we lock it.
+ */
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
+ {
+ SpinLockAcquire(&s->mutex);
+ src_islogical = SlotIsLogical(s);
+ src_restart_lsn = s->data.restart_lsn;
+ temporary = s->data.persistency == RS_TEMPORARY;
+ plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+ SpinLockRelease(&s->mutex);
+
+ src = s;
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (src == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+
+ /* Check type of replication slot */
+ if (src_islogical != logical_slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ src_islogical ?
+ errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
+ NameStr(*src_name)) :
+ errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
+ NameStr(*src_name))));
+
+ /* Copying non-reserved slot doesn't make sense */
+ if (XLogRecPtrIsInvalid(src_restart_lsn))
+ {
+ Assert(!logical_slot);
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
+ }
+
+ /* Overwrite params from optional arguments */
+ if (PG_NARGS() >= 3)
+ temporary = PG_GETARG_BOOL(2);
+ if (PG_NARGS() >= 4)
+ {
+ Assert(logical_slot);
+ plugin = NameStr(*(PG_GETARG_NAME(3)));
+ }
+
+ /* Create new slot and acquire it */
+ if (logical_slot)
+ create_logical_replication_slot(NameStr(*dst_name),
+ plugin,
+ temporary,
+ src_restart_lsn);
+ else
+ create_physical_replication_slot(NameStr(*dst_name),
+ true,
+ temporary,
+ src_restart_lsn);
+
+ /*
+ * Update the destination slot to current values of the source slot;
+ * recheck that the source slot is still the one we saw previously.
+ */
+ {
+ TransactionId copy_effective_xmin;
+ TransactionId copy_effective_catalog_xmin;
+ TransactionId copy_xmin;
+ TransactionId copy_catalog_xmin;
+ XLogRecPtr copy_restart_lsn;
+ bool copy_islogical;
+ char *copy_name;
+
+ /* Copy data of source slot again */
+ SpinLockAcquire(&src->mutex);
+ copy_effective_xmin = src->effective_xmin;
+ copy_effective_catalog_xmin = src->effective_catalog_xmin;
+
+ copy_xmin = src->data.xmin;
+ copy_catalog_xmin = src->data.catalog_xmin;
+ copy_restart_lsn = src->data.restart_lsn;
+
+ /* for existence check */
+ copy_name = pstrdup(NameStr(src->data.name));
+ copy_islogical = SlotIsLogical(src);
+ SpinLockRelease(&src->mutex);
+
+ /*
+ * Check if the source slot still exists and is valid. We regards it
+ * as invalid if the type of replication slot or name has been
+ * changed, or the restart_lsn either is invalid or has gone backward.
+ * (The restart_lsn could go backwards if the source slot is dropped
+ * and copied from an older slot during installation.)
+ *
+ * Since erroring out will release and drop the destination slot we
+ * don't need to release it here.
+ */
+ if (copy_restart_lsn < src_restart_lsn ||
+ src_islogical != copy_islogical ||
+ strcmp(copy_name, NameStr(*src_name)) != 0)
+ ereport(ERROR,
+ (errmsg("could not copy replication slot \"%s\"",
+ NameStr(*src_name)),
+ errdetail("The source replication slot was modified incompatibly during the copy operation.")));
+
+ /* Install copied values again */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = copy_effective_xmin;
+ MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
+
+ MyReplicationSlot->data.xmin = copy_xmin;
+ MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
+ MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ ReplicationSlotSave();
+
+#ifdef USE_ASSERT_CHECKING
+ /* Check that the restart_lsn is available */
+ {
+ XLogSegNo segno;
+
+ XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
+ Assert(XLogGetLastRemovedSegno() < segno);
+ }
+#endif
+ }
+
+ /* target slot fully created, mark as persistent if needed */
+ if (logical_slot && !temporary)
+ ReplicationSlotPersist();
+
+ /* All done. Set up the return values */
+ values[0] = NameGetDatum(dst_name);
+ nulls[0] = false;
+ if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
+ {
+ values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+ nulls[1] = false;
+ }
+ else
+ nulls[1] = true;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+/* The wrappers below are all to appease opr_sanity */
+Datum
+pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, false);
+}
+
+Datum
+pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, false);
+}
}
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+ InvalidXLogRecPtr,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201904031
+#define CATALOG_VERSION_NO 201904051
#endif
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4220', descr => 'copy a physical replication slot, changing temporality',
+ proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+ proallargtypes => '{name,name,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+ prosrc => 'pg_copy_physical_replication_slot_a' },
+{ oid => '4221', descr => 'copy a physical replication slot',
+ proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+ proallargtypes => '{name,name,name,pg_lsn}',
+ proargmodes => '{i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+ prosrc => 'pg_copy_physical_replication_slot_b' },
{ oid => '3780', descr => 'drop a replication slot',
proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
prorettype => 'void', proargtypes => 'name',
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name',
+ proallargtypes => '{name,name,bool,name,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_a' },
+{ oid => '4223', descr => 'copy a logical replication slot, changing temporality',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+ proallargtypes => '{name,name,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_b' },
+{ oid => '4224', descr => 'copy a logical replication slot',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+ proallargtypes => '{name,name,name,pg_lsn}',
+ proargmodes => '{i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_c' },
{ oid => '3782', descr => 'get changes from replication slot',
proname => 'pg_logical_slot_get_changes', procost => '1000',
prorows => '1000', provariadic => 'text', proisstrict => 'f',
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,