]> granicus.if.org Git - postgresql/commitdiff
Speedup 2PC by skipping two phase state files in normal path
authorSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 02:40:44 +0000 (18:40 -0800)
committerSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 02:40:44 +0000 (18:40 -0800)
2PC state info is written only to WAL at PREPARE, then read back from WAL at
COMMIT PREPARED/ABORT PREPARED. Prepared transactions that live past one bufmgr
checkpoint cycle will be written to disk in the same form as previously. Crash
recovery path is not altered. Measured performance gains of 50-100% for short
2PC transactions by completely avoiding writing files and fsyncing. Other
optimizations still available, further patches in related areas expected.

Stas Kelvich and heavily edited by Simon Riggs

Based upon earlier ideas and patches by Michael Paquier and Heikki Linnakangas,
a concrete example of how Postgres-XC has fed back ideas into PostgreSQL.

Reviewed by Michael Paquier, Jeff Janes and Andres Freund
Performance testing by Jesper Pedersen

src/backend/access/transam/twophase.c
src/backend/access/transam/xlog.c
src/include/access/xlog.h

index 2251b022da6e2072c59d87866430ed0a8531d3c9..8a22836406afd02c61ba190841b60e6b7e9758f3 100644 (file)
  *             what keeps the XID considered running by TransactionIdIsInProgress.
  *             It is also convenient as a PGPROC to hook the gxact's locks to.
  *
- *             In order to survive crashes and shutdowns, all prepared
- *             transactions must be stored in permanent storage. This includes
- *             locking information, pending notifications etc. All that state
- *             information is written to the per-transaction state file in
- *             the pg_twophase directory.
+ *             Information to recover prepared transactions in case of crash is
+ *             now stored in WAL for the common case. In some cases there will be
+ *             an extended period between preparing a GXACT and commit/abort, in
+ *             which case we need to separately record prepared transaction data
+ *             in permanent storage. This includes locking information, pending
+ *             notifications etc. All that state information is written to the
+ *             per-transaction state file in the pg_twophase directory.
+ *             All prepared transactions will be written prior to shutdown.
+ *
+ *             Life track of state data is following:
+ *
+ *             * On PREPARE TRANSACTION backend writes state data only to the WAL and
+ *               stores pointer to the start of the WAL record in
+ *               gxact->prepare_start_lsn.
+ *             * If COMMIT occurs before checkpoint then backend reads data from WAL
+ *               using prepare_start_lsn.
+ *             * On checkpoint state data copied to files in pg_twophase directory and
+ *               fsynced
+ *             * If COMMIT happens after checkpoint then backend reads state data from
+ *               files
+ *             * In case of crash replay will move data from xlog to files, if that
+ *               hasn't happened before. XXX TODO - move to shmem in replay also
  *
  *-------------------------------------------------------------------------
  */
@@ -51,6 +68,7 @@
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
+#include "access/xlogreader.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
 #include "funcapi.h"
@@ -117,10 +135,21 @@ typedef struct GlobalTransactionData
        int                     pgprocno;               /* ID of associated dummy PGPROC */
        BackendId       dummyBackendId; /* similar to backend id for backends */
        TimestampTz prepared_at;        /* time of preparation */
-       XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
+
+       /*
+        * Note that we need to keep track of two LSNs for each GXACT.
+        * We keep track of the start LSN because this is the address we must
+        * use to read state data back from WAL when committing a prepared GXACT.
+        * We keep track of the end LSN because that is the LSN we need to wait
+        * for prior to commit.
+        */
+       XLogRecPtr      prepare_start_lsn;      /* XLOG offset of prepare record start */
+       XLogRecPtr      prepare_end_lsn;        /* XLOG offset of prepare record end */
+
        Oid                     owner;                  /* ID of user that executed the xact */
        BackendId       locking_backend;        /* backend currently working on the xact */
        bool            valid;                  /* TRUE if PGPROC entry is in proc array */
+       bool            ondisk;                 /* TRUE if prepare state file is on disk */
        char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
 }      GlobalTransactionData;
 
@@ -166,6 +195,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
                           const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
 
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 
 /*
  * Initialization of shared memory
@@ -398,11 +428,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        pgxact->nxids = 0;
 
        gxact->prepared_at = prepared_at;
-       /* initialize LSN to 0 (start of WAL) */
-       gxact->prepare_lsn = 0;
+       /* initialize LSN to InvalidXLogRecPtr */
+       gxact->prepare_start_lsn = InvalidXLogRecPtr;
+       gxact->prepare_end_lsn = InvalidXLogRecPtr;
        gxact->owner = owner;
        gxact->locking_backend = MyBackendId;
        gxact->valid = false;
+       gxact->ondisk = false;
        strcpy(gxact->gid, gid);
 
        /* And insert it into the active array */
@@ -578,41 +610,6 @@ RemoveGXact(GlobalTransaction gxact)
        elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
 }
 
-/*
- * TransactionIdIsPrepared
- *             True iff transaction associated with the identifier is prepared
- *             for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
-       bool            result = false;
-       int                     i;
-
-       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
-       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
-       {
-               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
-               if (gxact->valid && pgxact->xid == xid)
-               {
-                       result = true;
-                       break;
-               }
-       }
-
-       LWLockRelease(TwoPhaseStateLock);
-
-       return result;
-}
-
 /*
  * Returns an array of all prepared transactions for the user-level
  * function pg_prepared_xact.
@@ -1013,21 +1010,13 @@ StartPrepare(GlobalTransaction gxact)
 }
 
 /*
- * Finish preparing state file.
- *
- * Calculates CRC and writes state file to WAL and in pg_twophase directory.
+ * Finish preparing state data and writing it to WAL.
  */
 void
 EndPrepare(GlobalTransaction gxact)
 {
-       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader *hdr;
-       char            path[MAXPGPATH];
        StateFileChunk *record;
-       pg_crc32c       statefile_crc;
-       pg_crc32c       bogus_crc;
-       int                     fd;
 
        /* Add the end sentinel to the list of 2PC records */
        RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1039,8 +1028,9 @@ EndPrepare(GlobalTransaction gxact)
        hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
        /*
-        * If the file size exceeds MaxAllocSize, we won't be able to read it in
-        * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
+        * If the data size exceeds MaxAllocSize, we won't be able to read it in
+        * ReadTwoPhaseFile. Check for that now, rather than fail in the case
+        * where we write data to file and then re-read at commit time.
         */
        if (hdr->total_len > MaxAllocSize)
                ereport(ERROR,
@@ -1048,70 +1038,8 @@ EndPrepare(GlobalTransaction gxact)
                                 errmsg("two-phase state file maximum length exceeded")));
 
        /*
-        * Create the 2PC state file.
-        */
-       TwoPhaseFilePath(path, xid);
-
-       fd = OpenTransientFile(path,
-                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
-                                                  S_IRUSR | S_IWUSR);
-       if (fd < 0)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not create two-phase state file \"%s\": %m",
-                                               path)));
-
-       /* Write data to file, and calculate CRC as we pass over it */
-       INIT_CRC32C(statefile_crc);
-
-       for (record = records.head; record != NULL; record = record->next)
-       {
-               COMP_CRC32C(statefile_crc, record->data, record->len);
-               if ((write(fd, record->data, record->len)) != record->len)
-               {
-                       CloseTransientFile(fd);
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not write two-phase state file: %m")));
-               }
-       }
-
-       FIN_CRC32C(statefile_crc);
-
-       /*
-        * Write a deliberately bogus CRC to the state file; this is just paranoia
-        * to catch the case where four more bytes will run us out of disk space.
-        */
-       bogus_crc = ~statefile_crc;
-
-       if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
-       {
-               CloseTransientFile(fd);
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not write two-phase state file: %m")));
-       }
-
-       /* Back up to prepare for rewriting the CRC */
-       if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
-       {
-               CloseTransientFile(fd);
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not seek in two-phase state file: %m")));
-       }
-
-       /*
-        * The state file isn't valid yet, because we haven't written the correct
-        * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
-        *
-        * Between the time we have written the WAL entry and the time we write
-        * out the correct state file CRC, we have an inconsistency: the xact is
-        * prepared according to WAL but not according to our on-disk state. We
-        * use a critical section to force a PANIC if we are unable to complete
-        * the write --- then, WAL replay should repair the inconsistency.  The
-        * odds of a PANIC actually occurring should be very tiny given that we
-        * were able to write the bogus CRC above.
+        * Now writing 2PC state data to WAL. We let the WAL's CRC protection
+        * cover us, so no need to calculate a separate CRC.
         *
         * We have to set delayChkpt here, too; otherwise a checkpoint starting
         * immediately after the WAL record is inserted could complete without
@@ -1131,24 +1059,13 @@ EndPrepare(GlobalTransaction gxact)
        XLogBeginInsert();
        for (record = records.head; record != NULL; record = record->next)
                XLogRegisterData(record->data, record->len);
-       gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
-       XLogFlush(gxact->prepare_lsn);
+       gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+       XLogFlush(gxact->prepare_end_lsn);
 
        /* If we crash now, we have prepared: WAL replay will fix things */
 
-       /* write correct CRC and close file */
-       if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
-       {
-               CloseTransientFile(fd);
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not write two-phase state file: %m")));
-       }
-
-       if (CloseTransientFile(fd) != 0)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not close two-phase state file: %m")));
+       /* Store record's start location to read that later on Commit */
+       gxact->prepare_start_lsn = ProcLastRecPtr;
 
        /*
         * Mark the prepared transaction as valid.  As soon as xact.c marks
@@ -1186,7 +1103,7 @@ EndPrepare(GlobalTransaction gxact)
         * Note that at this stage we have marked the prepare, but still show as
         * running in the procarray (twice!) and continue to hold locks.
         */
-       SyncRepWaitForLSN(gxact->prepare_lsn);
+       SyncRepWaitForLSN(gxact->prepare_end_lsn);
 
        records.tail = records.head = NULL;
        records.num_chunks = 0;
@@ -1315,6 +1232,57 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
        return buf;
 }
 
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ *
+ * Note clearly that this function accesses WAL during normal operation, similarly
+ * to the way WALSender or Logical Decoding would do. It does not run during
+ * crash recovery or standby processing.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+       XLogRecord *record;
+       XLogReaderState *xlogreader;
+       char       *errormsg;
+
+       Assert(!RecoveryInProgress());
+
+       xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
+       if (!xlogreader)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory"),
+                                errdetail("Failed while allocating an XLog reading processor.")));
+
+       record = XLogReadRecord(xlogreader, lsn, &errormsg);
+       if (record == NULL)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read two-phase state from xlog at %X/%X",
+                                                       (uint32) (lsn >> 32),
+                                                       (uint32) lsn)));
+
+       if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
+               (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("expected two-phase state data is not present in xlog at %X/%X",
+                                                       (uint32) (lsn >> 32),
+                                                       (uint32) lsn)));
+
+       if (len != NULL)
+               *len = XLogRecGetDataLen(xlogreader);
+
+       *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+       memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
+
+       XLogReaderFree(xlogreader);
+}
+
+
 /*
  * Confirms an xid is prepared, during recovery
  */
@@ -1375,14 +1343,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        xid = pgxact->xid;
 
        /*
-        * Read and validate the state file
+        * Read and validate 2PC state data.
+        * State data will typically be stored in WAL files if the LSN is after the
+        * last checkpoint record, or moved to disk if for some reason they have
+        * lived for a long time.
         */
-       buf = ReadTwoPhaseFile(xid, true);
-       if (buf == NULL)
-               ereport(ERROR,
-                               (errcode(ERRCODE_DATA_CORRUPTED),
-                                errmsg("two-phase state file for transaction %u is corrupt",
-                                               xid)));
+       if (gxact->ondisk)
+               buf = ReadTwoPhaseFile(xid, true);
+       else
+               XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
 
        /*
         * Disassemble the header area
@@ -1482,9 +1452,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        AtEOXact_PgStat(isCommit);
 
        /*
-        * And now we can clean up our mess.
+        * And now we can clean up any files we may have left.
         */
-       RemoveTwoPhaseFile(xid, true);
+       if (gxact->ondisk)
+               RemoveTwoPhaseFile(xid, true);
 
        RemoveGXact(gxact);
        MyLockedGxact = NULL;
@@ -1493,8 +1464,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 }
 
 /*
- * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
- * and call the indicated callbacks for each 2PC record.
+ * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
  */
 static void
 ProcessRecords(char *bufptr, TransactionId xid,
@@ -1539,7 +1509,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
 }
 
 /*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
  *
  * Note: content and len don't include CRC.
  */
@@ -1610,97 +1581,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
  * This is deliberately run as late as possible in the checkpoint sequence,
  * because GXACTs ordinarily have short lifespans, and so it is quite
  * possible that GXACTs that were valid at checkpoint start will no longer
- * exist if we wait a little bit.
+ * exist if we wait a little bit. With typical checkpoint settings this
+ * will be about 3 minutes for an online checkpoint, so as a result we
+ * we expect that there will be no GXACTs that need to be copied to disk.
  *
- * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
- * each time.  This is considered unusual enough that we don't bother to
- * expend any extra code to avoid the redundant fsyncs.  (They should be
- * reasonably cheap anyway, since they won't cause I/O.)
+ * If a GXACT remains valid across multiple checkpoints, it will already
+ * be on disk so we don't bother to repeat that write.
  */
 void
 CheckPointTwoPhase(XLogRecPtr redo_horizon)
 {
-       TransactionId *xids;
-       int                     nxids;
-       char            path[MAXPGPATH];
        int                     i;
+       int                     serialized_xacts = 0;
 
-       /*
-        * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
-        * it just long enough to make a list of the XIDs that require fsyncing,
-        * and then do the I/O afterwards.
-        *
-        * This approach creates a race condition: someone else could delete a
-        * GXACT between the time we release TwoPhaseStateLock and the time we try
-        * to open its state file.  We handle this by special-casing ENOENT
-        * failures: if we see that, we verify that the GXACT is no longer valid,
-        * and if so ignore the failure.
-        */
        if (max_prepared_xacts <= 0)
                return;                                 /* nothing to do */
 
        TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
 
-       xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
-       nxids = 0;
-
+       /*
+        * We are expecting there to be zero GXACTs that need to be
+        * copied to disk, so we perform all I/O while holding
+        * TwoPhaseStateLock for simplicity. This prevents any new xacts
+        * from preparing while this occurs, which shouldn't be a problem
+        * since the presence of long-lived prepared xacts indicates the
+        * transaction manager isn't active.
+        *
+        * It's also possible to move I/O out of the lock, but on
+        * every error we should check whether somebody commited our
+        * transaction in different backend. Let's leave this optimisation
+        * for future, if somebody will spot that this place cause
+        * bottleneck.
+        *
+        * Note that it isn't possible for there to be a GXACT with
+        * a prepare_end_lsn set prior to the last checkpoint yet
+        * is marked invalid, because of the efforts with delayChkpt.
+        */
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (gxact->valid &&
-                       gxact->prepare_lsn <= redo_horizon)
-                       xids[nxids++] = pgxact->xid;
-       }
-
-       LWLockRelease(TwoPhaseStateLock);
-
-       for (i = 0; i < nxids; i++)
-       {
-               TransactionId xid = xids[i];
-               int                     fd;
-
-               TwoPhaseFilePath(path, xid);
-
-               fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
-               if (fd < 0)
+                       !gxact->ondisk &&
+                       gxact->prepare_end_lsn <= redo_horizon)
                {
-                       if (errno == ENOENT)
-                       {
-                               /* OK if gxact is no longer valid */
-                               if (!TransactionIdIsPrepared(xid))
-                                       continue;
-                               /* Restore errno in case it was changed */
-                               errno = ENOENT;
-                       }
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not open two-phase state file \"%s\": %m",
-                                                       path)));
-               }
+                       char       *buf;
+                       int             len;
 
-               if (pg_fsync(fd) != 0)
-               {
-                       CloseTransientFile(fd);
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not fsync two-phase state file \"%s\": %m",
-                                                       path)));
+                       XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
+                       RecreateTwoPhaseFile(pgxact->xid, buf, len);
+                       gxact->ondisk = true;
+                       pfree(buf);
+                       serialized_xacts++;
                }
-
-               if (CloseTransientFile(fd) != 0)
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not close two-phase state file \"%s\": %m",
-                                                       path)));
        }
-
-       pfree(xids);
+       LWLockRelease(TwoPhaseStateLock);
 
        TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
+
+       if (log_checkpoints && serialized_xacts > 0)
+               ereport(LOG,
+                               (errmsg("%u two-phase state files were written "
+                                               "for long-running prepared transactions",
+                                               serialized_xacts)));
 }
 
 /*
@@ -2029,17 +1974,11 @@ RecoverPreparedTransactions(void)
 
                        /*
                         * Recreate its GXACT and dummy PGPROC
-                        *
-                        * Note: since we don't have the PREPARE record's WAL location at
-                        * hand, we leave prepare_lsn zeroes.  This means the GXACT will
-                        * be fsync'd on every future checkpoint.  We assume this
-                        * situation is infrequent enough that the performance cost is
-                        * negligible (especially since we know the state file has already
-                        * been fsynced).
                         */
                        gxact = MarkAsPreparing(xid, hdr->gid,
                                                                        hdr->prepared_at,
                                                                        hdr->owner, hdr->database);
+                       gxact->ondisk = true;
                        GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
                        MarkAsPrepared(gxact);
 
index 7d5d493cdcd9a7e470190030091e07ba60e9d257..a2846c41b5843e88ad0b4c666519597c871383b6 100644 (file)
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
  * stored here.  The parallel leader advances its own copy, when necessary,
  * in WaitForParallelWorkersToFinish.
  */
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr     ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr     XactLastRecEnd = InvalidXLogRecPtr;
 XLogRecPtr     XactLastCommitEnd = InvalidXLogRecPtr;
 
index 3de337a207c055f5b6a68e0314c1d448116b9f06..ecd30ce3ce219225bfb2725fa0d2c8fb14c5b3a6 100644 (file)
@@ -86,6 +86,7 @@ typedef enum
        RECOVERY_TARGET_IMMEDIATE
 } RecoveryTargetType;
 
+extern XLogRecPtr ProcLastRecPtr;
 extern XLogRecPtr XactLastRecEnd;
 extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;