return NULL;
}
-#ifndef FRONTEND
- /* Will be loaded on first read */
- state->timelineHistory = NIL;
-#endif
-
return state;
}
pfree(state->errormsg_buf);
if (state->readRecordBuf)
pfree(state->readRecordBuf);
-#ifndef FRONTEND
- if (state->timelineHistory)
- list_free_deep(state->timelineHistory);
-#endif
pfree(state->readBuf);
pfree(state);
}
#include <unistd.h>
-#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
- static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
- sendTLI != tli)
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
char path[MAXPGPATH];
path)));
}
sendOff = 0;
- sendTLI = tli;
}
/* Need to seek in the file? */
}
}
-/*
- * Determine XLogReaderState->currTLI and ->currTLIValidUntil;
- * XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
- * decision. This may later be used to determine which xlog segment file to
- * open, etc.
- *
- * We switch to an xlog segment from the new timeline eagerly when on a
- * historical timeline, as soon as we reach the start of the xlog segment
- * containing the timeline switch. The server copied the segment to the new
- * timeline so all the data up to the switch point is the same, but there's no
- * guarantee the old segment will still exist. It may have been deleted or
- * renamed with a .partial suffix so we can't necessarily keep reading from
- * the old TLI even though tliSwitchPoint says it's OK.
- *
- * Because of this, callers MAY NOT assume that currTLI is the timeline that
- * will be in a page's xlp_tli; the page may begin on an older timeline or we
- * might be reading from historical timeline data on a segment that's been
- * copied to a new timeline.
- */
-static void
-XLogReadDetermineTimeline(XLogReaderState *state)
-{
- /* Read the history on first time through */
- if (state->timelineHistory == NIL)
- state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
-
- /*
- * Are we reading the record immediately following the one we read last
- * time? If not, then don't use the cached timeline info.
- */
- if (state->currRecPtr != state->EndRecPtr)
- {
- state->currTLI = 0;
- state->currTLIValidUntil = InvalidXLogRecPtr;
- }
-
- /*
- * Are we reading a timeline that used to be the latest one, but became
- * historical? This can happen in a replica that gets promoted, and in a
- * cascading replica whose upstream gets promoted. In either case,
- * re-read the timeline history data. We cannot read past the timeline
- * switch point, because either the records in the old timeline might be
- * invalid, or worse, they may valid but *different* from the ones we
- * should be reading.
- */
- if (state->currTLIValidUntil == InvalidXLogRecPtr &&
- state->currTLI != ThisTimeLineID &&
- state->currTLI != 0)
- {
- /* re-read timeline history */
- list_free_deep(state->timelineHistory);
- state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
-
- elog(DEBUG2, "timeline %u became historical during decoding",
- state->currTLI);
-
- /* then invalidate the cached timeline info */
- state->currTLI = 0;
- state->currTLIValidUntil = InvalidXLogRecPtr;
- }
-
- /*
- * Are we reading a record immediately following a timeline switch? If
- * so, we must follow the switch too.
- */
- if (state->currRecPtr == state->EndRecPtr &&
- state->currTLI != 0 &&
- state->currTLIValidUntil != InvalidXLogRecPtr &&
- state->currRecPtr >= state->currTLIValidUntil)
- {
- elog(DEBUG2,
- "requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline",
- (uint32) (state->currRecPtr >> 32),
- (uint32) state->currRecPtr,
- state->currTLI,
- (uint32) (state->currTLIValidUntil >> 32),
- (uint32) (state->currTLIValidUntil));
-
- /* invalidate TLI info so we look up the next TLI */
- state->currTLI = 0;
- state->currTLIValidUntil = InvalidXLogRecPtr;
- }
-
- if (state->currTLI == 0)
- {
- /*
- * Something changed; work out what timeline this record is on. We
- * might read it from the segment on this TLI or, if the segment is
- * also contained by newer timelines, the copy from a newer TLI.
- */
- state->currTLI = tliOfPointInHistory(state->currRecPtr,
- state->timelineHistory);
-
- /*
- * Look for the most recent timeline that's on the same xlog segment
- * as this record, since that's the only one we can assume is still
- * readable.
- */
- while (state->currTLI != ThisTimeLineID &&
- state->currTLIValidUntil == InvalidXLogRecPtr)
- {
- XLogRecPtr tliSwitch;
- TimeLineID nextTLI;
-
- CHECK_FOR_INTERRUPTS();
-
- tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
- &nextTLI);
-
- /* round ValidUntil down to start of seg containing the switch */
- state->currTLIValidUntil =
- ((tliSwitch / XLogSegSize) * XLogSegSize);
-
- if (state->currRecPtr >= state->currTLIValidUntil)
- {
- /*
- * The new currTLI ends on this WAL segment so check the next
- * TLI to see if it's the last one on the segment.
- *
- * If that's the current TLI we'll stop searching.
- */
- state->currTLI = nextTLI;
- state->currTLIValidUntil = InvalidXLogRecPtr;
- }
- }
-
- /*
- * We're now either reading from the first xlog segment in the current
- * server's timeline or the most recent historical timeline that
- * exists on the target segment.
- */
- elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
- (uint32) (state->currRecPtr >> 32),
- (uint32) state->currRecPtr,
- state->currTLI,
- (uint32) (state->currTLIValidUntil >> 32),
- (uint32) (state->currTLIValidUntil),
- ThisTimeLineID);
- }
-}
-
/*
* read_page callback for reading local xlog files
*
int count;
loc = targetPagePtr + reqLen;
-
- /* Make sure enough xlog is available... */
while (1)
{
/*
- * Check which timeline to get the record from.
- *
- * We have to do it each time through the loop because if we're in
- * recovery as a cascading standby, the current timeline might've
- * become historical.
+ * TODO: we're going to have to do something more intelligent about
+ * timelines on standbys. Use readTimeLineHistory() and
+ * tliOfPointInHistory() to get the proper LSN? For now we'll catch
+ * that case earlier, but the code and TODO is left in here for when
+ * that changes.
*/
- XLogReadDetermineTimeline(state);
-
- if (state->currTLI == ThisTimeLineID)
+ if (!RecoveryInProgress())
{
- /*
- * We're reading from the current timeline so we might have to
- * wait for the desired record to be generated (or, for a standby,
- * received & replayed)
- */
- if (!RecoveryInProgress())
- {
- *pageTLI = ThisTimeLineID;
- read_upto = GetFlushRecPtr();
- }
- else
- read_upto = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= read_upto)
- break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
}
else
- {
- /*
- * We're on a historical timeline, so limit reading to the switch
- * point where we moved to the next timeline.
- *
- * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
- * about the new timeline, so we must've received past the end of
- * it.
- */
- read_upto = state->currTLIValidUntil;
-
- /*
- * Setting pageTLI to our wanted record's TLI is slightly wrong;
- * the page might begin on an older timeline if it contains a
- * timeline switch, since its xlog segment will have been copied
- * from the prior timeline. This is pretty harmless though, as
- * nothing cares so long as the timeline doesn't go backwards. We
- * should read the page header instead; FIXME someday.
- */
- *pageTLI = state->currTLI;
-
- /* No need to wait on a historical timeline */
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
break;
- }
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
+ /* compute the current end-of-wal */
+ if (!RecoveryInProgress())
+ end_of_wal = GetFlushRecPtr();
+ else
+ end_of_wal = GetXLogReplayRecPtr(NULL);
+
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
- /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
#include "access/xlogrecord.h"
-#ifndef FRONTEND
-#include "nodes/pg_list.h"
-#endif
-
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
- /* timeline to read it from, 0 if a lookup is required */
- TimeLineID currTLI;
- /*
- * Safe point to read to in currTLI. If currTLI is historical, then this
- * is set to the end of the last whole segment that contains that TLI;
- * if currTLI is ThisTimeLineID, this is InvalidXLogRecPtr. This is *not*
- * the tliSwitchPoint.
- */
- XLogRecPtr currTLIValidUntil;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
uint32 readRecordBufSize;
-#ifndef FRONTEND
- /* cached timeline history, only available in backend */
- List *timelineHistory;
-#endif
-
/* Buffer to hold error message */
char *errormsg_buf;
};
test_parser \
test_rls_hooks \
test_shm_mq \
- test_slot_timelines \
worker_spi
all: submake-errcodes
+++ /dev/null
-results/
-tmp_check/
-log/
+++ /dev/null
-# src/test/modules/test_slot_timelines/Makefile
-
-MODULES = test_slot_timelines
-PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
-
-EXTENSION = test_slot_timelines
-DATA = test_slot_timelines--1.0.sql
-
-EXTRA_INSTALL=contrib/test_decoding
-REGRESS=load_extension
-REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
-
-ifdef USE_PGXS
-PG_CONFIG = pg_config
-PGXS := $(shell $(PG_CONFIG) --pgxs)
-include $(PGXS)
-else
-subdir = src/test/modules/test_slot_timelines
-top_builddir = ../../../..
-include $(top_builddir)/src/Makefile.global
-include $(top_srcdir)/contrib/contrib-global.mk
-endif
+++ /dev/null
-A test module for logical decoding failover and timeline following.
-
-This module provides a minimal way to maintain logical slots on replicas
-that mirror the state on the master. It doesn't make decoding possible,
-just tracking slot state so that a decoding client that's using the master
-can follow a physical failover to the standby. The master doesn't know
-about the slots on the standby, they're synced by a client that connects
-to both.
-
-This is intentionally not part of the test_decoding module because that's meant
-to serve as example code, where this module exercises internal server features
-by unsafely exposing internal state to SQL. It's not the right way to do
-failover, it's just a simple way to test it from the perl TAP framework to
-prove the feature works.
-
-In a practical implementation of this approach a bgworker on the master would
-monitor slot positions and relay them to a bgworker on the standby that applies
-the position updates without exposing slot internals to SQL. That's too complex
-for this test framework though.
+++ /dev/null
-CREATE EXTENSION test_slot_timelines;
-SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
- test_slot_timelines_create_logical_slot
------------------------------------------
-
-(1 row)
-
-SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
- test_slot_timelines_advance_logical_slot
-------------------------------------------
-
-(1 row)
-
-SELECT pg_drop_replication_slot('test_slot');
- pg_drop_replication_slot
---------------------------
-
-(1 row)
-
+++ /dev/null
-CREATE EXTENSION test_slot_timelines;
-SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
-ERROR: replication slots can only be used if max_replication_slots > 0
-SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
-ERROR: replication slots can only be used if max_replication_slots > 0
-SELECT pg_drop_replication_slot('test_slot');
-ERROR: replication slots can only be used if max_replication_slots > 0
+++ /dev/null
-CREATE EXTENSION test_slot_timelines;
-
-SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
-
-SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
-
-SELECT pg_drop_replication_slot('test_slot');
+++ /dev/null
--- complain if script is sourced in psql, rather than via CREATE EXTENSION
-\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
-
-CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
-RETURNS void
-STRICT LANGUAGE c AS 'MODULE_PATHNAME';
-
-COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
-IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
-
-CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
-RETURNS void
-STRICT LANGUAGE c AS 'MODULE_PATHNAME';
-
-COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn)
-IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
+++ /dev/null
-/*--------------------------------------------------------------------------
- *
- * test_slot_timelines.c
- * Test harness code for slot timeline following
- *
- * Copyright (c) 2016, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/test/modules/test_slot_timelines/test_slot_timelines.c
- *
- * -------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/transam.h"
-#include "fmgr.h"
-#include "miscadmin.h"
-#include "replication/slot.h"
-#include "utils/builtins.h"
-#include "utils/pg_lsn.h"
-
-PG_MODULE_MAGIC;
-
-PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
-PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
-
-static void clear_slot_transient_state(void);
-
-/*
- * Create a new logical slot, with invalid LSN and xid, directly. This does not
- * use the snapshot builder or logical decoding machinery. It's only intended
- * for creating a slot on a replica that mirrors the state of a slot on an
- * upstream master.
- *
- * Note that this is test harness code. You shouldn't expose slot internals
- * to SQL like this for any real world usage. See the README.
- */
-Datum
-test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
-{
- char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
- char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
-
- CheckSlotRequirements();
-
- ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
-
- /* register the plugin name with the slot */
- StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
-
- /*
- * Initialize persistent state to placeholders to be set by
- * test_slot_timelines_advance_logical_slot .
- */
- MyReplicationSlot->data.xmin = InvalidTransactionId;
- MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
- MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
- MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
-
- clear_slot_transient_state();
-
- ReplicationSlotRelease();
-
- PG_RETURN_VOID();
-}
-
-/*
- * Set the state of a slot.
- *
- * This doesn't maintain the non-persistent state at all,
- * but since the slot isn't in use that's OK.
- *
- * There's intentionally no check to prevent slots going backwards
- * because they can actually go backwards if the master crashes when
- * it hasn't yet flushed slot state to disk then we copy the older
- * slot state after recovery.
- *
- * There's no checking done for xmin or catalog xmin either, since
- * we can't really do anything useful that accounts for xid wrap-around.
- *
- * Note that this is test harness code. You shouldn't expose slot internals
- * to SQL like this for any real world usage. See the README.
- */
-Datum
-test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
-{
- char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
- TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1));
- TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2));
- XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
- XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
-
- CheckSlotRequirements();
-
- ReplicationSlotAcquire(slotname);
-
- if (MyReplicationSlot->data.database != MyDatabaseId)
- elog(ERROR, "trying to update a slot on a different database");
-
- MyReplicationSlot->data.xmin = new_xmin;
- MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
- MyReplicationSlot->data.restart_lsn = restart_lsn;
- MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
-
- clear_slot_transient_state();
-
- ReplicationSlotMarkDirty();
- ReplicationSlotSave();
- ReplicationSlotRelease();
-
- ReplicationSlotsComputeRequiredXmin(false);
- ReplicationSlotsComputeRequiredLSN();
-
- PG_RETURN_VOID();
-}
-
-static void
-clear_slot_transient_state(void)
-{
- Assert(MyReplicationSlot != NULL);
-
- /*
- * Make sure the slot state is the same as if it were newly loaded from
- * disk on recovery.
- */
- MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
- MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
-
- MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
- MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
- MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
- MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
-}
+++ /dev/null
-max_replication_slots=2
-wal_level=logical
+++ /dev/null
-# test_slot_timelines extension
-comment = 'Test utility for slot timeline following and logical decoding'
-default_version = '1.0'
-module_pathname = '$libdir/test_slot_timelines'
-relocatable = true
#
#-------------------------------------------------------------------------
-EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
-
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
+++ /dev/null
-# Demonstrate that logical can follow timeline switches.
-#
-# Logical replication slots can follow timeline switches but it's
-# normally not possible to have a logical slot on a replica where
-# promotion and a timeline switch can occur. The only ways
-# we can create that circumstance are:
-#
-# * By doing a filesystem-level copy of the DB, since pg_basebackup
-# excludes pg_replslot but we can copy it directly; or
-#
-# * by creating a slot directly at the C level on the replica and
-# advancing it as we go using the low level APIs. It can't be done
-# from SQL since logical decoding isn't allowed on replicas.
-#
-# This module uses the first approach to show that timeline following
-# on a logical slot works.
-#
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 20;
-use RecursiveCopy;
-use File::Copy;
-
-my ($stdout, $stderr, $ret);
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
-$node_master->dump_info;
-$node_master->start;
-
-diag "Testing logical timeline following with a filesystem-level copy";
-
-$node_master->safe_psql('postgres',
-"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
-);
-$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
-$node_master->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('beforebb');");
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-
-my $backup_name = 'b1';
-$node_master->backup_fs_hot($backup_name);
-
-my $node_replica = get_new_node('replica');
-$node_replica->init_from_backup(
- $node_master, $backup_name,
- has_streaming => 1,
- has_restoring => 1);
-$node_replica->start;
-
-$node_master->safe_psql('postgres',
-"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
-);
-$node_master->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('afterbb');");
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-
-# Verify that only the before base_backup slot is on the replica
-$stdout = $node_replica->safe_psql('postgres',
- 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
-is($stdout, 'before_basebackup',
- 'Expected to find only slot before_basebackup on replica');
-
-# Boom, crash
-$node_master->stop('immediate');
-
-$node_replica->promote;
-$node_replica->poll_query_until('postgres',
- "SELECT NOT pg_is_in_recovery();");
-
-$node_replica->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('after failover');");
-
-# Shouldn't be able to read from slot created after base backup
-($ret, $stdout, $stderr) = $node_replica->psql('postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
-);
-is($ret, 3, 'replaying from after_basebackup slot fails');
-like(
- $stderr,
- qr/replication slot "after_basebackup" does not exist/,
- 'after_basebackup slot missing');
-
-# Should be able to read from slot created before base backup
-($ret, $stdout, $stderr) = $node_replica->psql(
- 'postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
- timeout => 30);
-is($ret, 0, 'replay from slot before_basebackup succeeds');
-is( $stdout, q(BEGIN
-table public.decoding: INSERT: blah[text]:'beforebb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'afterbb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'after failover'
-COMMIT), 'decoded expected data from slot before_basebackup');
-is($stderr, '', 'replay from slot before_basebackup produces no stderr');
-
-# We don't need the standby anymore
-$node_replica->teardown_node();
-
-
-# OK, time to try the same thing again, but this time we'll be using slot
-# mirroring on the standby and a pg_basebackup of the master.
-
-diag "Testing logical timeline following with test_slot_timelines module";
-
-$node_master->start();
-
-# Clean up after the last test
-$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
-is( $node_master->psql(
- 'postgres',
-'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
- 0,
- 'dropping slots succeeds via pg_drop_replication_slot');
-
-# Same as before, we'll make one slot before basebackup, one after. This time
-# the basebackup will be with pg_basebackup so it'll omit both slots, then
-# we'll use SQL functions provided by the test_slot_timelines test module to sync
-# them to the replica, do some work, sync them and fail over then test again.
-# This time we should have both the before- and after-basebackup slots working.
-
-is( $node_master->psql(
- 'postgres',
-"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
- ),
- 0,
- 'creating slot before_basebackup succeeds');
-
-$node_master->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('beforebb');");
-
-$backup_name = 'b2';
-$node_master->backup($backup_name);
-
-is( $node_master->psql(
- 'postgres',
-"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
- ),
- 0,
- 'creating slot after_basebackup succeeds');
-
-$node_master->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('afterbb');");
-
-$node_replica = get_new_node('replica2');
-$node_replica->init_from_backup(
- $node_master, $backup_name,
- has_streaming => 1,
- has_restoring => 1);
-
-$node_replica->start;
-
-# Verify the slots are both absent on the replica
-$stdout = $node_replica->safe_psql('postgres',
- 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
-is($stdout, '', 'No slots exist on the replica');
-
-# Now do our magic to sync the slot states across. Normally
-# this would be being done continuously by a bgworker but
-# we're just doing it by hand for this test. This is exposing
-# postgres innards to SQL so it's unsafe except for testing.
-$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
-
-my $slotinfo = $node_master->safe_psql(
- 'postgres',
- qq{SELECT slot_name, plugin,
- COALESCE(xmin, '0'), catalog_xmin,
- restart_lsn, confirmed_flush_lsn
- FROM pg_replication_slots ORDER BY slot_name}
-);
-diag "Copying slots to replica";
-open my $fh, '<', \$slotinfo or die $!;
-while (<$fh>)
-{
- print $_;
- chomp $_;
- my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
- $confirmed_flush_lsn)
- = map { "'$_'" } split qr/\|/, $_;
-
- print
-"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
- $node_replica->safe_psql('postgres',
- "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
- );
- $node_replica->safe_psql('postgres',
-"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
- );
-}
-close $fh or die $!;
-
-# Now both slots are present on the replica and exactly match the master
-$stdout = $node_replica->safe_psql('postgres',
- 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
-is( $stdout,
- "after_basebackup\nbefore_basebackup",
- 'both slots now exist on replica');
-
-$stdout = $node_replica->safe_psql(
- 'postgres',
- qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin,
- restart_lsn, confirmed_flush_lsn
- FROM pg_replication_slots
- ORDER BY slot_name});
-is($stdout, $slotinfo,
- "slot data read back from replica matches slot data on master");
-
-# We now have to copy some extra WAL to satisfy the requirements of the oldest
-# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
-# so we have to help out. We know the WAL is still retained on the master
-# because we haven't advanced the slots there.
-#
-# Figure out what the oldest segment we need is by looking at the restart_lsn
-# of the oldest slot.
-#
-# It only makes sense to do this once the slots are created on the replica,
-# otherwise it might just delete the segments again.
-
-my $oldest_needed_segment = $node_master->safe_psql(
- 'postgres',
- qq{SELECT pg_xlogfile_name((
- SELECT restart_lsn
- FROM pg_replication_slots
- ORDER BY restart_lsn ASC
- LIMIT 1
- ));}
-);
-
-diag "oldest needed xlog seg is $oldest_needed_segment ";
-
-# WAL segment names sort lexically so we can just grab everything > than this
-# segment.
-opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
-while (my $seg = readdir $pg_xlog)
-{
- next if $seg eq '.' or $seg eq '..';
- next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
- diag "copying xlog seg $seg";
- copy(
- $node_master->data_dir . "/pg_xlog/" . $seg,
- $node_replica->data_dir . "/pg_xlog/" . $seg
- ) or die "copy of xlog seg $seg failed: $!";
-}
-closedir $pg_xlog;
-
-# Boom, crash the master
-$node_master->stop('immediate');
-
-$node_replica->promote;
-$node_replica->poll_query_until('postgres',
- "SELECT NOT pg_is_in_recovery();");
-
-$node_replica->safe_psql('postgres',
- "INSERT INTO decoding(blah) VALUES ('after failover');");
-
-# This time we can read from both slots
-($ret, $stdout, $stderr) = $node_replica->psql(
- 'postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
- timeout => 30);
-is($ret, 0, 'replay from slot after_basebackup succeeds');
-is( $stdout, q(BEGIN
-table public.decoding: INSERT: blah[text]:'afterbb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'after failover'
-COMMIT), 'decoded expected data from slot after_basebackup');
-is($stderr, '', 'replay from slot after_basebackup produces no stderr');
-
-# Should be able to read from slot created before base backup
-#
-# This would fail with an error about missing WAL segments if we hadn't
-# copied extra WAL earlier.
-($ret, $stdout, $stderr) = $node_replica->psql(
- 'postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
- timeout => 30);
-is($ret, 0, 'replay from slot before_basebackup succeeds');
-is( $stdout, q(BEGIN
-table public.decoding: INSERT: blah[text]:'beforebb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'afterbb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'after failover'
-COMMIT), 'decoded expected data from slot before_basebackup');
-is($stderr, '', 'replay from slot before_basebackup produces no stderr');
-
-($ret, $stdout, $stderr) = $node_replica->psql('postgres',
- 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
-is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
-is($stderr, '', 'dropping slots produces no stderr output');
-
-1;