1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.29 2007/04/03 16:34:35 tgl Exp $
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has a dummy PGPROC that is entered
25 * into the ProcArray array; this is what keeps the XID considered
26 * running by TransactionIdIsInProgress. It is also convenient as a
27 * PGPROC to hook the gxact's locks to.
29 * In order to survive crashes and shutdowns, all prepared
30 * transactions must be stored in permanent storage. This includes
31 * locking information, pending notifications etc. All that state
32 * information is written to the per-transaction state file in
33 * the pg_twophase directory.
35 *-------------------------------------------------------------------------
41 #include <sys/types.h>
45 #include "access/heapam.h"
46 #include "access/subtrans.h"
47 #include "access/transam.h"
48 #include "access/twophase.h"
49 #include "access/twophase_rmgr.h"
50 #include "access/xact.h"
51 #include "catalog/pg_type.h"
53 #include "miscadmin.h"
55 #include "storage/fd.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/builtins.h"
62 * Directory where Two-phase commit files reside within PGDATA
64 #define TWOPHASE_DIR "pg_twophase"
66 /* GUC variable, can't be changed after startup */
67 int max_prepared_xacts = 5;
70 * This struct describes one global transaction that is in prepared state
71 * or attempting to become prepared.
73 * The first component of the struct is a dummy PGPROC that is inserted
74 * into the global ProcArray so that the transaction appears to still be
75 * running and holding locks. It must be first because we cast pointers
76 * to PGPROC and pointers to GlobalTransactionData back and forth.
78 * The lifecycle of a global transaction is:
80 * 1. After checking that the requested GID is not in use, set up an
81 * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
82 * with locking_xid = my own XID and valid = false.
84 * 2. After successfully completing prepare, set valid = true and enter the
85 * contained PGPROC into the global ProcArray.
87 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
88 * is valid and its locking_xid is no longer active, then store my current
89 * XID into locking_xid. This prevents concurrent attempts to commit or
90 * rollback the same prepared xact.
92 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
93 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
96 * Note that if the preparing transaction fails between steps 1 and 2, the
97 * entry will remain in prepXacts until recycled. We can detect recyclable
98 * entries by checking for valid = false and locking_xid no longer active.
100 * typedef struct GlobalTransactionData *GlobalTransaction appears in
105 typedef struct GlobalTransactionData
107 PGPROC proc; /* dummy proc */
108 TimestampTz prepared_at; /* time of preparation */
109 XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
110 Oid owner; /* ID of user that executed the xact */
111 TransactionId locking_xid; /* top-level XID of backend working on xact */
112 bool valid; /* TRUE if fully prepared */
113 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
114 } GlobalTransactionData;
117 * Two Phase Commit shared state. Access to this struct is protected
118 * by TwoPhaseStateLock.
120 typedef struct TwoPhaseStateData
122 /* Head of linked list of free GlobalTransactionData structs */
123 SHMEM_OFFSET freeGXacts;
125 /* Number of valid prepXacts entries. */
129 * There are max_prepared_xacts items in this array, but C wants a
132 GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */
133 } TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
135 static TwoPhaseStateData *TwoPhaseState;
138 static void RecordTransactionCommitPrepared(TransactionId xid,
140 TransactionId *children,
143 static void RecordTransactionAbortPrepared(TransactionId xid,
145 TransactionId *children,
148 static void ProcessRecords(char *bufptr, TransactionId xid,
149 const TwoPhaseCallback callbacks[]);
153 * Initialization of shared memory
156 TwoPhaseShmemSize(void)
160 /* Need the fixed struct, the array of pointers, and the GTD structs */
161 size = offsetof(TwoPhaseStateData, prepXacts);
162 size = add_size(size, mul_size(max_prepared_xacts,
163 sizeof(GlobalTransaction)));
164 size = MAXALIGN(size);
165 size = add_size(size, mul_size(max_prepared_xacts,
166 sizeof(GlobalTransactionData)));
172 TwoPhaseShmemInit(void)
176 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
179 if (!IsUnderPostmaster)
181 GlobalTransaction gxacts;
185 TwoPhaseState->freeGXacts = INVALID_OFFSET;
186 TwoPhaseState->numPrepXacts = 0;
189 * Initialize the linked list of free GlobalTransactionData structs
191 gxacts = (GlobalTransaction)
192 ((char *) TwoPhaseState +
193 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
194 sizeof(GlobalTransaction) * max_prepared_xacts));
195 for (i = 0; i < max_prepared_xacts; i++)
197 gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
198 TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
208 * Reserve the GID for the given transaction.
210 * Internally, this creates a gxact struct and puts it into the active array.
211 * NOTE: this is also used when reloading a gxact after a crash; so avoid
212 * assuming that we can use very much backend context.
215 MarkAsPreparing(TransactionId xid, const char *gid,
216 TimestampTz prepared_at, Oid owner, Oid databaseid)
218 GlobalTransaction gxact;
221 if (strlen(gid) >= GIDSIZE)
223 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224 errmsg("transaction identifier \"%s\" is too long",
227 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
230 * First, find and recycle any gxacts that failed during prepare. We do
231 * this partly to ensure we don't mistakenly say their GIDs are still
232 * reserved, and partly so we don't fail on out-of-slots unnecessarily.
234 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
236 gxact = TwoPhaseState->prepXacts[i];
237 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
239 /* It's dead Jim ... remove from the active array */
240 TwoPhaseState->numPrepXacts--;
241 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
242 /* and put it back in the freelist */
243 gxact->proc.links.next = TwoPhaseState->freeGXacts;
244 TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
245 /* Back up index count too, so we don't miss scanning one */
250 /* Check for conflicting GID */
251 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
253 gxact = TwoPhaseState->prepXacts[i];
254 if (strcmp(gxact->gid, gid) == 0)
257 (errcode(ERRCODE_DUPLICATE_OBJECT),
258 errmsg("transaction identifier \"%s\" is already in use",
263 /* Get a free gxact from the freelist */
264 if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
266 (errcode(ERRCODE_OUT_OF_MEMORY),
267 errmsg("maximum number of prepared transactions reached"),
268 errhint("Increase max_prepared_transactions (currently %d).",
269 max_prepared_xacts)));
270 gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
271 TwoPhaseState->freeGXacts = gxact->proc.links.next;
274 MemSet(&gxact->proc, 0, sizeof(PGPROC));
275 SHMQueueElemInit(&(gxact->proc.links));
276 gxact->proc.waitStatus = STATUS_OK;
277 gxact->proc.xid = xid;
278 gxact->proc.xmin = InvalidTransactionId;
280 gxact->proc.databaseId = databaseid;
281 gxact->proc.roleId = owner;
282 gxact->proc.inCommit = false;
283 gxact->proc.inVacuum = false;
284 gxact->proc.isAutovacuum = false;
285 gxact->proc.lwWaiting = false;
286 gxact->proc.lwExclusive = false;
287 gxact->proc.lwWaitLink = NULL;
288 gxact->proc.waitLock = NULL;
289 gxact->proc.waitProcLock = NULL;
290 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
291 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
292 /* subxid data must be filled later by GXactLoadSubxactData */
293 gxact->proc.subxids.overflowed = false;
294 gxact->proc.subxids.nxids = 0;
296 gxact->prepared_at = prepared_at;
297 /* initialize LSN to 0 (start of WAL) */
298 gxact->prepare_lsn.xlogid = 0;
299 gxact->prepare_lsn.xrecoff = 0;
300 gxact->owner = owner;
301 gxact->locking_xid = xid;
302 gxact->valid = false;
303 strcpy(gxact->gid, gid);
305 /* And insert it into the active array */
306 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
307 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
309 LWLockRelease(TwoPhaseStateLock);
315 * GXactLoadSubxactData
317 * If the transaction being persisted had any subtransactions, this must
318 * be called before MarkAsPrepared() to load information into the dummy
322 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
323 TransactionId *children)
325 /* We need no extra lock since the GXACT isn't valid yet */
326 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
328 gxact->proc.subxids.overflowed = true;
329 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
333 memcpy(gxact->proc.subxids.xids, children,
334 nsubxacts * sizeof(TransactionId));
335 gxact->proc.subxids.nxids = nsubxacts;
341 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
344 MarkAsPrepared(GlobalTransaction gxact)
346 /* Lock here may be overkill, but I'm not convinced of that ... */
347 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
348 Assert(!gxact->valid);
350 LWLockRelease(TwoPhaseStateLock);
353 * Put it into the global ProcArray so TransactionIdInProgress considers
354 * the XID as still running.
356 ProcArrayAdd(&gxact->proc);
361 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
363 static GlobalTransaction
364 LockGXact(const char *gid, Oid user)
368 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
370 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
372 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
374 /* Ignore not-yet-valid GIDs */
377 if (strcmp(gxact->gid, gid) != 0)
380 /* Found it, but has someone else got it locked? */
381 if (TransactionIdIsValid(gxact->locking_xid))
383 if (TransactionIdIsActive(gxact->locking_xid))
385 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
386 errmsg("prepared transaction with identifier \"%s\" is busy",
388 gxact->locking_xid = InvalidTransactionId;
391 if (user != gxact->owner && !superuser_arg(user))
393 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
394 errmsg("permission denied to finish prepared transaction"),
395 errhint("Must be superuser or the user that prepared the transaction.")));
398 * Note: it probably would be possible to allow committing from another
399 * database; but at the moment NOTIFY is known not to work and there
400 * may be some other issues as well. Hence disallow until someone
401 * gets motivated to make it work.
403 if (MyDatabaseId != gxact->proc.databaseId)
405 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
406 errmsg("prepared transaction belongs to another database"),
407 errhint("Connect to the database where the transaction was prepared to finish it.")));
409 /* OK for me to lock it */
410 gxact->locking_xid = GetTopTransactionId();
412 LWLockRelease(TwoPhaseStateLock);
417 LWLockRelease(TwoPhaseStateLock);
420 (errcode(ERRCODE_UNDEFINED_OBJECT),
421 errmsg("prepared transaction with identifier \"%s\" does not exist",
430 * Remove the prepared transaction from the shared memory array.
432 * NB: caller should have already removed it from ProcArray
435 RemoveGXact(GlobalTransaction gxact)
439 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
441 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
443 if (gxact == TwoPhaseState->prepXacts[i])
445 /* remove from the active array */
446 TwoPhaseState->numPrepXacts--;
447 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
449 /* and put it back in the freelist */
450 gxact->proc.links.next = TwoPhaseState->freeGXacts;
451 TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
453 LWLockRelease(TwoPhaseStateLock);
459 LWLockRelease(TwoPhaseStateLock);
461 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
465 * TransactionIdIsPrepared
466 * True iff transaction associated with the identifier is prepared
467 * for two-phase commit
469 * Note: only gxacts marked "valid" are considered; but notice we do not
470 * check the locking status.
472 * This is not currently exported, because it is only needed internally.
475 TransactionIdIsPrepared(TransactionId xid)
480 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
482 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
484 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
486 if (gxact->valid && gxact->proc.xid == xid)
493 LWLockRelease(TwoPhaseStateLock);
499 * Returns an array of all prepared transactions for the user-level
500 * function pg_prepared_xact.
502 * The returned array and all its elements are copies of internal data
503 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
505 * WARNING -- we return even those transactions that are not fully prepared
506 * yet. The caller should filter them out if he doesn't want them.
508 * The returned array is palloc'd.
511 GetPreparedTransactionList(GlobalTransaction *gxacts)
513 GlobalTransaction array;
517 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
519 if (TwoPhaseState->numPrepXacts == 0)
521 LWLockRelease(TwoPhaseStateLock);
527 num = TwoPhaseState->numPrepXacts;
528 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
530 for (i = 0; i < num; i++)
531 memcpy(array + i, TwoPhaseState->prepXacts[i],
532 sizeof(GlobalTransactionData));
534 LWLockRelease(TwoPhaseStateLock);
540 /* Working status for pg_prepared_xact */
543 GlobalTransaction array;
550 * Produce a view with one row per prepared transaction.
552 * This function is here so we don't have to export the
553 * GlobalTransactionData struct definition.
556 pg_prepared_xact(PG_FUNCTION_ARGS)
558 FuncCallContext *funcctx;
559 Working_State *status;
561 if (SRF_IS_FIRSTCALL())
564 MemoryContext oldcontext;
566 /* create a function context for cross-call persistence */
567 funcctx = SRF_FIRSTCALL_INIT();
570 * Switch to memory context appropriate for multiple function calls
572 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
574 /* build tupdesc for result tuples */
575 /* this had better match pg_prepared_xacts view in system_views.sql */
576 tupdesc = CreateTemplateTupleDesc(5, false);
577 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
579 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
581 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
582 TIMESTAMPTZOID, -1, 0);
583 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
585 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
588 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
591 * Collect all the 2PC status information that we will format and send
592 * out as a result set.
594 status = (Working_State *) palloc(sizeof(Working_State));
595 funcctx->user_fctx = (void *) status;
597 status->ngxacts = GetPreparedTransactionList(&status->array);
600 MemoryContextSwitchTo(oldcontext);
603 funcctx = SRF_PERCALL_SETUP();
604 status = (Working_State *) funcctx->user_fctx;
606 while (status->array != NULL && status->currIdx < status->ngxacts)
608 GlobalTransaction gxact = &status->array[status->currIdx++];
618 * Form tuple with appropriate data.
620 MemSet(values, 0, sizeof(values));
621 MemSet(nulls, 0, sizeof(nulls));
623 values[0] = TransactionIdGetDatum(gxact->proc.xid);
624 values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
625 values[2] = TimestampTzGetDatum(gxact->prepared_at);
626 values[3] = ObjectIdGetDatum(gxact->owner);
627 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
629 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
630 result = HeapTupleGetDatum(tuple);
631 SRF_RETURN_NEXT(funcctx, result);
634 SRF_RETURN_DONE(funcctx);
638 * TwoPhaseGetDummyProc
639 * Get the PGPROC that represents a prepared transaction specified by XID
642 TwoPhaseGetDummyProc(TransactionId xid)
644 PGPROC *result = NULL;
647 static TransactionId cached_xid = InvalidTransactionId;
648 static PGPROC *cached_proc = NULL;
651 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
652 * repeatedly for the same XID. We can save work with a simple cache.
654 if (xid == cached_xid)
657 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
659 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
661 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
663 if (gxact->proc.xid == xid)
665 result = &gxact->proc;
670 LWLockRelease(TwoPhaseStateLock);
672 if (result == NULL) /* should not happen */
673 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
676 cached_proc = result;
681 /************************************************************************/
682 /* State file support */
683 /************************************************************************/
685 #define TwoPhaseFilePath(path, xid) \
686 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
689 * 2PC state file format:
691 * 1. TwoPhaseFileHeader
692 * 2. TransactionId[] (subtransactions)
693 * 3. RelFileNode[] (files to be deleted at commit)
694 * 4. RelFileNode[] (files to be deleted at abort)
695 * 5. TwoPhaseRecordOnDisk
697 * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
700 * Each segment except the final CRC32 is MAXALIGN'd.
704 * Header for a 2PC state file
706 #define TWOPHASE_MAGIC 0x57F94531 /* format identifier */
708 typedef struct TwoPhaseFileHeader
710 uint32 magic; /* format identifier */
711 uint32 total_len; /* actual file length */
712 TransactionId xid; /* original transaction XID */
713 Oid database; /* OID of database it was in */
714 TimestampTz prepared_at; /* time of preparation */
715 Oid owner; /* user running the transaction */
716 int32 nsubxacts; /* number of following subxact XIDs */
717 int32 ncommitrels; /* number of delete-on-commit rels */
718 int32 nabortrels; /* number of delete-on-abort rels */
719 char gid[GIDSIZE]; /* GID for transaction */
720 } TwoPhaseFileHeader;
723 * Header for each record in a state file
725 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
726 * The rmgr data will be stored starting on a MAXALIGN boundary.
728 typedef struct TwoPhaseRecordOnDisk
730 uint32 len; /* length of rmgr data */
731 TwoPhaseRmgrId rmid; /* resource manager for this record */
732 uint16 info; /* flag bits for use by rmgr */
733 } TwoPhaseRecordOnDisk;
736 * During prepare, the state file is assembled in memory before writing it
737 * to WAL and the actual state file. We use a chain of XLogRecData blocks
738 * so that we will be able to pass the state file contents directly to
743 XLogRecData *head; /* first data block in the chain */
744 XLogRecData *tail; /* last block in chain */
745 uint32 bytes_free; /* free bytes left in tail block */
746 uint32 total_len; /* total data bytes in chain */
751 * Append a block of data to records data structure.
753 * NB: each block is padded to a MAXALIGN multiple. This must be
754 * accounted for when the file is later read!
756 * The data is copied, so the caller is free to modify it afterwards.
759 save_state_data(const void *data, uint32 len)
761 uint32 padlen = MAXALIGN(len);
763 if (padlen > records.bytes_free)
765 records.tail->next = palloc0(sizeof(XLogRecData));
766 records.tail = records.tail->next;
767 records.tail->buffer = InvalidBuffer;
768 records.tail->len = 0;
769 records.tail->next = NULL;
771 records.bytes_free = Max(padlen, 512);
772 records.tail->data = palloc(records.bytes_free);
775 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
776 records.tail->len += padlen;
777 records.bytes_free -= padlen;
778 records.total_len += padlen;
782 * Start preparing a state file.
784 * Initializes data structure and inserts the 2PC file header record.
787 StartPrepare(GlobalTransaction gxact)
789 TransactionId xid = gxact->proc.xid;
790 TwoPhaseFileHeader hdr;
791 TransactionId *children;
792 RelFileNode *commitrels;
793 RelFileNode *abortrels;
795 /* Initialize linked list */
796 records.head = palloc0(sizeof(XLogRecData));
797 records.head->buffer = InvalidBuffer;
798 records.head->len = 0;
799 records.head->next = NULL;
801 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
802 records.head->data = palloc(records.bytes_free);
804 records.tail = records.head;
806 records.total_len = 0;
809 hdr.magic = TWOPHASE_MAGIC;
810 hdr.total_len = 0; /* EndPrepare will fill this in */
812 hdr.database = gxact->proc.databaseId;
813 hdr.prepared_at = gxact->prepared_at;
814 hdr.owner = gxact->owner;
815 hdr.nsubxacts = xactGetCommittedChildren(&children);
816 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
817 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
818 StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
820 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
822 /* Add the additional info about subxacts and deletable files */
823 if (hdr.nsubxacts > 0)
825 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
826 /* While we have the child-xact data, stuff it in the gxact too */
827 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
830 if (hdr.ncommitrels > 0)
832 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
835 if (hdr.nabortrels > 0)
837 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
843 * Finish preparing state file.
845 * Calculates CRC and writes state file to WAL and in pg_twophase directory.
848 EndPrepare(GlobalTransaction gxact)
850 TransactionId xid = gxact->proc.xid;
851 TwoPhaseFileHeader *hdr;
852 char path[MAXPGPATH];
854 pg_crc32 statefile_crc;
858 /* Add the end sentinel to the list of 2PC records */
859 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
862 /* Go back and fill in total_len in the file header record */
863 hdr = (TwoPhaseFileHeader *) records.head->data;
864 Assert(hdr->magic == TWOPHASE_MAGIC);
865 hdr->total_len = records.total_len + sizeof(pg_crc32);
868 * Create the 2PC state file.
870 * Note: because we use BasicOpenFile(), we are responsible for ensuring
871 * the FD gets closed in any error exit path. Once we get into the
872 * critical section, though, it doesn't matter since any failure causes
875 TwoPhaseFilePath(path, xid);
877 fd = BasicOpenFile(path,
878 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
882 (errcode_for_file_access(),
883 errmsg("could not create two-phase state file \"%s\": %m",
886 /* Write data to file, and calculate CRC as we pass over it */
887 INIT_CRC32(statefile_crc);
889 for (record = records.head; record != NULL; record = record->next)
891 COMP_CRC32(statefile_crc, record->data, record->len);
892 if ((write(fd, record->data, record->len)) != record->len)
896 (errcode_for_file_access(),
897 errmsg("could not write two-phase state file: %m")));
901 FIN_CRC32(statefile_crc);
904 * Write a deliberately bogus CRC to the state file; this is just paranoia
905 * to catch the case where four more bytes will run us out of disk space.
907 bogus_crc = ~statefile_crc;
909 if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
913 (errcode_for_file_access(),
914 errmsg("could not write two-phase state file: %m")));
917 /* Back up to prepare for rewriting the CRC */
918 if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
922 (errcode_for_file_access(),
923 errmsg("could not seek in two-phase state file: %m")));
927 * The state file isn't valid yet, because we haven't written the correct
928 * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
930 * Between the time we have written the WAL entry and the time we write
931 * out the correct state file CRC, we have an inconsistency: the xact is
932 * prepared according to WAL but not according to our on-disk state. We
933 * use a critical section to force a PANIC if we are unable to complete
934 * the write --- then, WAL replay should repair the inconsistency. The
935 * odds of a PANIC actually occurring should be very tiny given that we
936 * were able to write the bogus CRC above.
938 * We have to set inCommit here, too; otherwise a checkpoint
939 * starting immediately after the WAL record is inserted could complete
940 * without fsync'ing our state file. (This is essentially the same kind
941 * of race condition as the COMMIT-to-clog-write case that
942 * RecordTransactionCommit uses inCommit for; see notes there.)
944 * We save the PREPARE record's location in the gxact for later use by
945 * CheckPointTwoPhase.
947 START_CRIT_SECTION();
949 MyProc->inCommit = true;
951 gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
953 XLogFlush(gxact->prepare_lsn);
955 /* If we crash now, we have prepared: WAL replay will fix things */
957 /* write correct CRC and close file */
958 if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
962 (errcode_for_file_access(),
963 errmsg("could not write two-phase state file: %m")));
968 (errcode_for_file_access(),
969 errmsg("could not close two-phase state file: %m")));
972 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
973 * as not running our XID (which it will do immediately after this
974 * function returns), others can commit/rollback the xact.
976 * NB: a side effect of this is to make a dummy ProcArray entry for the
977 * prepared XID. This must happen before we clear the XID from MyProc,
978 * else there is a window where the XID is not running according to
979 * TransactionIdInProgress, and onlookers would be entitled to assume the
980 * xact crashed. Instead we have a window where the same XID appears
981 * twice in ProcArray, which is OK.
983 MarkAsPrepared(gxact);
986 * Now we can mark ourselves as out of the commit critical section:
987 * a checkpoint starting after this will certainly see the gxact as a
988 * candidate for fsyncing.
990 MyProc->inCommit = false;
994 records.tail = records.head = NULL;
998 * Register a 2PC record to be written to state file.
1001 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1002 const void *data, uint32 len)
1004 TwoPhaseRecordOnDisk record;
1009 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1011 save_state_data(data, len);
1016 * Read and validate the state file for xid.
1018 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1019 * contents of the file. Otherwise return NULL.
1022 ReadTwoPhaseFile(TransactionId xid)
1024 char path[MAXPGPATH];
1026 TwoPhaseFileHeader *hdr;
1033 TwoPhaseFilePath(path, xid);
1035 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1039 (errcode_for_file_access(),
1040 errmsg("could not open two-phase state file \"%s\": %m",
1046 * Check file length. We can determine a lower bound pretty easily. We
1047 * set an upper bound mainly to avoid palloc() failure on a corrupt file.
1049 if (fstat(fd, &stat))
1053 (errcode_for_file_access(),
1054 errmsg("could not stat two-phase state file \"%s\": %m",
1059 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1060 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1061 sizeof(pg_crc32)) ||
1062 stat.st_size > 10000000)
1068 crc_offset = stat.st_size - sizeof(pg_crc32);
1069 if (crc_offset != MAXALIGN(crc_offset))
1076 * OK, slurp in the file.
1078 buf = (char *) palloc(stat.st_size);
1080 if (read(fd, buf, stat.st_size) != stat.st_size)
1084 (errcode_for_file_access(),
1085 errmsg("could not read two-phase state file \"%s\": %m",
1093 hdr = (TwoPhaseFileHeader *) buf;
1094 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1100 INIT_CRC32(calc_crc);
1101 COMP_CRC32(calc_crc, buf, crc_offset);
1102 FIN_CRC32(calc_crc);
1104 file_crc = *((pg_crc32 *) (buf + crc_offset));
1106 if (!EQ_CRC32(calc_crc, file_crc))
1117 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1120 FinishPreparedTransaction(const char *gid, bool isCommit)
1122 GlobalTransaction gxact;
1126 TwoPhaseFileHeader *hdr;
1127 TransactionId *children;
1128 RelFileNode *commitrels;
1129 RelFileNode *abortrels;
1133 * Validate the GID, and lock the GXACT to ensure that two backends do not
1134 * try to commit the same GID at once.
1136 gxact = LockGXact(gid, GetUserId());
1137 xid = gxact->proc.xid;
1140 * Read and validate the state file
1142 buf = ReadTwoPhaseFile(xid);
1145 (errcode(ERRCODE_DATA_CORRUPTED),
1146 errmsg("two-phase state file for transaction %u is corrupt",
1150 * Disassemble the header area
1152 hdr = (TwoPhaseFileHeader *) buf;
1153 Assert(TransactionIdEquals(hdr->xid, xid));
1154 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1155 children = (TransactionId *) bufptr;
1156 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1157 commitrels = (RelFileNode *) bufptr;
1158 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1159 abortrels = (RelFileNode *) bufptr;
1160 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1163 * The order of operations here is critical: make the XLOG entry for
1164 * commit or abort, then mark the transaction committed or aborted in
1165 * pg_clog, then remove its PGPROC from the global ProcArray (which means
1166 * TransactionIdIsInProgress will stop saying the prepared xact is in
1167 * progress), then run the post-commit or post-abort callbacks. The
1168 * callbacks will release the locks the transaction held.
1171 RecordTransactionCommitPrepared(xid,
1172 hdr->nsubxacts, children,
1173 hdr->ncommitrels, commitrels);
1175 RecordTransactionAbortPrepared(xid,
1176 hdr->nsubxacts, children,
1177 hdr->nabortrels, abortrels);
1179 ProcArrayRemove(&gxact->proc);
1182 * In case we fail while running the callbacks, mark the gxact invalid so
1183 * no one else will try to commit/rollback, and so it can be recycled
1184 * properly later. It is still locked by our XID so it won't go away yet.
1186 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1188 gxact->valid = false;
1191 * We have to remove any files that were supposed to be dropped. For
1192 * consistency with the regular xact.c code paths, must do this before
1193 * releasing locks, so do it before running the callbacks.
1195 * NB: this code knows that we couldn't be dropping any temp rels ...
1199 for (i = 0; i < hdr->ncommitrels; i++)
1200 smgrdounlink(smgropen(commitrels[i]), false, false);
1204 for (i = 0; i < hdr->nabortrels; i++)
1205 smgrdounlink(smgropen(abortrels[i]), false, false);
1208 /* And now do the callbacks */
1210 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1212 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1214 pgstat_count_xact_commit();
1217 * And now we can clean up our mess.
1219 RemoveTwoPhaseFile(xid, true);
1227 * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1228 * and call the indicated callbacks for each 2PC record.
1231 ProcessRecords(char *bufptr, TransactionId xid,
1232 const TwoPhaseCallback callbacks[])
1236 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1238 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1239 if (record->rmid == TWOPHASE_RM_END_ID)
1242 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1244 if (callbacks[record->rmid] != NULL)
1245 callbacks[record->rmid] (xid, record->info,
1246 (void *) bufptr, record->len);
1248 bufptr += MAXALIGN(record->len);
1253 * Remove the 2PC file for the specified XID.
1255 * If giveWarning is false, do not complain about file-not-present;
1256 * this is an expected case during WAL replay.
1259 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1261 char path[MAXPGPATH];
1263 TwoPhaseFilePath(path, xid);
1265 if (errno != ENOENT || giveWarning)
1267 (errcode_for_file_access(),
1268 errmsg("could not remove two-phase state file \"%s\": %m",
1273 * Recreates a state file. This is used in WAL replay.
1275 * Note: content and len don't include CRC.
1278 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1280 char path[MAXPGPATH];
1281 pg_crc32 statefile_crc;
1285 INIT_CRC32(statefile_crc);
1286 COMP_CRC32(statefile_crc, content, len);
1287 FIN_CRC32(statefile_crc);
1289 TwoPhaseFilePath(path, xid);
1291 fd = BasicOpenFile(path,
1292 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1296 (errcode_for_file_access(),
1297 errmsg("could not recreate two-phase state file \"%s\": %m",
1300 /* Write content and CRC */
1301 if (write(fd, content, len) != len)
1305 (errcode_for_file_access(),
1306 errmsg("could not write two-phase state file: %m")));
1308 if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1312 (errcode_for_file_access(),
1313 errmsg("could not write two-phase state file: %m")));
1317 * We must fsync the file because the end-of-replay checkpoint will not do
1318 * so, there being no GXACT in shared memory yet to tell it to.
1320 if (pg_fsync(fd) != 0)
1324 (errcode_for_file_access(),
1325 errmsg("could not fsync two-phase state file: %m")));
1330 (errcode_for_file_access(),
1331 errmsg("could not close two-phase state file: %m")));
1335 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1337 * We must fsync the state file of any GXACT that is valid and has a PREPARE
1338 * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
1339 * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1341 * This is deliberately run as late as possible in the checkpoint sequence,
1342 * because GXACTs ordinarily have short lifespans, and so it is quite
1343 * possible that GXACTs that were valid at checkpoint start will no longer
1344 * exist if we wait a little bit.
1346 * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1347 * each time. This is considered unusual enough that we don't bother to
1348 * expend any extra code to avoid the redundant fsyncs. (They should be
1349 * reasonably cheap anyway, since they won't cause I/O.)
1352 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1354 TransactionId *xids;
1356 char path[MAXPGPATH];
1360 * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1361 * it just long enough to make a list of the XIDs that require fsyncing,
1362 * and then do the I/O afterwards.
1364 * This approach creates a race condition: someone else could delete a
1365 * GXACT between the time we release TwoPhaseStateLock and the time we try
1366 * to open its state file. We handle this by special-casing ENOENT
1367 * failures: if we see that, we verify that the GXACT is no longer valid,
1368 * and if so ignore the failure.
1370 if (max_prepared_xacts <= 0)
1371 return; /* nothing to do */
1372 xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1375 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1377 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1379 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1382 XLByteLE(gxact->prepare_lsn, redo_horizon))
1383 xids[nxids++] = gxact->proc.xid;
1386 LWLockRelease(TwoPhaseStateLock);
1388 for (i = 0; i < nxids; i++)
1390 TransactionId xid = xids[i];
1393 TwoPhaseFilePath(path, xid);
1395 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1398 if (errno == ENOENT)
1400 /* OK if gxact is no longer valid */
1401 if (!TransactionIdIsPrepared(xid))
1403 /* Restore errno in case it was changed */
1407 (errcode_for_file_access(),
1408 errmsg("could not open two-phase state file \"%s\": %m",
1412 if (pg_fsync(fd) != 0)
1416 (errcode_for_file_access(),
1417 errmsg("could not fsync two-phase state file \"%s\": %m",
1423 (errcode_for_file_access(),
1424 errmsg("could not close two-phase state file \"%s\": %m",
1432 * PrescanPreparedTransactions
1434 * Scan the pg_twophase directory and determine the range of valid XIDs
1435 * present. This is run during database startup, after we have completed
1436 * reading WAL. ShmemVariableCache->nextXid has been set to one more than
1437 * the highest XID for which evidence exists in WAL.
1439 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1440 * are present, it suggests that the DBA has done a PITR recovery to an
1441 * earlier point in time without cleaning out pg_twophase. We dare not
1442 * try to recover such prepared xacts since they likely depend on database
1443 * state that doesn't exist now.
1445 * However, we will advance nextXid beyond any subxact XIDs belonging to
1446 * valid prepared xacts. We need to do this since subxact commit doesn't
1447 * write a WAL entry, and so there might be no evidence in WAL of those
1450 * Our other responsibility is to determine and return the oldest valid XID
1451 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1452 * This is needed to synchronize pg_subtrans startup properly.
1455 PrescanPreparedTransactions(void)
1457 TransactionId origNextXid = ShmemVariableCache->nextXid;
1458 TransactionId result = origNextXid;
1460 struct dirent *clde;
1462 cldir = AllocateDir(TWOPHASE_DIR);
1463 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1465 if (strlen(clde->d_name) == 8 &&
1466 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1470 TwoPhaseFileHeader *hdr;
1471 TransactionId *subxids;
1474 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1476 /* Reject XID if too new */
1477 if (TransactionIdFollowsOrEquals(xid, origNextXid))
1480 (errmsg("removing future two-phase state file \"%s\"",
1482 RemoveTwoPhaseFile(xid, true);
1487 * Note: we can't check if already processed because clog
1488 * subsystem isn't up yet.
1491 /* Read and validate file */
1492 buf = ReadTwoPhaseFile(xid);
1496 (errmsg("removing corrupt two-phase state file \"%s\"",
1498 RemoveTwoPhaseFile(xid, true);
1502 /* Deconstruct header */
1503 hdr = (TwoPhaseFileHeader *) buf;
1504 if (!TransactionIdEquals(hdr->xid, xid))
1507 (errmsg("removing corrupt two-phase state file \"%s\"",
1509 RemoveTwoPhaseFile(xid, true);
1515 * OK, we think this file is valid. Incorporate xid into the
1516 * running-minimum result.
1518 if (TransactionIdPrecedes(xid, result))
1522 * Examine subtransaction XIDs ... they should all follow main
1523 * XID, and they may force us to advance nextXid.
1525 subxids = (TransactionId *)
1526 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1527 for (i = 0; i < hdr->nsubxacts; i++)
1529 TransactionId subxid = subxids[i];
1531 Assert(TransactionIdFollows(subxid, xid));
1532 if (TransactionIdFollowsOrEquals(subxid,
1533 ShmemVariableCache->nextXid))
1535 ShmemVariableCache->nextXid = subxid;
1536 TransactionIdAdvance(ShmemVariableCache->nextXid);
1549 * RecoverPreparedTransactions
1551 * Scan the pg_twophase directory and reload shared-memory state for each
1552 * prepared transaction (reacquire locks, etc). This is run during database
1556 RecoverPreparedTransactions(void)
1558 char dir[MAXPGPATH];
1560 struct dirent *clde;
1562 snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1564 cldir = AllocateDir(dir);
1565 while ((clde = ReadDir(cldir, dir)) != NULL)
1567 if (strlen(clde->d_name) == 8 &&
1568 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1573 TwoPhaseFileHeader *hdr;
1574 TransactionId *subxids;
1575 GlobalTransaction gxact;
1578 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1580 /* Already processed? */
1581 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1584 (errmsg("removing stale two-phase state file \"%s\"",
1586 RemoveTwoPhaseFile(xid, true);
1590 /* Read and validate file */
1591 buf = ReadTwoPhaseFile(xid);
1595 (errmsg("removing corrupt two-phase state file \"%s\"",
1597 RemoveTwoPhaseFile(xid, true);
1602 (errmsg("recovering prepared transaction %u", xid)));
1604 /* Deconstruct header */
1605 hdr = (TwoPhaseFileHeader *) buf;
1606 Assert(TransactionIdEquals(hdr->xid, xid));
1607 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1608 subxids = (TransactionId *) bufptr;
1609 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1610 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1611 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1614 * Reconstruct subtrans state for the transaction --- needed
1615 * because pg_subtrans is not preserved over a restart. Note that
1616 * we are linking all the subtransactions directly to the
1617 * top-level XID; there may originally have been a more complex
1618 * hierarchy, but there's no need to restore that exactly.
1620 for (i = 0; i < hdr->nsubxacts; i++)
1621 SubTransSetParent(subxids[i], xid);
1624 * Recreate its GXACT and dummy PGPROC
1626 * Note: since we don't have the PREPARE record's WAL location at
1627 * hand, we leave prepare_lsn zeroes. This means the GXACT will
1628 * be fsync'd on every future checkpoint. We assume this
1629 * situation is infrequent enough that the performance cost is
1630 * negligible (especially since we know the state file has already
1633 gxact = MarkAsPreparing(xid, hdr->gid,
1635 hdr->owner, hdr->database);
1636 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1637 MarkAsPrepared(gxact);
1640 * Recover other state (notably locks) using resource managers
1642 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1651 * RecordTransactionCommitPrepared
1653 * This is basically the same as RecordTransactionCommit: in particular,
1654 * we must set the inCommit flag to avoid a race condition.
1656 * We know the transaction made at least one XLOG entry (its PREPARE),
1657 * so it is never possible to optimize out the commit record.
1660 RecordTransactionCommitPrepared(TransactionId xid,
1662 TransactionId *children,
1666 XLogRecData rdata[3];
1668 xl_xact_commit_prepared xlrec;
1671 START_CRIT_SECTION();
1673 /* See notes in RecordTransactionCommit */
1674 MyProc->inCommit = true;
1676 /* Emit the XLOG commit record */
1678 xlrec.crec.xtime = time(NULL);
1679 xlrec.crec.nrels = nrels;
1680 xlrec.crec.nsubxacts = nchildren;
1681 rdata[0].data = (char *) (&xlrec);
1682 rdata[0].len = MinSizeOfXactCommitPrepared;
1683 rdata[0].buffer = InvalidBuffer;
1684 /* dump rels to delete */
1687 rdata[0].next = &(rdata[1]);
1688 rdata[1].data = (char *) rels;
1689 rdata[1].len = nrels * sizeof(RelFileNode);
1690 rdata[1].buffer = InvalidBuffer;
1693 /* dump committed child Xids */
1696 rdata[lastrdata].next = &(rdata[2]);
1697 rdata[2].data = (char *) children;
1698 rdata[2].len = nchildren * sizeof(TransactionId);
1699 rdata[2].buffer = InvalidBuffer;
1702 rdata[lastrdata].next = NULL;
1704 recptr = XLogInsert(RM_XACT_ID,
1705 XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
1708 /* we don't currently try to sleep before flush here ... */
1710 /* Flush XLOG to disk */
1713 /* Mark the transaction committed in pg_clog */
1714 TransactionIdCommit(xid);
1715 /* to avoid race conditions, the parent must commit first */
1716 TransactionIdCommitTree(nchildren, children);
1718 /* Checkpoint can proceed now */
1719 MyProc->inCommit = false;
1725 * RecordTransactionAbortPrepared
1727 * This is basically the same as RecordTransactionAbort.
1729 * We know the transaction made at least one XLOG entry (its PREPARE),
1730 * so it is never possible to optimize out the abort record.
1733 RecordTransactionAbortPrepared(TransactionId xid,
1735 TransactionId *children,
1739 XLogRecData rdata[3];
1741 xl_xact_abort_prepared xlrec;
1745 * Catch the scenario where we aborted partway through
1746 * RecordTransactionCommitPrepared ...
1748 if (TransactionIdDidCommit(xid))
1749 elog(PANIC, "cannot abort transaction %u, it was already committed",
1752 START_CRIT_SECTION();
1754 /* Emit the XLOG abort record */
1756 xlrec.arec.xtime = time(NULL);
1757 xlrec.arec.nrels = nrels;
1758 xlrec.arec.nsubxacts = nchildren;
1759 rdata[0].data = (char *) (&xlrec);
1760 rdata[0].len = MinSizeOfXactAbortPrepared;
1761 rdata[0].buffer = InvalidBuffer;
1762 /* dump rels to delete */
1765 rdata[0].next = &(rdata[1]);
1766 rdata[1].data = (char *) rels;
1767 rdata[1].len = nrels * sizeof(RelFileNode);
1768 rdata[1].buffer = InvalidBuffer;
1771 /* dump committed child Xids */
1774 rdata[lastrdata].next = &(rdata[2]);
1775 rdata[2].data = (char *) children;
1776 rdata[2].len = nchildren * sizeof(TransactionId);
1777 rdata[2].buffer = InvalidBuffer;
1780 rdata[lastrdata].next = NULL;
1782 recptr = XLogInsert(RM_XACT_ID,
1783 XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
1786 /* Always flush, since we're about to remove the 2PC state file */
1790 * Mark the transaction aborted in clog. This is not absolutely necessary
1791 * but we may as well do it while we are here.
1793 TransactionIdAbort(xid);
1794 TransactionIdAbortTree(nchildren, children);