]> granicus.if.org Git - postgresql/commitdiff
Keep track of transaction commit timestamps
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 3 Dec 2014 14:53:02 +0000 (11:53 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 3 Dec 2014 14:53:02 +0000 (11:53 -0300)
Transactions can now set their commit timestamp directly as they commit,
or an external transaction commit timestamp can be fed from an outside
system using the new function TransactionTreeSetCommitTsData().  This
data is crash-safe, and truncated at Xid freeze point, same as pg_clog.

This module is disabled by default because it causes a performance hit,
but can be enabled in postgresql.conf requiring only a server restart.

A new test in src/test/modules is included.

Catalog version bumped due to the new subdirectory within PGDATA and a
couple of new SQL functions.

Authors: Álvaro Herrera and Petr Jelínek

Reviewed to varying degrees by Michael Paquier, Andres Freund, Robert
Haas, Amit Kapila, Fujii Masao, Jaime Casanova, Simon Riggs, Steven
Singer, Peter Eisentraut

43 files changed:
contrib/pg_upgrade/pg_upgrade.c
contrib/pg_xlogdump/rmgrdesc.c
doc/src/sgml/config.sgml
doc/src/sgml/func.sgml
doc/src/sgml/ref/pg_resetxlog.sgml
doc/src/sgml/storage.sgml
src/backend/access/rmgrdesc/Makefile
src/backend/access/rmgrdesc/committsdesc.c [new file with mode: 0644]
src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/Makefile
src/backend/access/transam/README
src/backend/access/transam/commit_ts.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/backend/access/transam/slru.c
src/backend/access/transam/varsup.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/commands/vacuum.c
src/backend/libpq/hba.c
src/backend/replication/logical/decode.c
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/lwlock.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/bin/pg_controldata/pg_controldata.c
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/commit_ts.h [new file with mode: 0644]
src/include/access/rmgrlist.h
src/include/access/transam.h
src/include/access/xlog_internal.h
src/include/catalog/catversion.h
src/include/catalog/pg_control.h
src/include/catalog/pg_proc.h
src/include/storage/lwlock.h
src/include/utils/builtins.h
src/test/modules/Makefile
src/test/modules/commit_ts/.gitignore [new file with mode: 0644]
src/test/modules/commit_ts/Makefile [new file with mode: 0644]
src/test/modules/commit_ts/commit_ts.conf [new file with mode: 0644]
src/test/modules/commit_ts/expected/commit_timestamp.out [new file with mode: 0644]
src/test/modules/commit_ts/expected/commit_timestamp_1.out [new file with mode: 0644]
src/test/modules/commit_ts/sql/commit_timestamp.sql [new file with mode: 0644]

index 3b8241b37e2e897b34508bafb0da3fad341c5cc2..dd1113ecc259428b2bd10f430efddd27f9b91f22 100644 (file)
@@ -430,6 +430,13 @@ copy_clog_xlog_xid(void)
                          "\"%s/pg_resetxlog\" -f -e %u \"%s\"",
                          new_cluster.bindir, old_cluster.controldata.chkpnt_nxtepoch,
                          new_cluster.pgdata);
+       /* must reset commit timestamp limits also */
+       exec_prog(UTILITY_LOG_FILE, NULL, true,
+                         "\"%s/pg_resetxlog\" -f -c %u,%u \"%s\"",
+                         new_cluster.bindir,
+                         old_cluster.controldata.chkpnt_nxtxid,
+                         old_cluster.controldata.chkpnt_nxtxid,
+                         new_cluster.pgdata);
        check_ok();
 
        /*
index 93971982390967aac3a30d3075ae8a105861eda2..180818d68b5e82d8e76ad723e6612833f5f809cb 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "access/brin_xlog.h"
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
index ab8c2637d750dab2f5742aaac6473332523b9740..e3713d3b490513f7ac20a46c29cf06dc56972e77 100644 (file)
@@ -2673,6 +2673,20 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
+      <term><varname>track_commit_timestamp</varname> (<type>bool</type>)</term>
+      <indexterm>
+       <primary><varname>track_commit_timestamp</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Record commit time of transactions. This parameter
+        can only be set in <filename>postgresql.conf</> file or on the server
+        command line. The default value is <literal>off</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
index baf81ee0404a3886a4be866173885c356fa5dace..62ec275a9e5524b702b6b53735611543850583cb 100644 (file)
@@ -15938,6 +15938,45 @@ SELECT collation for ('foo' COLLATE "de_DE");
     For example <literal>10:20:10,14,15</literal> means
     <literal>xmin=10, xmax=20, xip_list=10, 14, 15</literal>.
    </para>
+
+   <para>
+    The functions shown in <xref linkend="functions-commit-timestamp">
+    provide information about transactions that have been already committed.
+    These functions mainly provide information about when the transactions
+    were committed. They only provide useful data when
+    <xref linkend="guc-track-commit-timestamp"> configuration option is enabled
+    and only for transactions that were committed after it was enabled.
+   </para>
+
+   <table id="functions-commit-timestamp">
+    <title>Committed transaction information</title>
+    <tgroup cols="3">
+     <thead>
+      <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry></row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry>
+        <indexterm><primary>pg_xact_commit_timestamp</primary></indexterm>
+        <literal><function>pg_xact_commit_timestamp(<parameter>xid</parameter>)</function></literal>
+       </entry>
+       <entry><type>timestamp with time zone</type></entry>
+       <entry>get commit timestamp of a transaction</entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm><primary>pg_last_committed_xact</primary></indexterm>
+        <literal><function>pg_last_committed_xact()</function></literal>
+       </entry>
+       <entry><parameter>xid</> <type>xid</>, <parameter>timestamp</> <type>timestamp with time zone</></entry>
+       <entry>get transaction ID and commit timestamp of latest committed transaction</entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
   </sect1>
 
   <sect1 id="functions-admin">
index aba7185f35592780673aa2add2febed6f5e8f36c..59280f01cb1596b318dbcabb7cd1cd2f8bcb66b6 100644 (file)
@@ -22,6 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
   <cmdsynopsis>
    <command>pg_resetxlog</command>
+   <arg choice="opt"><option>-c</option> <replaceable class="parameter">xid</replaceable>,<replaceable class="parameter">xid</replaceable></arg>
    <arg choice="opt"><option>-f</option></arg>
    <arg choice="opt"><option>-n</option></arg>
    <arg choice="opt"><option>-o</option> <replaceable class="parameter">oid</replaceable></arg>
@@ -79,9 +80,12 @@ PostgreSQL documentation
   <para>
    The <option>-o</>, <option>-x</>, <option>-e</>,
    <option>-m</>, <option>-O</>,
+   <option>-c</>
    and <option>-l</>
    options allow the next OID, next transaction ID, next transaction ID's
-   epoch, next and oldest multitransaction ID, next multitransaction offset, and WAL
+   epoch, next and oldest multitransaction ID, next multitransaction offset,
+   oldest and newest transaction IDs for which the commit time can be retrieved,
+   and WAL
    starting address values to be set manually.  These are only needed when
    <command>pg_resetxlog</command> is unable to determine appropriate values
    by reading <filename>pg_control</>.  Safe values can be determined as
@@ -128,6 +132,19 @@ PostgreSQL documentation
      </para>
     </listitem>
 
+    <listitem>
+     <para>
+      A safe value for the oldest transaction ID for which the commit time can
+      be retrieved (first part of <option>-c</>) can be determined by looking
+      for the numerically smallest file name in the directory
+      <filename>pg_committs</> under the data directory.  Conversely, a safe
+      value for the newest transaction ID for which the commit time can be
+      retrieved (second part of <option>-c</>) can be determined by looking for
+      the numerically greatest file name in the same directory.  As above, the
+      file names are in hexadecimal.
+     </para>
+    </listitem>
+
     <listitem>
      <para>
       The WAL starting address (<option>-l</>) should be
index 920b5f0dc3bc3b90fb19f672137ed0843052f8d8..cb76b98dc363a621757ad2e0d3bbda9789ab1740 100644 (file)
@@ -66,6 +66,11 @@ Item
  <structname>pg_database</></entry>
 </row>
 
+<row>
+ <entry><filename>pg_commit_ts</></entry>
+ <entry>Subdirectory containing transaction commit timestamp data</entry>
+</row>
+
 <row>
  <entry><filename>pg_clog</></entry>
  <entry>Subdirectory containing transaction commit status data</entry>
index 32cb985036c2d2124b750abfdb97f8c25becc843..d18e8ec99802ba79523f2e8e5059f4c00c7e7d6f 100644 (file)
@@ -8,7 +8,7 @@ subdir = src/backend/access/rmgrdesc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = brindesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
+OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
           hashdesc.o heapdesc.o \
           mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
           standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
diff --git a/src/backend/access/rmgrdesc/committsdesc.c b/src/backend/access/rmgrdesc/committsdesc.c
new file mode 100644 (file)
index 0000000..6efabd9
--- /dev/null
@@ -0,0 +1,82 @@
+/*-------------------------------------------------------------------------
+ *
+ * committsdesc.c
+ *    rmgr descriptor routines for access/transam/commit_ts.c
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *    src/backend/access/rmgrdesc/committsdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/commit_ts.h"
+#include "utils/timestamp.h"
+
+
+void
+commit_ts_desc(StringInfo buf, XLogReaderState *record)
+{
+       char       *rec = XLogRecGetData(record);
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       if (info == COMMIT_TS_ZEROPAGE)
+       {
+               int                     pageno;
+
+               memcpy(&pageno, rec, sizeof(int));
+               appendStringInfo(buf, "%d", pageno);
+       }
+       else if (info == COMMIT_TS_TRUNCATE)
+       {
+               int                     pageno;
+
+               memcpy(&pageno, rec, sizeof(int));
+               appendStringInfo(buf, "%d", pageno);
+       }
+       else if (info == COMMIT_TS_SETTS)
+       {
+               xl_commit_ts_set *xlrec = (xl_commit_ts_set *) rec;
+               int             nsubxids;
+
+               appendStringInfo(buf, "set %s/%d for: %u",
+                                                timestamptz_to_str(xlrec->timestamp),
+                                                xlrec->nodeid,
+                                                xlrec->mainxid);
+               nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
+                                       sizeof(TransactionId));
+               if (nsubxids > 0)
+               {
+                       int             i;
+                       TransactionId *subxids;
+
+                       subxids = palloc(sizeof(TransactionId) * nsubxids);
+                       memcpy(subxids,
+                                  XLogRecGetData(record) + SizeOfCommitTsSet,
+                                  sizeof(TransactionId) * nsubxids);
+                       for (i = 0; i < nsubxids; i++)
+                               appendStringInfo(buf, ", %u", subxids[i]);
+                       pfree(subxids);
+               }
+       }
+}
+
+const char *
+commit_ts_identify(uint8 info)
+{
+       switch (info)
+       {
+               case COMMIT_TS_ZEROPAGE:
+                       return "ZEROPAGE";
+               case COMMIT_TS_TRUNCATE:
+                       return "TRUNCATE";
+               case COMMIT_TS_SETTS:
+                       return "SETTS";
+               default:
+                       return NULL;
+       }
+}
index eba046d1fa9fe51fbc9ee0f8bc47429a4f621249..6b5fea96d7bc536e7eb53faff98cde6a9b37cf26 100644 (file)
@@ -45,6 +45,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
                appendStringInfo(buf, "redo %X/%X; "
                                                 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
                                                 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
+                                                "oldest/newest commit timestamp xid: %u/%u; "
                                                 "oldest running xid %u; %s",
                                (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
                                                 checkpoint->ThisTimeLineID,
@@ -58,6 +59,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
                                                 checkpoint->oldestXidDB,
                                                 checkpoint->oldestMulti,
                                                 checkpoint->oldestMultiDB,
+                                                checkpoint->oldestCommitTs,
+                                                checkpoint->newestCommitTs,
                                                 checkpoint->oldestActiveXid,
                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
        }
index 82a6c7695fc4078512fd6bbbc50ecac30fbafb74..9d4d5dbc9753a2829c2cbcaa33151287a139483f 100644 (file)
@@ -12,8 +12,9 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
-       timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
+OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
+       timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
+       xact.o xlog.o xlogarchive.o xlogfuncs.o \
        xloginsert.o xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
index b619de5ad3a7cb35335cd4e6d2d704bda89197e9..bc68b470e09f817e6ddd4d7c44c7ca08db55d0f0 100644 (file)
@@ -840,7 +840,7 @@ parent transaction to complete.
 
 Not all transactional behaviour is emulated, for example we do not insert
 a transaction entry into the lock table, nor do we maintain the transaction
-stack in memory. Clog and multixact entries are made normally.
+stack in memory. Clog, multixact and commit_ts entries are made normally.
 Subtrans is maintained during recovery but the details of the transaction
 tree are ignored and all subtransactions reference the top-level TransactionId
 directly. Since commit is atomic this provides correct lock wait behaviour
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
new file mode 100644 (file)
index 0000000..ca074da
--- /dev/null
@@ -0,0 +1,902 @@
+/*-------------------------------------------------------------------------
+ *
+ * commit_ts.c
+ *             PostgreSQL commit timestamp manager
+ *
+ * This module is a pg_clog-like system that stores the commit timestamp
+ * for each transaction.
+ *
+ * XLOG interactions: this module generates an XLOG record whenever a new
+ * CommitTs page is initialized to zeroes.  Also, one XLOG record is
+ * generated for setting of values when the caller requests it; this allows
+ * us to support values coming from places other than transaction commit.
+ * Other writes of CommitTS come from recording of transaction commit in
+ * xact.c, which generates its own XLOG records for these events and will
+ * re-perform the status update on redo; so we need make no additional XLOG
+ * entry here.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/commit_ts.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/commit_ts.h"
+#include "access/htup_details.h"
+#include "access/slru.h"
+#include "access/transam.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/builtins.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+
+/*
+ * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CommitTs page numbering also wraps around at
+ * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
+ * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+ */
+
+/*
+ * We need 8+4 bytes per xact.  Note that enlarging this struct might mean
+ * the largest possible file name is more than 5 chars long; see
+ * SlruScanDirectory.
+ */
+typedef struct CommitTimestampEntry
+{
+       TimestampTz             time;
+       CommitTsNodeId  nodeid;
+} CommitTimestampEntry;
+
+#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
+                                                                       sizeof(CommitTsNodeId))
+
+#define COMMIT_TS_XACTS_PER_PAGE \
+       (BLCKSZ / SizeOfCommitTimestampEntry)
+
+#define TransactionIdToCTsPage(xid)    \
+       ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+#define TransactionIdToCTsEntry(xid)   \
+       ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CommitTs control
+ */
+static SlruCtlData CommitTsCtlData;
+
+#define CommitTsCtl (&CommitTsCtlData)
+
+/*
+ * We keep a cache of the last value set in shared memory.  This is protected
+ * by CommitTsLock.
+ */
+typedef struct CommitTimestampShared
+{
+       TransactionId   xidLastCommit;
+       CommitTimestampEntry dataLastCommit;
+} CommitTimestampShared;
+
+CommitTimestampShared  *commitTsShared;
+
+
+/* GUC variable */
+bool   track_commit_timestamp;
+
+static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
+
+static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+                                        TransactionId *subxids, TimestampTz ts,
+                                        CommitTsNodeId nodeid, int pageno);
+static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+                                                 CommitTsNodeId nodeid, int slotno);
+static int     ZeroCommitTsPage(int pageno, bool writeXlog);
+static bool CommitTsPagePrecedes(int page1, int page2);
+static void WriteZeroPageXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno);
+static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+                                                TransactionId *subxids, TimestampTz timestamp,
+                                                CommitTsNodeId nodeid);
+
+
+/*
+ * CommitTsSetDefaultNodeId
+ *
+ * Set default nodeid for current backend.
+ */
+void
+CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
+{
+       default_node_id = nodeid;
+}
+
+/*
+ * CommitTsGetDefaultNodeId
+ *
+ * Set default nodeid for current backend.
+ */
+CommitTsNodeId
+CommitTsGetDefaultNodeId(void)
+{
+       return default_node_id;
+}
+
+/*
+ * TransactionTreeSetCommitTsData
+ *
+ * Record the final commit timestamp of transaction entries in the commit log
+ * for a transaction and its subtransaction tree, as efficiently as possible.
+ *
+ * xid is the top level transaction id.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ * The reason why tracking just the parent xid commit timestamp is not enough
+ * is that the subtrans SLRU does not stay valid across crashes (it's not
+ * permanent) so we need to keep the information about them here. If the
+ * subtrans implementation changes in the future, we might want to revisit the
+ * decision of storing timestamp info for each subxid.
+ *
+ * The do_xlog parameter tells us whether to include a XLog record of this
+ * or not.  Normal path through RecordTransactionCommit() will be related
+ * to a transaction commit XLog record, and so should pass "false" here.
+ * Other callers probably want to pass true, so that the given values persist
+ * in case of crashes.
+ */
+void
+TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+                                                          TransactionId *subxids, TimestampTz timestamp,
+                                                          CommitTsNodeId nodeid, bool do_xlog)
+{
+       int                     i;
+       TransactionId headxid;
+       TransactionId newestXact;
+
+       if (!track_commit_timestamp)
+               return;
+
+       /*
+        * Comply with the WAL-before-data rule: if caller specified it wants
+        * this value to be recorded in WAL, do so before touching the data.
+        */
+       if (do_xlog)
+               WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
+
+       /*
+        * Figure out the latest Xid in this batch: either the last subxid if
+        * there's any, otherwise the parent xid.
+        */
+       if (nsubxids > 0)
+               newestXact = subxids[nsubxids - 1];
+       else
+               newestXact = xid;
+
+       /*
+        * We split the xids to set the timestamp to in groups belonging to the
+        * same SLRU page; the first element in each such set is its head.  The
+        * first group has the main XID as the head; subsequent sets use the
+        * first subxid not on the previous page as head.  This way, we only have
+        * to lock/modify each SLRU page once.
+        */
+       for (i = 0, headxid = xid;;)
+       {
+               int                     pageno = TransactionIdToCTsPage(headxid);
+               int                     j;
+
+               for (j = i; j < nsubxids; j++)
+               {
+                       if (TransactionIdToCTsPage(subxids[j]) != pageno)
+                               break;
+               }
+               /* subxids[i..j] are on the same page as the head */
+
+               SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
+                                                        pageno);
+
+               /* if we wrote out all subxids, we're done. */
+               if (j + 1 >= nsubxids)
+                       break;
+
+               /*
+                * Set the new head and skip over it, as well as over the subxids
+                * we just wrote.
+                */
+               headxid = subxids[j];
+               i += j - i + 1;
+       }
+
+       /* update the cached value in shared memory */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       commitTsShared->xidLastCommit = xid;
+       commitTsShared->dataLastCommit.time = timestamp;
+       commitTsShared->dataLastCommit.nodeid = nodeid;
+
+       /* and move forwards our endpoint, if needed */
+       if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
+               ShmemVariableCache->newestCommitTs = newestXact;
+       LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Record the commit timestamp of transaction entries in the commit log for all
+ * entries on a single page.  Atomic only on this page.
+ */
+static void
+SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+                                        TransactionId *subxids, TimestampTz ts,
+                                        CommitTsNodeId nodeid, int pageno)
+{
+       int                     slotno;
+       int                     i;
+
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+       slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+
+       TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
+       for (i = 0; i < nsubxids; i++)
+               TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
+
+       CommitTsCtl->shared->page_dirty[slotno] = true;
+
+       LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Sets the commit timestamp of a single transaction.
+ *
+ * Must be called with CommitTsControlLock held
+ */
+static void
+TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+                                                CommitTsNodeId nodeid, int slotno)
+{
+       int                     entryno = TransactionIdToCTsEntry(xid);
+       CommitTimestampEntry entry;
+
+       Assert(TransactionIdIsNormal(xid));
+
+       entry.time = ts;
+       entry.nodeid = nodeid;
+
+       memcpy(CommitTsCtl->shared->page_buffer[slotno] +
+                  SizeOfCommitTimestampEntry * entryno,
+                  &entry, SizeOfCommitTimestampEntry);
+}
+
+/*
+ * Interrogate the commit timestamp of a transaction.
+ *
+ * Return value indicates whether commit timestamp record was found for
+ * given xid.
+ */
+bool
+TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+                                                        CommitTsNodeId *nodeid)
+{
+       int                     pageno = TransactionIdToCTsPage(xid);
+       int                     entryno = TransactionIdToCTsEntry(xid);
+       int                     slotno;
+       CommitTimestampEntry entry;
+       TransactionId oldestCommitTs;
+       TransactionId newestCommitTs;
+
+       /* Error if module not enabled */
+       if (!track_commit_timestamp)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("could not get commit timestamp data"),
+                                errhint("Make sure the configuration parameter \"%s\" is set.",
+                                                "track_commit_timestamp")));
+
+       /* error if the given Xid doesn't normally commit */
+       if (!TransactionIdIsNormal(xid))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
+
+       /*
+        * Return empty if the requested value is outside our valid range.
+        */
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+       oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+       newestCommitTs = ShmemVariableCache->newestCommitTs;
+       /* neither is invalid, or both are */
+       Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
+       LWLockRelease(CommitTsLock);
+
+       if (!TransactionIdIsValid(oldestCommitTs) ||
+               TransactionIdPrecedes(xid, oldestCommitTs) ||
+               TransactionIdPrecedes(newestCommitTs, xid))
+       {
+               if (ts)
+                       *ts = 0;
+               if (nodeid)
+                       *nodeid = InvalidCommitTsNodeId;
+               return false;
+       }
+
+       /*
+        * Use an unlocked atomic read on our cached value in shared memory; if
+        * it's a hit, acquire a lock and read the data, after verifying that it's
+        * still what we initially read.  Otherwise, fall through to read from
+        * SLRU.
+        */
+       if (commitTsShared->xidLastCommit == xid)
+       {
+               LWLockAcquire(CommitTsLock, LW_SHARED);
+               if (commitTsShared->xidLastCommit == xid)
+               {
+                       if (ts)
+                               *ts = commitTsShared->dataLastCommit.time;
+                       if (nodeid)
+                               *nodeid = commitTsShared->dataLastCommit.nodeid;
+
+                       LWLockRelease(CommitTsLock);
+                       return *ts != 0;
+               }
+               LWLockRelease(CommitTsLock);
+       }
+
+       /* lock is acquired by SimpleLruReadPage_ReadOnly */
+       slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+       memcpy(&entry,
+                  CommitTsCtl->shared->page_buffer[slotno] +
+                  SizeOfCommitTimestampEntry * entryno,
+                  SizeOfCommitTimestampEntry);
+
+       if (ts)
+               *ts = entry.time;
+       if (nodeid)
+               *nodeid = entry.nodeid;
+
+       LWLockRelease(CommitTsControlLock);
+       return *ts != 0;
+}
+
+/*
+ * Return the Xid of the latest committed transaction.  (As far as this module
+ * is concerned, anyway; it's up to the caller to ensure the value is useful
+ * for its purposes.)
+ *
+ * ts and extra are filled with the corresponding data; they can be passed
+ * as NULL if not wanted.
+ */
+TransactionId
+GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
+{
+       TransactionId   xid;
+
+       /* Error if module not enabled */
+       if (!track_commit_timestamp)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("could not get commit timestamp data"),
+                                errhint("Make sure the configuration parameter \"%s\" is set.",
+                                                "track_commit_timestamp")));
+
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+       xid = commitTsShared->xidLastCommit;
+       if (ts)
+               *ts = commitTsShared->dataLastCommit.time;
+       if (nodeid)
+               *nodeid = commitTsShared->dataLastCommit.nodeid;
+       LWLockRelease(CommitTsLock);
+
+       return xid;
+}
+
+/*
+ * SQL-callable wrapper to obtain commit time of a transaction
+ */
+Datum
+pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
+{
+       TransactionId   xid = PG_GETARG_UINT32(0);
+       TimestampTz             ts;
+       bool                    found;
+
+       found = TransactionIdGetCommitTsData(xid, &ts, NULL);
+
+       if (!found)
+               PG_RETURN_NULL();
+
+       PG_RETURN_TIMESTAMPTZ(ts);
+}
+
+
+Datum
+pg_last_committed_xact(PG_FUNCTION_ARGS)
+{
+       TransactionId   xid;
+       TimestampTz             ts;
+       Datum       values[2];
+       bool        nulls[2];
+       TupleDesc   tupdesc;
+       HeapTuple       htup;
+
+       /* and construct a tuple with our data */
+       xid = GetLatestCommitTsData(&ts, NULL);
+
+       /*
+        * Construct a tuple descriptor for the result row.  This must match this
+        * function's pg_proc entry!
+        */
+       tupdesc = CreateTemplateTupleDesc(2, false);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
+                                          XIDOID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
+                                          TIMESTAMPTZOID, -1, 0);
+       tupdesc = BlessTupleDesc(tupdesc);
+
+       if (!TransactionIdIsNormal(xid))
+       {
+               memset(nulls, true, sizeof(nulls));
+       }
+       else
+       {
+               values[0] = TransactionIdGetDatum(xid);
+               nulls[0] = false;
+
+               values[1] = TimestampTzGetDatum(ts);
+               nulls[1] = false;
+       }
+
+       htup = heap_form_tuple(tupdesc, values, nulls);
+
+       PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
+
+
+/*
+ * Number of shared CommitTS buffers.
+ *
+ * We use a very similar logic as for the number of CLOG buffers; see comments
+ * in CLOGShmemBuffers.
+ */
+Size
+CommitTsShmemBuffers(void)
+{
+       return Min(16, Max(4, NBuffers / 1024));
+}
+
+/*
+ * Shared memory sizing for CommitTs
+ */
+Size
+CommitTsShmemSize(void)
+{
+       return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
+               sizeof(CommitTimestampShared);
+}
+
+/*
+ * Initialize CommitTs at system startup (postmaster start or standalone
+ * backend)
+ */
+void
+CommitTsShmemInit(void)
+{
+       bool    found;
+
+       CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+       SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
+                                 CommitTsControlLock, "pg_commit_ts");
+
+       commitTsShared = ShmemInitStruct("CommitTs shared",
+                                                                        sizeof(CommitTimestampShared),
+                                                                        &found);
+
+       if (!IsUnderPostmaster)
+       {
+               Assert(!found);
+
+               commitTsShared->xidLastCommit = InvalidTransactionId;
+               TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+               commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
+       }
+       else
+               Assert(found);
+}
+
+/*
+ * This function must be called ONCE on system install.
+ *
+ * (The CommitTs directory is assumed to have been created by initdb, and
+ * CommitTsShmemInit must have been called already.)
+ */
+void
+BootStrapCommitTs(void)
+{
+       /*
+        * Nothing to do here at present, unlike most other SLRU modules; segments
+        * are created when the server is started with this module enabled.
+        * See StartupCommitTs.
+        */
+}
+
+/*
+ * Initialize (or reinitialize) a page of CommitTs to zeroes.
+ * If writeXlog is TRUE, also emit an XLOG record saying we did this.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCommitTsPage(int pageno, bool writeXlog)
+{
+       int                     slotno;
+
+       slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
+
+       if (writeXlog)
+               WriteZeroPageXlogRec(pageno);
+
+       return slotno;
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ */
+void
+StartupCommitTs(void)
+{
+       TransactionId xid = ShmemVariableCache->nextXid;
+       int                     pageno = TransactionIdToCTsPage(xid);
+
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+       /*
+        * Initialize our idea of the latest page number.
+        */
+       CommitTsCtl->shared->latest_page_number = pageno;
+
+       LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * when commit timestamp is enabled.  Must be called after recovery has
+ * finished.
+ *
+ * This is in charge of creating the currently active segment, if it's not
+ * already there.  The reason for this is that the server might have been
+ * running with this module disabled for a while and thus might have skipped
+ * the normal creation point.
+ */
+void
+CompleteCommitTsInitialization(void)
+{
+       TransactionId xid = ShmemVariableCache->nextXid;
+       int                     pageno = TransactionIdToCTsPage(xid);
+
+       /*
+        * Re-Initialize our idea of the latest page number.
+        */
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+       CommitTsCtl->shared->latest_page_number = pageno;
+       LWLockRelease(CommitTsControlLock);
+
+       /*
+        * If this module is not currently enabled, make sure we don't hand back
+        * possibly-invalid data; also remove segments of old data.
+        */
+       if (!track_commit_timestamp)
+       {
+               LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+               ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
+               ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+               LWLockRelease(CommitTsLock);
+
+               TruncateCommitTs(ReadNewTransactionId());
+
+               return;
+       }
+
+       /*
+        * If CommitTs is enabled, but it wasn't in the previous server run, we
+        * need to set the oldest and newest values to the next Xid; that way, we
+        * will not try to read data that might not have been set.
+        *
+        * XXX does this have a problem if a server is started with commitTs
+        * enabled, then started with commitTs disabled, then restarted with it
+        * enabled again?  It doesn't look like it does, because there should be a
+        * checkpoint that sets the value to InvalidTransactionId at end of
+        * recovery; and so any chance of injecting new transactions without
+        * CommitTs values would occur after the oldestCommitTs has been set to
+        * Invalid temporarily.
+        */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+       {
+               ShmemVariableCache->oldestCommitTs =
+                       ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
+       }
+       LWLockRelease(CommitTsLock);
+
+       /* Finally, create the current segment file, if necessary */
+       if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
+       {
+               int             slotno;
+
+               LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+               slotno = ZeroCommitTsPage(pageno, false);
+               SimpleLruWritePage(CommitTsCtl, slotno);
+               Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+               LWLockRelease(CommitTsControlLock);
+       }
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownCommitTs(void)
+{
+       /* Flush dirty CommitTs pages to disk */
+       SimpleLruFlush(CommitTsCtl, false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCommitTs(void)
+{
+       /* Flush dirty CommitTs pages to disk */
+       SimpleLruFlush(CommitTsCtl, true);
+}
+
+/*
+ * Make sure that CommitTs has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty CommitTs or xlog page to make room
+ * in shared memory.
+ *
+ * NB: the current implementation relies on track_commit_timestamp being
+ * PGC_POSTMASTER.
+ */
+void
+ExtendCommitTs(TransactionId newestXact)
+{
+       int                     pageno;
+
+       /* nothing to do if module not enabled */
+       if (!track_commit_timestamp)
+               return;
+
+       /*
+        * No work except at first XID of a page.  But beware: just after
+        * wraparound, the first XID of page zero is FirstNormalTransactionId.
+        */
+       if (TransactionIdToCTsEntry(newestXact) != 0 &&
+               !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+               return;
+
+       pageno = TransactionIdToCTsPage(newestXact);
+
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+       /* Zero the page and make an XLOG entry about it */
+       ZeroCommitTsPage(pageno, !InRecovery);
+
+       LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Remove all CommitTs segments before the one holding the passed
+ * transaction ID.
+ *
+ * Note that we don't need to flush XLOG here.
+ */
+void
+TruncateCommitTs(TransactionId oldestXact)
+{
+       int                     cutoffPage;
+
+       /*
+        * The cutoff point is the start of the segment containing oldestXact. We
+        * pass the *page* containing oldestXact to SimpleLruTruncate.
+        */
+       cutoffPage = TransactionIdToCTsPage(oldestXact);
+
+       /* Check to see if there's any files that could be removed */
+       if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
+                                                  &cutoffPage))
+               return;                                 /* nothing to remove */
+
+       /* Write XLOG record */
+       WriteTruncateXlogRec(cutoffPage);
+
+       /* Now we can remove the old CommitTs segment(s) */
+       SimpleLruTruncate(CommitTsCtl, cutoffPage);
+}
+
+/*
+ * Set the limit values between which commit TS can be consulted.
+ */
+void
+SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
+{
+       /*
+        * Be careful not to overwrite values that are either further into the
+        * "future" or signal a disabled committs.
+        */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
+       {
+               if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
+                       ShmemVariableCache->oldestCommitTs = oldestXact;
+               if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
+                       ShmemVariableCache->newestCommitTs = newestXact;
+       }
+       else
+       {
+               Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
+       }
+       LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Move forwards the oldest commitTS value that can be consulted
+ */
+void
+AdvanceOldestCommitTs(TransactionId oldestXact)
+{
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
+               TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
+               ShmemVariableCache->oldestCommitTs = oldestXact;
+       LWLockRelease(CommitTsLock);
+}
+
+
+/*
+ * Decide which of two CLOG page numbers is "older" for truncation purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+CommitTsPagePrecedes(int page1, int page2)
+{
+       TransactionId xid1;
+       TransactionId xid2;
+
+       xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
+       xid1 += FirstNormalTransactionId;
+       xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
+       xid2 += FirstNormalTransactionId;
+
+       return TransactionIdPrecedes(xid1, xid2);
+}
+
+
+/*
+ * Write a ZEROPAGE xlog record
+ */
+static void
+WriteZeroPageXlogRec(int pageno)
+{
+       XLogBeginInsert();
+       XLogRegisterData((char *) (&pageno), sizeof(int));
+       (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
+}
+
+/*
+ * Write a TRUNCATE xlog record
+ */
+static void
+WriteTruncateXlogRec(int pageno)
+{
+       XLogBeginInsert();
+       XLogRegisterData((char *) (&pageno), sizeof(int));
+       (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
+}
+
+/*
+ * Write a SETTS xlog record
+ */
+static void
+WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+                                                TransactionId *subxids, TimestampTz timestamp,
+                                                CommitTsNodeId nodeid)
+{
+       xl_commit_ts_set        record;
+
+       record.timestamp = timestamp;
+       record.nodeid = nodeid;
+       record.mainxid = mainxid;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &record,
+                                        offsetof(xl_commit_ts_set, mainxid) +
+                                        sizeof(TransactionId));
+       XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
+       XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
+}
+
+/*
+ * CommitTS resource manager's routines
+ */
+void
+commit_ts_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       /* Backup blocks are not used in commit_ts records */
+       Assert(!XLogRecHasAnyBlockRefs(record));
+
+       if (info == COMMIT_TS_ZEROPAGE)
+       {
+               int                     pageno;
+               int                     slotno;
+
+               memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+               LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+               slotno = ZeroCommitTsPage(pageno, false);
+               SimpleLruWritePage(CommitTsCtl, slotno);
+               Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+
+               LWLockRelease(CommitTsControlLock);
+       }
+       else if (info == COMMIT_TS_TRUNCATE)
+       {
+               int                     pageno;
+
+               memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+               /*
+                * During XLOG replay, latest_page_number isn't set up yet; insert a
+                * suitable value to bypass the sanity test in SimpleLruTruncate.
+                */
+               CommitTsCtl->shared->latest_page_number = pageno;
+
+               SimpleLruTruncate(CommitTsCtl, pageno);
+       }
+       else if (info == COMMIT_TS_SETTS)
+       {
+               xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
+               int                     nsubxids;
+               TransactionId *subxids;
+
+               nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
+                                       sizeof(TransactionId));
+               if (nsubxids > 0)
+               {
+                       subxids = palloc(sizeof(TransactionId) * nsubxids);
+                       memcpy(subxids,
+                                  XLogRecGetData(record) + SizeOfCommitTsSet,
+                                  sizeof(TransactionId) * nsubxids);
+               }
+               else
+                       subxids = NULL;
+
+               TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
+                                                                          setts->timestamp, setts->nodeid, false);
+               if (subxids)
+                       pfree(subxids);
+       }
+       else
+               elog(PANIC, "commit_ts_redo: unknown op code %u", info);
+}
index befd60f2d3777f1f3d072aac594e963d5a19c7a5..dcf423bdd73b7f3c238872d6454f1e73a9d81017 100644 (file)
@@ -8,6 +8,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
index 1f9a100da852d34e1fd3cb85ee4e3d1241813750..15596c7c7f5f7c5d983f6ce0883b0936b14cfb8a 100644 (file)
@@ -1297,7 +1297,7 @@ SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
 
                len = strlen(clde->d_name);
 
-               if ((len == 4 || len == 5) &&
+               if ((len == 4 || len == 5 || len == 6) &&
                        strspn(clde->d_name, "0123456789ABCDEF") == len)
                {
                        segno = (int) strtol(clde->d_name, NULL, 16);
index d51cca406c7c5dadac0ce7ca2daedd7bc34045c6..c5411566686a196c656e8035f9ddd317640d53b5 100644 (file)
@@ -14,6 +14,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -158,9 +159,10 @@ GetNewTransactionId(bool isSubXact)
         * XID before we zero the page.  Fortunately, a page of the commit log
         * holds 32K or more transactions, so we don't have to do this very often.
         *
-        * Extend pg_subtrans too.
+        * Extend pg_subtrans and pg_commit_ts too.
         */
        ExtendCLOG(xid);
+       ExtendCommitTs(xid);
        ExtendSUBTRANS(xid);
 
        /*
index 763e9deb6f52bd32e6f80e5a22c3ef08f7eb9611..8b2f7140cfc1b6128428d2b59615442dd65025cc 100644 (file)
@@ -20,6 +20,7 @@
 #include <time.h>
 #include <unistd.h>
 
+#include "access/commit_ts.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
@@ -1134,6 +1135,21 @@ RecordTransactionCommit(void)
                }
        }
 
+       /*
+        * We only need to log the commit timestamp separately if the node
+        * identifier is a valid value; the commit record above already contains
+        * the timestamp info otherwise, and will be used to load it.
+        */
+       if (markXidCommitted)
+       {
+               CommitTsNodeId          node_id;
+
+               node_id = CommitTsGetDefaultNodeId();
+               TransactionTreeSetCommitTsData(xid, nchildren, children,
+                                                                          xactStopTimestamp,
+                                                                          node_id, node_id != InvalidCommitTsNodeId);
+       }
+
        /*
         * Check if we want to commit asynchronously.  We can allow the XLOG flush
         * to happen asynchronously if synchronous_commit=off, or if the current
@@ -4644,6 +4660,7 @@ xactGetCommittedChildren(TransactionId **ptr)
  */
 static void
 xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+                                                 TimestampTz commit_time,
                                                  TransactionId *sub_xids, int nsubxacts,
                                                  SharedInvalidationMessage *inval_msgs, int nmsgs,
                                                  RelFileNode *xnodes, int nrels,
@@ -4671,6 +4688,10 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
                LWLockRelease(XidGenLock);
        }
 
+       /* Set the transaction commit timestamp and metadata */
+       TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
+                                                                  commit_time, InvalidCommitTsNodeId, false);
+
        if (standbyState == STANDBY_DISABLED)
        {
                /*
@@ -4790,7 +4811,8 @@ xact_redo_commit(xl_xact_commit *xlrec,
        /* invalidation messages array follows subxids */
        inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
-       xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
+       xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
+                                                         subxacts, xlrec->nsubxacts,
                                                          inval_msgs, xlrec->nmsgs,
                                                          xlrec->xnodes, xlrec->nrels,
                                                          xlrec->dbId,
@@ -4805,7 +4827,8 @@ static void
 xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
                                                 TransactionId xid, XLogRecPtr lsn)
 {
-       xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
+       xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
+                                                         xlrec->subxacts, xlrec->nsubxacts,
                                                          NULL, 0,      /* inval msgs */
                                                          NULL, 0,      /* relfilenodes */
                                                          InvalidOid,           /* dbId */
index a2ad5ebfe82b1d0287ad200a10b990862dc4c6c0..da28de90db6f62bef054faeb0fcd5e13b0a2e4d8 100644 (file)
@@ -22,6 +22,7 @@
 #include <unistd.h>
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
 #include "access/subtrans.h"
@@ -4518,6 +4519,8 @@ BootStrapXLOG(void)
        checkPoint.oldestXidDB = TemplateDbOid;
        checkPoint.oldestMulti = FirstMultiXactId;
        checkPoint.oldestMultiDB = TemplateDbOid;
+       checkPoint.oldestCommitTs = InvalidTransactionId;
+       checkPoint.newestCommitTs = InvalidTransactionId;
        checkPoint.time = (pg_time_t) time(NULL);
        checkPoint.oldestActiveXid = InvalidTransactionId;
 
@@ -4527,6 +4530,7 @@ BootStrapXLOG(void)
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
        SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+       SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
        /* Set up the XLOG page header */
        page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -4606,6 +4610,7 @@ BootStrapXLOG(void)
        ControlFile->max_locks_per_xact = max_locks_per_xact;
        ControlFile->wal_level = wal_level;
        ControlFile->wal_log_hints = wal_log_hints;
+       ControlFile->track_commit_timestamp = track_commit_timestamp;
        ControlFile->data_checksum_version = bootstrap_data_checksum_version;
 
        /* some additional ControlFile fields are set in WriteControlFile() */
@@ -4614,6 +4619,7 @@ BootStrapXLOG(void)
 
        /* Bootstrap the commit log, too */
        BootStrapCLOG();
+       BootStrapCommitTs();
        BootStrapSUBTRANS();
        BootStrapMultiXact();
 
@@ -5920,6 +5926,10 @@ StartupXLOG(void)
        ereport(DEBUG1,
                        (errmsg("oldest MultiXactId: %u, in database %u",
                                        checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
+       ereport(DEBUG1,
+                       (errmsg("commit timestamp Xid oldest/newest: %u/%u",
+                                       checkPoint.oldestCommitTs,
+                                       checkPoint.newestCommitTs)));
        if (!TransactionIdIsNormal(checkPoint.nextXid))
                ereport(PANIC,
                                (errmsg("invalid next transaction ID")));
@@ -5931,6 +5941,8 @@ StartupXLOG(void)
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
        SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+       SetCommitTsLimit(checkPoint.oldestCommitTs,
+                                        checkPoint.newestCommitTs);
        MultiXactSetSafeTruncate(checkPoint.oldestMulti);
        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
        XLogCtl->ckptXid = checkPoint.nextXid;
@@ -6153,11 +6165,12 @@ StartupXLOG(void)
                        ProcArrayInitRecovery(ShmemVariableCache->nextXid);
 
                        /*
-                        * Startup commit log and subtrans only. MultiXact has already
-                        * been started up and other SLRUs are not maintained during
-                        * recovery and need not be started yet.
+                        * Startup commit log, commit timestamp and subtrans only.
+                        * MultiXact has already been started up and other SLRUs are not
+                        * maintained during recovery and need not be started yet.
                         */
                        StartupCLOG();
+                       StartupCommitTs();
                        StartupSUBTRANS(oldestActiveXID);
 
                        /*
@@ -6827,12 +6840,13 @@ StartupXLOG(void)
        LWLockRelease(ProcArrayLock);
 
        /*
-        * Start up the commit log and subtrans, if not already done for hot
-        * standby.
+        * Start up the commit log, commit timestamp and subtrans, if not already
+        * done for hot standby.
         */
        if (standbyState == STANDBY_DISABLED)
        {
                StartupCLOG();
+               StartupCommitTs();
                StartupSUBTRANS(oldestActiveXID);
        }
 
@@ -6867,6 +6881,12 @@ StartupXLOG(void)
        LocalSetXLogInsertAllowed();
        XLogReportParameters();
 
+       /*
+        * Local WAL inserts enabled, so it's time to finish initialization
+        * of commit timestamp.
+        */
+       CompleteCommitTsInitialization();
+
        /*
         * All done.  Allow backends to write WAL.  (Although the bool flag is
         * probably atomic in itself, we use the info_lck here to ensure that
@@ -7433,6 +7453,7 @@ ShutdownXLOG(int code, Datum arg)
                CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
        }
        ShutdownCLOG();
+       ShutdownCommitTs();
        ShutdownSUBTRANS();
        ShutdownMultiXact();
 
@@ -7769,6 +7790,11 @@ CreateCheckPoint(int flags)
        checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
        LWLockRelease(XidGenLock);
 
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+       checkPoint.oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+       checkPoint.newestCommitTs = ShmemVariableCache->newestCommitTs;
+       LWLockRelease(CommitTsLock);
+
        /* Increase XID epoch if we've wrapped around since last checkpoint */
        checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
        if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
@@ -8046,6 +8072,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
        CheckPointCLOG();
+       CheckPointCommitTs();
        CheckPointSUBTRANS();
        CheckPointMultiXact();
        CheckPointPredicate();
@@ -8474,7 +8501,8 @@ XLogReportParameters(void)
                MaxConnections != ControlFile->MaxConnections ||
                max_worker_processes != ControlFile->max_worker_processes ||
                max_prepared_xacts != ControlFile->max_prepared_xacts ||
-               max_locks_per_xact != ControlFile->max_locks_per_xact)
+               max_locks_per_xact != ControlFile->max_locks_per_xact ||
+               track_commit_timestamp != ControlFile->track_commit_timestamp)
        {
                /*
                 * The change in number of backend slots doesn't need to be WAL-logged
@@ -8494,6 +8522,7 @@ XLogReportParameters(void)
                        xlrec.max_locks_per_xact = max_locks_per_xact;
                        xlrec.wal_level = wal_level;
                        xlrec.wal_log_hints = wal_log_hints;
+                       xlrec.track_commit_timestamp = track_commit_timestamp;
 
                        XLogBeginInsert();
                        XLogRegisterData((char *) &xlrec, sizeof(xlrec));
@@ -8508,6 +8537,7 @@ XLogReportParameters(void)
                ControlFile->max_locks_per_xact = max_locks_per_xact;
                ControlFile->wal_level = wal_level;
                ControlFile->wal_log_hints = wal_log_hints;
+               ControlFile->track_commit_timestamp = track_commit_timestamp;
                UpdateControlFile();
        }
 }
@@ -8884,6 +8914,7 @@ xlog_redo(XLogReaderState *record)
                ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
                ControlFile->wal_level = xlrec.wal_level;
                ControlFile->wal_log_hints = wal_log_hints;
+               ControlFile->track_commit_timestamp = track_commit_timestamp;
 
                /*
                 * Update minRecoveryPoint to ensure that if recovery is aborted, we
index 6384dc7f1e8f86710acebf67ec53640ade8f2a21..e32e0399f8951a78b78c73dfeb81f9f8fc05326e 100644 (file)
@@ -23,6 +23,7 @@
 #include <math.h>
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
@@ -1071,10 +1072,12 @@ vac_truncate_clog(TransactionId frozenXID,
                return;
 
        /*
-        * Truncate CLOG to the oldest computed value.  Note we don't truncate
-        * multixacts; that will be done by the next checkpoint.
+        * Truncate CLOG and CommitTs to the oldest computed value.
+        * Note we don't truncate multixacts; that will be done by the next
+        * checkpoint.
         */
        TruncateCLOG(frozenXID);
+       TruncateCommitTs(frozenXID);
 
        /*
         * Update the wrap limit for GetNewTransactionId and creation of new
@@ -1084,6 +1087,7 @@ vac_truncate_clog(TransactionId frozenXID,
         */
        SetTransactionIdLimit(frozenXID, oldestxid_datoid);
        SetMultiXactIdLimit(minMulti, minmulti_datoid);
+       AdvanceOldestCommitTs(frozenXID);
 }
 
 
index 800dcd998087582014468e71ce9c401a074c61c7..d43c8ff3fc98657f1f56d78bc0e6628325efc861 100644 (file)
@@ -1438,7 +1438,7 @@ parse_hba_auth_opt(char *name, char *val, HbaLine *hbaline, int line_num)
                                ereport(LOG,
                                                (errcode(ERRCODE_CONFIG_FILE_ERROR),
                                                 errmsg("client certificates can only be checked if a root certificate store is available"),
-                                                errhint("Make sure the configuration parameter \"ssl_ca_file\" is set."),
+                                                errhint("Make sure the configuration parameter \"%s\" is set.", "ssl_ca_file"),
                                                 errcontext("line %d of configuration file \"%s\"",
                                                                        line_num, HbaFileName)));
                                return false;
index 4e81322509f9c0aefb9cf8f4c9aa3a7a1e57c5c0..a455b9242fb9eb79b5d8fd8017029c8fbb0cb391 100644 (file)
@@ -133,6 +133,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
                case RM_SEQ_ID:
                case RM_SPGIST_ID:
                case RM_BRIN_ID:
+               case RM_COMMIT_TS_ID:
                        break;
                case RM_NEXT_ID:
                        elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
index 1d04c5508a9cd3e2bf7a117732462ed3d7aae2e9..b9577cd1d46632add93f3df2fb3e8827bf23acf0 100644 (file)
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, ProcGlobalShmemSize());
                size = add_size(size, XLOGShmemSize());
                size = add_size(size, CLOGShmemSize());
+               size = add_size(size, CommitTsShmemSize());
                size = add_size(size, SUBTRANSShmemSize());
                size = add_size(size, TwoPhaseShmemSize());
                size = add_size(size, BackgroundWorkerShmemSize());
@@ -198,6 +200,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         */
        XLOGShmemInit();
        CLOGShmemInit();
+       CommitTsShmemInit();
        SUBTRANSShmemInit();
        MultiXactShmemInit();
        InitBufferPool();
index 719181c2089f3fbd29be66b1a2b52abdf8e833d3..c9f86571fd339967373987685137404d1bfe69d9 100644 (file)
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/commit_ts.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "commands/async.h"
@@ -259,6 +260,9 @@ NumLWLocks(void)
        /* clog.c needs one per CLOG buffer */
        numLocks += CLOGShmemBuffers();
 
+       /* commit_ts.c needs one per CommitTs buffer */
+       numLocks += CommitTsShmemBuffers();
+
        /* subtrans.c needs one per SubTrans buffer */
        numLocks += NUM_SUBTRANS_BUFFERS;
 
index d4d74baedbc5e8eb9fcaa4c851f5cda015257ad9..b1bff7f35008c4eb8f5957bc0ef4b896a7ab37a6 100644 (file)
@@ -26,6 +26,7 @@
 #include <syslog.h>
 #endif
 
+#include "access/commit_ts.h"
 #include "access/gin.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -835,6 +836,15 @@ static struct config_bool ConfigureNamesBool[] =
                false,
                check_bonjour, NULL, NULL
        },
+       {
+               {"track_commit_timestamp", PGC_POSTMASTER, REPLICATION,
+                       gettext_noop("Collects transaction commit time."),
+                       NULL
+               },
+               &track_commit_timestamp,
+               false,
+               NULL, NULL, NULL
+       },
        {
                {"ssl", PGC_POSTMASTER, CONN_AUTH_SECURITY,
                        gettext_noop("Enables SSL connections."),
index 4a89cb7d9f86a0e7b2b85508e63de991222855af..c4b546ed12ec674e025b664b5e9536d28caf3e4c 100644 (file)
 
 #max_replication_slots = 0     # max number of replication slots
                                # (change requires restart)
+#track_commit_timestamp = off  # collect timestamp of transaction commit
+                               # (change requires restart)
 
 # - Master Server -
 
index 3b528673495413f03ecb39efd4d025f650aefb7c..3bee6573afce26e36c877b07af31e9349da08b34 100644 (file)
@@ -186,6 +186,7 @@ static const char *subdirs[] = {
        "pg_xlog",
        "pg_xlog/archive_status",
        "pg_clog",
+       "pg_commit_ts",
        "pg_dynshmem",
        "pg_notify",
        "pg_serial",
index b2e0793eb8f717392d42c369e66293a1bcfd5c38..a838bb5b2c30dea7db197c8192cd952c9c5b0311 100644 (file)
@@ -270,6 +270,8 @@ main(int argc, char *argv[])
                   ControlFile.checkPointCopy.oldestMulti);
        printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
                   ControlFile.checkPointCopy.oldestMultiDB);
+       printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+                  ControlFile.checkPointCopy.oldestCommitTs);
        printf(_("Time of latest checkpoint:            %s\n"),
                   ckpttime_str);
        printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
@@ -300,6 +302,8 @@ main(int argc, char *argv[])
                   ControlFile.max_prepared_xacts);
        printf(_("Current max_locks_per_xact setting:   %d\n"),
                   ControlFile.max_locks_per_xact);
+       printf(_("Current track_commit_timestamp setting: %s\n"),
+                  ControlFile.track_commit_timestamp ? _("on") : _("off"));
        printf(_("Maximum data alignment:               %u\n"),
                   ControlFile.maxAlign);
        /* we don't print floatFormat since can't say much useful about it */
index 666e8dbaa24b2ef73fcbe105248373c39f008947..f42d515a683c2fddddcedd8504271457b31a66a0 100644 (file)
@@ -63,6 +63,8 @@ static bool guessed = false;  /* T if we had to guess at any values */
 static const char *progname;
 static uint32 set_xid_epoch = (uint32) -1;
 static TransactionId set_xid = 0;
+static TransactionId set_oldest_commit_ts = 0;
+static TransactionId set_newest_commit_ts = 0;
 static Oid     set_oid = 0;
 static MultiXactId set_mxid = 0;
 static MultiXactOffset set_mxoff = (MultiXactOffset) -1;
@@ -112,7 +114,7 @@ main(int argc, char *argv[])
        }
 
 
-       while ((c = getopt(argc, argv, "D:fl:m:no:O:x:e:")) != -1)
+       while ((c = getopt(argc, argv, "c:D:e:fl:m:no:O:x:")) != -1)
        {
                switch (c)
                {
@@ -132,7 +134,9 @@ main(int argc, char *argv[])
                                set_xid_epoch = strtoul(optarg, &endptr, 0);
                                if (endptr == optarg || *endptr != '\0')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -e\n"), progname);
+                                       /*------
+                                         translator: the second %s is a command line argument (-e, etc) */
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-e");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -147,7 +151,7 @@ main(int argc, char *argv[])
                                set_xid = strtoul(optarg, &endptr, 0);
                                if (endptr == optarg || *endptr != '\0')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -x\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-x");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -158,11 +162,42 @@ main(int argc, char *argv[])
                                }
                                break;
 
+                       case 'c':
+                               set_oldest_commit_ts = strtoul(optarg, &endptr, 0);
+                               if (endptr == optarg || *endptr != ',')
+                               {
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-c");
+                                       fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+                                       exit(1);
+                               }
+                               set_newest_commit_ts = strtoul(endptr + 1, &endptr2, 0);
+                               if (endptr2 == endptr + 1 || *endptr2 != '\0')
+                               {
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-c");
+                                       fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+                                       exit(1);
+                               }
+
+                               if (set_oldest_commit_ts < 2 &&
+                                       set_oldest_commit_ts != 0)
+                               {
+                                       fprintf(stderr, _("%s: transaction ID (-c) must be either 0 or greater than or equal to 2\n"), progname);
+                                       exit(1);
+                               }
+
+                               if (set_newest_commit_ts < 2 &&
+                                       set_newest_commit_ts != 0)
+                               {
+                                       fprintf(stderr, _("%s: transaction ID (-c) must be either 0 or greater than or equal to 2\n"), progname);
+                                       exit(1);
+                               }
+                               break;
+
                        case 'o':
                                set_oid = strtoul(optarg, &endptr, 0);
                                if (endptr == optarg || *endptr != '\0')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -o\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-o");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -177,7 +212,7 @@ main(int argc, char *argv[])
                                set_mxid = strtoul(optarg, &endptr, 0);
                                if (endptr == optarg || *endptr != ',')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -m\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-m");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -185,7 +220,7 @@ main(int argc, char *argv[])
                                set_oldestmxid = strtoul(endptr + 1, &endptr2, 0);
                                if (endptr2 == endptr + 1 || *endptr2 != '\0')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -m\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-m");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -211,7 +246,7 @@ main(int argc, char *argv[])
                                set_mxoff = strtoul(optarg, &endptr, 0);
                                if (endptr == optarg || *endptr != '\0')
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -O\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-O");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -225,7 +260,7 @@ main(int argc, char *argv[])
                        case 'l':
                                if (strspn(optarg, "01234567890ABCDEFabcdef") != 24)
                                {
-                                       fprintf(stderr, _("%s: invalid argument for option -l\n"), progname);
+                                       fprintf(stderr, _("%s: invalid argument for option %s\n"), progname, "-l");
                                        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                        exit(1);
                                }
@@ -345,6 +380,11 @@ main(int argc, char *argv[])
                ControlFile.checkPointCopy.oldestXidDB = InvalidOid;
        }
 
+       if (set_oldest_commit_ts != 0)
+               ControlFile.checkPointCopy.oldestCommitTs = set_oldest_commit_ts;
+       if (set_newest_commit_ts != 0)
+               ControlFile.checkPointCopy.newestCommitTs = set_newest_commit_ts;
+
        if (set_oid != 0)
                ControlFile.checkPointCopy.nextOid = set_oid;
 
@@ -539,6 +579,7 @@ GuessControlValues(void)
 
        ControlFile.wal_level = WAL_LEVEL_MINIMAL;
        ControlFile.wal_log_hints = false;
+       ControlFile.track_commit_timestamp = false;
        ControlFile.MaxConnections = 100;
        ControlFile.max_worker_processes = 8;
        ControlFile.max_prepared_xacts = 0;
@@ -621,6 +662,10 @@ PrintControlValues(bool guessed)
                   ControlFile.checkPointCopy.oldestMulti);
        printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
                   ControlFile.checkPointCopy.oldestMultiDB);
+       printf(_("Latest checkpoint's oldest CommitTs:  %u\n"),
+                  ControlFile.checkPointCopy.oldestCommitTs);
+       printf(_("Latest checkpoint's newest CommitTs:  %u\n"),
+                  ControlFile.checkPointCopy.newestCommitTs);
        printf(_("Maximum data alignment:               %u\n"),
                   ControlFile.maxAlign);
        /* we don't print floatFormat since can't say much useful about it */
@@ -702,6 +747,17 @@ PrintNewControlValues()
                printf(_("NextXID epoch:                        %u\n"),
                           ControlFile.checkPointCopy.nextXidEpoch);
        }
+
+       if (set_oldest_commit_ts != 0)
+       {
+               printf(_("oldestCommitTs:                       %u\n"),
+                          ControlFile.checkPointCopy.oldestCommitTs);
+       }
+       if (set_newest_commit_ts != 0)
+       {
+               printf(_("newestCommitTs:                       %u\n"),
+                          ControlFile.checkPointCopy.newestCommitTs);
+       }
 }
 
 
@@ -739,6 +795,7 @@ RewriteControlFile(void)
         */
        ControlFile.wal_level = WAL_LEVEL_MINIMAL;
        ControlFile.wal_log_hints = false;
+       ControlFile.track_commit_timestamp = false;
        ControlFile.MaxConnections = 100;
        ControlFile.max_worker_processes = 8;
        ControlFile.max_prepared_xacts = 0;
@@ -1099,6 +1156,8 @@ usage(void)
        printf(_("%s resets the PostgreSQL transaction log.\n\n"), progname);
        printf(_("Usage:\n  %s [OPTION]... {[-D] DATADIR}\n\n"), progname);
        printf(_("Options:\n"));
+       printf(_("  -c XID,XID       set oldest and newest transactions bearing commit timestamp\n"));
+       printf(_("                   (zero in either value means no change)\n"));
        printf(_("  -e XIDEPOCH      set next transaction ID epoch\n"));
        printf(_("  -f               force update to be done\n"));
        printf(_("  -l XLOGFILE      force minimum WAL starting location for new transaction log\n"));
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
new file mode 100644 (file)
index 0000000..903c82c
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * commit_ts.h
+ *
+ * PostgreSQL commit timestamp manager
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/commit_ts.h
+ */
+#ifndef COMMIT_TS_H
+#define COMMIT_TS_H
+
+#include "access/xlog.h"
+#include "datatype/timestamp.h"
+#include "utils/guc.h"
+
+
+extern PGDLLIMPORT bool        track_commit_timestamp;
+
+extern bool check_track_commit_timestamp(bool *newval, void **extra,
+                                                        GucSource source);
+
+typedef uint32 CommitTsNodeId;
+#define InvalidCommitTsNodeId 0
+
+extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
+extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
+extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+                                                          TransactionId *subxids, TimestampTz timestamp,
+                                                          CommitTsNodeId nodeid, bool do_xlog);
+extern bool TransactionIdGetCommitTsData(TransactionId xid,
+                                                        TimestampTz *ts, CommitTsNodeId *nodeid);
+extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
+                                         CommitTsNodeId *nodeid);
+
+extern Size CommitTsShmemBuffers(void);
+extern Size CommitTsShmemSize(void);
+extern void CommitTsShmemInit(void);
+extern void BootStrapCommitTs(void);
+extern void StartupCommitTs(void);
+extern void CompleteCommitTsInitialization(void);
+extern void ShutdownCommitTs(void);
+extern void CheckPointCommitTs(void);
+extern void ExtendCommitTs(TransactionId newestXact);
+extern void TruncateCommitTs(TransactionId oldestXact);
+extern void SetCommitTsLimit(TransactionId oldestXact,
+                                TransactionId newestXact);
+extern void AdvanceOldestCommitTs(TransactionId oldestXact);
+
+/* XLOG stuff */
+#define COMMIT_TS_ZEROPAGE             0x00
+#define COMMIT_TS_TRUNCATE             0x10
+#define COMMIT_TS_SETTS                        0x20
+
+typedef struct xl_commit_ts_set
+{
+       TimestampTz             timestamp;
+       CommitTsNodeId  nodeid;
+       TransactionId   mainxid;
+       /* subxact Xids follow */
+} xl_commit_ts_set;
+
+#define SizeOfCommitTsSet      (offsetof(xl_commit_ts_set, mainxid) + \
+                                                        sizeof(TransactionId))
+
+
+extern void commit_ts_redo(XLogReaderState *record);
+extern void commit_ts_desc(StringInfo buf, XLogReaderState *record);
+extern const char *commit_ts_identify(uint8 info);
+
+#endif   /* COMMITTS_H */
index 0fc932f84ad2d7683a3f214096855b13da16ff7b..27168c35e6bf97362f4b731dfe856b86ef8c96fc 100644 (file)
@@ -43,3 +43,4 @@ PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_start
 PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
index 32d1b290e0cff20ae162c1b2950bd7ee753d7668..6666434e5f46a83ee966ce0434aaca76b47238a6 100644 (file)
@@ -123,6 +123,12 @@ typedef struct VariableCacheData
        TransactionId xidWrapLimit; /* where the world ends */
        Oid                     oldestXidDB;    /* database with minimum datfrozenxid */
 
+       /*
+        * These fields are protected by CommitTsLock
+        */
+       TransactionId oldestCommitTs;
+       TransactionId newestCommitTs;
+
        /*
         * These fields are protected by ProcArrayLock.
         */
index 85b3fe76bb67f272a9559423ce9dbcab2bac78ba..825cf547cbdfb05954ddf2ffbc1d5fcaaad673cb 100644 (file)
@@ -186,6 +186,7 @@ typedef struct xl_parameter_change
        int                     max_locks_per_xact;
        int                     wal_level;
        bool            wal_log_hints;
+       bool            track_commit_timestamp;
 } xl_parameter_change;
 
 /* logs restore point */
index 0b7c18237c63ec8cf8e0e31b0011e9bfde5b5f39..bf8397671e6683e619e308202ce5a26a7a4c3896 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201412021
+#define CATALOG_VERSION_NO     201412022
 
 #endif
index 15f81e4a44d432e69cec2ce10a67438d2c0f6ce3..6e9cac91d071aa0a459e1926e8a8b062557f34bf 100644 (file)
@@ -46,6 +46,8 @@ typedef struct CheckPoint
        MultiXactId oldestMulti;        /* cluster-wide minimum datminmxid */
        Oid                     oldestMultiDB;  /* database with minimum datminmxid */
        pg_time_t       time;                   /* time stamp of checkpoint */
+       TransactionId oldestCommitTs; /* oldest Xid with valid commit timestamp */
+       TransactionId newestCommitTs; /* newest Xid with valid commit timestamp */
 
        /*
         * Oldest XID still running. This is only needed to initialize hot standby
@@ -177,6 +179,7 @@ typedef struct ControlFileData
        int                     max_worker_processes;
        int                     max_prepared_xacts;
        int                     max_locks_per_xact;
+       bool            track_commit_timestamp;
 
        /*
         * This data is used to check for hardware-architecture compatibility of
index ce482557ee932802f22e575e1453ee759d954860..deba4b1fa21a38c0b5971957f9a605a2d1ad44bf 100644 (file)
@@ -3023,6 +3023,12 @@ DESCR("view two-phase transactions");
 DATA(insert OID = 3819 (  pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ pg_get_multixact_members _null_ _null_ _null_ ));
 DESCR("view members of a multixactid");
 
+DATA(insert OID = 3581 ( pg_xact_commit_timestamp PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 1184 "28" _null_ _null_ _null_ _null_ pg_xact_commit_timestamp _null_ _null_ _null_ ));
+DESCR("get commit timestamp of a transaction");
+
+DATA(insert OID = 3583 ( pg_last_committed_xact PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 2249 "" "{28,1184}" "{o,o}" "{xid,timestamp}" _null_ pg_last_committed_xact _null_ _null_ _null_ ));
+DESCR("get transaction Id and commit timestamp of latest transaction commit");
+
 DATA(insert OID = 3537 (  pg_describe_object           PGNSP PGUID 12 1 0 0 0 f f f f t f s 3 0 25 "26 26 23" _null_ _null_ _null_ _null_ pg_describe_object _null_ _null_ _null_ ));
 DESCR("get identification of SQL object");
 
index 91cab876a2e16376869dd4293f4e57bc66c4ecb1..09654a873e3ded3c170cdbf327c0d7f598722b25 100644 (file)
@@ -127,7 +127,10 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
 #define AutoFileLock                           (&MainLWLockArray[35].lock)
 #define ReplicationSlotAllocationLock  (&MainLWLockArray[36].lock)
 #define ReplicationSlotControlLock             (&MainLWLockArray[37].lock)
-#define NUM_INDIVIDUAL_LWLOCKS         38
+#define CommitTsControlLock                    (&MainLWLockArray[38].lock)
+#define CommitTsLock                           (&MainLWLockArray[39].lock)
+
+#define NUM_INDIVIDUAL_LWLOCKS         40
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
index 417fd1771a8ed2efab0ae3822d0d09570b382c80..565cff3e0d2dfa4e98f85fbb564ed1b667e73f53 100644 (file)
@@ -1187,6 +1187,10 @@ extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
 /* access/transam/multixact.c */
 extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
 
+/* access/transam/committs.c */
+extern Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS);
+extern Datum pg_last_committed_xact(PG_FUNCTION_ARGS);
+
 /* catalogs/dependency.c */
 extern Datum pg_describe_object(PG_FUNCTION_ARGS);
 extern Datum pg_identify_object(PG_FUNCTION_ARGS);
index 4cb6af01f8fdc31ebdf6e92ddb22550bd2bbc371..93d93af2cdb0e6422e4f52ab5757d0d45d947350 100644 (file)
@@ -5,6 +5,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 SUBDIRS = \
+                 commit_ts \
                  worker_spi \
                  dummy_seclabel \
                  test_shm_mq \
diff --git a/src/test/modules/commit_ts/.gitignore b/src/test/modules/commit_ts/.gitignore
new file mode 100644 (file)
index 0000000..5dcb3ff
--- /dev/null
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/commit_ts/Makefile b/src/test/modules/commit_ts/Makefile
new file mode 100644 (file)
index 0000000..b3cb315
--- /dev/null
@@ -0,0 +1,15 @@
+# src/test/modules/commit_ts/Makefile
+
+REGRESS = commit_timestamp
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/commit_ts/commit_ts.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/commit_ts
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/commit_ts/commit_ts.conf b/src/test/modules/commit_ts/commit_ts.conf
new file mode 100644 (file)
index 0000000..d221a60
--- /dev/null
@@ -0,0 +1 @@
+track_commit_timestamp = on
\ No newline at end of file
diff --git a/src/test/modules/commit_ts/expected/commit_timestamp.out b/src/test/modules/commit_ts/expected/commit_timestamp.out
new file mode 100644 (file)
index 0000000..e40e28c
--- /dev/null
@@ -0,0 +1,39 @@
+--
+-- Commit Timestamp
+--
+SHOW track_commit_timestamp;
+ track_commit_timestamp 
+------------------------
+ on
+(1 row)
+
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+ id | ?column? | ?column? | ?column? 
+----+----------+----------+----------
+  1 | t        | t        | t
+  2 | t        | t        | t
+  3 | t        | t        | t
+(3 rows)
+
+DROP TABLE committs_test;
+SELECT pg_xact_commit_timestamp('0'::xid);
+ERROR:  cannot retrieve commit timestamp for transaction 0
+SELECT pg_xact_commit_timestamp('1'::xid);
+ERROR:  cannot retrieve commit timestamp for transaction 1
+SELECT pg_xact_commit_timestamp('2'::xid);
+ERROR:  cannot retrieve commit timestamp for transaction 2
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
+ ?column? | ?column? | ?column? 
+----------+----------+----------
+ t        | t        | t
+(1 row)
+
diff --git a/src/test/modules/commit_ts/expected/commit_timestamp_1.out b/src/test/modules/commit_ts/expected/commit_timestamp_1.out
new file mode 100644 (file)
index 0000000..51f89c2
--- /dev/null
@@ -0,0 +1,34 @@
+--
+-- Commit Timestamp
+--
+SHOW track_commit_timestamp;
+ track_commit_timestamp 
+------------------------
+ off
+(1 row)
+
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+DROP TABLE committs_test;
+SELECT pg_xact_commit_timestamp('0'::xid);
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT pg_xact_commit_timestamp('1'::xid);
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT pg_xact_commit_timestamp('2'::xid);
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
diff --git a/src/test/modules/commit_ts/sql/commit_timestamp.sql b/src/test/modules/commit_ts/sql/commit_timestamp.sql
new file mode 100644 (file)
index 0000000..9beb78a
--- /dev/null
@@ -0,0 +1,24 @@
+--
+-- Commit Timestamp
+--
+SHOW track_commit_timestamp;
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+
+DROP TABLE committs_test;
+
+SELECT pg_xact_commit_timestamp('0'::xid);
+SELECT pg_xact_commit_timestamp('1'::xid);
+SELECT pg_xact_commit_timestamp('2'::xid);
+
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;