From a924c327e2793d2025b19e18de7917110dc8afd8 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 8 Dec 2016 12:00:00 -0500 Subject: [PATCH] Add support for temporary replication slots This allows creating temporary replication slots that are removed automatically at the end of the session or on error. From: Petr Jelinek --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/ddl.out | 4 +- contrib/test_decoding/expected/slot.out | 58 +++++++++++++++++++++ contrib/test_decoding/sql/slot.sql | 20 +++++++ doc/src/sgml/func.sgml | 16 ++++-- doc/src/sgml/protocol.sgml | 13 ++++- src/backend/catalog/system_views.sql | 11 ++++ src/backend/replication/repl_gram.y | 22 +++++--- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slot.c | 69 ++++++++++++++++++++----- src/backend/replication/slotfuncs.c | 24 ++++++--- src/backend/replication/walsender.c | 28 ++++++---- src/backend/storage/lmgr/proc.c | 3 ++ src/backend/tcop/postgres.c | 3 ++ src/include/catalog/pg_proc.h | 6 +-- src/include/nodes/replnodes.h | 1 + src/include/replication/slot.h | 4 +- src/test/regress/expected/rules.out | 3 +- 18 files changed, 237 insertions(+), 51 deletions(-) create mode 100644 contrib/test_decoding/expected/slot.out create mode 100644 contrib/test_decoding/sql/slot.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a6641f5040..d2bc8b8350 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -39,7 +39,7 @@ submake-test_decoding: REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill + spill slot regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index a9ba615b5b..c104c4802d 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -702,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+--------------------- + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn +-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- (0 rows) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out new file mode 100644 index 0000000000..5e6b70ba38 --- /dev/null +++ b/contrib/test_decoding/expected/slot.out @@ -0,0 +1,58 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT pg_drop_replication_slot('regression_slot_p'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false); + ?column? +---------- + init +(1 row) + +-- reconnect to clean temp slots +\c +SELECT pg_drop_replication_slot('regression_slot_p'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- should fail because the temporary slot was dropped automatically +SELECT pg_drop_replication_slot('regression_slot_t'); +ERROR: replication slot "regression_slot_t" does not exist +-- test switching between slots in a session +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT * FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + +SELECT * FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql new file mode 100644 index 0000000000..3b0aecd6a8 --- /dev/null +++ b/contrib/test_decoding/sql/slot.sql @@ -0,0 +1,20 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true); + +SELECT pg_drop_replication_slot('regression_slot_p'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false); + +-- reconnect to clean temp slots +\c + +SELECT pg_drop_replication_slot('regression_slot_p'); + +-- should fail because the temporary slot was dropped automatically +SELECT pg_drop_replication_slot('regression_slot_t'); + + +-- test switching between slots in a session +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true); +SELECT * FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL); +SELECT * FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index eca98dfd34..0f9c9bf129 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18465,7 +18465,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); pg_create_physical_replication_slot - pg_create_physical_replication_slot(slot_name name , immediately_reserve boolean ) + pg_create_physical_replication_slot(slot_name name , immediately_reserve boolean, temporary boolean) (slot_name name, xlog_position pg_lsn) @@ -18478,7 +18478,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); the LSN is reserved on first connection from a streaming replication client. Streaming changes from a physical slot is only possible with the streaming-replication protocol — - see . This function corresponds + see . The optional third + parameter, temporary, when set to true, specifies that + the slot should not be permanently stored to disk and is only meant + for use by current session. Temporary slots are also + released upon any error. This function corresponds to the replication protocol command CREATE_REPLICATION_SLOT ... PHYSICAL. @@ -18505,7 +18509,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); pg_create_logical_replication_slot - pg_create_logical_replication_slot(slot_name name, plugin name) + pg_create_logical_replication_slot(slot_name name, plugin name , temporary boolean) (slot_name name, xlog_position pg_lsn) @@ -18513,7 +18517,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); Creates a new logical (decoding) replication slot named slot_name using the output plugin - plugin. A call to this function has the same + plugin. The optional third + parameter, temporary, when set to true, specifies that + the slot should not be permanently stored to disk and is only meant + for use by current session. Temporary slots are also + released upon any error. A call to this function has the same effect as the replication protocol command CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 50cf527427..9ba147cae5 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1434,7 +1434,7 @@ The commands accepted in walsender mode are: - CREATE_REPLICATION_SLOT slot_name { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin } CREATE_REPLICATION_SLOT @@ -1464,6 +1464,17 @@ The commands accepted in walsender mode are: + + TEMPORARY + + + Specify that this replication slot is a temporary one. Temporary + slots are not saved to disk and are automatically dropped on error + or when the session has finished. + + + + RESERVE_WAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index df59d1819c..48e7c4b7f9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -726,6 +726,7 @@ CREATE VIEW pg_replication_slots AS L.slot_type, L.datoid, D.datname AS database, + L.temporary, L.active, L.active_pid, L.xmin, @@ -991,12 +992,22 @@ AS 'pg_logical_slot_peek_binary_changes'; CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, + IN temporary boolean DEFAULT false, OUT slot_name name, OUT xlog_position pg_lsn) RETURNS RECORD LANGUAGE INTERNAL STRICT VOLATILE AS 'pg_create_physical_replication_slot'; +CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( + IN slot_name name, IN plugin name, + IN temporary boolean DEFAULT false, + OUT slot_name text, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_create_logical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index fd0fa6dde0..e75516c8d2 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -77,6 +77,7 @@ Node *replication_parse_result; %token K_LOGICAL %token K_SLOT %token K_RESERVE_WAL +%token K_TEMPORARY %type command %type base_backup start_replication start_logical_replication @@ -89,7 +90,7 @@ Node *replication_parse_result; %type plugin_opt_elem %type plugin_opt_arg %type opt_slot -%type opt_reserve_wal +%type opt_reserve_wal opt_temporary %% @@ -183,24 +184,26 @@ base_backup_opt: ; create_replication_slot: - /* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal + /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->slotname = $2; - cmd->reserve_wal = $4; + cmd->temporary = $3; + cmd->reserve_wal = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT + /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; - cmd->plugin = $4; + cmd->temporary = $3; + cmd->plugin = $5; $$ = (Node *) cmd; } ; @@ -276,6 +279,11 @@ opt_reserve_wal: | /* EMPTY */ { $$ = false; } ; +opt_temporary: + K_TEMPORARY { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index f83ec538b6..9f50ce64a5 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -98,6 +98,7 @@ PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } +TEMPORARY { return K_TEMPORARY; } "," { return ','; } ";" { return ';'; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 0b2575ee9d..d8ed005e7e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "utils/builtins.h" /* * Replication slot on-disk data structure. @@ -98,7 +99,9 @@ int max_replication_slots = 0; /* the maximum number of replication * slots */ static LWLockTranche ReplSlotIOLWLockTranche; + static void ReplicationSlotDropAcquired(void); +static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name) { ReplicationSlot *slot = NULL; int i; - int active_pid = 0; + int active_pid = 0; /* Keep compiler quiet */ Assert(MyReplicationSlot == NULL); @@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name) SpinLockAcquire(&s->mutex); active_pid = s->active_pid; if (active_pid == 0) - s->active_pid = MyProcPid; + active_pid = s->active_pid = MyProcPid; SpinLockRelease(&s->mutex); slot = s; break; @@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - if (active_pid != 0) + if (active_pid != MyProcPid) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", @@ -389,9 +392,12 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else + else if (slot->data.persistency == RS_PERSISTENT) { - /* Mark slot inactive. We're not freeing it, just disconnecting. */ + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting. + */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; SpinLockRelease(&slot->mutex); @@ -405,6 +411,33 @@ ReplicationSlotRelease(void) LWLockRelease(ProcArrayLock); } +/* + * Cleanup all temporary slots created in current session. + */ +void +ReplicationSlotCleanup() +{ + int i; + + Assert(MyReplicationSlot == NULL); + + /* + * No need for locking as we are only interested in slots active in + * current process and those are not touched by other processes. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->active_pid == MyProcPid) + { + Assert(s->in_use && s->data.persistency == RS_TEMPORARY); + + ReplicationSlotDropPtr(s); + } + } +} + /* * Permanently drop replication slot identified by the passed in name. */ @@ -419,14 +452,11 @@ ReplicationSlotDrop(const char *name) } /* - * Permanently drop the currently acquired replication slot which will be - * released by the point this function returns. + * Permanently drop the currently acquired replication slot. */ static void ReplicationSlotDropAcquired(void) { - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; ReplicationSlot *slot = MyReplicationSlot; Assert(MyReplicationSlot != NULL); @@ -434,6 +464,19 @@ ReplicationSlotDropAcquired(void) /* slot isn't acquired anymore */ MyReplicationSlot = NULL; + ReplicationSlotDropPtr(slot); +} + +/* + * Permanently drop the replication slot which will be released by the point + * this function returns. + */ +static void +ReplicationSlotDropPtr(ReplicationSlot *slot) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + /* * If some other backend ran this code concurrently with us, we might try * to delete a slot with a certain name while someone else was trying to @@ -448,9 +491,9 @@ ReplicationSlotDropAcquired(void) /* * Rename the slot directory on disk, so that we'll no longer recognize * this as a valid slot. Note that if this fails, we've got to mark the - * slot inactive before bailing out. If we're dropping an ephemeral slot, - * we better never fail hard as the caller won't expect the slot to - * survive and this might get called during error handling. + * slot inactive before bailing out. If we're dropping an ephemeral or + * a temporary slot, we better never fail hard as the caller won't expect + * the slot to survive and this might get called during error handling. */ if (rename(path, tmppath) == 0) { @@ -469,7 +512,7 @@ ReplicationSlotDropAcquired(void) } else { - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = slot->data.persistency != RS_PERSISTENT; SpinLockAcquire(&slot->mutex); slot->active_pid = 0; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f9087619d2..1f1c56cc21 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -41,6 +41,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); bool immediately_reserve = PG_GETARG_BOOL(1); + bool temporary = PG_GETARG_BOOL(2); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -57,7 +58,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); + ReplicationSlotCreate(NameStr(*name), false, + temporary ? RS_TEMPORARY : RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; @@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); LogicalDecodingContext *ctx = NULL; @@ -116,11 +119,14 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* * Acquire a logical decoding slot, this will check for conflicting names. - * Initially create it as ephemeral - that allows us to nicely handle - * errors during initialization because it'll get dropped if this + * Initially create persisent slot as ephemeral - that allows us to nicely + * handle errors during initialization because it'll get dropped if this * transaction fails. We'll make it persistent at the end. + * Temporary slots can be created as temporary from beginning as they get + * dropped on error as well. */ - ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); + ReplicationSlotCreate(NameStr(*name), true, + temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. @@ -143,8 +149,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - /* ok, slot is now fully created, mark it as persistent */ - ReplicationSlotPersist(); + /* ok, slot is now fully created, mark it as persistent if needed */ + if (!temporary) + ReplicationSlotPersist(); ReplicationSlotRelease(); PG_RETURN_DATUM(result); @@ -174,7 +181,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 10 +#define PG_GET_REPLICATION_SLOTS_COLS 11 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -219,6 +226,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) Datum values[PG_GET_REPLICATION_SLOTS_COLS]; bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; + ReplicationSlotPersistency persistency; TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; @@ -246,6 +254,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) namecpy(&plugin, &slot->data.plugin); active_pid = slot->active_pid; + persistency = slot->data.persistency; } SpinLockRelease(&slot->mutex); @@ -269,6 +278,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = database; + values[i++] = BoolGetDatum(persistency == RS_TEMPORARY); values[i++] = BoolGetDatum(active_pid != 0); if (active_pid != 0) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aa42d59610..b14d82153a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -266,6 +266,8 @@ WalSndErrorCleanup(void) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + ReplicationSlotCleanup(); + replication_active = false; if (walsender_ready_to_stop) proc_exit(0); @@ -796,18 +798,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { - ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT); + ReplicationSlotCreate(cmd->slotname, false, + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); } else { CheckLogicalDecodingRequirements(); /* - * Initially create the slot as ephemeral - that allows us to nicely - * handle errors during initialization because it'll get dropped if - * this transaction fails. We'll make it persistent at the end. + * Initially create persisent slot as ephemeral - that allows us to + * nicely handle errors during initialization because it'll get + * dropped if this transaction fails. We'll make it persistent at the + * end. Temporary slots can be created as temporary from beginning as + * they get dropped on error as well. */ - ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL); + ReplicationSlotCreate(cmd->slotname, true, + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); } initStringInfo(&output_message); @@ -841,15 +847,18 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /* don't need the decoding context anymore */ FreeDecodingContext(ctx); - ReplicationSlotPersist(); + if (!cmd->temporary) + ReplicationSlotPersist(); } else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal) { ReplicationSlotReserveWal(); - /* Write this slot to disk */ ReplicationSlotMarkDirty(); - ReplicationSlotSave(); + + /* Write this slot to disk if it's permanent one. */ + if (!cmd->temporary) + ReplicationSlotSave(); } snprintf(xpos, sizeof(xpos), "%X/%X", @@ -933,9 +942,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_endmessage(&buf); - /* - * release active status again, START_REPLICATION will reacquire it - */ ReplicationSlotRelease(); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 83e9ca15d1..276261bd7b 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -810,6 +810,9 @@ ProcKill(int code, Datum arg) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* Also cleanup all the temporary slots. */ + ReplicationSlotCleanup(); + /* * Detach from any lock group of which we are a member. If the leader * exist before all other group members, it's PGPROC will remain allocated diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index cc847548a9..b17923106a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3878,6 +3878,9 @@ PostgresMain(int argc, char *argv[], if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* We also want to cleanup temporary slots on error. */ + ReplicationSlotCleanup(); + /* * Now return to normal top-level context and clear ErrorContext for * next time. diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 96e77ec437..cd7b909812 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5178,13 +5178,13 @@ DATA(insert OID = 5016 ( spg_box_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f DESCR("SP-GiST support for quad tree over box"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); -DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); DESCR("get changes from replication slot"); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index d2f1edbf0d..024b965a24 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -55,6 +55,7 @@ typedef struct CreateReplicationSlotCmd char *slotname; ReplicationKind kind; char *plugin; + bool temporary; bool reserve_wal; } CreateReplicationSlotCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index e00562d274..b653e5c196 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -28,7 +28,8 @@ typedef enum ReplicationSlotPersistency { RS_PERSISTENT, - RS_EPHEMERAL + RS_EPHEMERAL, + RS_TEMPORARY } ReplicationSlotPersistency; /* @@ -165,6 +166,7 @@ extern void ReplicationSlotDrop(const char *name); extern void ReplicationSlotAcquire(const char *name); extern void ReplicationSlotRelease(void); +extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a8f35a76fa..5314b9c207 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1427,13 +1427,14 @@ pg_replication_slots| SELECT l.slot_name, l.slot_type, l.datoid, d.datname AS database, + l.temporary, l.active, l.active_pid, l.xmin, l.catalog_xmin, l.restart_lsn, l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.40.0