]> granicus.if.org Git - postgresql/commitdiff
Cleanup slots during drop database
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 28 Mar 2017 14:05:21 +0000 (10:05 -0400)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 28 Mar 2017 14:05:21 +0000 (10:05 -0400)
Automatically drop all logical replication slots associated with a
database when the database is dropped. Previously we threw an ERROR
if a slot existed. Now we throw ERROR only if a slot is active in
the database being dropped.

Craig Ringer

doc/src/sgml/func.sgml
doc/src/sgml/protocol.sgml
src/backend/commands/dbcommands.c
src/backend/replication/slot.c
src/include/replication/slot.h
src/test/recovery/t/006_logical_decoding.pl
src/test/recovery/t/010_logical_decoding_timelines.pl

index ba6f8dd8d2db5f82d155d6c98f6e1bd2d1e07188..78508d74ece1ff63837babe7dfc03727ece27512 100644 (file)
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        <entry>
         Drops the physical or logical replication slot
         named <parameter>slot_name</parameter>. Same as replication protocol
-        command <literal>DROP_REPLICATION_SLOT</>.
+        command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
+        be called when connected to the same database the slot was created on.
        </entry>
       </row>
 
index b3a50261c333206901364fdce25acb12a8e6e565..5f971412ae70acc5ad3b6866f3f0dc6548f8f7f7 100644 (file)
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
      <para>
       Drops a replication slot, freeing any reserved server-side resources. If
       the slot is currently in use by an active connection, this command fails.
+      If the slot is a logical slot that was created in a database other than
+      the database the walsender is connected to, this command fails.
      </para>
      <variablelist>
       <varlistentry>
index 5a63b1abcbe0b563e515b1e700c8a4b7b7521b9f..c0ba2b451a7fa6f2d03ffa12682328e6d0102c1f 100644 (file)
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
                                 errmsg("cannot drop the currently open database")));
 
        /*
-        * Check whether there are, possibly unconnected, logical slots that refer
-        * to the to-be-dropped database. The database lock we are holding
-        * prevents the creation of new slots using the database.
+        * Check whether there are active logical slots that refer to the
+        * to-be-dropped database. The database lock we are holding prevents the
+        * creation of new slots using the database or existing slots becoming
+        * active.
         */
-       if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
+       (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
+       if (nslots_active)
+       {
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-                         errmsg("database \"%s\" is used by a logical replication slot",
+                         errmsg("database \"%s\" is used by an active logical replication slot",
                                         dbname),
-                                errdetail_plural("There is %d slot, %d of them active.",
-                                                                 "There are %d slots, %d of them active.",
-                                                                 nslots,
-                                                                 nslots, nslots_active)));
+                                errdetail_plural("There is %d active slot",
+                                                                 "There are %d active slots",
+                                                                 nslots_active, nslots_active)));
+       }
 
        /*
         * Check for other backends in the target database.  (Because we hold the
@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
         */
        dropDatabaseDependencies(db_id);
 
+       /*
+        * Drop db-specific replication slots.
+        */
+       ReplicationSlotsDropDBSlots(db_id);
+
        /*
         * Drop pages for this database that are in the shared buffer cache. This
         * is important to ensure that no remaining backend tries to write out a
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
                         * InitPostgres() cannot fully re-execute concurrently. This
                         * avoids backends re-connecting automatically to same database,
                         * which can happen in some cases.
+                        *
+                        * This will lock out walsenders trying to connect to db-specific
+                        * slots for logical decoding too, so it's safe for us to drop slots.
                         */
                        LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
                        ResolveRecoveryConflictWithDatabase(xlrec->db_id);
                }
 
+               /* Drop any database-specific replication slots */
+               ReplicationSlotsDropDBSlots(xlrec->db_id);
+
                /* Drop pages for this database that are in the shared buffer cache */
                DropDatabaseBuffers(xlrec->db_id);
 
index 5237a9fb078142bfe75ce402e06c1b1cde3e5c68..6c5ec7a00e8d646e34805ec9ebaf02174196ec9f 100644 (file)
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
        return false;
 }
 
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller should hold an exclusive lock on the
+ * pg_database oid for the database to prevent creation of new slots on the db
+ * or replay from existing slots.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop databases
+ * often, especially databases with lots of slots.
+ *
+ * Another session that concurrently acquires an existing slot on the target DB
+ * (most likely to drop it) may cause this function to ERROR. If that happens
+ * it may have dropped some but not all slots.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+       int                     i;
+
+       if (max_replication_slots <= 0)
+               return;
+
+restart:
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s;
+               NameData slotname;
+               int active_pid;
+
+               s = &ReplicationSlotCtl->replication_slots[i];
+
+               /* cannot change while ReplicationSlotCtlLock is held */
+               if (!s->in_use)
+                       continue;
+
+               /* only logical slots are database specific, skip */
+               if (!SlotIsLogical(s))
+                       continue;
+
+               /* not our database, skip */
+               if (s->data.database != dboid)
+                       continue;
+
+               /* Claim the slot, as if ReplicationSlotAcquire()ing. */
+               SpinLockAcquire(&s->mutex);
+               strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+               NameStr(slotname)[NAMEDATALEN-1] = '\0';
+               active_pid = s->active_pid;
+               if (active_pid == 0)
+               {
+                       MyReplicationSlot = s;
+                       s->active_pid = MyProcPid;
+               }
+               SpinLockRelease(&s->mutex);
+
+               /*
+                * We might fail here if the slot was active. Even though we hold an
+                * exclusive lock on the database object a logical slot for that DB can
+                * still be active if it's being dropped by a backend connected to
+                * another DB or is otherwise acquired.
+                *
+                * It's an unlikely race that'll only arise from concurrent user action,
+                * so we'll just bail out.
+                */
+               if (active_pid)
+                       elog(ERROR, "replication slot %s is in use by pid %d",
+                                               NameStr(slotname), active_pid);
+
+               /*
+                * To avoid largely duplicating ReplicationSlotDropAcquired() or
+                * complicating it with already_locked flags for ProcArrayLock,
+                * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
+                * just release our ReplicationSlotControlLock to drop the slot.
+                *
+                * For safety we'll restart our scan from the beginning each
+                * time we release the lock.
+                */
+               LWLockRelease(ReplicationSlotControlLock);
+               ReplicationSlotDropAcquired();
+               goto restart;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /* recompute limits once after all slots are dropped */
+       ReplicationSlotsComputeRequiredXmin(false);
+       ReplicationSlotsComputeRequiredLSN();
+}
+
 
 /*
  * Check whether the server's configuration supports using replication
index 62cacdb38440d3120a5c603b8b82e518584cebb9..9a2dbd7b61c54cde8a3ab4df481562141b8c25f4 100644 (file)
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
index 66d5e4ad09fa611f4c0ca6c59ad5cde357dc1135..bf9b50a6a3582a3205cc2ac4f12a6d9840be5e62 100644 (file)
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 16;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi
 is($stdout_sql, $expected, 'got expected output from SQL decoding session');
 
 my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
-diag "waiting to replay $endpos";
+print "waiting to replay $endpos\n";
 
 my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
 chomp($stdout_recv);
@@ -64,5 +64,41 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
+       'replaying logical slot from another database fails');
+
+$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
+
+# make sure you can't drop a slot while active
+my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
+       'dropping a DB with inactive logical slots fails');
+$pg_recvlogical->kill_kill;
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
+       'logical slot still exists');
+
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
+       'dropping a DB with inactive logical slots succeeds');
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
+       'logical slot was actually dropped with DB');
+
+# Restarting a node with wal_level = logical that has existing
+# slots must succeed, but decoding from those slots must fail.
+$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
+$node_master->restart;
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
+isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
+       'restored slot catalog_xmin is nonzero');
+is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
+       'reading from slot with wal_level < logical fails');
+is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
+       'can drop logical slot while wal_level = replica');
+is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+
 # done with the node
 $node_master->stop;
index 4561a06143b48f4f24c27a762b4179911210bebe..b618132e2b73b2d4dba991991bf7623a13af56ad 100644 (file)
 # This module uses the first approach to show that timeline following
 # on a logical slot works.
 #
+# (For convenience, it also tests some recovery-related operations
+# on logical slots).
+#
 use strict;
 use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 13;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -50,6 +53,16 @@ $node_master->safe_psql('postgres',
 $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
 $node_master->safe_psql('postgres',
        "INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+# We also want to verify that DROP DATABASE on a standby with a logical
+# slot works. This isn't strictly related to timeline following, but
+# the only way to get a logical slot on a standby right now is to use
+# the same physical copy trick, so:
+$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
+$node_master->safe_psql('dropme',
+"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
+);
+
 $node_master->safe_psql('postgres', 'CHECKPOINT;');
 
 my $backup_name = 'b1';
@@ -68,6 +81,17 @@ $node_replica->append_conf(
 
 $node_replica->start;
 
+# If we drop 'dropme' on the master, the standby should drop the
+# db and associated slot.
+is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
+       'dropped DB with logical slot OK on master');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
+is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
+       'dropped DB dropme on standby');
+is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
+       'logical slot was actually dropped on standby');
+
+# Back to testing failover...
 $node_master->safe_psql('postgres',
 "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
 );
@@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '',
 cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
           'xmin on physical slot must not be lower than catalog_xmin');
 
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+
 # Boom, crash
 $node_master->stop('immediate');
 
 $node_replica->promote;
+print "waiting for replica to come up\n";
 $node_replica->poll_query_until('postgres',
        "SELECT NOT pg_is_in_recovery();");
 
@@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
 chomp($stdout);
 is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
 
-# We don't need the standby anymore
 $node_replica->teardown_node();