]> granicus.if.org Git - postgresql/commitdiff
Support synchronization of snapshots through an export/import procedure.
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 22 Oct 2011 22:22:45 +0000 (18:22 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 22 Oct 2011 22:23:30 +0000 (18:23 -0400)
A transaction can export a snapshot with pg_export_snapshot(), and then
others can import it with SET TRANSACTION SNAPSHOT.  The data does not
leave the server so there are not security issues.  A snapshot can only
be imported while the exporting transaction is still running, and there
are some other restrictions.

I'm not totally convinced that we've covered all the bases for SSI (true
serializable) mode, but it works fine for lesser isolation modes.

Joachim Wieland, reviewed by Marko Tiikkaja, and rather heavily modified
by Tom Lane

17 files changed:
doc/src/sgml/func.sgml
doc/src/sgml/ref/set_transaction.sgml
doc/src/sgml/storage.sgml
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/parser/gram.y
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/predicate.c
src/backend/utils/misc/guc.c
src/backend/utils/time/snapmgr.c
src/bin/initdb/initdb.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/parser/kwlist.h
src/include/storage/predicate.h
src/include/storage/procarray.h
src/include/utils/snapmgr.h

index 45b995669732baf3f891ec99f187619f111b9cf0..8dd69337f113dca19c28e5b2631483cd124b9847 100644 (file)
@@ -13802,6 +13802,14 @@ SELECT typlen FROM pg_type WHERE oid = pg_typeof(33);
   <sect1 id="functions-admin">
    <title>System Administration Functions</title>
 
+   <para>
+    The functions described in this section are used to control and
+    monitor a <productname>PostgreSQL</> installation.
+   </para>
+
+  <sect2 id="functions-admin-set">
+   <title>Configuration Settings Functions</title>
+
    <para>
     <xref linkend="functions-admin-set-table"> shows the functions
     available to query and alter run-time configuration parameters.
@@ -13889,6 +13897,11 @@ SELECT set_config('log_statement_stats', 'off', false);
 </programlisting>
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-admin-signal">
+   <title>Server Signalling Functions</title>
+
    <indexterm>
     <primary>pg_cancel_backend</primary>
    </indexterm>
@@ -13985,6 +13998,11 @@ SELECT set_config('log_statement_stats', 'off', false);
     subprocess.
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-admin-backup">
+   <title>Backup Control Functions</title>
+
    <indexterm>
     <primary>backup</primary>
    </indexterm>
@@ -14181,6 +14199,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     <xref linkend="continuous-archiving">.
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-recovery-control">
+   <title>Recovery Control Functions</title>
+
    <indexterm>
     <primary>pg_is_in_recovery</primary>
    </indexterm>
@@ -14198,7 +14221,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     The functions shown in <xref
     linkend="functions-recovery-info-table"> provide information
     about the current status of the standby.
-    These functions may be executed during both recovery and in normal running.
+    These functions may be executed both during recovery and in normal running.
    </para>
 
    <table id="functions-recovery-info-table">
@@ -14333,6 +14356,87 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     the pause, the rate of WAL generation and available disk space.
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-snapshot-synchronization">
+   <title>Snapshot Synchronization Functions</title>
+
+   <indexterm>
+     <primary>pg_export_snapshot</primary>
+   </indexterm>
+
+   <para>
+    <productname>PostgreSQL</> allows database sessions to synchronize their
+    snapshots. A <firstterm>snapshot</> determines which data is visible to the
+    transaction that is using the snapshot. Synchronized snapshots are
+    necessary when two or more sessions need to see identical content in the
+    database. If two sessions just start their transactions independently,
+    there is always a possibility that some third transaction commits
+    between the executions of the two <command>START TRANSACTION</> commands,
+    so that one session sees the effects of that transaction and the other
+    does not.
+   </para>
+
+   <para>
+    To solve this problem, <productname>PostgreSQL</> allows a transaction to
+    <firstterm>export</> the snapshot it is using.  As long as the exporting
+    transaction remains open, other transactions can <firstterm>import</> its
+    snapshot, and thereby be guaranteed that they see exactly the same view
+    of the database that the first transaction sees.  But note that any
+    database changes made by any one of these transactions remain invisible
+    to the other transactions, as is usual for changes made by uncommitted
+    transactions.  So the transactions are synchronized with respect to
+    pre-existing data, but act normally for changes they make themselves.
+   </para>
+
+   <para>
+    Snapshots are exported with the <function>pg_export_snapshot</> function,
+    shown in <xref linkend="functions-snapshot-synchronization-table">, and
+    imported with the <xref linkend="sql-set-transaction"> command.
+   </para>
+
+   <table id="functions-snapshot-synchronization-table">
+    <title>Snapshot Synchronization Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry>
+        <literal><function>pg_export_snapshot()</function></literal>
+       </entry>
+       <entry><type>text</type></entry>
+       <entry>Save the current snapshot and return its identifier</entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <para>
+    The function <function>pg_export_snapshot</> saves the current snapshot
+    and returns a <type>text</> string identifying the snapshot.  This string
+    must be passed (outside the database) to clients that want to import the
+    snapshot.  The snapshot is available for import only until the end of the
+    transaction that exported it.  A transaction can export more than one
+    snapshot, if needed.  Note that doing so is only useful in <literal>READ
+    COMMITTED</> transactions, since in <literal>REPEATABLE READ</> and
+    higher isolation levels, transactions use the same snapshot throughout
+    their lifetime.  Once a transaction has exported any snapshots, it cannot
+    be prepared with <xref linkend="sql-prepare-transaction">.
+   </para>
+
+   <para>
+    See  <xref linkend="sql-set-transaction"> for details of how to use an
+    exported snapshot.
+   </para>
+  </sect2>
+
+  <sect2 id="functions-admin-dbobject">
+   <title>Database Object Management Functions</title>
+
    <para>
     The functions shown in <xref linkend="functions-admin-dbsize"> calculate
     the disk space usage of database objects.
@@ -14591,9 +14695,14 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     the relation.
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-admin-genfile">
+   <title>Generic File Access Functions</title>
+
    <para>
     The functions shown in <xref
-    linkend="functions-admin-genfile"> provide native access to
+    linkend="functions-admin-genfile-table"> provide native access to
     files on the machine hosting the server. Only files within the
     database cluster directory and the <varname>log_directory</> can be
     accessed.  Use a relative path for files in the cluster directory,
@@ -14601,7 +14710,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     for log files.  Use of these functions is restricted to superusers.
    </para>
 
-   <table id="functions-admin-genfile">
+   <table id="functions-admin-genfile-table">
     <title>Generic File Access Functions</title>
     <tgroup cols="3">
      <thead>
@@ -14694,13 +14803,18 @@ SELECT (pg_stat_file('filename')).modification;
 </programlisting>
    </para>
 
+  </sect2>
+
+  <sect2 id="functions-advisory-locks">
+   <title>Advisory Lock Functions</title>
+
    <para>
-    The functions shown in <xref linkend="functions-advisory-locks"> manage
-    advisory locks.  For details about proper use of these functions, see
-    <xref linkend="advisory-locks">.
+    The functions shown in <xref linkend="functions-advisory-locks-table">
+    manage advisory locks.  For details about proper use of these functions,
+    see <xref linkend="advisory-locks">.
    </para>
 
-   <table id="functions-advisory-locks">
+   <table id="functions-advisory-locks-table">
     <title>Advisory Lock Functions</title>
     <tgroup cols="3">
      <thead>
@@ -14972,6 +15086,8 @@ SELECT (pg_stat_file('filename')).modification;
     at session end, even if the client disconnects ungracefully.)
    </para>
 
+  </sect2>
+
   </sect1>
 
   <sect1 id="functions-trigger">
index e28a7e1cde24c50cdc2fa2dd5bd09050473e91af..4327ca51a69defd473293f0a87023763376ac1cf 100644 (file)
@@ -33,6 +33,7 @@
  <refsynopsisdiv>
 <synopsis>
 SET TRANSACTION <replaceable class="parameter">transaction_mode</replaceable> [, ...]
+SET TRANSACTION SNAPSHOT <replaceable class="parameter">snapshot_id</replaceable>
 SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transaction_mode</replaceable> [, ...]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
@@ -60,6 +61,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    The available transaction characteristics are the transaction
    isolation level, the transaction access mode (read/write or
    read-only), and the deferrable mode.
+   In addition, a snapshot can be selected, though only for the current
+   transaction, not as a session default.
   </para>
 
   <para>
@@ -98,7 +101,7 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
        serializable transactions would create a situation which could not
        have occurred for any serial (one-at-a-time) execution of those
        transactions, one of them will be rolled back with a
-       <literal>serialization_failure</literal> <literal>SQLSTATE</literal>.
+       <literal>serialization_failure</literal> error.
       </para>
      </listitem>
     </varlistentry>
@@ -139,13 +142,41 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
   <para>
    The <literal>DEFERRABLE</literal> transaction property has no effect
    unless the transaction is also <literal>SERIALIZABLE</literal> and
-   <literal>READ ONLY</literal>.  When all of these properties are set on a
+   <literal>READ ONLY</literal>.  When all three of these properties are
+   selected for a
    transaction, the transaction may block when first acquiring its snapshot,
    after which it is able to run without the normal overhead of a
    <literal>SERIALIZABLE</literal> transaction and without any risk of
    contributing to or being canceled by a serialization failure.  This mode
    is well suited for long-running reports or backups.
   </para>
+
+  <para>
+   The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
+   transaction to run with the same <firstterm>snapshot</> as an existing
+   transaction.  The pre-existing transaction must have exported its snapshot
+   with the <literal>pg_export_snapshot</literal> function (see <xref
+   linkend="functions-snapshot-synchronization">).  That function returns a
+   snapshot identifier, which must be given to <literal>SET TRANSACTION
+   SNAPSHOT</literal> to specify which snapshot is to be imported.  The
+   identifier must be written as a string literal in this command, for example
+   <literal>'000003A1-1'</>.
+   <literal>SET TRANSACTION SNAPSHOT</literal> can only be executed at the
+   start of a transaction, before the first query or
+   data-modification statement (<command>SELECT</command>,
+   <command>INSERT</command>, <command>DELETE</command>,
+   <command>UPDATE</command>, <command>FETCH</command>, or
+   <command>COPY</command>) of the transaction.  Furthermore, the transaction
+   must already be set to <literal>SERIALIZABLE</literal> or
+   <literal>REPEATABLE READ</literal> isolation level (otherwise, the snapshot
+   would be discarded immediately, since <literal>READ COMMITTED</> mode takes
+   a new snapshot for each command).  If the importing transaction uses
+   <literal>SERIALIZABLE</literal> isolation level, then the transaction that
+   exported the snapshot must also use that isolation level.  Also, a
+   non-read-only serializable transaction cannot import a snapshot from a
+   read-only transaction.
+  </para>
+
  </refsect1>
 
  <refsect1>
@@ -163,6 +194,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    by instead specifying the desired <replaceable
    class="parameter">transaction_modes</replaceable> in
    <command>BEGIN</command> or <command>START TRANSACTION</command>.
+   But that option is not available for <command>SET TRANSACTION
+   SNAPSHOT</command>.
   </para>
 
   <para>
@@ -178,11 +211,45 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
   </para>
  </refsect1>
 
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To begin a new transaction with the same snapshot as an already
+   existing transaction, first export the snapshot from the existing
+   transaction. That will return the snapshot identifier, for example:
+
+<programlisting>
+BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+SELECT pg_export_snapshot();
+ pg_export_snapshot
+--------------------
+ 000003A1-1
+(1 row)
+</programlisting>
+
+   Then give the snapshot identifier in a <command>SET TRANSACTION
+   SNAPSHOT</command> command at the beginning of the newly opened
+   transaction:
+
+<programlisting>
+BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+SET TRANSACTION SNAPSHOT '000003A1-1';
+</programlisting>
+  </para>
+ </refsect1>
+
  <refsect1 id="R1-SQL-SET-TRANSACTION-3">
   <title>Compatibility</title>
 
   <para>
-   Both commands are defined in the <acronym>SQL</acronym> standard.
+   These commands are defined in the <acronym>SQL</acronym> standard,
+   except for the <literal>DEFERRABLE</literal> transaction mode
+   and the <command>SET TRANSACTION SNAPSHOT</> form, which are
+   <productname>PostgreSQL</productname> extensions.
+  </para>
+
+  <para>
    <literal>SERIALIZABLE</literal> is the default transaction
    isolation level in the standard.  In
    <productname>PostgreSQL</productname> the default is ordinarily
@@ -197,12 +264,6 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    not implemented in the <productname>PostgreSQL</productname> server.
   </para>
 
-  <para>
-   The <literal>DEFERRABLE</literal>
-   <replaceable class="parameter">transaction_mode</replaceable>
-   is a <productname>PostgreSQL</productname> language extension.
-  </para>
-
   <para>
    The SQL standard requires commas between successive <replaceable
    class="parameter">transaction_modes</replaceable>, but for historical
index 0a133bb7c7e815e91808419ac4786a66497855f6..cb2f60e1eeef408ada17ed4651803cb5e3b96b31 100644 (file)
@@ -87,6 +87,11 @@ Item
  <entry>Subdirectory containing information about committed serializable transactions</entry>
 </row>
 
+<row>
+ <entry><filename>pg_snapshots</></entry>
+ <entry>Subdirectory containing exported snapshots</entry>
+</row>
+
 <row>
  <entry><filename>pg_stat_tmp</></entry>
  <entry>Subdirectory containing temporary files for the statistics
index 3dab45c2da60a1010d566fccf658a4a55ee3ece3..c151d3be191361ab84f9d3cf57567ba56a4cfb80 100644 (file)
@@ -2067,6 +2067,16 @@ PrepareTransaction(void)
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
 
+       /*
+        * Likewise, don't allow PREPARE after pg_export_snapshot.  This could be
+        * supported if we added cleanup logic to twophase.c, but for now it
+        * doesn't seem worth the trouble.
+        */
+       if (XactHasExportedSnapshots())
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot PREPARE a transaction that has exported snapshots")));
+
        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();
 
index 1c17348472e7513e3d1425030ae26b86e502ba17..5fec88691a33eefa61a98c5031e62d5d9842ea3c 100644 (file)
@@ -58,6 +58,7 @@
 #include "utils/guc.h"
 #include "utils/ps_status.h"
 #include "utils/relmapper.h"
+#include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
@@ -6381,6 +6382,12 @@ StartupXLOG(void)
                 */
                ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);
 
+               /*
+                * Likewise, delete any saved transaction snapshot files that got
+                * left behind by crashed backends.
+                */
+               DeleteAllExportedSnapshotFiles();
+
                /*
                 * Initialize for Hot Standby, if enabled. We won't let backends in
                 * yet, not until we've reached the min recovery point specified in
index e9f3896badb55ef56b46aee270b94d5487412d40..e2edcde024e2b757a4270083bf77ee2662502469 100644 (file)
@@ -553,8 +553,8 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 
        SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
        SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE
-       SHOW SIMILAR SIMPLE SMALLINT SOME STABLE STANDALONE_P START STATEMENT
-       STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
+       SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
+       STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
        SYMMETRIC SYSID SYSTEM_P
 
        TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN TIME TIMESTAMP
@@ -1352,6 +1352,15 @@ set_rest:        /* Generic SET syntaxes: */
                                        n->args = list_make1(makeStringConst($3 == XMLOPTION_DOCUMENT ? "DOCUMENT" : "CONTENT", @3));
                                        $$ = n;
                                }
+                       /* Special syntaxes invented by PostgreSQL: */
+                       | TRANSACTION SNAPSHOT Sconst
+                               {
+                                       VariableSetStmt *n = makeNode(VariableSetStmt);
+                                       n->kind = VAR_SET_MULTI;
+                                       n->name = "TRANSACTION SNAPSHOT";
+                                       n->args = list_make1(makeStringConst($3, @3));
+                                       $$ = n;
+                               }
                ;
 
 var_name:      ColId                                                           { $$ = $1; }
index 7d44a34d025df347fd77c3f3a3a0ab2cb6ab4756..a8ff54037c4c9ee74eeb7c3f6aafb860878d3eeb 100644 (file)
@@ -1122,6 +1122,28 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
        return result;
 }
 
+/*
+ * GetMaxSnapshotXidCount -- get max size for snapshot XID array
+ *
+ * We have to export this for use by snapmgr.c.
+ */
+int
+GetMaxSnapshotXidCount(void)
+{
+       return procArray->maxProcs;
+}
+
+/*
+ * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
+ *
+ * We have to export this for use by snapmgr.c.
+ */
+int
+GetMaxSnapshotSubxidCount(void)
+{
+       return TOTAL_MAX_CACHED_SUBXIDS;
+}
+
 /*
  * GetSnapshotData -- returns information about running transactions.
  *
@@ -1187,14 +1209,14 @@ GetSnapshotData(Snapshot snapshot)
                 * we are in recovery, see later comments.
                 */
                snapshot->xip = (TransactionId *)
-                       malloc(arrayP->maxProcs * sizeof(TransactionId));
+                       malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
                if (snapshot->xip == NULL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
                                         errmsg("out of memory")));
                Assert(snapshot->subxip == NULL);
                snapshot->subxip = (TransactionId *)
-                       malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+                       malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
                if (snapshot->subxip == NULL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -1376,6 +1398,77 @@ GetSnapshotData(Snapshot snapshot)
        return snapshot;
 }
 
+/*
+ * ProcArrayInstallImportedXmin -- install imported xmin into MyProc->xmin
+ *
+ * This is called when installing a snapshot imported from another
+ * transaction.  To ensure that OldestXmin doesn't go backwards, we must
+ * check that the source transaction is still running, and we'd better do
+ * that atomically with installing the new xmin.
+ *
+ * Returns TRUE if successful, FALSE if source xact is no longer running.
+ */
+bool
+ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
+{
+       bool            result = false;
+       ProcArrayStruct *arrayP = procArray;
+       int                     index;
+
+       Assert(TransactionIdIsNormal(xmin));
+       if (!TransactionIdIsNormal(sourcexid))
+               return false;
+
+       /* Get lock so source xact can't end while we're doing this */
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+       for (index = 0; index < arrayP->numProcs; index++)
+       {
+               volatile PGPROC *proc = arrayP->procs[index];
+               TransactionId xid;
+
+               /* Ignore procs running LAZY VACUUM */
+               if (proc->vacuumFlags & PROC_IN_VACUUM)
+                       continue;
+
+               xid = proc->xid;        /* fetch just once */
+               if (xid != sourcexid)
+                       continue;
+
+               /*
+                * We check the transaction's database ID for paranoia's sake: if
+                * it's in another DB then its xmin does not cover us.  Caller should
+                * have detected this already, so we just treat any funny cases as
+                * "transaction not found".
+                */
+               if (proc->databaseId != MyDatabaseId)
+                       continue;
+
+               /*
+                * Likewise, let's just make real sure its xmin does cover us.
+                */
+               xid = proc->xmin;       /* fetch just once */
+               if (!TransactionIdIsNormal(xid) ||
+                       !TransactionIdPrecedesOrEquals(xid, xmin))
+                       continue;
+
+               /*
+                * We're good.  Install the new xmin.  As in GetSnapshotData, set
+                * TransactionXmin too.  (Note that because snapmgr.c called
+                * GetSnapshotData first, we'll be overwriting a valid xmin here,
+                * so we don't check that.)
+                */
+               MyProc->xmin = TransactionXmin = xmin;
+
+               result = true;
+               break;
+       }
+
+       LWLockRelease(ProcArrayLock);
+
+       return result;
+}
+
 /*
  * GetRunningTransactionData -- returns information about running transactions.
  *
index d39f8975f8816d9bba02c0c398323ca470f1340b..345f6f56a69557269d51382e17f36a8630e5fcc5 100644 (file)
  *
  * predicate lock maintenance
  *             GetSerializableTransactionSnapshot(Snapshot snapshot)
+ *             SetSerializableTransactionSnapshot(Snapshot snapshot,
+ *                                                                                TransactionId sourcexid)
  *             RegisterPredicateLockingXid(void)
  *             PredicateLockRelation(Relation relation, Snapshot snapshot)
  *             PredicateLockPage(Relation relation, BlockNumber blkno,
@@ -417,7 +419,8 @@ static void OldSerXidSetActiveSerXmin(TransactionId xid);
 static uint32 predicatelock_hash(const void *key, Size keysize);
 static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
-static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot);
+static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
+                                                                         TransactionId sourcexid);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
                                                  PREDICATELOCKTARGETTAG *parent);
@@ -1505,7 +1508,8 @@ GetSafeSnapshot(Snapshot origSnapshot)
                 * our caller passed to us.  The pointer returned is actually the same
                 * one passed to it, but we avoid assuming that here.
                 */
-               snapshot = GetSerializableTransactionSnapshotInt(origSnapshot);
+               snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
+                                                                                                                InvalidTransactionId);
 
                if (MySerializableXact == InvalidSerializableXact)
                        return snapshot;        /* no concurrent r/w xacts; it's safe */
@@ -1574,11 +1578,52 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
        if (XactReadOnly && XactDeferrable)
                return GetSafeSnapshot(snapshot);
 
-       return GetSerializableTransactionSnapshotInt(snapshot);
+       return GetSerializableTransactionSnapshotInt(snapshot,
+                                                                                                InvalidTransactionId);
 }
 
+/*
+ * Import a snapshot to be used for the current transaction.
+ *
+ * This is nearly the same as GetSerializableTransactionSnapshot, except that
+ * we don't take a new snapshot, but rather use the data we're handed.
+ *
+ * The caller must have verified that the snapshot came from a serializable
+ * transaction; and if we're read-write, the source transaction must not be
+ * read-only.
+ */
+void
+SetSerializableTransactionSnapshot(Snapshot snapshot,
+                                                                  TransactionId sourcexid)
+{
+       Assert(IsolationIsSerializable());
+
+       /*
+        * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
+        * import snapshots, since there's no way to wait for a safe snapshot
+        * when we're using the snap we're told to.  (XXX instead of throwing
+        * an error, we could just ignore the XactDeferrable flag?)
+        */
+       if (XactReadOnly && XactDeferrable)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
+
+       (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+}
+
+/*
+ * Guts of GetSerializableTransactionSnapshot
+ *
+ * If sourcexid is valid, this is actually an import operation and we should
+ * skip calling GetSnapshotData, because the snapshot contents are already
+ * loaded up.  HOWEVER: to avoid race conditions, we must check that the
+ * source xact is still running after we acquire SerializableXactHashLock.
+ * We do that by calling ProcArrayInstallImportedXmin.
+ */
 static Snapshot
-GetSerializableTransactionSnapshotInt(Snapshot snapshot)
+GetSerializableTransactionSnapshotInt(Snapshot snapshot,
+                                                                         TransactionId sourcexid)
 {
        PGPROC     *proc;
        VirtualTransactionId vxid;
@@ -1598,6 +1643,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot)
        /*
         * First we get the sxact structure, which may involve looping and access
         * to the "finished" list to free a structure for use.
+        *
+        * We must hold SerializableXactHashLock when taking/checking the snapshot
+        * to avoid race conditions, for much the same reasons that
+        * GetSnapshotData takes the ProcArrayLock.  Since we might have to release
+        * SerializableXactHashLock to call SummarizeOldestCommittedSxact, this
+        * means we have to create the sxact first, which is a bit annoying (in
+        * particular, an elog(ERROR) in procarray.c would cause us to leak the
+        * sxact).  Consider refactoring to avoid this.
         */
 #ifdef TEST_OLDSERXID
        SummarizeOldestCommittedSxact();
@@ -1615,8 +1668,19 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot)
                }
        } while (!sxact);
 
-       /* Get the snapshot */
-       snapshot = GetSnapshotData(snapshot);
+       /* Get the snapshot, or check that it's safe to use */
+       if (!TransactionIdIsValid(sourcexid))
+               snapshot = GetSnapshotData(snapshot);
+       else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
+       {
+               ReleasePredXact(sxact);
+               LWLockRelease(SerializableXactHashLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("could not import the requested snapshot"),
+                                errdetail("The source transaction %u is not running anymore.",
+                                                  sourcexid)));
+       }
 
        /*
         * If there are no serializable transactions which are not read-only, we
index f1d35a9a1129a9ca2da72d03a5aa4a15d08c4735..73e600107110c28520913da3ec32189c22219fbb 100644 (file)
@@ -72,6 +72,7 @@
 #include "utils/plancache.h"
 #include "utils/portal.h"
 #include "utils/ps_status.h"
+#include "utils/snapmgr.h"
 #include "utils/tzparser.h"
 #include "utils/xml.h"
 
@@ -6093,8 +6094,11 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
                case VAR_SET_MULTI:
 
                        /*
-                        * Special case for special SQL syntax that effectively sets more
-                        * than one variable per statement.
+                        * Special-case SQL syntaxes.  The TRANSACTION and SESSION
+                        * CHARACTERISTICS cases effectively set more than one variable
+                        * per statement.  TRANSACTION SNAPSHOT only takes one argument,
+                        * but we put it here anyway since it's a special case and not
+                        * related to any GUC variable.
                         */
                        if (strcmp(stmt->name, "TRANSACTION") == 0)
                        {
@@ -6140,6 +6144,18 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
                                                         item->defname);
                                }
                        }
+                       else if (strcmp(stmt->name, "TRANSACTION SNAPSHOT") == 0)
+                       {
+                               A_Const    *con = (A_Const *) linitial(stmt->args);
+
+                               if (stmt->is_local)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("SET LOCAL TRANSACTION SNAPSHOT is not implemented")));
+                               Assert(IsA(con, A_Const));
+                               Assert(nodeTag(&con->val) == T_String);
+                               ImportSnapshot(strVal(&con->val));
+                       }
                        else
                                elog(ERROR, "unexpected SET MULTI element: %s",
                                         stmt->name);
index 518aaf1af0cfa66f88d3b19272d3118f172c15c0..50fb78057d8978e5d26f784d32806d99908bb05f 100644 (file)
  * handle this reference as an internally-tracked registration, so that this
  * module is entirely lower-level than ResourceOwners.
  *
+ * Likewise, any snapshots that have been exported by pg_export_snapshot
+ * have regd_count = 1 and are counted in RegisteredSnapshots, but are not
+ * tracked by any resource owner.
+ *
  * These arrangements let us reset MyProc->xmin when there are no snapshots
  * referenced by this transaction.     (One possible improvement would be to be
  * able to advance Xmin when the snapshot with the earliest Xmin is no longer
  * referenced. That's a bit harder though, it requires more locking, and
- * anyway it should be rather uncommon to keep snapshots referenced for too
- * long.)
+ * anyway it should be rather uncommon to keep temporary snapshots referenced
+ * for too long.)
  *
  *
  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  */
 #include "postgres.h"
 
+#include <sys/stat.h>
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xact.h"
+#include "miscadmin.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
-#include "utils/memutils.h"
+#include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/tqual.h"
@@ -111,6 +119,15 @@ bool               FirstSnapshotSet = false;
  */
 static Snapshot FirstXactSnapshot = NULL;
 
+/* Define pathname of exported-snapshot files */
+#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
+#define XactExportFilePath(path, xid, num, suffix) \
+       snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
+                        xid, num, suffix)
+
+/* Current xact's exported snapshots (a list of Snapshot structs) */
+static List *exportedSnapshots = NIL;
+
 
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
@@ -139,7 +156,8 @@ GetTransactionSnapshot(void)
                 * In transaction-snapshot mode, the first snapshot must live until
                 * end of xact regardless of what the caller does with it, so we must
                 * make a copy of it rather than returning CurrentSnapshotData
-                * directly.
+                * directly.  Furthermore, if we're running in serializable mode,
+                * predicate.c needs to wrap the snapshot fetch in its own processing.
                 */
                if (IsolationUsesXactSnapshot())
                {
@@ -203,6 +221,88 @@ SnapshotSetCommandId(CommandId curcid)
                SecondarySnapshot->curcid = curcid;
 }
 
+/*
+ * SetTransactionSnapshot
+ *             Set the transaction's snapshot from an imported MVCC snapshot.
+ *
+ * Note that this is very closely tied to GetTransactionSnapshot --- it
+ * must take care of all the same considerations as the first-snapshot case
+ * in GetTransactionSnapshot.
+ */
+static void
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+{
+       /* Caller should have checked this already */
+       Assert(!FirstSnapshotSet);
+
+       Assert(RegisteredSnapshots == 0);
+       Assert(FirstXactSnapshot == NULL);
+
+       /*
+        * Even though we are not going to use the snapshot it computes, we must
+        * call GetSnapshotData, for two reasons: (1) to be sure that
+        * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
+        * RecentXmin and RecentGlobalXmin.  (We could alternatively include those
+        * two variables in exported snapshot files, but it seems better to have
+        * snapshot importers compute reasonably up-to-date values for them.)
+        */
+       CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+       /*
+        * Now copy appropriate fields from the source snapshot.
+        */
+       CurrentSnapshot->xmin = sourcesnap->xmin;
+       CurrentSnapshot->xmax = sourcesnap->xmax;
+       CurrentSnapshot->xcnt = sourcesnap->xcnt;
+       Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
+       memcpy(CurrentSnapshot->xip, sourcesnap->xip,
+                  sourcesnap->xcnt * sizeof(TransactionId));
+       CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
+       Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
+       memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
+                  sourcesnap->subxcnt * sizeof(TransactionId));
+       CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
+       CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
+       /* NB: curcid should NOT be copied, it's a local matter */
+
+       /*
+        * Now we have to fix what GetSnapshotData did with MyProc->xmin and
+        * TransactionXmin.  There is a race condition: to make sure we are not
+        * causing the global xmin to go backwards, we have to test that the
+        * source transaction is still running, and that has to be done atomically.
+        * So let procarray.c do it.
+        *
+        * Note: in serializable mode, predicate.c will do this a second time.
+        * It doesn't seem worth contorting the logic here to avoid two calls,
+        * especially since it's not clear that predicate.c *must* do this.
+        */
+       if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("could not import the requested snapshot"),
+                                errdetail("The source transaction %u is not running anymore.",
+                                                  sourcexid)));
+
+       /*
+        * In transaction-snapshot mode, the first snapshot must live until end of
+        * xact, so we must make a copy of it.  Furthermore, if we're running in
+        * serializable mode, predicate.c needs to do its own processing.
+        */
+       if (IsolationUsesXactSnapshot())
+       {
+               if (IsolationIsSerializable())
+                       SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
+               /* Make a saved copy */
+               CurrentSnapshot = CopySnapshot(CurrentSnapshot);
+               FirstXactSnapshot = CurrentSnapshot;
+               /* Mark it as "registered" in FirstXactSnapshot */
+               FirstXactSnapshot->regd_count++;
+               RegisteredSnapshots++;
+       }
+
+       FirstSnapshotSet = true;
+}
+
 /*
  * CopySnapshot
  *             Copy the given snapshot.
@@ -558,6 +658,42 @@ AtEOXact_Snapshot(bool isCommit)
        }
        FirstXactSnapshot = NULL;
 
+       /*
+        * If we exported any snapshots, clean them up.
+        */
+       if (exportedSnapshots != NIL)
+       {
+               TransactionId myxid = GetTopTransactionId();
+               int                     i;
+               char            buf[MAXPGPATH];
+
+               /*
+                * Get rid of the files.  Unlink failure is only a WARNING because
+                * (1) it's too late to abort the transaction, and (2) leaving a
+                * leaked file around has little real consequence anyway.
+                */
+               for (i = 1; i <= list_length(exportedSnapshots); i++)
+               {
+                       XactExportFilePath(buf, myxid, i, "");
+                       if (unlink(buf))
+                               elog(WARNING, "could not unlink file \"%s\": %m", buf);
+               }
+
+               /*
+                * As with the FirstXactSnapshot, we needn't spend any effort on
+                * cleaning up the per-snapshot data structures, but we do need to
+                * adjust the RegisteredSnapshots count to prevent a warning below.
+                *
+                * Note: you might be thinking "why do we have the exportedSnapshots
+                * list at all?  All we need is a counter!".  You're right, but we do
+                * it this way in case we ever feel like improving xmin management.
+                */
+               Assert(RegisteredSnapshots >= list_length(exportedSnapshots));
+               RegisteredSnapshots -= list_length(exportedSnapshots);
+
+               exportedSnapshots = NIL;
+       }
+
        /* On commit, complain about leftover snapshots */
        if (isCommit)
        {
@@ -586,3 +722,464 @@ AtEOXact_Snapshot(bool isCommit)
 
        SnapshotResetXmin();
 }
+
+
+/*
+ * ExportSnapshot
+ *             Export the snapshot to a file so that other backends can import it.
+ *             Returns the token (the file name) that can be used to import this
+ *             snapshot.
+ */
+static char *
+ExportSnapshot(Snapshot snapshot)
+{
+       TransactionId topXid;
+       TransactionId *children;
+       int                     nchildren;
+       int                     addTopXid;
+       StringInfoData buf;
+       FILE       *f;
+       int                     i;
+       MemoryContext oldcxt;
+       char            path[MAXPGPATH];
+       char            pathtmp[MAXPGPATH];
+
+       /*
+        * It's tempting to call RequireTransactionChain here, since it's not
+        * very useful to export a snapshot that will disappear immediately
+        * afterwards.  However, we haven't got enough information to do that,
+        * since we don't know if we're at top level or not.  For example, we
+        * could be inside a plpgsql function that is going to fire off other
+        * transactions via dblink.  Rather than disallow perfectly legitimate
+        * usages, don't make a check.
+        *
+        * Also note that we don't make any restriction on the transaction's
+        * isolation level; however, importers must check the level if they
+        * are serializable.
+        */
+
+       /*
+        * This will assign a transaction ID if we do not yet have one.
+        */
+       topXid = GetTopTransactionId();
+
+       /*
+        * We cannot export a snapshot from a subtransaction because there's no
+        * easy way for importers to verify that the same subtransaction is still
+        * running.
+        */
+       if (IsSubTransaction())
+               ereport(ERROR,
+                               (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+                                errmsg("cannot export a snapshot from a subtransaction")));
+
+       /*
+        * We do however allow previous committed subtransactions to exist.
+        * Importers of the snapshot must see them as still running, so get their
+        * XIDs to add them to the snapshot.
+        */
+       nchildren = xactGetCommittedChildren(&children);
+
+       /*
+        * Copy the snapshot into TopTransactionContext, add it to the
+        * exportedSnapshots list, and mark it pseudo-registered.  We do this to
+        * ensure that the snapshot's xmin is honored for the rest of the
+        * transaction.  (Right now, because SnapshotResetXmin is so stupid, this
+        * is overkill; but later we might make that routine smarter.)
+        */
+       snapshot = CopySnapshot(snapshot);
+
+       oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+       exportedSnapshots = lappend(exportedSnapshots, snapshot);
+       MemoryContextSwitchTo(oldcxt);
+
+       snapshot->regd_count++;
+       RegisteredSnapshots++;
+
+       /*
+        * Fill buf with a text serialization of the snapshot, plus identification
+        * data about this transaction.  The format expected by ImportSnapshot
+        * is pretty rigid: each line must be fieldname:value.
+        */
+       initStringInfo(&buf);
+
+       appendStringInfo(&buf, "xid:%u\n", topXid);
+       appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
+       appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
+       appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
+
+       appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
+       appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
+
+       /*
+        * We must include our own top transaction ID in the top-xid data, since
+        * by definition we will still be running when the importing transaction
+        * adopts the snapshot, but GetSnapshotData never includes our own XID in
+        * the snapshot.  (There must, therefore, be enough room to add it.)
+        *
+        * However, it could be that our topXid is after the xmax, in which case
+        * we shouldn't include it because xip[] members are expected to be before
+        * xmax.  (We need not make the same check for subxip[] members, see
+        * snapshot.h.)
+        */
+       addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
+       appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
+       for (i = 0; i < snapshot->xcnt; i++)
+               appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
+       if (addTopXid)
+               appendStringInfo(&buf, "xip:%u\n", topXid);
+
+       /*
+        * Similarly, we add our subcommitted child XIDs to the subxid data.
+        * Here, we have to cope with possible overflow.
+        */
+       if (snapshot->suboverflowed ||
+               snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
+               appendStringInfoString(&buf, "sof:1\n");
+       else
+       {
+               appendStringInfoString(&buf, "sof:0\n");
+               appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
+               for (i = 0; i < snapshot->subxcnt; i++)
+                       appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
+               for (i = 0; i < nchildren; i++)
+                       appendStringInfo(&buf, "sxp:%u\n", children[i]);
+       }
+       appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
+
+       /*
+        * Now write the text representation into a file.  We first write to a
+        * ".tmp" filename, and rename to final filename if no error.  This
+        * ensures that no other backend can read an incomplete file
+        * (ImportSnapshot won't allow it because of its valid-characters check).
+        */
+       XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
+       if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create file \"%s\": %m", pathtmp)));
+
+       if (fwrite(buf.data, buf.len, 1, f) != 1)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+       /* no fsync() since file need not survive a system crash */
+
+       if (FreeFile(f))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+       /*
+        * Now that we have written everything into a .tmp file, rename the file
+        * to remove the .tmp suffix.
+        */
+       XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
+
+       if (rename(pathtmp, path) < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                               pathtmp, path)));
+
+       /*
+        * The basename of the file is what we return from pg_export_snapshot().
+        * It's already in path in a textual format and we know that the path
+        * starts with SNAPSHOT_EXPORT_DIR.  Skip over the prefix and the slash
+        * and pstrdup it so as not to return the address of a local variable.
+        */
+       return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
+}
+
+/*
+ * pg_export_snapshot
+ *             SQL-callable wrapper for ExportSnapshot.
+ */
+Datum
+pg_export_snapshot(PG_FUNCTION_ARGS)
+{
+       char       *snapshotName;
+
+       snapshotName = ExportSnapshot(GetActiveSnapshot());
+       PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
+}
+
+
+/*
+ * Parsing subroutines for ImportSnapshot: parse a line with the given
+ * prefix followed by a value, and advance *s to the next line.  The
+ * filename is provided for use in error messages.
+ */
+static int
+parseIntFromText(const char *prefix, char **s, const char *filename)
+{
+       char       *ptr = *s;
+       int                     prefixlen = strlen(prefix);
+       int                     val;
+
+       if (strncmp(ptr, prefix, prefixlen) != 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       ptr += prefixlen;
+       if (sscanf(ptr, "%d", &val) != 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       ptr = strchr(ptr, '\n');
+       if (!ptr)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       *s = ptr + 1;
+       return val;
+}
+
+static TransactionId
+parseXidFromText(const char *prefix, char **s, const char *filename)
+{
+       char       *ptr = *s;
+       int                     prefixlen = strlen(prefix);
+       TransactionId val;
+
+       if (strncmp(ptr, prefix, prefixlen) != 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       ptr += prefixlen;
+       if (sscanf(ptr, "%u", &val) != 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       ptr = strchr(ptr, '\n');
+       if (!ptr)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", filename)));
+       *s = ptr + 1;
+       return val;
+}
+
+/*
+ * ImportSnapshot
+ *      Import a previously exported snapshot.  The argument should be a
+ *      filename in SNAPSHOT_EXPORT_DIR.  Load the snapshot from that file.
+ *      This is called by "SET TRANSACTION SNAPSHOT 'foo'".
+ */
+void
+ImportSnapshot(const char *idstr)
+{
+       char            path[MAXPGPATH];
+       FILE       *f;
+       struct stat     stat_buf;
+       char       *filebuf;
+       int                     xcnt;
+       int                     i;
+       TransactionId src_xid;
+       Oid                     src_dbid;
+       int                     src_isolevel;
+       bool            src_readonly;
+       SnapshotData snapshot;
+
+       /*
+        * Must be at top level of a fresh transaction.  Note in particular that
+        * we check we haven't acquired an XID --- if we have, it's conceivable
+        * that the snapshot would show it as not running, making for very
+        * screwy behavior.
+        */
+       if (FirstSnapshotSet ||
+               GetTopTransactionIdIfAny() != InvalidTransactionId ||
+               IsSubTransaction())
+               ereport(ERROR,
+                               (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+                                errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
+
+       /*
+        * If we are in read committed mode then the next query would execute
+        * with a new snapshot thus making this function call quite useless.
+        */
+       if (!IsolationUsesXactSnapshot())
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
+
+       /*
+        * Verify the identifier: only 0-9, A-F and hyphens are allowed.  We do
+        * this mainly to prevent reading arbitrary files.
+        */
+       if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("invalid snapshot identifier \"%s\"", idstr)));
+
+       /* OK, read the file */
+       snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
+
+       f = AllocateFile(path, PG_BINARY_R);
+       if (!f)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("invalid snapshot identifier \"%s\"", idstr)));
+
+       /* get the size of the file so that we know how much memory we need */
+       if (fstat(fileno(f), &stat_buf))
+               elog(ERROR, "could not stat file \"%s\": %m", path);
+
+       /* and read the file into a palloc'd string */
+       filebuf = (char *) palloc(stat_buf.st_size + 1);
+       if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
+               elog(ERROR, "could not read file \"%s\": %m", path);
+
+       filebuf[stat_buf.st_size] = '\0';
+
+       FreeFile(f);
+
+       /*
+        * Construct a snapshot struct by parsing the file content.
+        */
+       memset(&snapshot, 0, sizeof(snapshot));
+
+       src_xid = parseXidFromText("xid:", &filebuf, path);
+       /* we abuse parseXidFromText a bit here ... */
+       src_dbid = parseXidFromText("dbid:", &filebuf, path);
+       src_isolevel = parseIntFromText("iso:", &filebuf, path);
+       src_readonly = parseIntFromText("ro:", &filebuf, path);
+
+       snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
+       snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
+
+       snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
+
+       /* sanity-check the xid count before palloc */
+       if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", path)));
+
+       snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+       for (i = 0; i < xcnt; i++)
+               snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
+
+       snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
+
+       if (!snapshot.suboverflowed)
+       {
+               snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
+
+               /* sanity-check the xid count before palloc */
+               if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                        errmsg("invalid snapshot data in file \"%s\"", path)));
+
+               snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+               for (i = 0; i < xcnt; i++)
+                       snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
+       }
+       else
+       {
+               snapshot.subxcnt = 0;
+               snapshot.subxip = NULL;
+       }
+
+       snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
+
+       /*
+        * Do some additional sanity checking, just to protect ourselves.  We
+        * don't trouble to check the array elements, just the most critical
+        * fields.
+        */
+       if (!TransactionIdIsNormal(src_xid) ||
+               !OidIsValid(src_dbid) ||
+               !TransactionIdIsNormal(snapshot.xmin) ||
+               !TransactionIdIsNormal(snapshot.xmax))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                                errmsg("invalid snapshot data in file \"%s\"", path)));
+
+       /*
+        * If we're serializable, the source transaction must be too, otherwise
+        * predicate.c has problems (SxactGlobalXmin could go backwards).  Also,
+        * a non-read-only transaction can't adopt a snapshot from a read-only
+        * transaction, as predicate.c handles the cases very differently.
+        */
+       if (IsolationIsSerializable())
+       {
+               if (src_isolevel != XACT_SERIALIZABLE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
+               if (src_readonly && !XactReadOnly)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
+       }
+
+       /*
+        * We cannot import a snapshot that was taken in a different database,
+        * because vacuum calculates OldestXmin on a per-database basis; so the
+        * source transaction's xmin doesn't protect us from data loss.  This
+        * restriction could be removed if the source transaction were to mark
+        * its xmin as being globally applicable.  But that would require some
+        * additional syntax, since that has to be known when the snapshot is
+        * initially taken.  (See pgsql-hackers discussion of 2011-10-21.)
+        */
+       if (src_dbid != MyDatabaseId)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot import a snapshot from a different database")));
+
+       /* OK, install the snapshot */
+       SetTransactionSnapshot(&snapshot, src_xid);
+}
+
+/*
+ * XactHasExportedSnapshots
+ *             Test whether current transaction has exported any snapshots.
+ */
+bool
+XactHasExportedSnapshots(void)
+{
+       return (exportedSnapshots != NIL);
+}
+
+/*
+ * DeleteAllExportedSnapshotFiles
+ *             Clean up any files that have been left behind by a crashed backend
+ *             that had exported snapshots before it died.
+ *
+ * This should be called during database startup or crash recovery.
+ */
+void
+DeleteAllExportedSnapshotFiles(void)
+{
+       char            buf[MAXPGPATH];
+       DIR                *s_dir;
+       struct dirent *s_de;
+
+       if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
+       {
+               /*
+                * We really should have that directory in a sane cluster setup. But
+                * then again if we don't, it's not fatal enough to make it FATAL.
+                * Since we're running in the postmaster, LOG is our best bet.
+                */
+               elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
+               return;
+       }
+
+       while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
+       {
+               if (strcmp(s_de->d_name, ".") == 0 ||
+                       strcmp(s_de->d_name, "..") == 0)
+                       continue;
+
+               snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
+               /* Again, unlink failure is not worthy of FATAL */
+               if (unlink(buf))
+                       elog(LOG, "could not unlink file \"%s\": %m", buf);
+       }
+
+       FreeDir(s_dir);
+}
index e535fdad1e91e556a49ec8deba6802d99ae8bbfa..29000095cba936ca5afda3220807b6128c6ef5aa 100644 (file)
@@ -2555,6 +2555,7 @@ main(int argc, char *argv[])
                "pg_clog",
                "pg_notify",
                "pg_serial",
+               "pg_snapshots",
                "pg_subtrans",
                "pg_twophase",
                "pg_multixact/members",
index 8097545faaaafb95f0832a659ed846b938451d4a..c6273c12671d7e37874a301f7b29cf7d762ebe0f 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201110161
+#define CATALOG_VERSION_NO     201110221
 
 #endif
index 96f43fe0b1fbb1944460f91f654a7074d7689b34..64b7a6a314d1084665caa5923fda8629ebd2f81b 100644 (file)
@@ -2870,6 +2870,9 @@ DESCR("xlog filename and byte offset, given an xlog location");
 DATA(insert OID = 2851 ( pg_xlogfile_name                      PGNSP PGUID 12 1 0 0 0 f f f t f i 1 0 25 "25" _null_ _null_ _null_ _null_ pg_xlogfile_name _null_ _null_ _null_ ));
 DESCR("xlog filename, given an xlog location");
 
+DATA(insert OID = 3809 ( pg_export_snapshot            PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_export_snapshot _null_ _null_ _null_ ));
+DESCR("export a snapshot");
+
 DATA(insert OID = 3810 (  pg_is_in_recovery            PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ ));
 DESCR("true if server is in recovery");
 
index 12c2faf3de8fbfb3169c99a360cc15465e425d79..3d170bc3679ff50ca75c9ab250891d835a68d2c1 100644 (file)
@@ -337,6 +337,7 @@ PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD)
 PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD)
+PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("some", SOME, RESERVED_KEYWORD)
 PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD)
index 9603b10ad4052e55109b976d521d6054b656dc8a..7f9d5fc51c188a32061a3187da993ec5b1d35346 100644 (file)
@@ -43,6 +43,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
 
 /* predicate lock maintenance */
 extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
+extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
+                                                                                          TransactionId sourcexid);
 extern void RegisterPredicateLockingXid(TransactionId xid);
 extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
 extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
index a11d4385b7df448031c9a68517c2c94a2ef9fb06..71c82437cdf84322e4d743109d5b6a403ed806bf 100644 (file)
@@ -37,10 +37,16 @@ extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
 extern void ExpireAllKnownAssignedTransactionIds(void);
 extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
 
-extern RunningTransactions GetRunningTransactionData(void);
+extern int     GetMaxSnapshotXidCount(void);
+extern int     GetMaxSnapshotSubxidCount(void);
 
 extern Snapshot GetSnapshotData(Snapshot snapshot);
 
+extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
+                                                                                TransactionId sourcexid);
+
+extern RunningTransactions GetRunningTransactionData(void);
+
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
 extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
index e665a28aff8571e50ce318d8a0ca4710ec503b3c..1e5cb866b171dad2c80180d84f1f450a2ce0ca79 100644 (file)
@@ -42,4 +42,9 @@ extern void AtSubCommit_Snapshot(int level);
 extern void AtSubAbort_Snapshot(int level);
 extern void AtEOXact_Snapshot(bool isCommit);
 
+extern Datum pg_export_snapshot(PG_FUNCTION_ARGS);
+extern void ImportSnapshot(const char *idstr);
+extern bool XactHasExportedSnapshots(void);
+extern void DeleteAllExportedSnapshotFiles(void);
+
 #endif   /* SNAPMGR_H */