]> granicus.if.org Git - postgresql/blob - src/test/recovery/t/006_logical_decoding.pl
Avoid race condition in logical replication test
[postgresql] / src / test / recovery / t / 006_logical_decoding.pl
1 # Testing of logical decoding using SQL interface and/or pg_recvlogical
2 #
3 # Most logical decoding tests are in contrib/test_decoding. This module
4 # is for work that doesn't fit well there, like where server restarts
5 # are required.
6 use strict;
7 use warnings;
8 use PostgresNode;
9 use TestLib;
10 use Test::More tests => 16;
11 use Config;
12
13 # Initialize master node
14 my $node_master = get_new_node('master');
15 $node_master->init(allows_streaming => 1);
16 $node_master->append_conf(
17         'postgresql.conf', qq(
18 wal_level = logical
19 ));
20 $node_master->start;
21 my $backup_name = 'master_backup';
22
23 $node_master->safe_psql('postgres',
24         qq[CREATE TABLE decoding_test(x integer, y text);]);
25
26 $node_master->safe_psql('postgres',
27 qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
28
29 $node_master->safe_psql('postgres',
30 qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
31 );
32
33 # Basic decoding works
34 my ($result) = $node_master->safe_psql('postgres',
35         qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
36 is(scalar(my @foobar = split /^/m, $result),
37         12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
38
39 # If we immediately crash the server we might lose the progress we just made
40 # and replay the same changes again. But a clean shutdown should never repeat
41 # the same changes when we use the SQL decoding interface.
42 $node_master->restart('fast');
43
44 # There are no new writes, so the result should be empty.
45 $result = $node_master->safe_psql('postgres',
46         qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
47 chomp($result);
48 is($result, '', 'Decoding after fast restart repeats no rows');
49
50 # Insert some rows and verify that we get the same results from pg_recvlogical
51 # and the SQL interface.
52 $node_master->safe_psql('postgres',
53 qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
54 );
55
56 my $expected = q{BEGIN
57 table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
58 table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
59 table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
60 table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
61 COMMIT};
62
63 my $stdout_sql = $node_master->safe_psql('postgres',
64 qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
65 );
66 is($stdout_sql, $expected, 'got expected output from SQL decoding session');
67
68 my $endpos = $node_master->safe_psql('postgres',
69 "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
70 );
71 print "waiting to replay $endpos\n";
72
73 my $stdout_recv = $node_master->pg_recvlogical_upto(
74         'postgres', 'test_slot', $endpos, 10,
75         'include-xids'     => '0',
76         'skip-empty-xacts' => '1');
77 chomp($stdout_recv);
78 is($stdout_recv, $expected,
79         'got same expected output from pg_recvlogical decoding session');
80
81 $node_master->poll_query_until('postgres',
82 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot' AND active_pid IS NULL)"
83 )
84   or die "slot never became inactive";
85
86 $stdout_recv = $node_master->pg_recvlogical_upto(
87         'postgres', 'test_slot', $endpos, 10,
88         'include-xids'     => '0',
89         'skip-empty-xacts' => '1');
90 chomp($stdout_recv);
91 is($stdout_recv, '',
92         'pg_recvlogical acknowledged changes, nothing pending on slot');
93
94 $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
95
96 is( $node_master->psql(
97                 'otherdb',
98 "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
99         ),
100         3,
101         'replaying logical slot from another database fails');
102
103 $node_master->safe_psql('otherdb',
104 qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]
105 );
106
107 # make sure you can't drop a slot while active
108 SKIP:
109 {
110
111    # some Windows Perls at least don't like IPC::Run's start/kill_kill regime.
112         skip "Test fails on Windows perl", 2 if $Config{osname} eq 'MSWin32';
113
114         my $pg_recvlogical = IPC::Run::start(
115                 [   'pg_recvlogical', '-d', $node_master->connstr('otherdb'),
116                         '-S', 'otherdb_slot', '-f', '-', '--start' ]);
117         $node_master->poll_query_until('otherdb',
118 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)"
119         ) or die "slot never became active";
120         is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
121                 3, 'dropping a DB with active logical slots fails');
122         $pg_recvlogical->kill_kill;
123         is($node_master->slot('otherdb_slot')->{'slot_name'},
124                 undef, 'logical slot still exists');
125 }
126
127 $node_master->poll_query_until('otherdb',
128 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)"
129 ) or die "slot never became inactive";
130
131 is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
132         0, 'dropping a DB with inactive logical slots succeeds');
133 is($node_master->slot('otherdb_slot')->{'slot_name'},
134         undef, 'logical slot was actually dropped with DB');
135
136 # Restarting a node with wal_level = logical that has existing
137 # slots must succeed, but decoding from those slots must fail.
138 $node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
139 is($node_master->safe_psql('postgres', 'SHOW wal_level'),
140         'logical', 'wal_level is still logical before restart');
141 $node_master->restart;
142 is($node_master->safe_psql('postgres', 'SHOW wal_level'),
143         'replica', 'wal_level is replica');
144 isnt($node_master->slot('test_slot')->{'catalog_xmin'},
145         '0', 'restored slot catalog_xmin is nonzero');
146 is( $node_master->psql(
147                 'postgres',
148                 qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]),
149         3,
150         'reading from slot with wal_level < logical fails');
151 is( $node_master->psql(
152                 'postgres', q[SELECT pg_drop_replication_slot('test_slot')]),
153         0,
154         'can drop logical slot while wal_level = replica');
155 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
156
157 # done with the node
158 $node_master->stop;