1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/twophase.c
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25 * what keeps the XID considered running by TransactionIdIsInProgress.
26 * It is also convenient as a PGPROC to hook the gxact's locks to.
28 * In order to survive crashes and shutdowns, all prepared
29 * transactions must be stored in permanent storage. This includes
30 * locking information, pending notifications etc. All that state
31 * information is written to the per-transaction state file in
32 * the pg_twophase directory.
34 *-------------------------------------------------------------------------
40 #include <sys/types.h>
44 #include "access/htup_details.h"
45 #include "access/subtrans.h"
46 #include "access/transam.h"
47 #include "access/twophase.h"
48 #include "access/twophase_rmgr.h"
49 #include "access/xact.h"
50 #include "access/xlog.h"
51 #include "access/xlogutils.h"
52 #include "catalog/pg_type.h"
53 #include "catalog/storage.h"
55 #include "miscadmin.h"
58 #include "replication/walsender.h"
59 #include "replication/syncrep.h"
60 #include "storage/fd.h"
61 #include "storage/predicate.h"
62 #include "storage/proc.h"
63 #include "storage/procarray.h"
64 #include "storage/sinvaladt.h"
65 #include "storage/smgr.h"
66 #include "utils/builtins.h"
67 #include "utils/memutils.h"
68 #include "utils/timestamp.h"
72 * Directory where Two-phase commit files reside within PGDATA
74 #define TWOPHASE_DIR "pg_twophase"
76 /* GUC variable, can't be changed after startup */
77 int max_prepared_xacts = 0;
80 * This struct describes one global transaction that is in prepared state
81 * or attempting to become prepared.
83 * The lifecycle of a global transaction is:
85 * 1. After checking that the requested GID is not in use, set up an
86 * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
87 * with locking_xid = my own XID and valid = false.
89 * 2. After successfully completing prepare, set valid = true and enter the
90 * referenced PGPROC into the global ProcArray.
92 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
93 * is valid and its locking_xid is no longer active, then store my current
94 * XID into locking_xid. This prevents concurrent attempts to commit or
95 * rollback the same prepared xact.
97 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
98 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
101 * Note that if the preparing transaction fails between steps 1 and 2, the
102 * entry will remain in prepXacts until recycled. We can detect recyclable
103 * entries by checking for valid = false and locking_xid no longer active.
105 * typedef struct GlobalTransactionData *GlobalTransaction appears in
110 typedef struct GlobalTransactionData
112 GlobalTransaction next; /* list link for free list */
113 int pgprocno; /* ID of associated dummy PGPROC */
114 BackendId dummyBackendId; /* similar to backend id for backends */
115 TimestampTz prepared_at; /* time of preparation */
116 XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
117 Oid owner; /* ID of user that executed the xact */
118 TransactionId locking_xid; /* top-level XID of backend working on xact */
119 bool valid; /* TRUE if fully prepared */
120 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
121 } GlobalTransactionData;
124 * Two Phase Commit shared state. Access to this struct is protected
125 * by TwoPhaseStateLock.
127 typedef struct TwoPhaseStateData
129 /* Head of linked list of free GlobalTransactionData structs */
130 GlobalTransaction freeGXacts;
132 /* Number of valid prepXacts entries. */
136 * There are max_prepared_xacts items in this array, but C wants a
139 GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */
140 } TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
142 static TwoPhaseStateData *TwoPhaseState;
145 static void RecordTransactionCommitPrepared(TransactionId xid,
147 TransactionId *children,
151 SharedInvalidationMessage *invalmsgs,
153 static void RecordTransactionAbortPrepared(TransactionId xid,
155 TransactionId *children,
158 static void ProcessRecords(char *bufptr, TransactionId xid,
159 const TwoPhaseCallback callbacks[]);
163 * Initialization of shared memory
166 TwoPhaseShmemSize(void)
170 /* Need the fixed struct, the array of pointers, and the GTD structs */
171 size = offsetof(TwoPhaseStateData, prepXacts);
172 size = add_size(size, mul_size(max_prepared_xacts,
173 sizeof(GlobalTransaction)));
174 size = MAXALIGN(size);
175 size = add_size(size, mul_size(max_prepared_xacts,
176 sizeof(GlobalTransactionData)));
182 TwoPhaseShmemInit(void)
186 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
189 if (!IsUnderPostmaster)
191 GlobalTransaction gxacts;
195 TwoPhaseState->freeGXacts = NULL;
196 TwoPhaseState->numPrepXacts = 0;
199 * Initialize the linked list of free GlobalTransactionData structs
201 gxacts = (GlobalTransaction)
202 ((char *) TwoPhaseState +
203 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
204 sizeof(GlobalTransaction) * max_prepared_xacts));
205 for (i = 0; i < max_prepared_xacts; i++)
207 /* insert into linked list */
208 gxacts[i].next = TwoPhaseState->freeGXacts;
209 TwoPhaseState->freeGXacts = &gxacts[i];
211 /* associate it with a PGPROC assigned by InitProcGlobal */
212 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
215 * Assign a unique ID for each dummy proc, so that the range of
216 * dummy backend IDs immediately follows the range of normal
217 * backend IDs. We don't dare to assign a real backend ID to dummy
218 * procs, because prepared transactions don't take part in cache
219 * invalidation like a real backend ID would imply, but having a
220 * unique ID for them is nevertheless handy. This arrangement
221 * allows you to allocate an array of size (MaxBackends +
222 * max_prepared_xacts + 1), and have a slot for every backend and
223 * prepared transaction. Currently multixact.c uses that
226 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
236 * Reserve the GID for the given transaction.
238 * Internally, this creates a gxact struct and puts it into the active array.
239 * NOTE: this is also used when reloading a gxact after a crash; so avoid
240 * assuming that we can use very much backend context.
243 MarkAsPreparing(TransactionId xid, const char *gid,
244 TimestampTz prepared_at, Oid owner, Oid databaseid)
246 GlobalTransaction gxact;
251 if (strlen(gid) >= GIDSIZE)
253 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
254 errmsg("transaction identifier \"%s\" is too long",
257 /* fail immediately if feature is disabled */
258 if (max_prepared_xacts == 0)
260 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
261 errmsg("prepared transactions are disabled"),
262 errhint("Set max_prepared_transactions to a nonzero value.")));
264 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
267 * First, find and recycle any gxacts that failed during prepare. We do
268 * this partly to ensure we don't mistakenly say their GIDs are still
269 * reserved, and partly so we don't fail on out-of-slots unnecessarily.
271 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
273 gxact = TwoPhaseState->prepXacts[i];
274 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
276 /* It's dead Jim ... remove from the active array */
277 TwoPhaseState->numPrepXacts--;
278 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
279 /* and put it back in the freelist */
280 gxact->next = TwoPhaseState->freeGXacts;
281 TwoPhaseState->freeGXacts = gxact;
282 /* Back up index count too, so we don't miss scanning one */
287 /* Check for conflicting GID */
288 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
290 gxact = TwoPhaseState->prepXacts[i];
291 if (strcmp(gxact->gid, gid) == 0)
294 (errcode(ERRCODE_DUPLICATE_OBJECT),
295 errmsg("transaction identifier \"%s\" is already in use",
300 /* Get a free gxact from the freelist */
301 if (TwoPhaseState->freeGXacts == NULL)
303 (errcode(ERRCODE_OUT_OF_MEMORY),
304 errmsg("maximum number of prepared transactions reached"),
305 errhint("Increase max_prepared_transactions (currently %d).",
306 max_prepared_xacts)));
307 gxact = TwoPhaseState->freeGXacts;
308 TwoPhaseState->freeGXacts = gxact->next;
310 proc = &ProcGlobal->allProcs[gxact->pgprocno];
311 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
313 /* Initialize the PGPROC entry */
314 MemSet(proc, 0, sizeof(PGPROC));
315 proc->pgprocno = gxact->pgprocno;
316 SHMQueueElemInit(&(proc->links));
317 proc->waitStatus = STATUS_OK;
318 /* We set up the gxact's VXID as InvalidBackendId/XID */
319 proc->lxid = (LocalTransactionId) xid;
321 pgxact->xmin = InvalidTransactionId;
322 pgxact->delayChkpt = false;
323 pgxact->vacuumFlags = 0;
325 proc->backendId = InvalidBackendId;
326 proc->databaseId = databaseid;
327 proc->roleId = owner;
328 proc->lwWaiting = false;
329 proc->lwWaitMode = 0;
330 proc->lwWaitLink = NULL;
331 proc->waitLock = NULL;
332 proc->waitProcLock = NULL;
333 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
334 SHMQueueInit(&(proc->myProcLocks[i]));
335 /* subxid data must be filled later by GXactLoadSubxactData */
336 pgxact->overflowed = false;
339 gxact->prepared_at = prepared_at;
340 /* initialize LSN to 0 (start of WAL) */
341 gxact->prepare_lsn = 0;
342 gxact->owner = owner;
343 gxact->locking_xid = xid;
344 gxact->valid = false;
345 strcpy(gxact->gid, gid);
347 /* And insert it into the active array */
348 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
349 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
351 LWLockRelease(TwoPhaseStateLock);
357 * GXactLoadSubxactData
359 * If the transaction being persisted had any subtransactions, this must
360 * be called before MarkAsPrepared() to load information into the dummy
364 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
365 TransactionId *children)
367 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
368 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
370 /* We need no extra lock since the GXACT isn't valid yet */
371 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
373 pgxact->overflowed = true;
374 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
378 memcpy(proc->subxids.xids, children,
379 nsubxacts * sizeof(TransactionId));
380 pgxact->nxids = nsubxacts;
386 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
389 MarkAsPrepared(GlobalTransaction gxact)
391 /* Lock here may be overkill, but I'm not convinced of that ... */
392 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
393 Assert(!gxact->valid);
395 LWLockRelease(TwoPhaseStateLock);
398 * Put it into the global ProcArray so TransactionIdIsInProgress considers
399 * the XID as still running.
401 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
406 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
408 static GlobalTransaction
409 LockGXact(const char *gid, Oid user)
413 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
415 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
417 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
418 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
420 /* Ignore not-yet-valid GIDs */
423 if (strcmp(gxact->gid, gid) != 0)
426 /* Found it, but has someone else got it locked? */
427 if (TransactionIdIsValid(gxact->locking_xid))
429 if (TransactionIdIsActive(gxact->locking_xid))
431 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
432 errmsg("prepared transaction with identifier \"%s\" is busy",
434 gxact->locking_xid = InvalidTransactionId;
437 if (user != gxact->owner && !superuser_arg(user))
439 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
440 errmsg("permission denied to finish prepared transaction"),
441 errhint("Must be superuser or the user that prepared the transaction.")));
444 * Note: it probably would be possible to allow committing from
445 * another database; but at the moment NOTIFY is known not to work and
446 * there may be some other issues as well. Hence disallow until
447 * someone gets motivated to make it work.
449 if (MyDatabaseId != proc->databaseId)
451 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
452 errmsg("prepared transaction belongs to another database"),
453 errhint("Connect to the database where the transaction was prepared to finish it.")));
455 /* OK for me to lock it */
456 gxact->locking_xid = GetTopTransactionId();
458 LWLockRelease(TwoPhaseStateLock);
463 LWLockRelease(TwoPhaseStateLock);
466 (errcode(ERRCODE_UNDEFINED_OBJECT),
467 errmsg("prepared transaction with identifier \"%s\" does not exist",
476 * Remove the prepared transaction from the shared memory array.
478 * NB: caller should have already removed it from ProcArray
481 RemoveGXact(GlobalTransaction gxact)
485 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
487 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
489 if (gxact == TwoPhaseState->prepXacts[i])
491 /* remove from the active array */
492 TwoPhaseState->numPrepXacts--;
493 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
495 /* and put it back in the freelist */
496 gxact->next = TwoPhaseState->freeGXacts;
497 TwoPhaseState->freeGXacts = gxact;
499 LWLockRelease(TwoPhaseStateLock);
505 LWLockRelease(TwoPhaseStateLock);
507 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
511 * TransactionIdIsPrepared
512 * True iff transaction associated with the identifier is prepared
513 * for two-phase commit
515 * Note: only gxacts marked "valid" are considered; but notice we do not
516 * check the locking status.
518 * This is not currently exported, because it is only needed internally.
521 TransactionIdIsPrepared(TransactionId xid)
526 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
528 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
530 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
531 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
533 if (gxact->valid && pgxact->xid == xid)
540 LWLockRelease(TwoPhaseStateLock);
546 * Returns an array of all prepared transactions for the user-level
547 * function pg_prepared_xact.
549 * The returned array and all its elements are copies of internal data
550 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
552 * WARNING -- we return even those transactions that are not fully prepared
553 * yet. The caller should filter them out if he doesn't want them.
555 * The returned array is palloc'd.
558 GetPreparedTransactionList(GlobalTransaction *gxacts)
560 GlobalTransaction array;
564 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
566 if (TwoPhaseState->numPrepXacts == 0)
568 LWLockRelease(TwoPhaseStateLock);
574 num = TwoPhaseState->numPrepXacts;
575 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
577 for (i = 0; i < num; i++)
578 memcpy(array + i, TwoPhaseState->prepXacts[i],
579 sizeof(GlobalTransactionData));
581 LWLockRelease(TwoPhaseStateLock);
587 /* Working status for pg_prepared_xact */
590 GlobalTransaction array;
597 * Produce a view with one row per prepared transaction.
599 * This function is here so we don't have to export the
600 * GlobalTransactionData struct definition.
603 pg_prepared_xact(PG_FUNCTION_ARGS)
605 FuncCallContext *funcctx;
606 Working_State *status;
608 if (SRF_IS_FIRSTCALL())
611 MemoryContext oldcontext;
613 /* create a function context for cross-call persistence */
614 funcctx = SRF_FIRSTCALL_INIT();
617 * Switch to memory context appropriate for multiple function calls
619 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
621 /* build tupdesc for result tuples */
622 /* this had better match pg_prepared_xacts view in system_views.sql */
623 tupdesc = CreateTemplateTupleDesc(5, false);
624 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
626 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
628 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
629 TIMESTAMPTZOID, -1, 0);
630 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
632 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
635 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
638 * Collect all the 2PC status information that we will format and send
639 * out as a result set.
641 status = (Working_State *) palloc(sizeof(Working_State));
642 funcctx->user_fctx = (void *) status;
644 status->ngxacts = GetPreparedTransactionList(&status->array);
647 MemoryContextSwitchTo(oldcontext);
650 funcctx = SRF_PERCALL_SETUP();
651 status = (Working_State *) funcctx->user_fctx;
653 while (status->array != NULL && status->currIdx < status->ngxacts)
655 GlobalTransaction gxact = &status->array[status->currIdx++];
656 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
657 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
667 * Form tuple with appropriate data.
669 MemSet(values, 0, sizeof(values));
670 MemSet(nulls, 0, sizeof(nulls));
672 values[0] = TransactionIdGetDatum(pgxact->xid);
673 values[1] = CStringGetTextDatum(gxact->gid);
674 values[2] = TimestampTzGetDatum(gxact->prepared_at);
675 values[3] = ObjectIdGetDatum(gxact->owner);
676 values[4] = ObjectIdGetDatum(proc->databaseId);
678 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
679 result = HeapTupleGetDatum(tuple);
680 SRF_RETURN_NEXT(funcctx, result);
683 SRF_RETURN_DONE(funcctx);
688 * Get the GlobalTransaction struct for a prepared transaction
691 static GlobalTransaction
692 TwoPhaseGetGXact(TransactionId xid)
694 GlobalTransaction result = NULL;
697 static TransactionId cached_xid = InvalidTransactionId;
698 static GlobalTransaction cached_gxact = NULL;
701 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
702 * repeatedly for the same XID. We can save work with a simple cache.
704 if (xid == cached_xid)
707 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
709 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
711 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
712 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
714 if (pgxact->xid == xid)
721 LWLockRelease(TwoPhaseStateLock);
723 if (result == NULL) /* should not happen */
724 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
727 cached_gxact = result;
733 * TwoPhaseGetDummyProc
734 * Get the dummy backend ID for prepared transaction specified by XID
736 * Dummy backend IDs are similar to real backend IDs of real backends.
737 * They start at MaxBackends + 1, and are unique across all currently active
738 * real backends and prepared transactions.
741 TwoPhaseGetDummyBackendId(TransactionId xid)
743 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
745 return gxact->dummyBackendId;
749 * TwoPhaseGetDummyProc
750 * Get the PGPROC that represents a prepared transaction specified by XID
753 TwoPhaseGetDummyProc(TransactionId xid)
755 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
757 return &ProcGlobal->allProcs[gxact->pgprocno];
760 /************************************************************************/
761 /* State file support */
762 /************************************************************************/
764 #define TwoPhaseFilePath(path, xid) \
765 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
768 * 2PC state file format:
770 * 1. TwoPhaseFileHeader
771 * 2. TransactionId[] (subtransactions)
772 * 3. RelFileNode[] (files to be deleted at commit)
773 * 4. RelFileNode[] (files to be deleted at abort)
774 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
775 * 6. TwoPhaseRecordOnDisk
777 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
780 * Each segment except the final CRC32 is MAXALIGN'd.
784 * Header for a 2PC state file
786 #define TWOPHASE_MAGIC 0x57F94532 /* format identifier */
788 typedef struct TwoPhaseFileHeader
790 uint32 magic; /* format identifier */
791 uint32 total_len; /* actual file length */
792 TransactionId xid; /* original transaction XID */
793 Oid database; /* OID of database it was in */
794 TimestampTz prepared_at; /* time of preparation */
795 Oid owner; /* user running the transaction */
796 int32 nsubxacts; /* number of following subxact XIDs */
797 int32 ncommitrels; /* number of delete-on-commit rels */
798 int32 nabortrels; /* number of delete-on-abort rels */
799 int32 ninvalmsgs; /* number of cache invalidation messages */
800 bool initfileinval; /* does relcache init file need invalidation? */
801 char gid[GIDSIZE]; /* GID for transaction */
802 } TwoPhaseFileHeader;
805 * Header for each record in a state file
807 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
808 * The rmgr data will be stored starting on a MAXALIGN boundary.
810 typedef struct TwoPhaseRecordOnDisk
812 uint32 len; /* length of rmgr data */
813 TwoPhaseRmgrId rmid; /* resource manager for this record */
814 uint16 info; /* flag bits for use by rmgr */
815 } TwoPhaseRecordOnDisk;
818 * During prepare, the state file is assembled in memory before writing it
819 * to WAL and the actual state file. We use a chain of XLogRecData blocks
820 * so that we will be able to pass the state file contents directly to
825 XLogRecData *head; /* first data block in the chain */
826 XLogRecData *tail; /* last block in chain */
827 uint32 bytes_free; /* free bytes left in tail block */
828 uint32 total_len; /* total data bytes in chain */
833 * Append a block of data to records data structure.
835 * NB: each block is padded to a MAXALIGN multiple. This must be
836 * accounted for when the file is later read!
838 * The data is copied, so the caller is free to modify it afterwards.
841 save_state_data(const void *data, uint32 len)
843 uint32 padlen = MAXALIGN(len);
845 if (padlen > records.bytes_free)
847 records.tail->next = palloc0(sizeof(XLogRecData));
848 records.tail = records.tail->next;
849 records.tail->buffer = InvalidBuffer;
850 records.tail->len = 0;
851 records.tail->next = NULL;
853 records.bytes_free = Max(padlen, 512);
854 records.tail->data = palloc(records.bytes_free);
857 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
858 records.tail->len += padlen;
859 records.bytes_free -= padlen;
860 records.total_len += padlen;
864 * Start preparing a state file.
866 * Initializes data structure and inserts the 2PC file header record.
869 StartPrepare(GlobalTransaction gxact)
871 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
872 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
873 TransactionId xid = pgxact->xid;
874 TwoPhaseFileHeader hdr;
875 TransactionId *children;
876 RelFileNode *commitrels;
877 RelFileNode *abortrels;
878 SharedInvalidationMessage *invalmsgs;
880 /* Initialize linked list */
881 records.head = palloc0(sizeof(XLogRecData));
882 records.head->buffer = InvalidBuffer;
883 records.head->len = 0;
884 records.head->next = NULL;
886 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
887 records.head->data = palloc(records.bytes_free);
889 records.tail = records.head;
891 records.total_len = 0;
894 hdr.magic = TWOPHASE_MAGIC;
895 hdr.total_len = 0; /* EndPrepare will fill this in */
897 hdr.database = proc->databaseId;
898 hdr.prepared_at = gxact->prepared_at;
899 hdr.owner = gxact->owner;
900 hdr.nsubxacts = xactGetCommittedChildren(&children);
901 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
902 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
903 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
905 StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
907 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
910 * Add the additional info about subxacts, deletable files and cache
911 * invalidation messages.
913 if (hdr.nsubxacts > 0)
915 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
916 /* While we have the child-xact data, stuff it in the gxact too */
917 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
919 if (hdr.ncommitrels > 0)
921 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
924 if (hdr.nabortrels > 0)
926 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
929 if (hdr.ninvalmsgs > 0)
931 save_state_data(invalmsgs,
932 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
938 * Finish preparing state file.
940 * Calculates CRC and writes state file to WAL and in pg_twophase directory.
943 EndPrepare(GlobalTransaction gxact)
945 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
946 TransactionId xid = pgxact->xid;
947 TwoPhaseFileHeader *hdr;
948 char path[MAXPGPATH];
950 pg_crc32 statefile_crc;
954 /* Add the end sentinel to the list of 2PC records */
955 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
958 /* Go back and fill in total_len in the file header record */
959 hdr = (TwoPhaseFileHeader *) records.head->data;
960 Assert(hdr->magic == TWOPHASE_MAGIC);
961 hdr->total_len = records.total_len + sizeof(pg_crc32);
964 * If the file size exceeds MaxAllocSize, we won't be able to read it in
965 * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
967 if (hdr->total_len > MaxAllocSize)
969 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
970 errmsg("two-phase state file maximum length exceeded")));
973 * Create the 2PC state file.
975 TwoPhaseFilePath(path, xid);
977 fd = OpenTransientFile(path,
978 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
982 (errcode_for_file_access(),
983 errmsg("could not create two-phase state file \"%s\": %m",
986 /* Write data to file, and calculate CRC as we pass over it */
987 INIT_CRC32(statefile_crc);
989 for (record = records.head; record != NULL; record = record->next)
991 COMP_CRC32(statefile_crc, record->data, record->len);
992 if ((write(fd, record->data, record->len)) != record->len)
994 CloseTransientFile(fd);
996 (errcode_for_file_access(),
997 errmsg("could not write two-phase state file: %m")));
1001 FIN_CRC32(statefile_crc);
1004 * Write a deliberately bogus CRC to the state file; this is just paranoia
1005 * to catch the case where four more bytes will run us out of disk space.
1007 bogus_crc = ~statefile_crc;
1009 if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
1011 CloseTransientFile(fd);
1013 (errcode_for_file_access(),
1014 errmsg("could not write two-phase state file: %m")));
1017 /* Back up to prepare for rewriting the CRC */
1018 if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
1020 CloseTransientFile(fd);
1022 (errcode_for_file_access(),
1023 errmsg("could not seek in two-phase state file: %m")));
1027 * The state file isn't valid yet, because we haven't written the correct
1028 * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
1030 * Between the time we have written the WAL entry and the time we write
1031 * out the correct state file CRC, we have an inconsistency: the xact is
1032 * prepared according to WAL but not according to our on-disk state. We
1033 * use a critical section to force a PANIC if we are unable to complete
1034 * the write --- then, WAL replay should repair the inconsistency. The
1035 * odds of a PANIC actually occurring should be very tiny given that we
1036 * were able to write the bogus CRC above.
1038 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1039 * immediately after the WAL record is inserted could complete without
1040 * fsync'ing our state file. (This is essentially the same kind of race
1041 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1042 * uses delayChkpt for; see notes there.)
1044 * We save the PREPARE record's location in the gxact for later use by
1045 * CheckPointTwoPhase.
1047 START_CRIT_SECTION();
1049 MyPgXact->delayChkpt = true;
1051 gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
1053 XLogFlush(gxact->prepare_lsn);
1055 /* If we crash now, we have prepared: WAL replay will fix things */
1057 /* write correct CRC and close file */
1058 if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
1060 CloseTransientFile(fd);
1062 (errcode_for_file_access(),
1063 errmsg("could not write two-phase state file: %m")));
1066 if (CloseTransientFile(fd) != 0)
1068 (errcode_for_file_access(),
1069 errmsg("could not close two-phase state file: %m")));
1072 * Mark the prepared transaction as valid. As soon as xact.c marks
1073 * MyPgXact as not running our XID (which it will do immediately after
1074 * this function returns), others can commit/rollback the xact.
1076 * NB: a side effect of this is to make a dummy ProcArray entry for the
1077 * prepared XID. This must happen before we clear the XID from MyPgXact,
1078 * else there is a window where the XID is not running according to
1079 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1080 * the xact crashed. Instead we have a window where the same XID appears
1081 * twice in ProcArray, which is OK.
1083 MarkAsPrepared(gxact);
1086 * Now we can mark ourselves as out of the commit critical section: a
1087 * checkpoint starting after this will certainly see the gxact as a
1088 * candidate for fsyncing.
1090 MyPgXact->delayChkpt = false;
1095 * Wait for synchronous replication, if required.
1097 * Note that at this stage we have marked the prepare, but still show as
1098 * running in the procarray (twice!) and continue to hold locks.
1100 SyncRepWaitForLSN(gxact->prepare_lsn);
1102 records.tail = records.head = NULL;
1106 * Register a 2PC record to be written to state file.
1109 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1110 const void *data, uint32 len)
1112 TwoPhaseRecordOnDisk record;
1117 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1119 save_state_data(data, len);
1124 * Read and validate the state file for xid.
1126 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1127 * contents of the file. Otherwise return NULL.
1130 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1132 char path[MAXPGPATH];
1134 TwoPhaseFileHeader *hdr;
1141 TwoPhaseFilePath(path, xid);
1143 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1148 (errcode_for_file_access(),
1149 errmsg("could not open two-phase state file \"%s\": %m",
1155 * Check file length. We can determine a lower bound pretty easily. We
1156 * set an upper bound to avoid palloc() failure on a corrupt file, though
1157 * we can't guarantee that we won't get an out of memory error anyway,
1158 * even on a valid file.
1160 if (fstat(fd, &stat))
1162 CloseTransientFile(fd);
1165 (errcode_for_file_access(),
1166 errmsg("could not stat two-phase state file \"%s\": %m",
1171 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1172 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1173 sizeof(pg_crc32)) ||
1174 stat.st_size > MaxAllocSize)
1176 CloseTransientFile(fd);
1180 crc_offset = stat.st_size - sizeof(pg_crc32);
1181 if (crc_offset != MAXALIGN(crc_offset))
1183 CloseTransientFile(fd);
1188 * OK, slurp in the file.
1190 buf = (char *) palloc(stat.st_size);
1192 if (read(fd, buf, stat.st_size) != stat.st_size)
1194 CloseTransientFile(fd);
1197 (errcode_for_file_access(),
1198 errmsg("could not read two-phase state file \"%s\": %m",
1204 CloseTransientFile(fd);
1206 hdr = (TwoPhaseFileHeader *) buf;
1207 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1213 INIT_CRC32(calc_crc);
1214 COMP_CRC32(calc_crc, buf, crc_offset);
1215 FIN_CRC32(calc_crc);
1217 file_crc = *((pg_crc32 *) (buf + crc_offset));
1219 if (!EQ_CRC32(calc_crc, file_crc))
1229 * Confirms an xid is prepared, during recovery
1232 StandbyTransactionIdIsPrepared(TransactionId xid)
1235 TwoPhaseFileHeader *hdr;
1238 Assert(TransactionIdIsValid(xid));
1240 if (max_prepared_xacts <= 0)
1241 return false; /* nothing to do */
1243 /* Read and validate file */
1244 buf = ReadTwoPhaseFile(xid, false);
1248 /* Check header also */
1249 hdr = (TwoPhaseFileHeader *) buf;
1250 result = TransactionIdEquals(hdr->xid, xid);
1257 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1260 FinishPreparedTransaction(const char *gid, bool isCommit)
1262 GlobalTransaction gxact;
1268 TwoPhaseFileHeader *hdr;
1269 TransactionId latestXid;
1270 TransactionId *children;
1271 RelFileNode *commitrels;
1272 RelFileNode *abortrels;
1273 RelFileNode *delrels;
1275 SharedInvalidationMessage *invalmsgs;
1279 * Validate the GID, and lock the GXACT to ensure that two backends do not
1280 * try to commit the same GID at once.
1282 gxact = LockGXact(gid, GetUserId());
1283 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1284 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1288 * Read and validate the state file
1290 buf = ReadTwoPhaseFile(xid, true);
1293 (errcode(ERRCODE_DATA_CORRUPTED),
1294 errmsg("two-phase state file for transaction %u is corrupt",
1298 * Disassemble the header area
1300 hdr = (TwoPhaseFileHeader *) buf;
1301 Assert(TransactionIdEquals(hdr->xid, xid));
1302 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1303 children = (TransactionId *) bufptr;
1304 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1305 commitrels = (RelFileNode *) bufptr;
1306 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1307 abortrels = (RelFileNode *) bufptr;
1308 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1309 invalmsgs = (SharedInvalidationMessage *) bufptr;
1310 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1312 /* compute latestXid among all children */
1313 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1316 * The order of operations here is critical: make the XLOG entry for
1317 * commit or abort, then mark the transaction committed or aborted in
1318 * pg_clog, then remove its PGPROC from the global ProcArray (which means
1319 * TransactionIdIsInProgress will stop saying the prepared xact is in
1320 * progress), then run the post-commit or post-abort callbacks. The
1321 * callbacks will release the locks the transaction held.
1324 RecordTransactionCommitPrepared(xid,
1325 hdr->nsubxacts, children,
1326 hdr->ncommitrels, commitrels,
1327 hdr->ninvalmsgs, invalmsgs,
1328 hdr->initfileinval);
1330 RecordTransactionAbortPrepared(xid,
1331 hdr->nsubxacts, children,
1332 hdr->nabortrels, abortrels);
1334 ProcArrayRemove(proc, latestXid);
1337 * In case we fail while running the callbacks, mark the gxact invalid so
1338 * no one else will try to commit/rollback, and so it can be recycled
1339 * properly later. It is still locked by our XID so it won't go away yet.
1341 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1343 gxact->valid = false;
1346 * We have to remove any files that were supposed to be dropped. For
1347 * consistency with the regular xact.c code paths, must do this before
1348 * releasing locks, so do it before running the callbacks.
1350 * NB: this code knows that we couldn't be dropping any temp rels ...
1354 delrels = commitrels;
1355 ndelrels = hdr->ncommitrels;
1359 delrels = abortrels;
1360 ndelrels = hdr->nabortrels;
1362 for (i = 0; i < ndelrels; i++)
1364 SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
1366 smgrdounlink(srel, false);
1371 * Handle cache invalidation messages.
1373 * Relcache init file invalidation requires processing both before and
1374 * after we send the SI messages. See AtEOXact_Inval()
1376 if (hdr->initfileinval)
1377 RelationCacheInitFilePreInvalidate();
1378 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1379 if (hdr->initfileinval)
1380 RelationCacheInitFilePostInvalidate();
1382 /* And now do the callbacks */
1384 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1386 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1388 PredicateLockTwoPhaseFinish(xid, isCommit);
1390 /* Count the prepared xact as committed or aborted */
1391 AtEOXact_PgStat(isCommit);
1394 * And now we can clean up our mess.
1396 RemoveTwoPhaseFile(xid, true);
1404 * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1405 * and call the indicated callbacks for each 2PC record.
1408 ProcessRecords(char *bufptr, TransactionId xid,
1409 const TwoPhaseCallback callbacks[])
1413 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1415 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1416 if (record->rmid == TWOPHASE_RM_END_ID)
1419 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1421 if (callbacks[record->rmid] != NULL)
1422 callbacks[record->rmid] (xid, record->info,
1423 (void *) bufptr, record->len);
1425 bufptr += MAXALIGN(record->len);
1430 * Remove the 2PC file for the specified XID.
1432 * If giveWarning is false, do not complain about file-not-present;
1433 * this is an expected case during WAL replay.
1436 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1438 char path[MAXPGPATH];
1440 TwoPhaseFilePath(path, xid);
1442 if (errno != ENOENT || giveWarning)
1444 (errcode_for_file_access(),
1445 errmsg("could not remove two-phase state file \"%s\": %m",
1450 * Recreates a state file. This is used in WAL replay.
1452 * Note: content and len don't include CRC.
1455 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1457 char path[MAXPGPATH];
1458 pg_crc32 statefile_crc;
1462 INIT_CRC32(statefile_crc);
1463 COMP_CRC32(statefile_crc, content, len);
1464 FIN_CRC32(statefile_crc);
1466 TwoPhaseFilePath(path, xid);
1468 fd = OpenTransientFile(path,
1469 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1473 (errcode_for_file_access(),
1474 errmsg("could not recreate two-phase state file \"%s\": %m",
1477 /* Write content and CRC */
1478 if (write(fd, content, len) != len)
1480 CloseTransientFile(fd);
1482 (errcode_for_file_access(),
1483 errmsg("could not write two-phase state file: %m")));
1485 if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1487 CloseTransientFile(fd);
1489 (errcode_for_file_access(),
1490 errmsg("could not write two-phase state file: %m")));
1494 * We must fsync the file because the end-of-replay checkpoint will not do
1495 * so, there being no GXACT in shared memory yet to tell it to.
1497 if (pg_fsync(fd) != 0)
1499 CloseTransientFile(fd);
1501 (errcode_for_file_access(),
1502 errmsg("could not fsync two-phase state file: %m")));
1505 if (CloseTransientFile(fd) != 0)
1507 (errcode_for_file_access(),
1508 errmsg("could not close two-phase state file: %m")));
1512 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1514 * We must fsync the state file of any GXACT that is valid and has a PREPARE
1515 * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
1516 * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1518 * This is deliberately run as late as possible in the checkpoint sequence,
1519 * because GXACTs ordinarily have short lifespans, and so it is quite
1520 * possible that GXACTs that were valid at checkpoint start will no longer
1521 * exist if we wait a little bit.
1523 * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1524 * each time. This is considered unusual enough that we don't bother to
1525 * expend any extra code to avoid the redundant fsyncs. (They should be
1526 * reasonably cheap anyway, since they won't cause I/O.)
1529 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1531 TransactionId *xids;
1533 char path[MAXPGPATH];
1537 * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1538 * it just long enough to make a list of the XIDs that require fsyncing,
1539 * and then do the I/O afterwards.
1541 * This approach creates a race condition: someone else could delete a
1542 * GXACT between the time we release TwoPhaseStateLock and the time we try
1543 * to open its state file. We handle this by special-casing ENOENT
1544 * failures: if we see that, we verify that the GXACT is no longer valid,
1545 * and if so ignore the failure.
1547 if (max_prepared_xacts <= 0)
1548 return; /* nothing to do */
1550 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1552 xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1555 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1557 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1559 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1560 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1563 gxact->prepare_lsn <= redo_horizon)
1564 xids[nxids++] = pgxact->xid;
1567 LWLockRelease(TwoPhaseStateLock);
1569 for (i = 0; i < nxids; i++)
1571 TransactionId xid = xids[i];
1574 TwoPhaseFilePath(path, xid);
1576 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1579 if (errno == ENOENT)
1581 /* OK if gxact is no longer valid */
1582 if (!TransactionIdIsPrepared(xid))
1584 /* Restore errno in case it was changed */
1588 (errcode_for_file_access(),
1589 errmsg("could not open two-phase state file \"%s\": %m",
1593 if (pg_fsync(fd) != 0)
1595 CloseTransientFile(fd);
1597 (errcode_for_file_access(),
1598 errmsg("could not fsync two-phase state file \"%s\": %m",
1602 if (CloseTransientFile(fd) != 0)
1604 (errcode_for_file_access(),
1605 errmsg("could not close two-phase state file \"%s\": %m",
1611 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1615 * PrescanPreparedTransactions
1617 * Scan the pg_twophase directory and determine the range of valid XIDs
1618 * present. This is run during database startup, after we have completed
1619 * reading WAL. ShmemVariableCache->nextXid has been set to one more than
1620 * the highest XID for which evidence exists in WAL.
1622 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1623 * are present, it suggests that the DBA has done a PITR recovery to an
1624 * earlier point in time without cleaning out pg_twophase. We dare not
1625 * try to recover such prepared xacts since they likely depend on database
1626 * state that doesn't exist now.
1628 * However, we will advance nextXid beyond any subxact XIDs belonging to
1629 * valid prepared xacts. We need to do this since subxact commit doesn't
1630 * write a WAL entry, and so there might be no evidence in WAL of those
1633 * Our other responsibility is to determine and return the oldest valid XID
1634 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1635 * This is needed to synchronize pg_subtrans startup properly.
1637 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1638 * top-level xids is stored in *xids_p. The number of entries in the array
1639 * is returned in *nxids_p.
1642 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1644 TransactionId origNextXid = ShmemVariableCache->nextXid;
1645 TransactionId result = origNextXid;
1647 struct dirent *clde;
1648 TransactionId *xids = NULL;
1652 cldir = AllocateDir(TWOPHASE_DIR);
1653 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1655 if (strlen(clde->d_name) == 8 &&
1656 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1660 TwoPhaseFileHeader *hdr;
1661 TransactionId *subxids;
1664 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1666 /* Reject XID if too new */
1667 if (TransactionIdFollowsOrEquals(xid, origNextXid))
1670 (errmsg("removing future two-phase state file \"%s\"",
1672 RemoveTwoPhaseFile(xid, true);
1677 * Note: we can't check if already processed because clog
1678 * subsystem isn't up yet.
1681 /* Read and validate file */
1682 buf = ReadTwoPhaseFile(xid, true);
1686 (errmsg("removing corrupt two-phase state file \"%s\"",
1688 RemoveTwoPhaseFile(xid, true);
1692 /* Deconstruct header */
1693 hdr = (TwoPhaseFileHeader *) buf;
1694 if (!TransactionIdEquals(hdr->xid, xid))
1697 (errmsg("removing corrupt two-phase state file \"%s\"",
1699 RemoveTwoPhaseFile(xid, true);
1705 * OK, we think this file is valid. Incorporate xid into the
1706 * running-minimum result.
1708 if (TransactionIdPrecedes(xid, result))
1712 * Examine subtransaction XIDs ... they should all follow main
1713 * XID, and they may force us to advance nextXid.
1715 * We don't expect anyone else to modify nextXid, hence we don't
1716 * need to hold a lock while examining it. We still acquire the
1717 * lock to modify it, though.
1719 subxids = (TransactionId *)
1720 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1721 for (i = 0; i < hdr->nsubxacts; i++)
1723 TransactionId subxid = subxids[i];
1725 Assert(TransactionIdFollows(subxid, xid));
1726 if (TransactionIdFollowsOrEquals(subxid,
1727 ShmemVariableCache->nextXid))
1729 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
1730 ShmemVariableCache->nextXid = subxid;
1731 TransactionIdAdvance(ShmemVariableCache->nextXid);
1732 LWLockRelease(XidGenLock);
1739 if (nxids == allocsize)
1744 xids = palloc(allocsize * sizeof(TransactionId));
1748 allocsize = allocsize * 2;
1749 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1752 xids[nxids++] = xid;
1770 * StandbyRecoverPreparedTransactions
1772 * Scan the pg_twophase directory and setup all the required information to
1773 * allow standby queries to treat prepared transactions as still active.
1774 * This is never called at the end of recovery - we use
1775 * RecoverPreparedTransactions() at that point.
1777 * Currently we simply call SubTransSetParent() for any subxids of prepared
1778 * transactions. If overwriteOK is true, it's OK if some XIDs have already
1779 * been marked in pg_subtrans.
1782 StandbyRecoverPreparedTransactions(bool overwriteOK)
1785 struct dirent *clde;
1787 cldir = AllocateDir(TWOPHASE_DIR);
1788 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1790 if (strlen(clde->d_name) == 8 &&
1791 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1795 TwoPhaseFileHeader *hdr;
1796 TransactionId *subxids;
1799 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1801 /* Already processed? */
1802 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1805 (errmsg("removing stale two-phase state file \"%s\"",
1807 RemoveTwoPhaseFile(xid, true);
1811 /* Read and validate file */
1812 buf = ReadTwoPhaseFile(xid, true);
1816 (errmsg("removing corrupt two-phase state file \"%s\"",
1818 RemoveTwoPhaseFile(xid, true);
1822 /* Deconstruct header */
1823 hdr = (TwoPhaseFileHeader *) buf;
1824 if (!TransactionIdEquals(hdr->xid, xid))
1827 (errmsg("removing corrupt two-phase state file \"%s\"",
1829 RemoveTwoPhaseFile(xid, true);
1835 * Examine subtransaction XIDs ... they should all follow main
1838 subxids = (TransactionId *)
1839 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1840 for (i = 0; i < hdr->nsubxacts; i++)
1842 TransactionId subxid = subxids[i];
1844 Assert(TransactionIdFollows(subxid, xid));
1845 SubTransSetParent(xid, subxid, overwriteOK);
1853 * RecoverPreparedTransactions
1855 * Scan the pg_twophase directory and reload shared-memory state for each
1856 * prepared transaction (reacquire locks, etc). This is run during database
1860 RecoverPreparedTransactions(void)
1862 char dir[MAXPGPATH];
1864 struct dirent *clde;
1865 bool overwriteOK = false;
1867 snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1869 cldir = AllocateDir(dir);
1870 while ((clde = ReadDir(cldir, dir)) != NULL)
1872 if (strlen(clde->d_name) == 8 &&
1873 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1878 TwoPhaseFileHeader *hdr;
1879 TransactionId *subxids;
1880 GlobalTransaction gxact;
1883 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1885 /* Already processed? */
1886 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1889 (errmsg("removing stale two-phase state file \"%s\"",
1891 RemoveTwoPhaseFile(xid, true);
1895 /* Read and validate file */
1896 buf = ReadTwoPhaseFile(xid, true);
1900 (errmsg("removing corrupt two-phase state file \"%s\"",
1902 RemoveTwoPhaseFile(xid, true);
1907 (errmsg("recovering prepared transaction %u", xid)));
1909 /* Deconstruct header */
1910 hdr = (TwoPhaseFileHeader *) buf;
1911 Assert(TransactionIdEquals(hdr->xid, xid));
1912 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1913 subxids = (TransactionId *) bufptr;
1914 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1915 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1916 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1917 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1920 * It's possible that SubTransSetParent has been set before, if
1921 * the prepared transaction generated xid assignment records. Test
1922 * here must match one used in AssignTransactionId().
1924 if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
1925 XLogLogicalInfoActive()))
1929 * Reconstruct subtrans state for the transaction --- needed
1930 * because pg_subtrans is not preserved over a restart. Note that
1931 * we are linking all the subtransactions directly to the
1932 * top-level XID; there may originally have been a more complex
1933 * hierarchy, but there's no need to restore that exactly.
1935 for (i = 0; i < hdr->nsubxacts; i++)
1936 SubTransSetParent(subxids[i], xid, overwriteOK);
1939 * Recreate its GXACT and dummy PGPROC
1941 * Note: since we don't have the PREPARE record's WAL location at
1942 * hand, we leave prepare_lsn zeroes. This means the GXACT will
1943 * be fsync'd on every future checkpoint. We assume this
1944 * situation is infrequent enough that the performance cost is
1945 * negligible (especially since we know the state file has already
1948 gxact = MarkAsPreparing(xid, hdr->gid,
1950 hdr->owner, hdr->database);
1951 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1952 MarkAsPrepared(gxact);
1955 * Recover other state (notably locks) using resource managers
1957 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1960 * Release locks held by the standby process after we process each
1961 * prepared transaction. As a result, we don't need too many
1962 * additional locks at any one time.
1965 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
1974 * RecordTransactionCommitPrepared
1976 * This is basically the same as RecordTransactionCommit: in particular,
1977 * we must set the delayChkpt flag to avoid a race condition.
1979 * We know the transaction made at least one XLOG entry (its PREPARE),
1980 * so it is never possible to optimize out the commit record.
1983 RecordTransactionCommitPrepared(TransactionId xid,
1985 TransactionId *children,
1989 SharedInvalidationMessage *invalmsgs,
1992 XLogRecData rdata[4];
1994 xl_xact_commit_prepared xlrec;
1997 START_CRIT_SECTION();
1999 /* See notes in RecordTransactionCommit */
2000 MyPgXact->delayChkpt = true;
2002 /* Emit the XLOG commit record */
2004 xlrec.crec.xact_time = GetCurrentTimestamp();
2005 xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
2006 xlrec.crec.nmsgs = 0;
2007 xlrec.crec.nrels = nrels;
2008 xlrec.crec.nsubxacts = nchildren;
2009 xlrec.crec.nmsgs = ninvalmsgs;
2011 rdata[0].data = (char *) (&xlrec);
2012 rdata[0].len = MinSizeOfXactCommitPrepared;
2013 rdata[0].buffer = InvalidBuffer;
2014 /* dump rels to delete */
2017 rdata[0].next = &(rdata[1]);
2018 rdata[1].data = (char *) rels;
2019 rdata[1].len = nrels * sizeof(RelFileNode);
2020 rdata[1].buffer = InvalidBuffer;
2023 /* dump committed child Xids */
2026 rdata[lastrdata].next = &(rdata[2]);
2027 rdata[2].data = (char *) children;
2028 rdata[2].len = nchildren * sizeof(TransactionId);
2029 rdata[2].buffer = InvalidBuffer;
2032 /* dump cache invalidation messages */
2035 rdata[lastrdata].next = &(rdata[3]);
2036 rdata[3].data = (char *) invalmsgs;
2037 rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
2038 rdata[3].buffer = InvalidBuffer;
2041 rdata[lastrdata].next = NULL;
2043 recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
2046 * We don't currently try to sleep before flush here ... nor is there any
2047 * support for async commit of a prepared xact (the very idea is probably
2051 /* Flush XLOG to disk */
2054 /* Mark the transaction committed in pg_clog */
2055 TransactionIdCommitTree(xid, nchildren, children);
2057 /* Checkpoint can proceed now */
2058 MyPgXact->delayChkpt = false;
2063 * Wait for synchronous replication, if required.
2065 * Note that at this stage we have marked clog, but still show as running
2066 * in the procarray and continue to hold locks.
2068 SyncRepWaitForLSN(recptr);
2072 * RecordTransactionAbortPrepared
2074 * This is basically the same as RecordTransactionAbort.
2076 * We know the transaction made at least one XLOG entry (its PREPARE),
2077 * so it is never possible to optimize out the abort record.
2080 RecordTransactionAbortPrepared(TransactionId xid,
2082 TransactionId *children,
2086 XLogRecData rdata[3];
2088 xl_xact_abort_prepared xlrec;
2092 * Catch the scenario where we aborted partway through
2093 * RecordTransactionCommitPrepared ...
2095 if (TransactionIdDidCommit(xid))
2096 elog(PANIC, "cannot abort transaction %u, it was already committed",
2099 START_CRIT_SECTION();
2101 /* Emit the XLOG abort record */
2103 xlrec.arec.xact_time = GetCurrentTimestamp();
2104 xlrec.arec.nrels = nrels;
2105 xlrec.arec.nsubxacts = nchildren;
2106 rdata[0].data = (char *) (&xlrec);
2107 rdata[0].len = MinSizeOfXactAbortPrepared;
2108 rdata[0].buffer = InvalidBuffer;
2109 /* dump rels to delete */
2112 rdata[0].next = &(rdata[1]);
2113 rdata[1].data = (char *) rels;
2114 rdata[1].len = nrels * sizeof(RelFileNode);
2115 rdata[1].buffer = InvalidBuffer;
2118 /* dump committed child Xids */
2121 rdata[lastrdata].next = &(rdata[2]);
2122 rdata[2].data = (char *) children;
2123 rdata[2].len = nchildren * sizeof(TransactionId);
2124 rdata[2].buffer = InvalidBuffer;
2127 rdata[lastrdata].next = NULL;
2129 recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
2131 /* Always flush, since we're about to remove the 2PC state file */
2135 * Mark the transaction aborted in clog. This is not absolutely necessary
2136 * but we may as well do it while we are here.
2138 TransactionIdAbortTree(xid, nchildren, children);
2143 * Wait for synchronous replication, if required.
2145 * Note that at this stage we have marked clog, but still show as running
2146 * in the procarray and continue to hold locks.
2148 SyncRepWaitForLSN(recptr);