]> granicus.if.org Git - postgresql/commitdiff
Store 2PC GID in commit/abort WAL recs for logical decoding
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 28 Mar 2018 16:42:50 +0000 (17:42 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 28 Mar 2018 16:42:50 +0000 (17:42 +0100)
Store GID of 2PC in commit/abort WAL records when wal_level = logical.
This allows logical decoding to send the SAME gid to subscribers
across restarts of logical replication.

Track relica origin replay progress for 2PC.

(Edited from patch 0003 in the logical decoding 2PC series.)

Authors: Nikhil Sontakke, Stas Kelvich
Reviewed-by: Simon Riggs, Andres Freund
src/backend/access/rmgrdesc/xactdesc.c
src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/include/access/twophase.h
src/include/access/xact.h

index e5eef9ea43908877c21cf1d78d40686138570a09..b3e2fc3036c4a025e8aeb61052fb158da4d10e8f 100644 (file)
@@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
                parsed->twophase_xid = xl_twophase->xid;
 
                data += sizeof(xl_xact_twophase);
+
+               if (parsed->xinfo & XACT_XINFO_HAS_GID)
+               {
+                       int gidlen;
+                       strcpy(parsed->twophase_gid, data);
+                       gidlen = strlen(parsed->twophase_gid) + 1;
+                       data += MAXALIGN(gidlen);
+               }
        }
 
        if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
                data += sizeof(xl_xact_xinfo);
        }
 
+       if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+       {
+               xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+               parsed->dbId = xl_dbinfo->dbId;
+               parsed->tsId = xl_dbinfo->tsId;
+
+               data += sizeof(xl_xact_dbinfo);
+       }
+
        if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
        {
                xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
                parsed->twophase_xid = xl_twophase->xid;
 
                data += sizeof(xl_xact_twophase);
+
+               if (parsed->xinfo & XACT_XINFO_HAS_GID)
+               {
+                       int gidlen;
+                       strcpy(parsed->twophase_gid, data);
+                       gidlen = strlen(parsed->twophase_gid) + 1;
+                       data += MAXALIGN(gidlen);
+               }
+       }
+
+       if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+       {
+               xl_xact_origin xl_origin;
+
+               /* we're only guaranteed 4 byte alignment, so copy onto stack */
+               memcpy(&xl_origin, data, sizeof(xl_origin));
+
+               parsed->origin_lsn = xl_origin.origin_lsn;
+               parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+               data += sizeof(xl_xact_origin);
        }
 }
 
index c479c4881b9f3e576ece59ee513764f37b349782..d6e4b7980f3c08be0c193943cff546ee951a98af 100644 (file)
@@ -144,11 +144,7 @@ int                        max_prepared_xacts = 0;
  *
  * typedef struct GlobalTransactionData *GlobalTransaction appears in
  * twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
                                                                RelFileNode *rels,
                                                                int ninvalmsgs,
                                                                SharedInvalidationMessage *invalmsgs,
-                                                               bool initfileinval);
+                                                               bool initfileinval,
+                                                               const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
                                                           int nchildren,
                                                           TransactionId *children,
                                                           int nrels,
-                                                          RelFileNode *rels);
+                                                          RelFileNode *rels,
+                                                          const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
                           const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC 0x57F94533      /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94534      /* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
        int32           ninvalmsgs;             /* number of cache invalidation messages */
        bool            initfileinval;  /* does relcache init file need invalidation? */
        uint16          gidlen;                 /* length of the GID - GID follows the header */
+       XLogRecPtr      origin_lsn;             /* lsn of this record at origin node */
+       TimestampTz origin_timestamp; /* time of prepare at origin node */
 } TwoPhaseFileHeader;
 
 /*
@@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
 {
        TwoPhaseFileHeader *hdr;
        StateFileChunk *record;
+       bool                    replorigin;
 
        /* Add the end sentinel to the list of 2PC records */
        RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
        Assert(hdr->magic == TWOPHASE_MAGIC);
        hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
+       replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+                                 replorigin_session_origin != DoNotReplicateId);
+
+       if (replorigin)
+       {
+               Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+               hdr->origin_lsn = replorigin_session_origin_lsn;
+               hdr->origin_timestamp = replorigin_session_origin_timestamp;
+       }
+       else
+       {
+               hdr->origin_lsn = InvalidXLogRecPtr;
+               hdr->origin_timestamp = 0;
+       }
+
        /*
         * If the data size exceeds MaxAllocSize, we won't be able to read it in
         * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
        XLogBeginInsert();
        for (record = records.head; record != NULL; record = record->next)
                XLogRegisterData(record->data, record->len);
+
+       XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
        gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+       if (replorigin)
+               /* Move LSNs forward for this replication origin */
+               replorigin_session_advance(replorigin_session_origin_lsn,
+                                                                  gxact->prepare_end_lsn);
+
        XLogFlush(gxact->prepare_end_lsn);
 
        /* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
        return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+       TwoPhaseFileHeader *hdr;
+       char *bufptr;
+
+       hdr = (TwoPhaseFileHeader *) xlrec;
+       bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+       parsed->origin_lsn = hdr->origin_lsn;
+       parsed->origin_timestamp = hdr->origin_timestamp;
+       parsed->twophase_xid = hdr->xid;
+       parsed->dbId = hdr->database;
+       parsed->nsubxacts = hdr->nsubxacts;
+       parsed->nrels = hdr->ncommitrels;
+       parsed->nabortrels = hdr->nabortrels;
+       parsed->nmsgs = hdr->ninvalmsgs;
+
+       strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+       bufptr += MAXALIGN(hdr->gidlen);
+
+       parsed->subxacts = (TransactionId *) bufptr;
+       bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+       parsed->xnodes = (RelFileNode *) bufptr;
+       bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+       parsed->abortnodes = (RelFileNode *) bufptr;
+       bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+       parsed->msgs = (SharedInvalidationMessage *) bufptr;
+       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
                                                                                hdr->nsubxacts, children,
                                                                                hdr->ncommitrels, commitrels,
                                                                                hdr->ninvalmsgs, invalmsgs,
-                                                                               hdr->initfileinval);
+                                                                               hdr->initfileinval, gid);
        else
                RecordTransactionAbortPrepared(xid,
                                                                           hdr->nsubxacts, children,
-                                                                          hdr->nabortrels, abortrels);
+                                                                          hdr->nabortrels, abortrels,
+                                                                          gid);
 
        ProcArrayRemove(proc, latestXid);
 
@@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
                        if (buf == NULL)
                                continue;
 
-                       PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+                       PrepareRedoAdd(buf, InvalidXLogRecPtr,
+                                                  InvalidXLogRecPtr, InvalidRepOriginId);
                }
        }
        LWLockRelease(TwoPhaseStateLock);
@@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
                                                                RelFileNode *rels,
                                                                int ninvalmsgs,
                                                                SharedInvalidationMessage *invalmsgs,
-                                                               bool initfileinval)
+                                                               bool initfileinval,
+                                                               const char *gid)
 {
        XLogRecPtr      recptr;
        TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
                                                                 ninvalmsgs, invalmsgs,
                                                                 initfileinval, false,
                                                                 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-                                                                xid);
+                                                                xid, gid);
 
 
        if (replorigin)
@@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
                                                           int nchildren,
                                                           TransactionId *children,
                                                           int nrels,
-                                                          RelFileNode *rels)
+                                                          RelFileNode *rels,
+                                                          const char *gid)
 {
        XLogRecPtr      recptr;
 
@@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
                                                                nchildren, children,
                                                                nrels, rels,
                                                                MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-                                                               xid);
+                                                               xid, gid);
 
        /* Always flush, since we're about to remove the 2PC state file */
        XLogFlush(recptr);
@@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
  * data, the entry is marked as located on disk.
  */
 void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+                          XLogRecPtr end_lsn, RepOriginId origin_id)
 {
        TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
        char       *bufptr;
@@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
        Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
        TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
 
+       if (origin_id != InvalidRepOriginId)
+       {
+               /* recover apply progress */
+               replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
+                                                  false /* backward */ , false /* WAL */ );
+       }
+
        elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
 }
 
index cfc62011b506d6df780d4f75fdaede7643bd91b9..b88d4ccf746847878809e172b80745589fd451b4 100644 (file)
@@ -1226,7 +1226,7 @@ RecordTransactionCommit(void)
                                                        nmsgs, invalMessages,
                                                        RelcacheInitFileInval, forceSyncCommit,
                                                        MyXactFlags,
-                                                       InvalidTransactionId /* plain commit */ );
+                                                       InvalidTransactionId, NULL /* plain commit */ );
 
                if (replorigin)
                        /* Move LSNs forward for this replication origin */
@@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact)
        XactLogAbortRecord(xact_time,
                                           nchildren, children,
                                           nrels, rels,
-                                          MyXactFlags, InvalidTransactionId);
+                                          MyXactFlags, InvalidTransactionId,
+                                          NULL);
 
        /*
         * Report the latest async abort LSN, so that the WAL writer knows to
@@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time,
                                        int nrels, RelFileNode *rels,
                                        int nmsgs, SharedInvalidationMessage *msgs,
                                        bool relcacheInval, bool forceSync,
-                                       int xactflags, TransactionId twophase_xid)
+                                       int xactflags, TransactionId twophase_xid,
+                                       const char *twophase_gid)
 {
        xl_xact_commit xlrec;
        xl_xact_xinfo xl_xinfo;
@@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time,
        xl_xact_origin xl_origin;
 
        uint8           info;
+       int                     gidlen = 0;
 
        Assert(CritSectionCount > 0);
 
@@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time,
        {
                xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
                xl_twophase.xid = twophase_xid;
+               Assert(twophase_gid != NULL);
+
+               if (XLogLogicalInfoActive())
+               {
+                       xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+                       gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+               }
        }
 
        /* dump transaction origin information */
@@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time,
        }
 
        if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+       {
                XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+               if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+               {
+                       static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+                       XLogRegisterData((char*) twophase_gid, gidlen);
+                       if (MAXALIGN(gidlen) != gidlen)
+                               XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+               }
+       }
 
        if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
                XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5379,15 +5398,19 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
                                   int nsubxacts, TransactionId *subxacts,
                                   int nrels, RelFileNode *rels,
-                                  int xactflags, TransactionId twophase_xid)
+                                  int xactflags, TransactionId twophase_xid,
+                                  const char *twophase_gid)
 {
        xl_xact_abort xlrec;
        xl_xact_xinfo xl_xinfo;
        xl_xact_subxacts xl_subxacts;
        xl_xact_relfilenodes xl_relfilenodes;
        xl_xact_twophase xl_twophase;
+       xl_xact_dbinfo xl_dbinfo;
+       xl_xact_origin xl_origin;
 
        uint8           info;
+       int                     gidlen = 0;
 
        Assert(CritSectionCount > 0);
 
@@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time,
        {
                xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
                xl_twophase.xid = twophase_xid;
+               Assert(twophase_gid != NULL);
+
+               if (XLogLogicalInfoActive())
+               {
+                       xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+                       gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+               }
+       }
+
+       if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+       {
+               xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+               xl_dbinfo.dbId = MyDatabaseId;
+               xl_dbinfo.tsId = MyDatabaseTableSpace;
+       }
+
+       /* dump transaction origin information only for abort prepared */
+       if ( (replorigin_session_origin != InvalidRepOriginId) &&
+                       TransactionIdIsValid(twophase_xid) &&
+                       XLogLogicalInfoActive())
+       {
+               xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+               xl_origin.origin_lsn = replorigin_session_origin_lsn;
+               xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
        }
 
        if (xl_xinfo.xinfo != 0)
@@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time,
        if (xl_xinfo.xinfo != 0)
                XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+       if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+               XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+
        if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
        {
                XLogRegisterData((char *) (&xl_subxacts),
@@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time,
        }
 
        if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+       {
                XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+               if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+               {
+                       static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+                       XLogRegisterData((char*) twophase_gid, gidlen);
+                       if (MAXALIGN(gidlen) != gidlen)
+                               XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+               }
+       }
+
+       if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+               XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+       if (TransactionIdIsValid(twophase_xid))
+               XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
        return XLogInsert(RM_XACT_ID, info);
 }
@@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record)
                LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                PrepareRedoAdd(XLogRecGetData(record),
                                           record->ReadRecPtr,
-                                          record->EndRecPtr);
+                                          record->EndRecPtr,
+                                          XLogRecGetOrigin(record));
                LWLockRelease(TwoPhaseStateLock);
        }
        else if (info == XLOG_XACT_ASSIGNMENT)
index 34d9470811bcd3c6b90ac76642dd94326c5077c2..f05cde202f710868f529bde6f37d2418e6601f0f 100644 (file)
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
                                                        int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+                                                       xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(void);
 extern void RecoverPreparedTransactions(void);
 
@@ -54,7 +57,7 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
 extern void FinishPreparedTransaction(const char *gid, bool isCommit);
 
 extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
-                          XLogRecPtr end_lsn);
+                          XLogRecPtr end_lsn, RepOriginId origin_id);
 extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 #endif                                                 /* TWOPHASE_H */
index 87ae2cd4df66cb916111683211d22d872cf744e0..a46396f2d92f4e5d3f0ffc48dce3c1148d545bd9 100644 (file)
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ *
+ * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
+ * specified in TwoPhaseFileHeader.
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_TWOPHASE                        (1U << 4)
 #define XACT_XINFO_HAS_ORIGIN                  (1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS                        (1U << 6)
+#define XACT_XINFO_HAS_GID                             (1U << 7)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -286,7 +294,6 @@ typedef struct xl_xact_abort
 typedef struct xl_xact_parsed_commit
 {
        TimestampTz xact_time;
-
        uint32          xinfo;
 
        Oid                     dbId;                   /* MyDatabaseId */
@@ -302,16 +309,24 @@ typedef struct xl_xact_parsed_commit
        SharedInvalidationMessage *msgs;
 
        TransactionId twophase_xid; /* only for 2PC */
+       char            twophase_gid[GIDSIZE]; /* only for 2PC */
+       int                     nabortrels;             /* only for 2PC */
+       RelFileNode *abortnodes;        /* only for 2PC */
 
        XLogRecPtr      origin_lsn;
        TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
        TimestampTz xact_time;
        uint32          xinfo;
 
+       Oid                     dbId;                   /* MyDatabaseId */
+       Oid                     tsId;                   /* MyDatabaseTableSpace */
+
        int                     nsubxacts;
        TransactionId *subxacts;
 
@@ -319,6 +334,10 @@ typedef struct xl_xact_parsed_abort
        RelFileNode *xnodes;
 
        TransactionId twophase_xid; /* only for 2PC */
+       char            twophase_gid[GIDSIZE]; /* only for 2PC */
+
+       XLogRecPtr      origin_lsn;
+       TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
 
@@ -386,12 +405,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
                                        int nmsgs, SharedInvalidationMessage *msgs,
                                        bool relcacheInval, bool forceSync,
                                        int xactflags,
-                                       TransactionId twophase_xid);
+                                       TransactionId twophase_xid,
+                                       const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
                                   int nsubxacts, TransactionId *subxacts,
                                   int nrels, RelFileNode *rels,
-                                  int xactflags, TransactionId twophase_xid);
+                                  int xactflags, TransactionId twophase_xid,
+                                  const char *twophase_gid);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */