Store GID of 2PC in commit/abort WAL records when wal_level = logical.
This allows logical decoding to send the SAME gid to subscribers
across restarts of logical replication.
Track relica origin replay progress for 2PC.
(Edited from patch 0003 in the logical decoding 2PC series.)
Authors: Nikhil Sontakke, Stas Kelvich
Reviewed-by: Simon Riggs, Andres Freund
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
data += sizeof(xl_xact_xinfo);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ xl_xact_origin xl_origin;
+
+ /* we're only guaranteed 4 byte alignment, so copy onto stack */
+ memcpy(&xl_origin, data, sizeof(xl_origin));
+
+ parsed->origin_lsn = xl_origin.origin_lsn;
+ parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+ data += sizeof(xl_xact_origin);
}
}
*
* typedef struct GlobalTransactionData *GlobalTransaction appears in
* twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
*/
-#define GIDSIZE 200
typedef struct GlobalTransactionData
{
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval);
+ bool initfileinval,
+ const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
/*
* Header for a 2PC state file
*/
-#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
+ XLogRecPtr origin_lsn; /* lsn of this record at origin node */
+ TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
/*
{
TwoPhaseFileHeader *hdr;
StateFileChunk *record;
+ bool replorigin;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
+
+ if (replorigin)
+ {
+ Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+ hdr->origin_lsn = replorigin_session_origin_lsn;
+ hdr->origin_timestamp = replorigin_session_origin_timestamp;
+ }
+ else
+ {
+ hdr->origin_lsn = InvalidXLogRecPtr;
+ hdr->origin_timestamp = 0;
+ }
+
/*
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
+
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ gxact->prepare_end_lsn);
+
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
return buf;
}
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+ TwoPhaseFileHeader *hdr;
+ char *bufptr;
+
+ hdr = (TwoPhaseFileHeader *) xlrec;
+ bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+ parsed->origin_lsn = hdr->origin_lsn;
+ parsed->origin_timestamp = hdr->origin_timestamp;
+ parsed->twophase_xid = hdr->xid;
+ parsed->dbId = hdr->database;
+ parsed->nsubxacts = hdr->nsubxacts;
+ parsed->nrels = hdr->ncommitrels;
+ parsed->nabortrels = hdr->nabortrels;
+ parsed->nmsgs = hdr->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+ bufptr += MAXALIGN(hdr->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+ parsed->xnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
+
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
- hdr->initfileinval);
+ hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
- hdr->nabortrels, abortrels);
+ hdr->nabortrels, abortrels,
+ gid);
ProcArrayRemove(proc, latestXid);
if (buf == NULL)
continue;
- PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ PrepareRedoAdd(buf, InvalidXLogRecPtr,
+ InvalidXLogRecPtr, InvalidRepOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval)
+ bool initfileinval,
+ const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
ninvalmsgs, invalmsgs,
initfileinval, false,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
if (replorigin)
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ const char *gid)
{
XLogRecPtr recptr;
nchildren, children,
nrels, rels,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
* data, the entry is marked as located on disk.
*/
void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn, RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+ if (origin_id != InvalidRepOriginId)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
- InvalidTransactionId /* plain commit */ );
+ InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
- MyXactFlags, InvalidTransactionId);
+ MyXactFlags, InvalidTransactionId,
+ NULL);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
}
/* dump transaction origin information */
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_abort xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_twophase xl_twophase;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
+ }
+
+ if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ /* dump transaction origin information only for abort prepared */
+ if ( (replorigin_session_origin != InvalidRepOriginId) &&
+ TransactionIdIsValid(twophase_xid) &&
+ XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_session_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
if (xl_xinfo.xinfo != 0)
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
{
XLogRegisterData((char *) (&xl_subxacts),
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ if (TransactionIdIsValid(twophase_xid))
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);
}
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
- record->EndRecPtr);
+ record->EndRecPtr,
+ XLogRecGetOrigin(record));
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)
#define TWOPHASE_H
#include "access/xlogdefs.h"
+#include "access/xact.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+ xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
- XLogRecPtr end_lsn);
+ XLogRecPtr end_lsn, RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
#endif /* TWOPHASE_H */
#include "storage/sinval.h"
#include "utils/datetime.h"
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ *
+ * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
+ * specified in TwoPhaseFileHeader.
+ */
+#define GIDSIZE 200
/*
* Xact isolation levels
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
#define XACT_XINFO_HAS_AE_LOCKS (1U << 6)
+#define XACT_XINFO_HAS_GID (1U << 7)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
typedef struct xl_xact_parsed_commit
{
TimestampTz xact_time;
-
uint32 xinfo;
Oid dbId; /* MyDatabaseId */
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE]; /* only for 2PC */
+ int nabortrels; /* only for 2PC */
+ RelFileNode *abortnodes; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
+typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
+
typedef struct xl_xact_parsed_abort
{
TimestampTz xact_time;
uint32 xinfo;
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+
int nsubxacts;
TransactionId *subxacts;
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE]; /* only for 2PC */
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
} xl_xact_parsed_abort;
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
int xactflags,
- TransactionId twophase_xid);
+ TransactionId twophase_xid,
+ const char *twophase_gid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid);
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */