From: Simon Riggs <simon@2ndQuadrant.com>
Date: Sat, 3 Sep 2016 16:48:01 +0000 (+0100)
Subject: New recovery target recovery_target_lsn
X-Git-Tag: REL_10_BETA1~1786
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=35250b6ad7a8ece5cfe54c0316c180df19f36c13;p=postgresql

New recovery target recovery_target_lsn

Michael Paquier
---

diff --git a/doc/src/sgml/recovery-config.sgml b/doc/src/sgml/recovery-config.sgml
index 26af221745..de3fb10f5b 100644
--- a/doc/src/sgml/recovery-config.sgml
+++ b/doc/src/sgml/recovery-config.sgml
@@ -157,9 +157,10 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       By default, recovery will recover to the end of the WAL log. The
       following parameters can be used to specify an earlier stopping point.
       At most one of <varname>recovery_target</>,
-      <varname>recovery_target_name</>, <varname>recovery_target_time</>, or
-      <varname>recovery_target_xid</> can be used; if more than one of these
-      is specified in the configuration file, the last entry will be used.
+      <varname>recovery_target_lsn</>, <varname>recovery_target_name</>,
+      <varname>recovery_target_time</>, or <varname>recovery_target_xid</>
+      can be used; if more than one of these is specified in the configuration
+      file, the last entry will be used.
      </para>
 
      <variablelist>
@@ -232,6 +233,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="recovery-target-lsn" xreflabel="recovery_target_lsn">
+      <term><varname>recovery_target_lsn</varname> (<type>pg_lsn</type>)
+      <indexterm>
+        <primary><varname>recovery_target_lsn</> recovery parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter specifies the LSN of the transaction log location up
+        to which recovery will proceed. The precise stopping point is also
+        influenced by <xref linkend="recovery-target-inclusive">. This
+        parameter is parsed using the system data type
+        <link linkend="datatype-pg-lsn"><type>pg_lsn</></link>.
+       </para>
+      </listitem>
+     </varlistentry>
      </variablelist>
 
      <para>
diff --git a/src/backend/access/transam/recovery.conf.sample b/src/backend/access/transam/recovery.conf.sample
index b777400d03..7a16751541 100644
--- a/src/backend/access/transam/recovery.conf.sample
+++ b/src/backend/access/transam/recovery.conf.sample
@@ -67,8 +67,8 @@
 # must set a recovery target.
 #
 # You may set a recovery target either by transactionId, by name,
-# or by timestamp. Recovery may either include or exclude the
-# transaction(s) with the recovery target value (ie, stop either
+# by timestamp or by WAL position (LSN). Recovery may either include or
+# exclude the transaction(s) with the recovery target value (ie, stop either
 # just after or just before the given target, respectively).
 #
 #
@@ -78,6 +78,8 @@
 #
 #recovery_target_xid = ''
 #
+#recovery_target_lsn = ''	# e.g. '0/70006B8'
+#
 #recovery_target_inclusive = true
 #
 #
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0b991bb91d..2189c22c64 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -67,6 +67,7 @@
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/relmapper.h"
 #include "utils/snapmgr.h"
@@ -254,6 +255,7 @@ static RecoveryTargetAction recoveryTargetAction = RECOVERY_TARGET_ACTION_PAUSE;
 static TransactionId recoveryTargetXid;
 static TimestampTz recoveryTargetTime;
 static char *recoveryTargetName;
+static XLogRecPtr recoveryTargetLSN;
 static int	recovery_min_apply_delay = 0;
 static TimestampTz recoveryDelayUntilTime;
 
@@ -275,6 +277,7 @@ static bool fast_promote = false;
  */
 static TransactionId recoveryStopXid;
 static TimestampTz recoveryStopTime;
+static XLogRecPtr recoveryStopLSN;
 static char recoveryStopName[MAXFNAMELEN];
 static bool recoveryStopAfter;
 
@@ -5078,6 +5081,23 @@ readRecoveryCommandFile(void)
 					(errmsg_internal("recovery_target_name = '%s'",
 									 recoveryTargetName)));
 		}
+		else if (strcmp(item->name, "recovery_target_lsn") == 0)
+		{
+			recoveryTarget = RECOVERY_TARGET_LSN;
+
+			/*
+			 * Convert the LSN string given by the user to XLogRecPtr form.
+			 */
+			recoveryTargetLSN =
+				DatumGetLSN(DirectFunctionCall3(pg_lsn_in,
+												CStringGetDatum(item->value),
+												ObjectIdGetDatum(InvalidOid),
+														Int32GetDatum(-1)));
+			ereport(DEBUG2,
+					(errmsg_internal("recovery_target_lsn = '%X/%X'",
+									 (uint32) (recoveryTargetLSN >> 32),
+									 (uint32) recoveryTargetLSN)));
+		}
 		else if (strcmp(item->name, "recovery_target") == 0)
 		{
 			if (strcmp(item->value, "immediate") == 0)
@@ -5400,8 +5420,26 @@ recoveryStopsBefore(XLogReaderState *record)
 
 		recoveryStopAfter = false;
 		recoveryStopXid = InvalidTransactionId;
+		recoveryStopLSN = InvalidXLogRecPtr;
+		recoveryStopTime = 0;
+		recoveryStopName[0] = '\0';
+		return true;
+	}
+
+	/* Check if target LSN has been reached */
+	if (recoveryTarget == RECOVERY_TARGET_LSN &&
+		!recoveryTargetInclusive &&
+		record->ReadRecPtr >= recoveryTargetLSN)
+	{
+		recoveryStopAfter = false;
+		recoveryStopXid = InvalidTransactionId;
+		recoveryStopLSN = record->ReadRecPtr;
 		recoveryStopTime = 0;
 		recoveryStopName[0] = '\0';
+		ereport(LOG,
+				(errmsg("recovery stopping before WAL position (LSN) \"%X/%X\"",
+						(uint32) (recoveryStopLSN >> 32),
+						(uint32) recoveryStopLSN)));
 		return true;
 	}
 
@@ -5479,6 +5517,7 @@ recoveryStopsBefore(XLogReaderState *record)
 		recoveryStopAfter = false;
 		recoveryStopXid = recordXid;
 		recoveryStopTime = recordXtime;
+		recoveryStopLSN = InvalidXLogRecPtr;
 		recoveryStopName[0] = '\0';
 
 		if (isCommit)
@@ -5532,6 +5571,7 @@ recoveryStopsAfter(XLogReaderState *record)
 		{
 			recoveryStopAfter = true;
 			recoveryStopXid = InvalidTransactionId;
+			recoveryStopLSN = InvalidXLogRecPtr;
 			(void) getRecordTimestamp(record, &recoveryStopTime);
 			strlcpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
 
@@ -5543,6 +5583,23 @@ recoveryStopsAfter(XLogReaderState *record)
 		}
 	}
 
+	/* Check if the target LSN has been reached */
+	if (recoveryTarget == RECOVERY_TARGET_LSN &&
+		recoveryTargetInclusive &&
+		record->ReadRecPtr >= recoveryTargetLSN)
+	{
+		recoveryStopAfter = true;
+		recoveryStopXid = InvalidTransactionId;
+		recoveryStopLSN = record->ReadRecPtr;
+		recoveryStopTime = 0;
+		recoveryStopName[0] = '\0';
+		ereport(LOG,
+				(errmsg("recovery stopping after WAL position (LSN) \"%X/%X\"",
+						(uint32) (recoveryStopLSN >> 32),
+						(uint32) recoveryStopLSN)));
+		return true;
+	}
+
 	if (rmid != RM_XACT_ID)
 		return false;
 
@@ -5598,6 +5655,7 @@ recoveryStopsAfter(XLogReaderState *record)
 			recoveryStopAfter = true;
 			recoveryStopXid = recordXid;
 			recoveryStopTime = recordXtime;
+			recoveryStopLSN = InvalidXLogRecPtr;
 			recoveryStopName[0] = '\0';
 
 			if (xact_info == XLOG_XACT_COMMIT ||
@@ -5629,6 +5687,7 @@ recoveryStopsAfter(XLogReaderState *record)
 		recoveryStopAfter = true;
 		recoveryStopXid = InvalidTransactionId;
 		recoveryStopTime = 0;
+		recoveryStopLSN = InvalidXLogRecPtr;
 		recoveryStopName[0] = '\0';
 		return true;
 	}
@@ -6055,6 +6114,11 @@ StartupXLOG(void)
 			ereport(LOG,
 					(errmsg("starting point-in-time recovery to \"%s\"",
 							recoveryTargetName)));
+		else if (recoveryTarget == RECOVERY_TARGET_LSN)
+			ereport(LOG,
+					(errmsg("starting point-in-time recovery to WAL position (LSN) \"%X/%X\"",
+							(uint32) (recoveryTargetLSN >> 32),
+							(uint32) recoveryTargetLSN)));
 		else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
 			ereport(LOG,
 					(errmsg("starting point-in-time recovery to earliest consistent point")));
@@ -7124,6 +7188,12 @@ StartupXLOG(void)
 					 "%s %s\n",
 					 recoveryStopAfter ? "after" : "before",
 					 timestamptz_to_str(recoveryStopTime));
+		else if (recoveryTarget == RECOVERY_TARGET_LSN)
+			snprintf(reason, sizeof(reason),
+					 "%s LSN %X/%X\n",
+					 recoveryStopAfter ? "after" : "before",
+					 (uint32 ) (recoveryStopLSN >> 32),
+					 (uint32) recoveryStopLSN);
 		else if (recoveryTarget == RECOVERY_TARGET_NAME)
 			snprintf(reason, sizeof(reason),
 					 "at restore point \"%s\"",
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 14b7f7f459..c9f332c908 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -83,6 +83,7 @@ typedef enum
 	RECOVERY_TARGET_XID,
 	RECOVERY_TARGET_TIME,
 	RECOVERY_TARGET_NAME,
+	RECOVERY_TARGET_LSN,
 	RECOVERY_TARGET_IMMEDIATE
 } RecoveryTargetType;
 
diff --git a/src/test/recovery/t/003_recovery_targets.pl b/src/test/recovery/t/003_recovery_targets.pl
index d1f6d78388..a82545bf6f 100644
--- a/src/test/recovery/t/003_recovery_targets.pl
+++ b/src/test/recovery/t/003_recovery_targets.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 9;
 
 # Create and test a standby from given backup, with a certain
 # recovery target.
@@ -86,6 +86,16 @@ my $lsn4 =
 $node_master->safe_psql('postgres',
 	"SELECT pg_create_restore_point('$recovery_name');");
 
+# And now for a recovery target LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(4001,5000))");
+my $recovery_lsn = $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
+my $lsn5 =
+  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO tab_int VALUES (generate_series(5001,6000))");
+
 # Force archiving of WAL file
 $node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
 
@@ -102,6 +112,9 @@ test_recovery_standby('time', 'standby_3', $node_master, \@recovery_params,
 @recovery_params = ("recovery_target_name = '$recovery_name'");
 test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
 	"4000", $lsn4);
+@recovery_params = ("recovery_target_lsn = '$recovery_lsn'");
+test_recovery_standby('LSN', 'standby_5', $node_master, \@recovery_params,
+	"5000", $lsn5);
 
 # Multiple targets
 # Last entry has priority (note that an array respects the order of items
@@ -111,16 +124,23 @@ test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
 	"recovery_target_xid  = '$recovery_txid'",
 	"recovery_target_time = '$recovery_time'");
 test_recovery_standby('name + XID + time',
-	'standby_5', $node_master, \@recovery_params, "3000", $lsn3);
+	'standby_6', $node_master, \@recovery_params, "3000", $lsn3);
 @recovery_params = (
 	"recovery_target_time = '$recovery_time'",
 	"recovery_target_name = '$recovery_name'",
 	"recovery_target_xid  = '$recovery_txid'");
 test_recovery_standby('time + name + XID',
-	'standby_6', $node_master, \@recovery_params, "2000", $lsn2);
+	'standby_7', $node_master, \@recovery_params, "2000", $lsn2);
 @recovery_params = (
 	"recovery_target_xid  = '$recovery_txid'",
 	"recovery_target_time = '$recovery_time'",
 	"recovery_target_name = '$recovery_name'");
 test_recovery_standby('XID + time + name',
-	'standby_7', $node_master, \@recovery_params, "4000", $lsn4);
+	'standby_8', $node_master, \@recovery_params, "4000", $lsn4);
+@recovery_params = (
+	"recovery_target_xid  = '$recovery_txid'",
+	"recovery_target_time = '$recovery_time'",
+	"recovery_target_name = '$recovery_name'",
+	"recovery_target_lsn = '$recovery_lsn'",);
+test_recovery_standby('XID + time + name + LSN',
+	'standby_9', $node_master, \@recovery_params, "5000", $lsn5);