1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/twophase.c
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25 * what keeps the XID considered running by TransactionIdIsInProgress.
26 * It is also convenient as a PGPROC to hook the gxact's locks to.
28 * In order to survive crashes and shutdowns, all prepared
29 * transactions must be stored in permanent storage. This includes
30 * locking information, pending notifications etc. All that state
31 * information is written to the per-transaction state file in
32 * the pg_twophase directory.
34 *-------------------------------------------------------------------------
40 #include <sys/types.h>
44 #include "access/htup_details.h"
45 #include "access/subtrans.h"
46 #include "access/transam.h"
47 #include "access/twophase.h"
48 #include "access/twophase_rmgr.h"
49 #include "access/xact.h"
50 #include "access/xlog.h"
51 #include "access/xloginsert.h"
52 #include "access/xlogutils.h"
53 #include "catalog/pg_type.h"
54 #include "catalog/storage.h"
56 #include "miscadmin.h"
59 #include "replication/walsender.h"
60 #include "replication/syncrep.h"
61 #include "storage/fd.h"
62 #include "storage/ipc.h"
63 #include "storage/predicate.h"
64 #include "storage/proc.h"
65 #include "storage/procarray.h"
66 #include "storage/sinvaladt.h"
67 #include "storage/smgr.h"
68 #include "utils/builtins.h"
69 #include "utils/memutils.h"
70 #include "utils/timestamp.h"
74 * Directory where Two-phase commit files reside within PGDATA
76 #define TWOPHASE_DIR "pg_twophase"
78 /* GUC variable, can't be changed after startup */
79 int max_prepared_xacts = 0;
82 * This struct describes one global transaction that is in prepared state
83 * or attempting to become prepared.
85 * The lifecycle of a global transaction is:
87 * 1. After checking that the requested GID is not in use, set up an entry in
88 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
89 * and mark it as locked by my backend.
91 * 2. After successfully completing prepare, set valid = true and enter the
92 * referenced PGPROC into the global ProcArray.
94 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
95 * valid and not locked, then mark the entry as locked by storing my current
96 * backend ID into locking_backend. This prevents concurrent attempts to
97 * commit or rollback the same prepared xact.
99 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
100 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
103 * Note that if the preparing transaction fails between steps 1 and 2, the
104 * entry must be removed so that the GID and the GlobalTransaction struct
105 * can be reused. See AtAbort_Twophase().
107 * typedef struct GlobalTransactionData *GlobalTransaction appears in
112 typedef struct GlobalTransactionData
114 GlobalTransaction next; /* list link for free list */
115 int pgprocno; /* ID of associated dummy PGPROC */
116 BackendId dummyBackendId; /* similar to backend id for backends */
117 TimestampTz prepared_at; /* time of preparation */
118 XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
119 Oid owner; /* ID of user that executed the xact */
120 BackendId locking_backend; /* backend currently working on the xact */
121 bool valid; /* TRUE if PGPROC entry is in proc array */
122 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
123 } GlobalTransactionData;
126 * Two Phase Commit shared state. Access to this struct is protected
127 * by TwoPhaseStateLock.
129 typedef struct TwoPhaseStateData
131 /* Head of linked list of free GlobalTransactionData structs */
132 GlobalTransaction freeGXacts;
134 /* Number of valid prepXacts entries. */
137 /* There are max_prepared_xacts items in this array */
138 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
141 static TwoPhaseStateData *TwoPhaseState;
144 * Global transaction entry currently locked by us, if any.
146 static GlobalTransaction MyLockedGxact = NULL;
148 static bool twophaseExitRegistered = false;
150 static void RecordTransactionCommitPrepared(TransactionId xid,
152 TransactionId *children,
156 SharedInvalidationMessage *invalmsgs,
158 static void RecordTransactionAbortPrepared(TransactionId xid,
160 TransactionId *children,
163 static void ProcessRecords(char *bufptr, TransactionId xid,
164 const TwoPhaseCallback callbacks[]);
165 static void RemoveGXact(GlobalTransaction gxact);
169 * Initialization of shared memory
172 TwoPhaseShmemSize(void)
176 /* Need the fixed struct, the array of pointers, and the GTD structs */
177 size = offsetof(TwoPhaseStateData, prepXacts);
178 size = add_size(size, mul_size(max_prepared_xacts,
179 sizeof(GlobalTransaction)));
180 size = MAXALIGN(size);
181 size = add_size(size, mul_size(max_prepared_xacts,
182 sizeof(GlobalTransactionData)));
188 TwoPhaseShmemInit(void)
192 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
195 if (!IsUnderPostmaster)
197 GlobalTransaction gxacts;
201 TwoPhaseState->freeGXacts = NULL;
202 TwoPhaseState->numPrepXacts = 0;
205 * Initialize the linked list of free GlobalTransactionData structs
207 gxacts = (GlobalTransaction)
208 ((char *) TwoPhaseState +
209 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
210 sizeof(GlobalTransaction) * max_prepared_xacts));
211 for (i = 0; i < max_prepared_xacts; i++)
213 /* insert into linked list */
214 gxacts[i].next = TwoPhaseState->freeGXacts;
215 TwoPhaseState->freeGXacts = &gxacts[i];
217 /* associate it with a PGPROC assigned by InitProcGlobal */
218 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
221 * Assign a unique ID for each dummy proc, so that the range of
222 * dummy backend IDs immediately follows the range of normal
223 * backend IDs. We don't dare to assign a real backend ID to dummy
224 * procs, because prepared transactions don't take part in cache
225 * invalidation like a real backend ID would imply, but having a
226 * unique ID for them is nevertheless handy. This arrangement
227 * allows you to allocate an array of size (MaxBackends +
228 * max_prepared_xacts + 1), and have a slot for every backend and
229 * prepared transaction. Currently multixact.c uses that
232 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
240 * Exit hook to unlock the global transaction entry we're working on.
243 AtProcExit_Twophase(int code, Datum arg)
245 /* same logic as abort */
250 * Abort hook to unlock the global transaction entry we're working on.
253 AtAbort_Twophase(void)
255 if (MyLockedGxact == NULL)
259 * What to do with the locked global transaction entry? If we were in
260 * the process of preparing the transaction, but haven't written the WAL
261 * record and state file yet, the transaction must not be considered as
262 * prepared. Likewise, if we are in the process of finishing an
263 * already-prepared transaction, and fail after having already written
264 * the 2nd phase commit or rollback record to the WAL, the transaction
265 * should not be considered as prepared anymore. In those cases, just
266 * remove the entry from shared memory.
268 * Otherwise, the entry must be left in place so that the transaction
269 * can be finished later, so just unlock it.
271 * If we abort during prepare, after having written the WAL record, we
272 * might not have transfered all locks and other state to the prepared
273 * transaction yet. Likewise, if we abort during commit or rollback,
274 * after having written the WAL record, we might not have released
275 * all the resources held by the transaction yet. In those cases, the
276 * in-memory state can be wrong, but it's too late to back out.
278 if (!MyLockedGxact->valid)
280 RemoveGXact(MyLockedGxact);
284 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
286 MyLockedGxact->locking_backend = InvalidBackendId;
288 LWLockRelease(TwoPhaseStateLock);
290 MyLockedGxact = NULL;
294 * This is called after we have finished transfering state to the prepared
298 PostPrepare_Twophase()
300 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
301 MyLockedGxact->locking_backend = InvalidBackendId;
302 LWLockRelease(TwoPhaseStateLock);
304 MyLockedGxact = NULL;
310 * Reserve the GID for the given transaction.
312 * Internally, this creates a gxact struct and puts it into the active array.
313 * NOTE: this is also used when reloading a gxact after a crash; so avoid
314 * assuming that we can use very much backend context.
317 MarkAsPreparing(TransactionId xid, const char *gid,
318 TimestampTz prepared_at, Oid owner, Oid databaseid)
320 GlobalTransaction gxact;
325 if (strlen(gid) >= GIDSIZE)
327 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
328 errmsg("transaction identifier \"%s\" is too long",
331 /* fail immediately if feature is disabled */
332 if (max_prepared_xacts == 0)
334 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
335 errmsg("prepared transactions are disabled"),
336 errhint("Set max_prepared_transactions to a nonzero value.")));
338 /* on first call, register the exit hook */
339 if (!twophaseExitRegistered)
341 before_shmem_exit(AtProcExit_Twophase, 0);
342 twophaseExitRegistered = true;
345 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
347 /* Check for conflicting GID */
348 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
350 gxact = TwoPhaseState->prepXacts[i];
351 if (strcmp(gxact->gid, gid) == 0)
354 (errcode(ERRCODE_DUPLICATE_OBJECT),
355 errmsg("transaction identifier \"%s\" is already in use",
360 /* Get a free gxact from the freelist */
361 if (TwoPhaseState->freeGXacts == NULL)
363 (errcode(ERRCODE_OUT_OF_MEMORY),
364 errmsg("maximum number of prepared transactions reached"),
365 errhint("Increase max_prepared_transactions (currently %d).",
366 max_prepared_xacts)));
367 gxact = TwoPhaseState->freeGXacts;
368 TwoPhaseState->freeGXacts = gxact->next;
370 proc = &ProcGlobal->allProcs[gxact->pgprocno];
371 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
373 /* Initialize the PGPROC entry */
374 MemSet(proc, 0, sizeof(PGPROC));
375 proc->pgprocno = gxact->pgprocno;
376 SHMQueueElemInit(&(proc->links));
377 proc->waitStatus = STATUS_OK;
378 /* We set up the gxact's VXID as InvalidBackendId/XID */
379 proc->lxid = (LocalTransactionId) xid;
381 pgxact->xmin = InvalidTransactionId;
382 pgxact->delayChkpt = false;
383 pgxact->vacuumFlags = 0;
385 proc->backendId = InvalidBackendId;
386 proc->databaseId = databaseid;
387 proc->roleId = owner;
388 proc->lwWaiting = false;
389 proc->lwWaitMode = 0;
390 proc->waitLock = NULL;
391 proc->waitProcLock = NULL;
392 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
393 SHMQueueInit(&(proc->myProcLocks[i]));
394 /* subxid data must be filled later by GXactLoadSubxactData */
395 pgxact->overflowed = false;
398 gxact->prepared_at = prepared_at;
399 /* initialize LSN to 0 (start of WAL) */
400 gxact->prepare_lsn = 0;
401 gxact->owner = owner;
402 gxact->locking_backend = MyBackendId;
403 gxact->valid = false;
404 strcpy(gxact->gid, gid);
406 /* And insert it into the active array */
407 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
408 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
411 * Remember that we have this GlobalTransaction entry locked for us.
412 * If we abort after this, we must release it.
414 MyLockedGxact = gxact;
416 LWLockRelease(TwoPhaseStateLock);
422 * GXactLoadSubxactData
424 * If the transaction being persisted had any subtransactions, this must
425 * be called before MarkAsPrepared() to load information into the dummy
429 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
430 TransactionId *children)
432 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
433 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
435 /* We need no extra lock since the GXACT isn't valid yet */
436 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
438 pgxact->overflowed = true;
439 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
443 memcpy(proc->subxids.xids, children,
444 nsubxacts * sizeof(TransactionId));
445 pgxact->nxids = nsubxacts;
451 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
454 MarkAsPrepared(GlobalTransaction gxact)
456 /* Lock here may be overkill, but I'm not convinced of that ... */
457 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
458 Assert(!gxact->valid);
460 LWLockRelease(TwoPhaseStateLock);
463 * Put it into the global ProcArray so TransactionIdIsInProgress considers
464 * the XID as still running.
466 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
471 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
473 static GlobalTransaction
474 LockGXact(const char *gid, Oid user)
478 /* on first call, register the exit hook */
479 if (!twophaseExitRegistered)
481 before_shmem_exit(AtProcExit_Twophase, 0);
482 twophaseExitRegistered = true;
485 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
487 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
489 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
490 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
492 /* Ignore not-yet-valid GIDs */
495 if (strcmp(gxact->gid, gid) != 0)
498 /* Found it, but has someone else got it locked? */
499 if (gxact->locking_backend != InvalidBackendId)
501 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
502 errmsg("prepared transaction with identifier \"%s\" is busy",
505 if (user != gxact->owner && !superuser_arg(user))
507 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
508 errmsg("permission denied to finish prepared transaction"),
509 errhint("Must be superuser or the user that prepared the transaction.")));
512 * Note: it probably would be possible to allow committing from
513 * another database; but at the moment NOTIFY is known not to work and
514 * there may be some other issues as well. Hence disallow until
515 * someone gets motivated to make it work.
517 if (MyDatabaseId != proc->databaseId)
519 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
520 errmsg("prepared transaction belongs to another database"),
521 errhint("Connect to the database where the transaction was prepared to finish it.")));
523 /* OK for me to lock it */
524 gxact->locking_backend = MyBackendId;
525 MyLockedGxact = gxact;
527 LWLockRelease(TwoPhaseStateLock);
532 LWLockRelease(TwoPhaseStateLock);
535 (errcode(ERRCODE_UNDEFINED_OBJECT),
536 errmsg("prepared transaction with identifier \"%s\" does not exist",
545 * Remove the prepared transaction from the shared memory array.
547 * NB: caller should have already removed it from ProcArray
550 RemoveGXact(GlobalTransaction gxact)
554 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
556 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
558 if (gxact == TwoPhaseState->prepXacts[i])
560 /* remove from the active array */
561 TwoPhaseState->numPrepXacts--;
562 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
564 /* and put it back in the freelist */
565 gxact->next = TwoPhaseState->freeGXacts;
566 TwoPhaseState->freeGXacts = gxact;
568 LWLockRelease(TwoPhaseStateLock);
574 LWLockRelease(TwoPhaseStateLock);
576 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
580 * TransactionIdIsPrepared
581 * True iff transaction associated with the identifier is prepared
582 * for two-phase commit
584 * Note: only gxacts marked "valid" are considered; but notice we do not
585 * check the locking status.
587 * This is not currently exported, because it is only needed internally.
590 TransactionIdIsPrepared(TransactionId xid)
595 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
597 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
599 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
600 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
602 if (gxact->valid && pgxact->xid == xid)
609 LWLockRelease(TwoPhaseStateLock);
615 * Returns an array of all prepared transactions for the user-level
616 * function pg_prepared_xact.
618 * The returned array and all its elements are copies of internal data
619 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
621 * WARNING -- we return even those transactions that are not fully prepared
622 * yet. The caller should filter them out if he doesn't want them.
624 * The returned array is palloc'd.
627 GetPreparedTransactionList(GlobalTransaction *gxacts)
629 GlobalTransaction array;
633 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
635 if (TwoPhaseState->numPrepXacts == 0)
637 LWLockRelease(TwoPhaseStateLock);
643 num = TwoPhaseState->numPrepXacts;
644 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
646 for (i = 0; i < num; i++)
647 memcpy(array + i, TwoPhaseState->prepXacts[i],
648 sizeof(GlobalTransactionData));
650 LWLockRelease(TwoPhaseStateLock);
656 /* Working status for pg_prepared_xact */
659 GlobalTransaction array;
666 * Produce a view with one row per prepared transaction.
668 * This function is here so we don't have to export the
669 * GlobalTransactionData struct definition.
672 pg_prepared_xact(PG_FUNCTION_ARGS)
674 FuncCallContext *funcctx;
675 Working_State *status;
677 if (SRF_IS_FIRSTCALL())
680 MemoryContext oldcontext;
682 /* create a function context for cross-call persistence */
683 funcctx = SRF_FIRSTCALL_INIT();
686 * Switch to memory context appropriate for multiple function calls
688 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
690 /* build tupdesc for result tuples */
691 /* this had better match pg_prepared_xacts view in system_views.sql */
692 tupdesc = CreateTemplateTupleDesc(5, false);
693 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
695 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
697 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
698 TIMESTAMPTZOID, -1, 0);
699 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
701 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
704 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
707 * Collect all the 2PC status information that we will format and send
708 * out as a result set.
710 status = (Working_State *) palloc(sizeof(Working_State));
711 funcctx->user_fctx = (void *) status;
713 status->ngxacts = GetPreparedTransactionList(&status->array);
716 MemoryContextSwitchTo(oldcontext);
719 funcctx = SRF_PERCALL_SETUP();
720 status = (Working_State *) funcctx->user_fctx;
722 while (status->array != NULL && status->currIdx < status->ngxacts)
724 GlobalTransaction gxact = &status->array[status->currIdx++];
725 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
726 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
736 * Form tuple with appropriate data.
738 MemSet(values, 0, sizeof(values));
739 MemSet(nulls, 0, sizeof(nulls));
741 values[0] = TransactionIdGetDatum(pgxact->xid);
742 values[1] = CStringGetTextDatum(gxact->gid);
743 values[2] = TimestampTzGetDatum(gxact->prepared_at);
744 values[3] = ObjectIdGetDatum(gxact->owner);
745 values[4] = ObjectIdGetDatum(proc->databaseId);
747 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
748 result = HeapTupleGetDatum(tuple);
749 SRF_RETURN_NEXT(funcctx, result);
752 SRF_RETURN_DONE(funcctx);
757 * Get the GlobalTransaction struct for a prepared transaction
760 static GlobalTransaction
761 TwoPhaseGetGXact(TransactionId xid)
763 GlobalTransaction result = NULL;
766 static TransactionId cached_xid = InvalidTransactionId;
767 static GlobalTransaction cached_gxact = NULL;
770 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
771 * repeatedly for the same XID. We can save work with a simple cache.
773 if (xid == cached_xid)
776 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
778 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
780 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
781 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
783 if (pgxact->xid == xid)
790 LWLockRelease(TwoPhaseStateLock);
792 if (result == NULL) /* should not happen */
793 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
796 cached_gxact = result;
802 * TwoPhaseGetDummyProc
803 * Get the dummy backend ID for prepared transaction specified by XID
805 * Dummy backend IDs are similar to real backend IDs of real backends.
806 * They start at MaxBackends + 1, and are unique across all currently active
807 * real backends and prepared transactions.
810 TwoPhaseGetDummyBackendId(TransactionId xid)
812 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
814 return gxact->dummyBackendId;
818 * TwoPhaseGetDummyProc
819 * Get the PGPROC that represents a prepared transaction specified by XID
822 TwoPhaseGetDummyProc(TransactionId xid)
824 GlobalTransaction gxact = TwoPhaseGetGXact(xid);
826 return &ProcGlobal->allProcs[gxact->pgprocno];
829 /************************************************************************/
830 /* State file support */
831 /************************************************************************/
833 #define TwoPhaseFilePath(path, xid) \
834 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
837 * 2PC state file format:
839 * 1. TwoPhaseFileHeader
840 * 2. TransactionId[] (subtransactions)
841 * 3. RelFileNode[] (files to be deleted at commit)
842 * 4. RelFileNode[] (files to be deleted at abort)
843 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
844 * 6. TwoPhaseRecordOnDisk
846 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
847 * 9. checksum (CRC-32C)
849 * Each segment except the final checksum is MAXALIGN'd.
853 * Header for a 2PC state file
855 #define TWOPHASE_MAGIC 0x57F94532 /* format identifier */
857 typedef struct TwoPhaseFileHeader
859 uint32 magic; /* format identifier */
860 uint32 total_len; /* actual file length */
861 TransactionId xid; /* original transaction XID */
862 Oid database; /* OID of database it was in */
863 TimestampTz prepared_at; /* time of preparation */
864 Oid owner; /* user running the transaction */
865 int32 nsubxacts; /* number of following subxact XIDs */
866 int32 ncommitrels; /* number of delete-on-commit rels */
867 int32 nabortrels; /* number of delete-on-abort rels */
868 int32 ninvalmsgs; /* number of cache invalidation messages */
869 bool initfileinval; /* does relcache init file need invalidation? */
870 char gid[GIDSIZE]; /* GID for transaction */
871 } TwoPhaseFileHeader;
874 * Header for each record in a state file
876 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
877 * The rmgr data will be stored starting on a MAXALIGN boundary.
879 typedef struct TwoPhaseRecordOnDisk
881 uint32 len; /* length of rmgr data */
882 TwoPhaseRmgrId rmid; /* resource manager for this record */
883 uint16 info; /* flag bits for use by rmgr */
884 } TwoPhaseRecordOnDisk;
887 * During prepare, the state file is assembled in memory before writing it
888 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
891 typedef struct StateFileChunk
895 struct StateFileChunk *next;
900 StateFileChunk *head; /* first data block in the chain */
901 StateFileChunk *tail; /* last block in chain */
903 uint32 bytes_free; /* free bytes left in tail block */
904 uint32 total_len; /* total data bytes in chain */
909 * Append a block of data to records data structure.
911 * NB: each block is padded to a MAXALIGN multiple. This must be
912 * accounted for when the file is later read!
914 * The data is copied, so the caller is free to modify it afterwards.
917 save_state_data(const void *data, uint32 len)
919 uint32 padlen = MAXALIGN(len);
921 if (padlen > records.bytes_free)
923 records.tail->next = palloc0(sizeof(StateFileChunk));
924 records.tail = records.tail->next;
925 records.tail->len = 0;
926 records.tail->next = NULL;
927 records.num_chunks++;
929 records.bytes_free = Max(padlen, 512);
930 records.tail->data = palloc(records.bytes_free);
933 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
934 records.tail->len += padlen;
935 records.bytes_free -= padlen;
936 records.total_len += padlen;
940 * Start preparing a state file.
942 * Initializes data structure and inserts the 2PC file header record.
945 StartPrepare(GlobalTransaction gxact)
947 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
948 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
949 TransactionId xid = pgxact->xid;
950 TwoPhaseFileHeader hdr;
951 TransactionId *children;
952 RelFileNode *commitrels;
953 RelFileNode *abortrels;
954 SharedInvalidationMessage *invalmsgs;
956 /* Initialize linked list */
957 records.head = palloc0(sizeof(StateFileChunk));
958 records.head->len = 0;
959 records.head->next = NULL;
961 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
962 records.head->data = palloc(records.bytes_free);
964 records.tail = records.head;
965 records.num_chunks = 1;
967 records.total_len = 0;
970 hdr.magic = TWOPHASE_MAGIC;
971 hdr.total_len = 0; /* EndPrepare will fill this in */
973 hdr.database = proc->databaseId;
974 hdr.prepared_at = gxact->prepared_at;
975 hdr.owner = gxact->owner;
976 hdr.nsubxacts = xactGetCommittedChildren(&children);
977 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
978 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
979 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
981 StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
983 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
986 * Add the additional info about subxacts, deletable files and cache
987 * invalidation messages.
989 if (hdr.nsubxacts > 0)
991 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
992 /* While we have the child-xact data, stuff it in the gxact too */
993 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
995 if (hdr.ncommitrels > 0)
997 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1000 if (hdr.nabortrels > 0)
1002 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1005 if (hdr.ninvalmsgs > 0)
1007 save_state_data(invalmsgs,
1008 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1014 * Finish preparing state file.
1016 * Calculates CRC and writes state file to WAL and in pg_twophase directory.
1019 EndPrepare(GlobalTransaction gxact)
1021 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1022 TransactionId xid = pgxact->xid;
1023 TwoPhaseFileHeader *hdr;
1024 char path[MAXPGPATH];
1025 StateFileChunk *record;
1026 pg_crc32 statefile_crc;
1030 /* Add the end sentinel to the list of 2PC records */
1031 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1034 /* Go back and fill in total_len in the file header record */
1035 hdr = (TwoPhaseFileHeader *) records.head->data;
1036 Assert(hdr->magic == TWOPHASE_MAGIC);
1037 hdr->total_len = records.total_len + sizeof(pg_crc32);
1040 * If the file size exceeds MaxAllocSize, we won't be able to read it in
1041 * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
1043 if (hdr->total_len > MaxAllocSize)
1045 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1046 errmsg("two-phase state file maximum length exceeded")));
1049 * Create the 2PC state file.
1051 TwoPhaseFilePath(path, xid);
1053 fd = OpenTransientFile(path,
1054 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1058 (errcode_for_file_access(),
1059 errmsg("could not create two-phase state file \"%s\": %m",
1062 /* Write data to file, and calculate CRC as we pass over it */
1063 INIT_CRC32C(statefile_crc);
1065 for (record = records.head; record != NULL; record = record->next)
1067 COMP_CRC32C(statefile_crc, record->data, record->len);
1068 if ((write(fd, record->data, record->len)) != record->len)
1070 CloseTransientFile(fd);
1072 (errcode_for_file_access(),
1073 errmsg("could not write two-phase state file: %m")));
1077 FIN_CRC32C(statefile_crc);
1080 * Write a deliberately bogus CRC to the state file; this is just paranoia
1081 * to catch the case where four more bytes will run us out of disk space.
1083 bogus_crc = ~statefile_crc;
1085 if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
1087 CloseTransientFile(fd);
1089 (errcode_for_file_access(),
1090 errmsg("could not write two-phase state file: %m")));
1093 /* Back up to prepare for rewriting the CRC */
1094 if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
1096 CloseTransientFile(fd);
1098 (errcode_for_file_access(),
1099 errmsg("could not seek in two-phase state file: %m")));
1103 * The state file isn't valid yet, because we haven't written the correct
1104 * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
1106 * Between the time we have written the WAL entry and the time we write
1107 * out the correct state file CRC, we have an inconsistency: the xact is
1108 * prepared according to WAL but not according to our on-disk state. We
1109 * use a critical section to force a PANIC if we are unable to complete
1110 * the write --- then, WAL replay should repair the inconsistency. The
1111 * odds of a PANIC actually occurring should be very tiny given that we
1112 * were able to write the bogus CRC above.
1114 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1115 * immediately after the WAL record is inserted could complete without
1116 * fsync'ing our state file. (This is essentially the same kind of race
1117 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1118 * uses delayChkpt for; see notes there.)
1120 * We save the PREPARE record's location in the gxact for later use by
1121 * CheckPointTwoPhase.
1123 XLogEnsureRecordSpace(0, records.num_chunks);
1125 START_CRIT_SECTION();
1127 MyPgXact->delayChkpt = true;
1130 for (record = records.head; record != NULL; record = record->next)
1131 XLogRegisterData(record->data, record->len);
1132 gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1133 XLogFlush(gxact->prepare_lsn);
1135 /* If we crash now, we have prepared: WAL replay will fix things */
1137 /* write correct CRC and close file */
1138 if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
1140 CloseTransientFile(fd);
1142 (errcode_for_file_access(),
1143 errmsg("could not write two-phase state file: %m")));
1146 if (CloseTransientFile(fd) != 0)
1148 (errcode_for_file_access(),
1149 errmsg("could not close two-phase state file: %m")));
1152 * Mark the prepared transaction as valid. As soon as xact.c marks
1153 * MyPgXact as not running our XID (which it will do immediately after
1154 * this function returns), others can commit/rollback the xact.
1156 * NB: a side effect of this is to make a dummy ProcArray entry for the
1157 * prepared XID. This must happen before we clear the XID from MyPgXact,
1158 * else there is a window where the XID is not running according to
1159 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1160 * the xact crashed. Instead we have a window where the same XID appears
1161 * twice in ProcArray, which is OK.
1163 MarkAsPrepared(gxact);
1166 * Now we can mark ourselves as out of the commit critical section: a
1167 * checkpoint starting after this will certainly see the gxact as a
1168 * candidate for fsyncing.
1170 MyPgXact->delayChkpt = false;
1173 * Remember that we have this GlobalTransaction entry locked for us. If
1174 * we crash after this point, it's too late to abort, but we must unlock
1175 * it so that the prepared transaction can be committed or rolled back.
1177 MyLockedGxact = gxact;
1182 * Wait for synchronous replication, if required.
1184 * Note that at this stage we have marked the prepare, but still show as
1185 * running in the procarray (twice!) and continue to hold locks.
1187 SyncRepWaitForLSN(gxact->prepare_lsn);
1189 records.tail = records.head = NULL;
1190 records.num_chunks = 0;
1194 * Register a 2PC record to be written to state file.
1197 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1198 const void *data, uint32 len)
1200 TwoPhaseRecordOnDisk record;
1205 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1207 save_state_data(data, len);
1212 * Read and validate the state file for xid.
1214 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1215 * contents of the file. Otherwise return NULL.
1218 ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1220 char path[MAXPGPATH];
1222 TwoPhaseFileHeader *hdr;
1229 TwoPhaseFilePath(path, xid);
1231 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1236 (errcode_for_file_access(),
1237 errmsg("could not open two-phase state file \"%s\": %m",
1243 * Check file length. We can determine a lower bound pretty easily. We
1244 * set an upper bound to avoid palloc() failure on a corrupt file, though
1245 * we can't guarantee that we won't get an out of memory error anyway,
1246 * even on a valid file.
1248 if (fstat(fd, &stat))
1250 CloseTransientFile(fd);
1253 (errcode_for_file_access(),
1254 errmsg("could not stat two-phase state file \"%s\": %m",
1259 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1260 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1261 sizeof(pg_crc32)) ||
1262 stat.st_size > MaxAllocSize)
1264 CloseTransientFile(fd);
1268 crc_offset = stat.st_size - sizeof(pg_crc32);
1269 if (crc_offset != MAXALIGN(crc_offset))
1271 CloseTransientFile(fd);
1276 * OK, slurp in the file.
1278 buf = (char *) palloc(stat.st_size);
1280 if (read(fd, buf, stat.st_size) != stat.st_size)
1282 CloseTransientFile(fd);
1285 (errcode_for_file_access(),
1286 errmsg("could not read two-phase state file \"%s\": %m",
1292 CloseTransientFile(fd);
1294 hdr = (TwoPhaseFileHeader *) buf;
1295 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1301 INIT_CRC32C(calc_crc);
1302 COMP_CRC32C(calc_crc, buf, crc_offset);
1303 FIN_CRC32C(calc_crc);
1305 file_crc = *((pg_crc32 *) (buf + crc_offset));
1307 if (!EQ_CRC32C(calc_crc, file_crc))
1317 * Confirms an xid is prepared, during recovery
1320 StandbyTransactionIdIsPrepared(TransactionId xid)
1323 TwoPhaseFileHeader *hdr;
1326 Assert(TransactionIdIsValid(xid));
1328 if (max_prepared_xacts <= 0)
1329 return false; /* nothing to do */
1331 /* Read and validate file */
1332 buf = ReadTwoPhaseFile(xid, false);
1336 /* Check header also */
1337 hdr = (TwoPhaseFileHeader *) buf;
1338 result = TransactionIdEquals(hdr->xid, xid);
1345 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1348 FinishPreparedTransaction(const char *gid, bool isCommit)
1350 GlobalTransaction gxact;
1356 TwoPhaseFileHeader *hdr;
1357 TransactionId latestXid;
1358 TransactionId *children;
1359 RelFileNode *commitrels;
1360 RelFileNode *abortrels;
1361 RelFileNode *delrels;
1363 SharedInvalidationMessage *invalmsgs;
1367 * Validate the GID, and lock the GXACT to ensure that two backends do not
1368 * try to commit the same GID at once.
1370 gxact = LockGXact(gid, GetUserId());
1371 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1372 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1376 * Read and validate the state file
1378 buf = ReadTwoPhaseFile(xid, true);
1381 (errcode(ERRCODE_DATA_CORRUPTED),
1382 errmsg("two-phase state file for transaction %u is corrupt",
1386 * Disassemble the header area
1388 hdr = (TwoPhaseFileHeader *) buf;
1389 Assert(TransactionIdEquals(hdr->xid, xid));
1390 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1391 children = (TransactionId *) bufptr;
1392 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1393 commitrels = (RelFileNode *) bufptr;
1394 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1395 abortrels = (RelFileNode *) bufptr;
1396 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1397 invalmsgs = (SharedInvalidationMessage *) bufptr;
1398 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1400 /* compute latestXid among all children */
1401 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1404 * The order of operations here is critical: make the XLOG entry for
1405 * commit or abort, then mark the transaction committed or aborted in
1406 * pg_clog, then remove its PGPROC from the global ProcArray (which means
1407 * TransactionIdIsInProgress will stop saying the prepared xact is in
1408 * progress), then run the post-commit or post-abort callbacks. The
1409 * callbacks will release the locks the transaction held.
1412 RecordTransactionCommitPrepared(xid,
1413 hdr->nsubxacts, children,
1414 hdr->ncommitrels, commitrels,
1415 hdr->ninvalmsgs, invalmsgs,
1416 hdr->initfileinval);
1418 RecordTransactionAbortPrepared(xid,
1419 hdr->nsubxacts, children,
1420 hdr->nabortrels, abortrels);
1422 ProcArrayRemove(proc, latestXid);
1425 * In case we fail while running the callbacks, mark the gxact invalid so
1426 * no one else will try to commit/rollback, and so it will be recycled
1427 * if we fail after this point. It is still locked by our backend so it
1428 * won't go away yet.
1430 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1432 gxact->valid = false;
1435 * We have to remove any files that were supposed to be dropped. For
1436 * consistency with the regular xact.c code paths, must do this before
1437 * releasing locks, so do it before running the callbacks.
1439 * NB: this code knows that we couldn't be dropping any temp rels ...
1443 delrels = commitrels;
1444 ndelrels = hdr->ncommitrels;
1448 delrels = abortrels;
1449 ndelrels = hdr->nabortrels;
1451 for (i = 0; i < ndelrels; i++)
1453 SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
1455 smgrdounlink(srel, false);
1460 * Handle cache invalidation messages.
1462 * Relcache init file invalidation requires processing both before and
1463 * after we send the SI messages. See AtEOXact_Inval()
1465 if (hdr->initfileinval)
1466 RelationCacheInitFilePreInvalidate();
1467 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1468 if (hdr->initfileinval)
1469 RelationCacheInitFilePostInvalidate();
1471 /* And now do the callbacks */
1473 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1475 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1477 PredicateLockTwoPhaseFinish(xid, isCommit);
1479 /* Count the prepared xact as committed or aborted */
1480 AtEOXact_PgStat(isCommit);
1483 * And now we can clean up our mess.
1485 RemoveTwoPhaseFile(xid, true);
1488 MyLockedGxact = NULL;
1494 * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1495 * and call the indicated callbacks for each 2PC record.
1498 ProcessRecords(char *bufptr, TransactionId xid,
1499 const TwoPhaseCallback callbacks[])
1503 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1505 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1506 if (record->rmid == TWOPHASE_RM_END_ID)
1509 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1511 if (callbacks[record->rmid] != NULL)
1512 callbacks[record->rmid] (xid, record->info,
1513 (void *) bufptr, record->len);
1515 bufptr += MAXALIGN(record->len);
1520 * Remove the 2PC file for the specified XID.
1522 * If giveWarning is false, do not complain about file-not-present;
1523 * this is an expected case during WAL replay.
1526 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1528 char path[MAXPGPATH];
1530 TwoPhaseFilePath(path, xid);
1532 if (errno != ENOENT || giveWarning)
1534 (errcode_for_file_access(),
1535 errmsg("could not remove two-phase state file \"%s\": %m",
1540 * Recreates a state file. This is used in WAL replay.
1542 * Note: content and len don't include CRC.
1545 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1547 char path[MAXPGPATH];
1548 pg_crc32 statefile_crc;
1552 INIT_CRC32C(statefile_crc);
1553 COMP_CRC32C(statefile_crc, content, len);
1554 FIN_CRC32C(statefile_crc);
1556 TwoPhaseFilePath(path, xid);
1558 fd = OpenTransientFile(path,
1559 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1563 (errcode_for_file_access(),
1564 errmsg("could not recreate two-phase state file \"%s\": %m",
1567 /* Write content and CRC */
1568 if (write(fd, content, len) != len)
1570 CloseTransientFile(fd);
1572 (errcode_for_file_access(),
1573 errmsg("could not write two-phase state file: %m")));
1575 if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1577 CloseTransientFile(fd);
1579 (errcode_for_file_access(),
1580 errmsg("could not write two-phase state file: %m")));
1584 * We must fsync the file because the end-of-replay checkpoint will not do
1585 * so, there being no GXACT in shared memory yet to tell it to.
1587 if (pg_fsync(fd) != 0)
1589 CloseTransientFile(fd);
1591 (errcode_for_file_access(),
1592 errmsg("could not fsync two-phase state file: %m")));
1595 if (CloseTransientFile(fd) != 0)
1597 (errcode_for_file_access(),
1598 errmsg("could not close two-phase state file: %m")));
1602 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1604 * We must fsync the state file of any GXACT that is valid and has a PREPARE
1605 * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
1606 * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1608 * This is deliberately run as late as possible in the checkpoint sequence,
1609 * because GXACTs ordinarily have short lifespans, and so it is quite
1610 * possible that GXACTs that were valid at checkpoint start will no longer
1611 * exist if we wait a little bit.
1613 * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1614 * each time. This is considered unusual enough that we don't bother to
1615 * expend any extra code to avoid the redundant fsyncs. (They should be
1616 * reasonably cheap anyway, since they won't cause I/O.)
1619 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1621 TransactionId *xids;
1623 char path[MAXPGPATH];
1627 * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1628 * it just long enough to make a list of the XIDs that require fsyncing,
1629 * and then do the I/O afterwards.
1631 * This approach creates a race condition: someone else could delete a
1632 * GXACT between the time we release TwoPhaseStateLock and the time we try
1633 * to open its state file. We handle this by special-casing ENOENT
1634 * failures: if we see that, we verify that the GXACT is no longer valid,
1635 * and if so ignore the failure.
1637 if (max_prepared_xacts <= 0)
1638 return; /* nothing to do */
1640 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1642 xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1645 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1647 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1649 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1650 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1653 gxact->prepare_lsn <= redo_horizon)
1654 xids[nxids++] = pgxact->xid;
1657 LWLockRelease(TwoPhaseStateLock);
1659 for (i = 0; i < nxids; i++)
1661 TransactionId xid = xids[i];
1664 TwoPhaseFilePath(path, xid);
1666 fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1669 if (errno == ENOENT)
1671 /* OK if gxact is no longer valid */
1672 if (!TransactionIdIsPrepared(xid))
1674 /* Restore errno in case it was changed */
1678 (errcode_for_file_access(),
1679 errmsg("could not open two-phase state file \"%s\": %m",
1683 if (pg_fsync(fd) != 0)
1685 CloseTransientFile(fd);
1687 (errcode_for_file_access(),
1688 errmsg("could not fsync two-phase state file \"%s\": %m",
1692 if (CloseTransientFile(fd) != 0)
1694 (errcode_for_file_access(),
1695 errmsg("could not close two-phase state file \"%s\": %m",
1701 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1705 * PrescanPreparedTransactions
1707 * Scan the pg_twophase directory and determine the range of valid XIDs
1708 * present. This is run during database startup, after we have completed
1709 * reading WAL. ShmemVariableCache->nextXid has been set to one more than
1710 * the highest XID for which evidence exists in WAL.
1712 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1713 * are present, it suggests that the DBA has done a PITR recovery to an
1714 * earlier point in time without cleaning out pg_twophase. We dare not
1715 * try to recover such prepared xacts since they likely depend on database
1716 * state that doesn't exist now.
1718 * However, we will advance nextXid beyond any subxact XIDs belonging to
1719 * valid prepared xacts. We need to do this since subxact commit doesn't
1720 * write a WAL entry, and so there might be no evidence in WAL of those
1723 * Our other responsibility is to determine and return the oldest valid XID
1724 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1725 * This is needed to synchronize pg_subtrans startup properly.
1727 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1728 * top-level xids is stored in *xids_p. The number of entries in the array
1729 * is returned in *nxids_p.
1732 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1734 TransactionId origNextXid = ShmemVariableCache->nextXid;
1735 TransactionId result = origNextXid;
1737 struct dirent *clde;
1738 TransactionId *xids = NULL;
1742 cldir = AllocateDir(TWOPHASE_DIR);
1743 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1745 if (strlen(clde->d_name) == 8 &&
1746 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1750 TwoPhaseFileHeader *hdr;
1751 TransactionId *subxids;
1754 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1756 /* Reject XID if too new */
1757 if (TransactionIdFollowsOrEquals(xid, origNextXid))
1760 (errmsg("removing future two-phase state file \"%s\"",
1762 RemoveTwoPhaseFile(xid, true);
1767 * Note: we can't check if already processed because clog
1768 * subsystem isn't up yet.
1771 /* Read and validate file */
1772 buf = ReadTwoPhaseFile(xid, true);
1776 (errmsg("removing corrupt two-phase state file \"%s\"",
1778 RemoveTwoPhaseFile(xid, true);
1782 /* Deconstruct header */
1783 hdr = (TwoPhaseFileHeader *) buf;
1784 if (!TransactionIdEquals(hdr->xid, xid))
1787 (errmsg("removing corrupt two-phase state file \"%s\"",
1789 RemoveTwoPhaseFile(xid, true);
1795 * OK, we think this file is valid. Incorporate xid into the
1796 * running-minimum result.
1798 if (TransactionIdPrecedes(xid, result))
1802 * Examine subtransaction XIDs ... they should all follow main
1803 * XID, and they may force us to advance nextXid.
1805 * We don't expect anyone else to modify nextXid, hence we don't
1806 * need to hold a lock while examining it. We still acquire the
1807 * lock to modify it, though.
1809 subxids = (TransactionId *)
1810 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1811 for (i = 0; i < hdr->nsubxacts; i++)
1813 TransactionId subxid = subxids[i];
1815 Assert(TransactionIdFollows(subxid, xid));
1816 if (TransactionIdFollowsOrEquals(subxid,
1817 ShmemVariableCache->nextXid))
1819 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
1820 ShmemVariableCache->nextXid = subxid;
1821 TransactionIdAdvance(ShmemVariableCache->nextXid);
1822 LWLockRelease(XidGenLock);
1829 if (nxids == allocsize)
1834 xids = palloc(allocsize * sizeof(TransactionId));
1838 allocsize = allocsize * 2;
1839 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1842 xids[nxids++] = xid;
1860 * StandbyRecoverPreparedTransactions
1862 * Scan the pg_twophase directory and setup all the required information to
1863 * allow standby queries to treat prepared transactions as still active.
1864 * This is never called at the end of recovery - we use
1865 * RecoverPreparedTransactions() at that point.
1867 * Currently we simply call SubTransSetParent() for any subxids of prepared
1868 * transactions. If overwriteOK is true, it's OK if some XIDs have already
1869 * been marked in pg_subtrans.
1872 StandbyRecoverPreparedTransactions(bool overwriteOK)
1875 struct dirent *clde;
1877 cldir = AllocateDir(TWOPHASE_DIR);
1878 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1880 if (strlen(clde->d_name) == 8 &&
1881 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1885 TwoPhaseFileHeader *hdr;
1886 TransactionId *subxids;
1889 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1891 /* Already processed? */
1892 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1895 (errmsg("removing stale two-phase state file \"%s\"",
1897 RemoveTwoPhaseFile(xid, true);
1901 /* Read and validate file */
1902 buf = ReadTwoPhaseFile(xid, true);
1906 (errmsg("removing corrupt two-phase state file \"%s\"",
1908 RemoveTwoPhaseFile(xid, true);
1912 /* Deconstruct header */
1913 hdr = (TwoPhaseFileHeader *) buf;
1914 if (!TransactionIdEquals(hdr->xid, xid))
1917 (errmsg("removing corrupt two-phase state file \"%s\"",
1919 RemoveTwoPhaseFile(xid, true);
1925 * Examine subtransaction XIDs ... they should all follow main
1928 subxids = (TransactionId *)
1929 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1930 for (i = 0; i < hdr->nsubxacts; i++)
1932 TransactionId subxid = subxids[i];
1934 Assert(TransactionIdFollows(subxid, xid));
1935 SubTransSetParent(xid, subxid, overwriteOK);
1943 * RecoverPreparedTransactions
1945 * Scan the pg_twophase directory and reload shared-memory state for each
1946 * prepared transaction (reacquire locks, etc). This is run during database
1950 RecoverPreparedTransactions(void)
1952 char dir[MAXPGPATH];
1954 struct dirent *clde;
1955 bool overwriteOK = false;
1957 snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1959 cldir = AllocateDir(dir);
1960 while ((clde = ReadDir(cldir, dir)) != NULL)
1962 if (strlen(clde->d_name) == 8 &&
1963 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1968 TwoPhaseFileHeader *hdr;
1969 TransactionId *subxids;
1970 GlobalTransaction gxact;
1973 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1975 /* Already processed? */
1976 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1979 (errmsg("removing stale two-phase state file \"%s\"",
1981 RemoveTwoPhaseFile(xid, true);
1985 /* Read and validate file */
1986 buf = ReadTwoPhaseFile(xid, true);
1990 (errmsg("removing corrupt two-phase state file \"%s\"",
1992 RemoveTwoPhaseFile(xid, true);
1997 (errmsg("recovering prepared transaction %u", xid)));
1999 /* Deconstruct header */
2000 hdr = (TwoPhaseFileHeader *) buf;
2001 Assert(TransactionIdEquals(hdr->xid, xid));
2002 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2003 subxids = (TransactionId *) bufptr;
2004 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2005 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2006 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2007 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2010 * It's possible that SubTransSetParent has been set before, if
2011 * the prepared transaction generated xid assignment records. Test
2012 * here must match one used in AssignTransactionId().
2014 if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
2015 XLogLogicalInfoActive()))
2019 * Reconstruct subtrans state for the transaction --- needed
2020 * because pg_subtrans is not preserved over a restart. Note that
2021 * we are linking all the subtransactions directly to the
2022 * top-level XID; there may originally have been a more complex
2023 * hierarchy, but there's no need to restore that exactly.
2025 for (i = 0; i < hdr->nsubxacts; i++)
2026 SubTransSetParent(subxids[i], xid, overwriteOK);
2029 * Recreate its GXACT and dummy PGPROC
2031 * Note: since we don't have the PREPARE record's WAL location at
2032 * hand, we leave prepare_lsn zeroes. This means the GXACT will
2033 * be fsync'd on every future checkpoint. We assume this
2034 * situation is infrequent enough that the performance cost is
2035 * negligible (especially since we know the state file has already
2038 gxact = MarkAsPreparing(xid, hdr->gid,
2040 hdr->owner, hdr->database);
2041 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2042 MarkAsPrepared(gxact);
2045 * Recover other state (notably locks) using resource managers
2047 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2050 * Release locks held by the standby process after we process each
2051 * prepared transaction. As a result, we don't need too many
2052 * additional locks at any one time.
2055 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2064 * RecordTransactionCommitPrepared
2066 * This is basically the same as RecordTransactionCommit: in particular,
2067 * we must set the delayChkpt flag to avoid a race condition.
2069 * We know the transaction made at least one XLOG entry (its PREPARE),
2070 * so it is never possible to optimize out the commit record.
2073 RecordTransactionCommitPrepared(TransactionId xid,
2075 TransactionId *children,
2079 SharedInvalidationMessage *invalmsgs,
2084 START_CRIT_SECTION();
2086 /* See notes in RecordTransactionCommit */
2087 MyPgXact->delayChkpt = true;
2089 /* Emit the XLOG commit record */
2090 recptr = XactLogCommitRecord(GetCurrentTimestamp(),
2091 nchildren, children, nrels, rels,
2092 ninvalmsgs, invalmsgs,
2093 initfileinval, false,
2097 * We don't currently try to sleep before flush here ... nor is there any
2098 * support for async commit of a prepared xact (the very idea is probably
2102 /* Flush XLOG to disk */
2105 /* Mark the transaction committed in pg_clog */
2106 TransactionIdCommitTree(xid, nchildren, children);
2108 /* Checkpoint can proceed now */
2109 MyPgXact->delayChkpt = false;
2114 * Wait for synchronous replication, if required.
2116 * Note that at this stage we have marked clog, but still show as running
2117 * in the procarray and continue to hold locks.
2119 SyncRepWaitForLSN(recptr);
2123 * RecordTransactionAbortPrepared
2125 * This is basically the same as RecordTransactionAbort.
2127 * We know the transaction made at least one XLOG entry (its PREPARE),
2128 * so it is never possible to optimize out the abort record.
2131 RecordTransactionAbortPrepared(TransactionId xid,
2133 TransactionId *children,
2140 * Catch the scenario where we aborted partway through
2141 * RecordTransactionCommitPrepared ...
2143 if (TransactionIdDidCommit(xid))
2144 elog(PANIC, "cannot abort transaction %u, it was already committed",
2147 START_CRIT_SECTION();
2149 /* Emit the XLOG abort record */
2150 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2151 nchildren, children,
2155 /* Always flush, since we're about to remove the 2PC state file */
2159 * Mark the transaction aborted in clog. This is not absolutely necessary
2160 * but we may as well do it while we are here.
2162 TransactionIdAbortTree(xid, nchildren, children);
2167 * Wait for synchronous replication, if required.
2169 * Note that at this stage we have marked clog, but still show as running
2170 * in the procarray and continue to hold locks.
2172 SyncRepWaitForLSN(recptr);