From 1eb6d6527aae264b3e0b9c95aa70bb7a594ad1cf Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 28 Mar 2018 17:42:50 +0100 Subject: [PATCH] Store 2PC GID in commit/abort WAL recs for logical decoding Store GID of 2PC in commit/abort WAL records when wal_level = logical. This allows logical decoding to send the SAME gid to subscribers across restarts of logical replication. Track relica origin replay progress for 2PC. (Edited from patch 0003 in the logical decoding 2PC series.) Authors: Nikhil Sontakke, Stas Kelvich Reviewed-by: Simon Riggs, Andres Freund --- src/backend/access/rmgrdesc/xactdesc.c | 39 +++++++++ src/backend/access/transam/twophase.c | 105 +++++++++++++++++++++---- src/backend/access/transam/xact.c | 78 ++++++++++++++++-- src/include/access/twophase.h | 5 +- src/include/access/xact.h | 27 ++++++- 5 files changed, 230 insertions(+), 24 deletions(-) diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index e5eef9ea43..b3e2fc3036 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } } if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo); } + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) { xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; @@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } + } + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + xl_xact_origin xl_origin; + + /* we're only guaranteed 4 byte alignment, so copy onto stack */ + memcpy(&xl_origin, data, sizeof(xl_origin)); + + parsed->origin_lsn = xl_origin.origin_lsn; + parsed->origin_timestamp = xl_origin.origin_timestamp; + + data += sizeof(xl_xact_origin); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c479c4881b..d6e4b7980f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -144,11 +144,7 @@ int max_prepared_xacts = 0; * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h - * - * Note that the max value of GIDSIZE must fit in the uint16 gidlen, - * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid) /* * Header for a 2PC state file */ -#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */ +#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */ typedef struct TwoPhaseFileHeader { @@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ uint16 gidlen; /* length of the GID - GID follows the header */ + XLogRecPtr origin_lsn; /* lsn of this record at origin node */ + TimestampTz origin_timestamp; /* time of prepare at origin node */ } TwoPhaseFileHeader; /* @@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact) { TwoPhaseFileHeader *hdr; StateFileChunk *record; + bool replorigin; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); + + if (replorigin) + { + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr); + hdr->origin_lsn = replorigin_session_origin_lsn; + hdr->origin_timestamp = replorigin_session_origin_timestamp; + } + else + { + hdr->origin_lsn = InvalidXLogRecPtr; + hdr->origin_timestamp = 0; + } + /* * If the data size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail in the case @@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + gxact->prepare_end_lsn); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->origin_lsn = hdr->origin_lsn; + parsed->origin_timestamp = hdr->origin_timestamp; + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->nrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->xnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); @@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void) if (buf == NULL) continue; - PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + PrepareRedoAdd(buf, InvalidXLogRecPtr, + InvalidXLogRecPtr, InvalidRepOriginId); } } LWLockRelease(TwoPhaseStateLock); @@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ninvalmsgs, invalmsgs, initfileinval, false, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); if (replorigin) @@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid, nchildren, children, nrels, rels, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); @@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * data, the entry is marked as located on disk. */ void -PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, RepOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (origin_id != InvalidRepOriginId) + { + /* recover apply progress */ + replorigin_advance(origin_id, hdr->origin_lsn, end_lsn, + false /* backward */ , false /* WAL */ ); + } + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cfc62011b5..b88d4ccf74 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1226,7 +1226,7 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, MyXactFlags, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - MyXactFlags, InvalidTransactionId); + MyXactFlags, InvalidTransactionId, + NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } } /* dump transaction origin information */ @@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5379,15 +5398,19 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; + xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + /* dump transaction origin information only for abort prepared */ + if ( (replorigin_session_origin != InvalidRepOriginId) && + TransactionIdIsValid(twophase_xid) && + XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_session_origin_lsn; + xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + if (TransactionIdIsValid(twophase_xid)) + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); return XLogInsert(RM_XACT_ID, info); } @@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record) LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, - record->EndRecPtr); + record->EndRecPtr, + XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 34d9470811..f05cde202f 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); +extern void ParsePrepareRecord(uint8 info, char *xlrec, + xl_xact_parsed_prepare *parsed); extern void StandbyRecoverPreparedTransactions(void); extern void RecoverPreparedTransactions(void); @@ -54,7 +57,7 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, - XLogRecPtr end_lsn); + XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); #endif /* TWOPHASE_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 87ae2cd4df..a46396f2d9 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -21,6 +21,13 @@ #include "storage/sinval.h" #include "utils/datetime.h" +/* + * Maximum size of Global Transaction ID (including '\0'). + * + * Note that the max value of GIDSIZE must fit in the uint16 gidlen, + * specified in TwoPhaseFileHeader. + */ +#define GIDSIZE 200 /* * Xact isolation levels @@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) +#define XACT_XINFO_HAS_GID (1U << 7) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -286,7 +294,6 @@ typedef struct xl_xact_abort typedef struct xl_xact_parsed_commit { TimestampTz xact_time; - uint32 xinfo; Oid dbId; /* MyDatabaseId */ @@ -302,16 +309,24 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; /* only for 2PC */ + int nabortrels; /* only for 2PC */ + RelFileNode *abortnodes; /* only for 2PC */ XLogRecPtr origin_lsn; TimestampTz origin_timestamp; } xl_xact_parsed_commit; +typedef xl_xact_parsed_commit xl_xact_parsed_prepare; + typedef struct xl_xact_parsed_abort { TimestampTz xact_time; uint32 xinfo; + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + int nsubxacts; TransactionId *subxacts; @@ -319,6 +334,10 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; /* only for 2PC */ + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_abort; @@ -386,12 +405,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, - TransactionId twophase_xid); + TransactionId twophase_xid, + const char *twophase_gid); extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid); + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ -- 2.40.0