my $exc_save = $@;
if ($exc_save)
# IPC::Run::run threw an exception. re-throw unless it's a
# timeout, which we'll handle by testing is_expired
die $exc_save
if $ret == 1;
die "connection error: '$$stderr'\nwhile running '@psql_params'"
if $ret == 2;
- die "error running SQL: '$$stderr'\nwhile running '@psql_params'"
+ die "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'"
if $ret == 3;
die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'";
+=pod $node->lsn(mode)
+Look up xlog positions on the server:
+* insert position (master only, error on replica)
+* write position (master only, error on replica)
+* flush position
+* receive position (always undef on master)
+* replay position
+mode must be specified.
+sub lsn
+ my ($self, $mode) = @_;
+ my %modes = ('insert' => 'pg_current_xlog_insert_location()',
+ 'flush' => 'pg_current_xlog_flush_location()',
+ 'write' => 'pg_current_xlog_location()',
+ 'receive' => 'pg_last_xlog_receive_location()',
+ 'replay' => 'pg_last_xlog_replay_location()');
+ $mode = '<undef>' if !defined($mode);
+ die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', keys %modes)
+ if !defined($modes{$mode});
+ my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}");
+ chomp($result);
+ if ($result eq '')
+ {
+ return undef;
+ }
+ else
+ {
+ return $result;
+ }
+=pod $node->wait_for_catchup(standby_name, mode, target_lsn)
+Wait for the node with application_name standby_name (usually from node->name)
+until its replication position in pg_stat_replication equals or passes the
+upstream's xlog insert point at the time this function is called. By default
+the replay_location is waited for, but 'mode' may be specified to wait for any
+of sent|write|flush|replay.
+If there is no active replication connection from this peer, waits until
+poll_query_until timeout.
+Requires that the 'postgres' db exists and is accessible.
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+This is not a test. It die()s on failure.
+sub wait_for_catchup
+ my ($self, $standby_name, $mode, $target_lsn) = @_;
+ $mode = defined($mode) ? $mode : 'replay';
+ my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 );
+ die "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode});
+ # Allow passing of a PostgresNode instance as shorthand
+ if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") )
+ {
+ $standby_name = $standby_name->name;
+ }
+ die 'target_lsn must be specified' unless defined($target_lsn);
+ print "Waiting for replication conn " . $standby_name . "'s " . $mode . "_location to pass " . $target_lsn . " on " . $self->name . "\n";
+ my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+ $self->poll_query_until('postgres', $query)
+ or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
+ print "done\n";
+=pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
+Wait for the named replication slot to equal or pass the supplied target_lsn.
+The position used is the restart_lsn unless mode is given, in which case it may
+be 'restart' or 'confirmed_flush'.
+Requires that the 'postgres' db exists and is accessible.
+This is not a test. It die()s on failure.
+If the slot is not active, will time out after poll_query_until's timeout.
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+Note that for logical slots, restart_lsn is held down by the oldest in-progress tx.
+sub wait_for_slot_catchup
+ my ($self, $slot_name, $mode, $target_lsn) = @_;
+ $mode = defined($mode) ? $mode : 'restart';
+ if (!($mode eq 'restart' || $mode eq 'confirmed_flush'))
+ {
+ die "valid modes are restart, confirmed_flush";
+ }
+ die 'target lsn must be specified' unless defined($target_lsn);
+ print "Waiting for replication slot " . $slot_name . "'s " . $mode . "_lsn to pass " . $target_lsn . " on " . $self->name . "\n";
+ my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
+ $self->poll_query_until('postgres', $query)
+ or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
+ print "done\n";
+=pod $node->query_hash($dbname, $query, @columns)
+Execute $query on $dbname, replacing any appearance of the string __COLUMNS__
+within the query with a comma-separated list of @columns.
+If __COLUMNS__ does not appear in the query, its result columns must EXACTLY
+match the order and number (but not necessarily alias) of supplied @columns.
+The query must return zero or one rows.
+Return a hash-ref representation of the results of the query, with any empty
+or null results as defined keys with an empty-string value. There is no way
+to differentiate between null and empty-string result fields.
+If the query returns zero rows, return a hash with all columns empty. There
+is no way to differentiate between zero rows returned and a row with only
+null columns.
+sub query_hash
+ my ($self, $dbname, $query, @columns) = @_;
+ die 'calls in array context for multi-row results not supported yet' if (wantarray);
+ # Replace __COLUMNS__ if found
+ substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = join(', ', @columns)
+ if index($query, '__COLUMNS__') >= 0;
+ my $result = $self->safe_psql($dbname, $query);
+ # hash slice, see .
+ #
+ # Fills the hash with empty strings produced by x-operator element
+ # duplication if result is an empty row
+ #
+ my %val;
+ @val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@columns);
+ return \%val;
+=pod $node->slot(slot_name)
+Return hash-ref of replication slot data for the named slot, or a hash-ref with
+all values '' if not found. Does not differentiate between null and empty string
+for fields, no field is ever undef.
+The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also
+as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't
+"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN
+sub slot
+ my ($self, $slot_name) = @_;
+ my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
+ return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
# Create some content on master
"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $until_lsn =
- $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
# Wait until standby has replayed enough data on standby 1
-my $caughtup_query =
- "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for standby to catch up";
+$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write'));
# Stop and remove master, and promote standby 1, switching it to a new timeline
my $connstr_1 = $node_standby_1->connstr;
'recovery.conf', qq(
+primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}'
# to ensure that the timeline switch has been done.
"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-$until_lsn = $node_standby_1->safe_psql('postgres',
- "SELECT pg_current_xlog_location();");
-$caughtup_query =
- "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_2->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for standby to catch up";
+$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write'));
my $result =
$node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");