From ff539da31691f2cd2694360250571c5c5fb7415e Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Tue, 28 Mar 2017 10:05:21 -0400 Subject: [PATCH] Cleanup slots during drop database Automatically drop all logical replication slots associated with a database when the database is dropped. Previously we threw an ERROR if a slot existed. Now we throw ERROR only if a slot is active in the database being dropped. Craig Ringer --- doc/src/sgml/func.sgml | 3 +- doc/src/sgml/protocol.sgml | 2 + src/backend/commands/dbcommands.c | 32 +++++-- src/backend/replication/slot.c | 88 +++++++++++++++++++ src/include/replication/slot.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 40 ++++++++- .../t/010_logical_decoding_timelines.pl | 30 ++++++- 7 files changed, 182 insertions(+), 14 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index ba6f8dd8d2..78508d74ec 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); Drops the physical or logical replication slot named slot_name. Same as replication protocol - command DROP_REPLICATION_SLOT. + command DROP_REPLICATION_SLOT. For logical slots, this must + be called when connected to the same database the slot was created on. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b3a50261c3..5f971412ae 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are: Drops a replication slot, freeing any reserved server-side resources. If the slot is currently in use by an active connection, this command fails. + If the slot is a logical slot that was created in a database other than + the database the walsender is connected to, this command fails. diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 5a63b1abcb..c0ba2b451a 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop the currently open database"))); /* - * Check whether there are, possibly unconnected, logical slots that refer - * to the to-be-dropped database. The database lock we are holding - * prevents the creation of new slots using the database. + * Check whether there are active logical slots that refer to the + * to-be-dropped database. The database lock we are holding prevents the + * creation of new slots using the database or existing slots becoming + * active. */ - if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active)) + (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active); + if (nslots_active) + { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("database \"%s\" is used by a logical replication slot", + errmsg("database \"%s\" is used by an active logical replication slot", dbname), - errdetail_plural("There is %d slot, %d of them active.", - "There are %d slots, %d of them active.", - nslots, - nslots, nslots_active))); + errdetail_plural("There is %d active slot", + "There are %d active slots", + nslots_active, nslots_active))); + } /* * Check for other backends in the target database. (Because we hold the @@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok) */ dropDatabaseDependencies(db_id); + /* + * Drop db-specific replication slots. + */ + ReplicationSlotsDropDBSlots(db_id); + /* * Drop pages for this database that are in the shared buffer cache. This * is important to ensure that no remaining backend tries to write out a @@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record) * InitPostgres() cannot fully re-execute concurrently. This * avoids backends re-connecting automatically to same database, * which can happen in some cases. + * + * This will lock out walsenders trying to connect to db-specific + * slots for logical decoding too, so it's safe for us to drop slots. */ LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock); ResolveRecoveryConflictWithDatabase(xlrec->db_id); } + /* Drop any database-specific replication slots */ + ReplicationSlotsDropDBSlots(xlrec->db_id); + /* Drop pages for this database that are in the shared buffer cache */ DropDatabaseBuffers(xlrec->db_id); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5237a9fb07..6c5ec7a00e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) return false; } +/* + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the + * passed database oid. The caller should hold an exclusive lock on the + * pg_database oid for the database to prevent creation of new slots on the db + * or replay from existing slots. + * + * This routine isn't as efficient as it could be - but we don't drop databases + * often, especially databases with lots of slots. + * + * Another session that concurrently acquires an existing slot on the target DB + * (most likely to drop it) may cause this function to ERROR. If that happens + * it may have dropped some but not all slots. + */ +void +ReplicationSlotsDropDBSlots(Oid dboid) +{ + int i; + + if (max_replication_slots <= 0) + return; + +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + NameData slotname; + int active_pid; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* only logical slots are database specific, skip */ + if (!SlotIsLogical(s)) + continue; + + /* not our database, skip */ + if (s->data.database != dboid) + continue; + + /* Claim the slot, as if ReplicationSlotAcquire()ing. */ + SpinLockAcquire(&s->mutex); + strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN); + NameStr(slotname)[NAMEDATALEN-1] = '\0'; + active_pid = s->active_pid; + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + } + SpinLockRelease(&s->mutex); + + /* + * We might fail here if the slot was active. Even though we hold an + * exclusive lock on the database object a logical slot for that DB can + * still be active if it's being dropped by a backend connected to + * another DB or is otherwise acquired. + * + * It's an unlikely race that'll only arise from concurrent user action, + * so we'll just bail out. + */ + if (active_pid) + elog(ERROR, "replication slot %s is in use by pid %d", + NameStr(slotname), active_pid); + + /* + * To avoid largely duplicating ReplicationSlotDropAcquired() or + * complicating it with already_locked flags for ProcArrayLock, + * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we + * just release our ReplicationSlotControlLock to drop the slot. + * + * For safety we'll restart our scan from the beginning each + * time we release the lock. + */ + LWLockRelease(ReplicationSlotControlLock); + ReplicationSlotDropAcquired(); + goto restart; + } + LWLockRelease(ReplicationSlotControlLock); + + /* recompute limits once after all slots are dropped */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); +} + /* * Check whether the server's configuration supports using replication diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 62cacdb384..9a2dbd7b61 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 66d5e4ad09..bf9b50a6a3 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 5; +use Test::More tests => 16; # Initialize master node my $node_master = get_new_node('master'); @@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi is($stdout_sql, $expected, 'got expected output from SQL decoding session'); my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"); -diag "waiting to replay $endpos"; +print "waiting to replay $endpos\n"; my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); chomp($stdout_recv); @@ -64,5 +64,41 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo chomp($stdout_recv); is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); +$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb'); + +is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3, + 'replaying logical slot from another database fails'); + +$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]); + +# make sure you can't drop a slot while active +my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']); +$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)"); +is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3, + 'dropping a DB with inactive logical slots fails'); +$pg_recvlogical->kill_kill; +is($node_master->slot('otherdb_slot')->{'slot_name'}, undef, + 'logical slot still exists'); + +$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)"); +is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0, + 'dropping a DB with inactive logical slots succeeds'); +is($node_master->slot('otherdb_slot')->{'slot_name'}, undef, + 'logical slot was actually dropped with DB'); + +# Restarting a node with wal_level = logical that has existing +# slots must succeed, but decoding from those slots must fail. +$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica'); +is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart'); +$node_master->restart; +is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica'); +isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', + 'restored slot catalog_xmin is nonzero'); +is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, + 'reading from slot with wal_level < logical fails'); +is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, + 'can drop logical slot while wal_level = replica'); +is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); + # done with the node $node_master->stop; diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl index 4561a06143..b618132e2b 100644 --- a/src/test/recovery/t/010_logical_decoding_timelines.pl +++ b/src/test/recovery/t/010_logical_decoding_timelines.pl @@ -15,12 +15,15 @@ # This module uses the first approach to show that timeline following # on a logical slot works. # +# (For convenience, it also tests some recovery-related operations +# on logical slots). +# use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 13; use RecursiveCopy; use File::Copy; use IPC::Run (); @@ -50,6 +53,16 @@ $node_master->safe_psql('postgres', $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); $node_master->safe_psql('postgres', "INSERT INTO decoding(blah) VALUES ('beforebb');"); + +# We also want to verify that DROP DATABASE on a standby with a logical +# slot works. This isn't strictly related to timeline following, but +# the only way to get a logical slot on a standby right now is to use +# the same physical copy trick, so: +$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;'); +$node_master->safe_psql('dropme', +"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');" +); + $node_master->safe_psql('postgres', 'CHECKPOINT;'); my $backup_name = 'b1'; @@ -68,6 +81,17 @@ $node_replica->append_conf( $node_replica->start; +# If we drop 'dropme' on the master, the standby should drop the +# db and associated slot. +is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0, + 'dropped DB with logical slot OK on master'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert')); +is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '', + 'dropped DB dropme on standby'); +is($node_master->slot('dropme_slot')->{'slot_name'}, undef, + 'logical slot was actually dropped on standby'); + +# Back to testing failover... $node_master->safe_psql('postgres', "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" ); @@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '', cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'}, 'xmin on physical slot must not be lower than catalog_xmin'); +$node_master->safe_psql('postgres', 'CHECKPOINT'); + # Boom, crash $node_master->stop('immediate'); $node_replica->promote; +print "waiting for replica to come up\n"; $node_replica->poll_query_until('postgres', "SELECT NOT pg_is_in_recovery();"); @@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup', chomp($stdout); is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup'); -# We don't need the standby anymore $node_replica->teardown_node(); -- 2.40.0