]> granicus.if.org Git - postgresql/commitdiff
Teach xlogreader to follow timeline switches
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 22 Mar 2017 07:05:12 +0000 (07:05 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 22 Mar 2017 07:05:12 +0000 (07:05 +0000)
Uses page-based mechanism to ensure we’re using the correct timeline.

Tests are included to exercise the functionality using a cold disk-level copy
of the master that's started up as a replica with slots intact, but the
intended use of the functionality is with later features.

Craig Ringer, reviewed by Simon Riggs and Andres Freund

src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/walsender.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h
src/test/recovery/Makefile
src/test/recovery/t/010_logical_decoding_timelines.pl [new file with mode: 0644]

index b2b9fcbebb028a3dea348f0fe496cca6f991cc9c..28c07d37c17648cd872263b5eac859905fdc3203 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <unistd.h>
 
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
        /* state maintained across calls */
        static int      sendFile = -1;
        static XLogSegNo sendSegNo = 0;
+       static TimeLineID sendTLI = 0;
        static uint32 sendOff = 0;
 
        p = buf;
@@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
                startoff = recptr % XLogSegSize;
 
                /* Do we need to switch to a different xlog segment? */
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+                       sendTLI != tli)
                {
                        char            path[MAXPGPATH];
 
@@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
                                                                        path)));
                        }
                        sendOff = 0;
+                       sendTLI = tli;
                }
 
                /* Need to seek in the file? */
@@ -753,6 +757,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
        }
 }
 
+/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's currTLI to that timeline ID.
+ *
+ * We care about timelines in xlogreader when we might be reading xlog
+ * generated prior to a promotion, either if we're currently a standby in
+ * recovery or if we're a promoted master reading xlogs generated by the old
+ * master before our promotion.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch.  The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends apruptly (possibly getting renamed to
+ * .partial) and we have to switch to a new one.  Even in the middle of reading
+ * a page we could have to dump the cached page and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current replay
+ * position (using GetWalRcvWriteRecPtr) if executing in recovery, so it
+ * doesn't fail to notice that the current timeline became historical. The
+ * caller must also update ThisTimeLineID with the result of
+ * GetWalRcvWriteRecPtr and must check RecoveryInProgress().
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+       const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+       Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+       Assert(wantLength <= XLOG_BLCKSZ);
+       Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+       /*
+        * If the desired page is currently read in and valid, we have nothing to do.
+        *
+        * The caller should've ensured that it didn't previously advance readOff
+        * past the valid limit of this timeline, so it doesn't matter if the current
+        * TLI has since become historical.
+        */
+       if (lastReadPage == wantPage &&
+               state->readLen != 0 &&
+               lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+               return;
+
+       /*
+        * If we're reading from the current timeline, it hasn't become historical
+        * and the page we're reading is after the last page read, we can again
+        * just carry on. (Seeking backwards requires a check to make sure the older
+        * page isn't on a prior timeline).
+        *
+        * ThisTimeLineID might've become historical since we last looked, but the
+        * caller is required not to read past the flush limit it saw at the time
+        * it looked up the timeline. There's nothing we can do about it if
+        * StartupXLOG() renames it to .partial concurrently.
+        */
+       if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+       {
+               Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+               return;
+       }
+
+       /*
+        * If we're just reading pages from a previously validated historical
+        * timeline and the timeline we're reading from is valid until the
+        * end of the current segment we can just keep reading.
+        */
+       if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+               state->currTLI != ThisTimeLineID &&
+               state->currTLI != 0 &&
+               (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+               return;
+
+       /*
+        * If we reach this point we're either looking up a page for random access,
+        * the current timeline just became historical, or we're reading from a new
+        * segment containing a timeline switch. In all cases we need to determine
+        * the newest timeline on the segment.
+        *
+        * If it's the current timeline we can just keep reading from here unless
+        * we detect a timeline switch that makes the current timeline historical.
+        * If it's a historical timeline we can read all the segment on the newest
+        * timeline because it contains all the old timelines' data too. So only
+        * one switch check is required.
+        */
+       {
+               /*
+                * We need to re-read the timeline history in case it's been changed
+                * by a promotion or replay from a cascaded replica.
+                */
+               List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+               XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+               Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+               /* Find the timeline of the last LSN on the segment containing wantPage. */
+               state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+               state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
+                       &state->nextTLI);
+
+               Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+                               wantPage + wantLength < state->currTLIValidUntil);
+
+               list_free_deep(timelineHistory);
+
+               elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+                               state->currTLI,
+                               (uint32)(state->currTLIValidUntil >> 32),
+                               (uint32)(state->currTLIValidUntil));
+       }
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
        int                     count;
 
        loc = targetPagePtr + reqLen;
+
+       /* Loop waiting for xlog to be available if necessary */
        while (1)
        {
                /*
-                * TODO: we're going to have to do something more intelligent about
-                * timelines on standbys. Use readTimeLineHistory() and
-                * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-                * that case earlier, but the code and TODO is left in here for when
-                * that changes.
+                * Determine the limit of xlog we can currently read to, and what the
+                * most recent timeline is.
+                *
+                * RecoveryInProgress() will update ThisTimeLineID when it first
+                * notices recovery finishes, so we only have to maintain it for the
+                * local process until recovery ends.
                 */
                if (!RecoveryInProgress())
-               {
-                       *pageTLI = ThisTimeLineID;
                        read_upto = GetFlushRecPtr();
-               }
                else
-                       read_upto = GetXLogReplayRecPtr(pageTLI);
+                       read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-               if (loc <= read_upto)
-                       break;
+               *pageTLI = ThisTimeLineID;
+
+               /*
+                * Check which timeline to get the record from.
+                *
+                * We have to do it each time through the loop because if we're in
+                * recovery as a cascading standby, the current timeline might've
+                * become historical. We can't rely on RecoveryInProgress() because
+                * in a standby configuration like
+                *
+                *    A => B => C
+                *
+                * if we're a logical decoding session on C, and B gets promoted, our
+                * timeline will change while we remain in recovery.
+                *
+                * We can't just keep reading from the old timeline as the last WAL
+                * archive in the timeline will get renamed to .partial by StartupXLOG().
+                *
+                * If that happens after our caller updated ThisTimeLineID but before
+                * we actually read the xlog page, we might still try to read from the
+                * old (now renamed) segment and fail. There's not much we can do about
+                * this, but it can only happen when we're a leaf of a cascading
+                * standby whose master gets promoted while we're decoding, so a
+                * one-off ERROR isn't too bad.
+                */
+               XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+               if (state->currTLI == ThisTimeLineID)
+               {
 
-               CHECK_FOR_INTERRUPTS();
-               pg_usleep(1000L);
+                       if (loc <= read_upto)
+                               break;
+
+                       CHECK_FOR_INTERRUPTS();
+                       pg_usleep(1000L);
+               }
+               else
+               {
+                       /*
+                        * We're on a historical timeline, so limit reading to the switch
+                        * point where we moved to the next timeline.
+                        *
+                        * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+                        * about the new timeline, so we must've received past the end of
+                        * it.
+                        */
+                       read_upto = state->currTLIValidUntil;
+
+                       /*
+                        * Setting pageTLI to our wanted record's TLI is slightly wrong;
+                        * the page might begin on an older timeline if it contains a
+                        * timeline switch, since its xlog segment will have been copied
+                        * from the prior timeline. This is pretty harmless though, as
+                        * nothing cares so long as the timeline doesn't go backwards.  We
+                        * should read the page header instead; FIXME someday.
+                        */
+                       *pageTLI = state->currTLI;
+
+                       /* No need to wait on a historical timeline */
+                       break;
+               }
        }
 
        if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
index 41c50005d7ff6b39ba1d98afb44af6b2c95d8ab4..c251b92f57bcbc248a9afde17b47328a03c55978 100644 (file)
@@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
        rsinfo->setResult = p->tupstore;
        rsinfo->setDesc = p->tupdesc;
 
-       /* compute the current end-of-wal */
+       /*
+        * Compute the current end-of-wal and maintain ThisTimeLineID.
+        * RecoveryInProgress() will update ThisTimeLineID on promotion.
+        */
        if (!RecoveryInProgress())
                end_of_wal = GetFlushRecPtr();
        else
-               end_of_wal = GetXLogReplayRecPtr(NULL);
+               end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
        ReplicationSlotAcquire(NameStr(*name));
 
@@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                /* invalidate non-timetravel entries */
                InvalidateSystemCaches();
 
+               /* Decode until we run out of records */
                while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
                           (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
                {
index 0f6b828336f718e7f49aba68a6c45835ad4c8ddd..75617709ecfbfa58cd7a49db1bace68431c25466 100644 (file)
@@ -48,6 +48,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
        XLogRecPtr      flushptr;
        int                     count;
 
+       XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+       sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+       sendTimeLine = state->currTLI;
+       sendTimeLineValidUpto = state->currTLIValidUntil;
+       sendTimeLineNextTLI = state->nextTLI;
+
        /* make sure we have enough WAL available */
        flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
        pq_endmessage(&buf);
        pq_flush();
 
-       /* setup state for XLogReadPage */
-       sendTimeLineIsHistoric = false;
-       sendTimeLine = ThisTimeLineID;
-
        /*
         * Initialize position to the last ack'ed one, then the xlog records begin
         * to be shipped from that position.
index 663d3e7890b7c2ad14492f1245d06ab20a614781..a1beeb54965c8ef4b3d7e5efa5ef98a52e3dec08 100644 (file)
@@ -161,6 +161,22 @@ struct XLogReaderState
 
        /* beginning of the WAL record being read. */
        XLogRecPtr      currRecPtr;
+       /* timeline to read it from, 0 if a lookup is required */
+       TimeLineID      currTLI;
+       /*
+        * Safe point to read to in currTLI if current TLI is historical
+        * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+        *
+        * Actually set to the start of the segment containing the timeline
+        * switch that ends currTLI's validity, not the LSN of the switch
+        * its self, since we can't assume the old segment will be present.
+        */
+       XLogRecPtr      currTLIValidUntil;
+       /*
+        * If currTLI is not the most recent known timeline, the next timeline to
+        * read from when currTLIValidUntil is reached.
+        */
+       TimeLineID      nextTLI;
 
        /* Buffer for current ReadRecord result (expandable) */
        char       *readRecordBuf;
index 567a7f3d8710e7e46311e65ff92c522333fd2b00..25a99422c1ae8905f56172e0547aabe3daf2ce0f 100644 (file)
@@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
                                         XLogRecPtr targetRecPtr, char *cur_page,
                                         TimeLineID *pageTLI);
 
+extern void XLogReadDetermineTimeline(XLogReaderState *state,
+                                       XLogRecPtr wantPage, uint32 wantLength);
+
 #endif
index 9d03d337d546beabed6bd4037c301da5395d50f6..142a1b8de2ee4db901be58d5c838f0a1c143f2e1 100644 (file)
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
new file mode 100644 (file)
index 0000000..09830dc
--- /dev/null
@@ -0,0 +1,130 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use RecursiveCopy;
+use File::Copy;
+use IPC::Run ();
+use Scalar::Util qw(blessed);
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+       "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+       $node_master, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+       "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+       'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+       'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+       "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+       "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+       $stderr,
+       qr/replication slot "after_basebackup" does not exist/,
+       'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+       'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+       timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+
+my $final_expected_output_bb = q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT);
+is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# So far we've peeked the slots, so when we fetch the same info over
+# pg_recvlogical we should get complete results. First, find out the commit lsn
+# of the last transaction. There's no max(pg_lsn), so:
+
+my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+
+# now use the walsender protocol to peek the slot changes and make sure we see
+# the same results.
+
+$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
+       $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+
+# walsender likes to add a newline
+chomp($stdout);
+is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();