1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.27 2007/01/16 13:28:56 alvherre Exp $
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has a dummy PGPROC that is entered
25 * into the ProcArray array; this is what keeps the XID considered
26 * running by TransactionIdIsInProgress. It is also convenient as a
27 * PGPROC to hook the gxact's locks to.
29 * In order to survive crashes and shutdowns, all prepared
30 * transactions must be stored in permanent storage. This includes
31 * locking information, pending notifications etc. All that state
32 * information is written to the per-transaction state file in
33 * the pg_twophase directory.
35 *-------------------------------------------------------------------------
41 #include <sys/types.h>
45 #include "access/heapam.h"
46 #include "access/subtrans.h"
47 #include "access/transam.h"
48 #include "access/twophase.h"
49 #include "access/twophase_rmgr.h"
50 #include "access/xact.h"
51 #include "catalog/pg_type.h"
53 #include "miscadmin.h"
55 #include "storage/fd.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/builtins.h"
62 * Directory where Two-phase commit files reside within PGDATA
64 #define TWOPHASE_DIR "pg_twophase"
66 /* GUC variable, can't be changed after startup */
67 int max_prepared_xacts = 5;
70 * This struct describes one global transaction that is in prepared state
71 * or attempting to become prepared.
73 * The first component of the struct is a dummy PGPROC that is inserted
74 * into the global ProcArray so that the transaction appears to still be
75 * running and holding locks. It must be first because we cast pointers
76 * to PGPROC and pointers to GlobalTransactionData back and forth.
78 * The lifecycle of a global transaction is:
80 * 1. After checking that the requested GID is not in use, set up an
81 * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
82 * with locking_xid = my own XID and valid = false.
84 * 2. After successfully completing prepare, set valid = true and enter the
85 * contained PGPROC into the global ProcArray.
87 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
88 * is valid and its locking_xid is no longer active, then store my current
89 * XID into locking_xid. This prevents concurrent attempts to commit or
90 * rollback the same prepared xact.
92 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
93 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
96 * Note that if the preparing transaction fails between steps 1 and 2, the
97 * entry will remain in prepXacts until recycled. We can detect recyclable
98 * entries by checking for valid = false and locking_xid no longer active.
100 * typedef struct GlobalTransactionData *GlobalTransaction appears in
105 typedef struct GlobalTransactionData
107 PGPROC proc; /* dummy proc */
108 TimestampTz prepared_at; /* time of preparation */
109 XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
110 Oid owner; /* ID of user that executed the xact */
111 TransactionId locking_xid; /* top-level XID of backend working on xact */
112 bool valid; /* TRUE if fully prepared */
113 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
114 } GlobalTransactionData;
117 * Two Phase Commit shared state. Access to this struct is protected
118 * by TwoPhaseStateLock.
120 typedef struct TwoPhaseStateData
122 /* Head of linked list of free GlobalTransactionData structs */
123 SHMEM_OFFSET freeGXacts;
125 /* Number of valid prepXacts entries. */
129 * There are max_prepared_xacts items in this array, but C wants a
132 GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */
133 } TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
135 static TwoPhaseStateData *TwoPhaseState;
138 static void RecordTransactionCommitPrepared(TransactionId xid,
140 TransactionId *children,
143 static void RecordTransactionAbortPrepared(TransactionId xid,
145 TransactionId *children,
148 static void ProcessRecords(char *bufptr, TransactionId xid,
149 const TwoPhaseCallback callbacks[]);
153 * Initialization of shared memory
156 TwoPhaseShmemSize(void)
160 /* Need the fixed struct, the array of pointers, and the GTD structs */
161 size = offsetof(TwoPhaseStateData, prepXacts);
162 size = add_size(size, mul_size(max_prepared_xacts,
163 sizeof(GlobalTransaction)));
164 size = MAXALIGN(size);
165 size = add_size(size, mul_size(max_prepared_xacts,
166 sizeof(GlobalTransactionData)));
172 TwoPhaseShmemInit(void)
176 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
179 if (!IsUnderPostmaster)
181 GlobalTransaction gxacts;
185 TwoPhaseState->freeGXacts = INVALID_OFFSET;
186 TwoPhaseState->numPrepXacts = 0;
189 * Initialize the linked list of free GlobalTransactionData structs
191 gxacts = (GlobalTransaction)
192 ((char *) TwoPhaseState +
193 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
194 sizeof(GlobalTransaction) * max_prepared_xacts));
195 for (i = 0; i < max_prepared_xacts; i++)
197 gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
198 TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
208 * Reserve the GID for the given transaction.
210 * Internally, this creates a gxact struct and puts it into the active array.
211 * NOTE: this is also used when reloading a gxact after a crash; so avoid
212 * assuming that we can use very much backend context.
215 MarkAsPreparing(TransactionId xid, const char *gid,
216 TimestampTz prepared_at, Oid owner, Oid databaseid)
218 GlobalTransaction gxact;
221 if (strlen(gid) >= GIDSIZE)
223 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224 errmsg("transaction identifier \"%s\" is too long",
227 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
230 * First, find and recycle any gxacts that failed during prepare. We do
231 * this partly to ensure we don't mistakenly say their GIDs are still
232 * reserved, and partly so we don't fail on out-of-slots unnecessarily.
234 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
236 gxact = TwoPhaseState->prepXacts[i];
237 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
239 /* It's dead Jim ... remove from the active array */
240 TwoPhaseState->numPrepXacts--;
241 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
242 /* and put it back in the freelist */
243 gxact->proc.links.next = TwoPhaseState->freeGXacts;
244 TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
245 /* Back up index count too, so we don't miss scanning one */
250 /* Check for conflicting GID */
251 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
253 gxact = TwoPhaseState->prepXacts[i];
254 if (strcmp(gxact->gid, gid) == 0)
257 (errcode(ERRCODE_DUPLICATE_OBJECT),
258 errmsg("transaction identifier \"%s\" is already in use",
263 /* Get a free gxact from the freelist */
264 if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
266 (errcode(ERRCODE_OUT_OF_MEMORY),
267 errmsg("maximum number of prepared transactions reached"),
268 errhint("Increase max_prepared_transactions (currently %d).",
269 max_prepared_xacts)));
270 gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
271 TwoPhaseState->freeGXacts = gxact->proc.links.next;
274 MemSet(&gxact->proc, 0, sizeof(PGPROC));
275 SHMQueueElemInit(&(gxact->proc.links));
276 gxact->proc.waitStatus = STATUS_OK;
277 gxact->proc.xid = xid;
278 gxact->proc.xmin = InvalidTransactionId;
280 gxact->proc.databaseId = databaseid;
281 gxact->proc.roleId = owner;
282 gxact->proc.inVacuum = false;
283 gxact->proc.isAutovacuum = false;
284 gxact->proc.lwWaiting = false;
285 gxact->proc.lwExclusive = false;
286 gxact->proc.lwWaitLink = NULL;
287 gxact->proc.waitLock = NULL;
288 gxact->proc.waitProcLock = NULL;
289 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
290 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
291 /* subxid data must be filled later by GXactLoadSubxactData */
292 gxact->proc.subxids.overflowed = false;
293 gxact->proc.subxids.nxids = 0;
295 gxact->prepared_at = prepared_at;
296 /* initialize LSN to 0 (start of WAL) */
297 gxact->prepare_lsn.xlogid = 0;
298 gxact->prepare_lsn.xrecoff = 0;
299 gxact->owner = owner;
300 gxact->locking_xid = xid;
301 gxact->valid = false;
302 strcpy(gxact->gid, gid);
304 /* And insert it into the active array */
305 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
306 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
308 LWLockRelease(TwoPhaseStateLock);
314 * GXactLoadSubxactData
316 * If the transaction being persisted had any subtransactions, this must
317 * be called before MarkAsPrepared() to load information into the dummy
321 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
322 TransactionId *children)
324 /* We need no extra lock since the GXACT isn't valid yet */
325 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
327 gxact->proc.subxids.overflowed = true;
328 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
332 memcpy(gxact->proc.subxids.xids, children,
333 nsubxacts * sizeof(TransactionId));
334 gxact->proc.subxids.nxids = nsubxacts;
340 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
343 MarkAsPrepared(GlobalTransaction gxact)
345 /* Lock here may be overkill, but I'm not convinced of that ... */
346 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
347 Assert(!gxact->valid);
349 LWLockRelease(TwoPhaseStateLock);
352 * Put it into the global ProcArray so TransactionIdInProgress considers
353 * the XID as still running.
355 ProcArrayAdd(&gxact->proc);
360 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
362 static GlobalTransaction
363 LockGXact(const char *gid, Oid user)
367 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
369 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
371 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
373 /* Ignore not-yet-valid GIDs */
376 if (strcmp(gxact->gid, gid) != 0)
379 /* Found it, but has someone else got it locked? */
380 if (TransactionIdIsValid(gxact->locking_xid))
382 if (TransactionIdIsActive(gxact->locking_xid))
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg("prepared transaction with identifier \"%s\" is busy",
387 gxact->locking_xid = InvalidTransactionId;
390 if (user != gxact->owner && !superuser_arg(user))
392 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
393 errmsg("permission denied to finish prepared transaction"),
394 errhint("Must be superuser or the user that prepared the transaction.")));
396 /* OK for me to lock it */
397 gxact->locking_xid = GetTopTransactionId();
399 LWLockRelease(TwoPhaseStateLock);
404 LWLockRelease(TwoPhaseStateLock);
407 (errcode(ERRCODE_UNDEFINED_OBJECT),
408 errmsg("prepared transaction with identifier \"%s\" does not exist",
417 * Remove the prepared transaction from the shared memory array.
419 * NB: caller should have already removed it from ProcArray
422 RemoveGXact(GlobalTransaction gxact)
426 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
428 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
430 if (gxact == TwoPhaseState->prepXacts[i])
432 /* remove from the active array */
433 TwoPhaseState->numPrepXacts--;
434 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
436 /* and put it back in the freelist */
437 gxact->proc.links.next = TwoPhaseState->freeGXacts;
438 TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
440 LWLockRelease(TwoPhaseStateLock);
446 LWLockRelease(TwoPhaseStateLock);
448 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
452 * TransactionIdIsPrepared
453 * True iff transaction associated with the identifier is prepared
454 * for two-phase commit
456 * Note: only gxacts marked "valid" are considered; but notice we do not
457 * check the locking status.
459 * This is not currently exported, because it is only needed internally.
462 TransactionIdIsPrepared(TransactionId xid)
467 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
469 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
471 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
473 if (gxact->valid && gxact->proc.xid == xid)
480 LWLockRelease(TwoPhaseStateLock);
486 * Returns an array of all prepared transactions for the user-level
487 * function pg_prepared_xact.
489 * The returned array and all its elements are copies of internal data
490 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
492 * WARNING -- we return even those transactions that are not fully prepared
493 * yet. The caller should filter them out if he doesn't want them.
495 * The returned array is palloc'd.
498 GetPreparedTransactionList(GlobalTransaction *gxacts)
500 GlobalTransaction array;
504 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
506 if (TwoPhaseState->numPrepXacts == 0)
508 LWLockRelease(TwoPhaseStateLock);
514 num = TwoPhaseState->numPrepXacts;
515 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
517 for (i = 0; i < num; i++)
518 memcpy(array + i, TwoPhaseState->prepXacts[i],
519 sizeof(GlobalTransactionData));
521 LWLockRelease(TwoPhaseStateLock);
527 /* Working status for pg_prepared_xact */
530 GlobalTransaction array;
537 * Produce a view with one row per prepared transaction.
539 * This function is here so we don't have to export the
540 * GlobalTransactionData struct definition.
543 pg_prepared_xact(PG_FUNCTION_ARGS)
545 FuncCallContext *funcctx;
546 Working_State *status;
548 if (SRF_IS_FIRSTCALL())
551 MemoryContext oldcontext;
553 /* create a function context for cross-call persistence */
554 funcctx = SRF_FIRSTCALL_INIT();
557 * Switch to memory context appropriate for multiple function calls
559 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
561 /* build tupdesc for result tuples */
562 /* this had better match pg_prepared_xacts view in system_views.sql */
563 tupdesc = CreateTemplateTupleDesc(5, false);
564 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
566 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
568 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
569 TIMESTAMPTZOID, -1, 0);
570 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
572 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
575 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
578 * Collect all the 2PC status information that we will format and send
579 * out as a result set.
581 status = (Working_State *) palloc(sizeof(Working_State));
582 funcctx->user_fctx = (void *) status;
584 status->ngxacts = GetPreparedTransactionList(&status->array);
587 MemoryContextSwitchTo(oldcontext);
590 funcctx = SRF_PERCALL_SETUP();
591 status = (Working_State *) funcctx->user_fctx;
593 while (status->array != NULL && status->currIdx < status->ngxacts)
595 GlobalTransaction gxact = &status->array[status->currIdx++];
605 * Form tuple with appropriate data.
607 MemSet(values, 0, sizeof(values));
608 MemSet(nulls, 0, sizeof(nulls));
610 values[0] = TransactionIdGetDatum(gxact->proc.xid);
611 values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
612 values[2] = TimestampTzGetDatum(gxact->prepared_at);
613 values[3] = ObjectIdGetDatum(gxact->owner);
614 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
616 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
617 result = HeapTupleGetDatum(tuple);
618 SRF_RETURN_NEXT(funcctx, result);
621 SRF_RETURN_DONE(funcctx);
625 * TwoPhaseGetDummyProc
626 * Get the PGPROC that represents a prepared transaction specified by XID
629 TwoPhaseGetDummyProc(TransactionId xid)
631 PGPROC *result = NULL;
634 static TransactionId cached_xid = InvalidTransactionId;
635 static PGPROC *cached_proc = NULL;
638 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
639 * repeatedly for the same XID. We can save work with a simple cache.
641 if (xid == cached_xid)
644 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
646 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
648 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
650 if (gxact->proc.xid == xid)
652 result = &gxact->proc;
657 LWLockRelease(TwoPhaseStateLock);
659 if (result == NULL) /* should not happen */
660 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
663 cached_proc = result;
668 /************************************************************************/
669 /* State file support */
670 /************************************************************************/
672 #define TwoPhaseFilePath(path, xid) \
673 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
676 * 2PC state file format:
678 * 1. TwoPhaseFileHeader
679 * 2. TransactionId[] (subtransactions)
680 * 3. RelFileNode[] (files to be deleted at commit)
681 * 4. RelFileNode[] (files to be deleted at abort)
682 * 5. TwoPhaseRecordOnDisk
684 * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
687 * Each segment except the final CRC32 is MAXALIGN'd.
691 * Header for a 2PC state file
693 #define TWOPHASE_MAGIC 0x57F94531 /* format identifier */
695 typedef struct TwoPhaseFileHeader
697 uint32 magic; /* format identifier */
698 uint32 total_len; /* actual file length */
699 TransactionId xid; /* original transaction XID */
700 Oid database; /* OID of database it was in */
701 TimestampTz prepared_at; /* time of preparation */
702 Oid owner; /* user running the transaction */
703 int32 nsubxacts; /* number of following subxact XIDs */
704 int32 ncommitrels; /* number of delete-on-commit rels */
705 int32 nabortrels; /* number of delete-on-abort rels */
706 char gid[GIDSIZE]; /* GID for transaction */
707 } TwoPhaseFileHeader;
710 * Header for each record in a state file
712 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
713 * The rmgr data will be stored starting on a MAXALIGN boundary.
715 typedef struct TwoPhaseRecordOnDisk
717 uint32 len; /* length of rmgr data */
718 TwoPhaseRmgrId rmid; /* resource manager for this record */
719 uint16 info; /* flag bits for use by rmgr */
720 } TwoPhaseRecordOnDisk;
723 * During prepare, the state file is assembled in memory before writing it
724 * to WAL and the actual state file. We use a chain of XLogRecData blocks
725 * so that we will be able to pass the state file contents directly to
730 XLogRecData *head; /* first data block in the chain */
731 XLogRecData *tail; /* last block in chain */
732 uint32 bytes_free; /* free bytes left in tail block */
733 uint32 total_len; /* total data bytes in chain */
738 * Append a block of data to records data structure.
740 * NB: each block is padded to a MAXALIGN multiple. This must be
741 * accounted for when the file is later read!
743 * The data is copied, so the caller is free to modify it afterwards.
746 save_state_data(const void *data, uint32 len)
748 uint32 padlen = MAXALIGN(len);
750 if (padlen > records.bytes_free)
752 records.tail->next = palloc0(sizeof(XLogRecData));
753 records.tail = records.tail->next;
754 records.tail->buffer = InvalidBuffer;
755 records.tail->len = 0;
756 records.tail->next = NULL;
758 records.bytes_free = Max(padlen, 512);
759 records.tail->data = palloc(records.bytes_free);
762 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
763 records.tail->len += padlen;
764 records.bytes_free -= padlen;
765 records.total_len += padlen;
769 * Start preparing a state file.
771 * Initializes data structure and inserts the 2PC file header record.
774 StartPrepare(GlobalTransaction gxact)
776 TransactionId xid = gxact->proc.xid;
777 TwoPhaseFileHeader hdr;
778 TransactionId *children;
779 RelFileNode *commitrels;
780 RelFileNode *abortrels;
782 /* Initialize linked list */
783 records.head = palloc0(sizeof(XLogRecData));
784 records.head->buffer = InvalidBuffer;
785 records.head->len = 0;
786 records.head->next = NULL;
788 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
789 records.head->data = palloc(records.bytes_free);
791 records.tail = records.head;
793 records.total_len = 0;
796 hdr.magic = TWOPHASE_MAGIC;
797 hdr.total_len = 0; /* EndPrepare will fill this in */
799 hdr.database = gxact->proc.databaseId;
800 hdr.prepared_at = gxact->prepared_at;
801 hdr.owner = gxact->owner;
802 hdr.nsubxacts = xactGetCommittedChildren(&children);
803 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
804 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
805 StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
807 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
809 /* Add the additional info about subxacts and deletable files */
810 if (hdr.nsubxacts > 0)
812 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
813 /* While we have the child-xact data, stuff it in the gxact too */
814 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
817 if (hdr.ncommitrels > 0)
819 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
822 if (hdr.nabortrels > 0)
824 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
830 * Finish preparing state file.
832 * Calculates CRC and writes state file to WAL and in pg_twophase directory.
835 EndPrepare(GlobalTransaction gxact)
837 TransactionId xid = gxact->proc.xid;
838 TwoPhaseFileHeader *hdr;
839 char path[MAXPGPATH];
841 pg_crc32 statefile_crc;
845 /* Add the end sentinel to the list of 2PC records */
846 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
849 /* Go back and fill in total_len in the file header record */
850 hdr = (TwoPhaseFileHeader *) records.head->data;
851 Assert(hdr->magic == TWOPHASE_MAGIC);
852 hdr->total_len = records.total_len + sizeof(pg_crc32);
855 * Create the 2PC state file.
857 * Note: because we use BasicOpenFile(), we are responsible for ensuring
858 * the FD gets closed in any error exit path. Once we get into the
859 * critical section, though, it doesn't matter since any failure causes
862 TwoPhaseFilePath(path, xid);
864 fd = BasicOpenFile(path,
865 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
869 (errcode_for_file_access(),
870 errmsg("could not create two-phase state file \"%s\": %m",
873 /* Write data to file, and calculate CRC as we pass over it */
874 INIT_CRC32(statefile_crc);
876 for (record = records.head; record != NULL; record = record->next)
878 COMP_CRC32(statefile_crc, record->data, record->len);
879 if ((write(fd, record->data, record->len)) != record->len)
883 (errcode_for_file_access(),
884 errmsg("could not write two-phase state file: %m")));
888 FIN_CRC32(statefile_crc);
891 * Write a deliberately bogus CRC to the state file; this is just paranoia
892 * to catch the case where four more bytes will run us out of disk space.
894 bogus_crc = ~statefile_crc;
896 if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
900 (errcode_for_file_access(),
901 errmsg("could not write two-phase state file: %m")));
904 /* Back up to prepare for rewriting the CRC */
905 if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
909 (errcode_for_file_access(),
910 errmsg("could not seek in two-phase state file: %m")));
914 * The state file isn't valid yet, because we haven't written the correct
915 * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
917 * Between the time we have written the WAL entry and the time we write
918 * out the correct state file CRC, we have an inconsistency: the xact is
919 * prepared according to WAL but not according to our on-disk state. We
920 * use a critical section to force a PANIC if we are unable to complete
921 * the write --- then, WAL replay should repair the inconsistency. The
922 * odds of a PANIC actually occurring should be very tiny given that we
923 * were able to write the bogus CRC above.
925 * We have to lock out checkpoint start here, too; otherwise a checkpoint
926 * starting immediately after the WAL record is inserted could complete
927 * without fsync'ing our state file. (This is essentially the same kind
928 * of race condition as the COMMIT-to-clog-write case that
929 * RecordTransactionCommit uses CheckpointStartLock for; see notes there.)
931 * We save the PREPARE record's location in the gxact for later use by
932 * CheckPointTwoPhase.
934 START_CRIT_SECTION();
936 LWLockAcquire(CheckpointStartLock, LW_SHARED);
938 gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
940 XLogFlush(gxact->prepare_lsn);
942 /* If we crash now, we have prepared: WAL replay will fix things */
944 /* write correct CRC and close file */
945 if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
949 (errcode_for_file_access(),
950 errmsg("could not write two-phase state file: %m")));
955 (errcode_for_file_access(),
956 errmsg("could not close two-phase state file: %m")));
959 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
960 * as not running our XID (which it will do immediately after this
961 * function returns), others can commit/rollback the xact.
963 * NB: a side effect of this is to make a dummy ProcArray entry for the
964 * prepared XID. This must happen before we clear the XID from MyProc,
965 * else there is a window where the XID is not running according to
966 * TransactionIdInProgress, and onlookers would be entitled to assume the
967 * xact crashed. Instead we have a window where the same XID appears
968 * twice in ProcArray, which is OK.
970 MarkAsPrepared(gxact);
973 * Now we can release the checkpoint start lock: a checkpoint starting
974 * after this will certainly see the gxact as a candidate for fsyncing.
976 LWLockRelease(CheckpointStartLock);
980 records.tail = records.head = NULL;
984 * Register a 2PC record to be written to state file.
987 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
988 const void *data, uint32 len)
990 TwoPhaseRecordOnDisk record;
995 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
997 save_state_data(data, len);
1002 * Read and validate the state file for xid.
1004 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1005 * contents of the file. Otherwise return NULL.
1008 ReadTwoPhaseFile(TransactionId xid)
1010 char path[MAXPGPATH];
1012 TwoPhaseFileHeader *hdr;
1019 TwoPhaseFilePath(path, xid);
1021 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1025 (errcode_for_file_access(),
1026 errmsg("could not open two-phase state file \"%s\": %m",
1032 * Check file length. We can determine a lower bound pretty easily. We
1033 * set an upper bound mainly to avoid palloc() failure on a corrupt file.
1035 if (fstat(fd, &stat))
1039 (errcode_for_file_access(),
1040 errmsg("could not stat two-phase state file \"%s\": %m",
1045 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1046 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1047 sizeof(pg_crc32)) ||
1048 stat.st_size > 10000000)
1054 crc_offset = stat.st_size - sizeof(pg_crc32);
1055 if (crc_offset != MAXALIGN(crc_offset))
1062 * OK, slurp in the file.
1064 buf = (char *) palloc(stat.st_size);
1066 if (read(fd, buf, stat.st_size) != stat.st_size)
1070 (errcode_for_file_access(),
1071 errmsg("could not read two-phase state file \"%s\": %m",
1079 hdr = (TwoPhaseFileHeader *) buf;
1080 if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1086 INIT_CRC32(calc_crc);
1087 COMP_CRC32(calc_crc, buf, crc_offset);
1088 FIN_CRC32(calc_crc);
1090 file_crc = *((pg_crc32 *) (buf + crc_offset));
1092 if (!EQ_CRC32(calc_crc, file_crc))
1103 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1106 FinishPreparedTransaction(const char *gid, bool isCommit)
1108 GlobalTransaction gxact;
1112 TwoPhaseFileHeader *hdr;
1113 TransactionId *children;
1114 RelFileNode *commitrels;
1115 RelFileNode *abortrels;
1119 * Validate the GID, and lock the GXACT to ensure that two backends do not
1120 * try to commit the same GID at once.
1122 gxact = LockGXact(gid, GetUserId());
1123 xid = gxact->proc.xid;
1126 * Read and validate the state file
1128 buf = ReadTwoPhaseFile(xid);
1131 (errcode(ERRCODE_DATA_CORRUPTED),
1132 errmsg("two-phase state file for transaction %u is corrupt",
1136 * Disassemble the header area
1138 hdr = (TwoPhaseFileHeader *) buf;
1139 Assert(TransactionIdEquals(hdr->xid, xid));
1140 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1141 children = (TransactionId *) bufptr;
1142 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1143 commitrels = (RelFileNode *) bufptr;
1144 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1145 abortrels = (RelFileNode *) bufptr;
1146 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1149 * The order of operations here is critical: make the XLOG entry for
1150 * commit or abort, then mark the transaction committed or aborted in
1151 * pg_clog, then remove its PGPROC from the global ProcArray (which means
1152 * TransactionIdIsInProgress will stop saying the prepared xact is in
1153 * progress), then run the post-commit or post-abort callbacks. The
1154 * callbacks will release the locks the transaction held.
1157 RecordTransactionCommitPrepared(xid,
1158 hdr->nsubxacts, children,
1159 hdr->ncommitrels, commitrels);
1161 RecordTransactionAbortPrepared(xid,
1162 hdr->nsubxacts, children,
1163 hdr->nabortrels, abortrels);
1165 ProcArrayRemove(&gxact->proc);
1168 * In case we fail while running the callbacks, mark the gxact invalid so
1169 * no one else will try to commit/rollback, and so it can be recycled
1170 * properly later. It is still locked by our XID so it won't go away yet.
1172 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1174 gxact->valid = false;
1177 * We have to remove any files that were supposed to be dropped. For
1178 * consistency with the regular xact.c code paths, must do this before
1179 * releasing locks, so do it before running the callbacks.
1181 * NB: this code knows that we couldn't be dropping any temp rels ...
1185 for (i = 0; i < hdr->ncommitrels; i++)
1186 smgrdounlink(smgropen(commitrels[i]), false, false);
1190 for (i = 0; i < hdr->nabortrels; i++)
1191 smgrdounlink(smgropen(abortrels[i]), false, false);
1194 /* And now do the callbacks */
1196 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1198 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1200 pgstat_count_xact_commit();
1203 * And now we can clean up our mess.
1205 RemoveTwoPhaseFile(xid, true);
1213 * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1214 * and call the indicated callbacks for each 2PC record.
1217 ProcessRecords(char *bufptr, TransactionId xid,
1218 const TwoPhaseCallback callbacks[])
1222 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1224 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1225 if (record->rmid == TWOPHASE_RM_END_ID)
1228 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1230 if (callbacks[record->rmid] != NULL)
1231 callbacks[record->rmid] (xid, record->info,
1232 (void *) bufptr, record->len);
1234 bufptr += MAXALIGN(record->len);
1239 * Remove the 2PC file for the specified XID.
1241 * If giveWarning is false, do not complain about file-not-present;
1242 * this is an expected case during WAL replay.
1245 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1247 char path[MAXPGPATH];
1249 TwoPhaseFilePath(path, xid);
1251 if (errno != ENOENT || giveWarning)
1253 (errcode_for_file_access(),
1254 errmsg("could not remove two-phase state file \"%s\": %m",
1259 * Recreates a state file. This is used in WAL replay.
1261 * Note: content and len don't include CRC.
1264 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1266 char path[MAXPGPATH];
1267 pg_crc32 statefile_crc;
1271 INIT_CRC32(statefile_crc);
1272 COMP_CRC32(statefile_crc, content, len);
1273 FIN_CRC32(statefile_crc);
1275 TwoPhaseFilePath(path, xid);
1277 fd = BasicOpenFile(path,
1278 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1282 (errcode_for_file_access(),
1283 errmsg("could not recreate two-phase state file \"%s\": %m",
1286 /* Write content and CRC */
1287 if (write(fd, content, len) != len)
1291 (errcode_for_file_access(),
1292 errmsg("could not write two-phase state file: %m")));
1294 if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1298 (errcode_for_file_access(),
1299 errmsg("could not write two-phase state file: %m")));
1303 * We must fsync the file because the end-of-replay checkpoint will not do
1304 * so, there being no GXACT in shared memory yet to tell it to.
1306 if (pg_fsync(fd) != 0)
1310 (errcode_for_file_access(),
1311 errmsg("could not fsync two-phase state file: %m")));
1316 (errcode_for_file_access(),
1317 errmsg("could not close two-phase state file: %m")));
1321 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1323 * We must fsync the state file of any GXACT that is valid and has a PREPARE
1324 * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
1325 * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1327 * This is deliberately run as late as possible in the checkpoint sequence,
1328 * because GXACTs ordinarily have short lifespans, and so it is quite
1329 * possible that GXACTs that were valid at checkpoint start will no longer
1330 * exist if we wait a little bit.
1332 * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1333 * each time. This is considered unusual enough that we don't bother to
1334 * expend any extra code to avoid the redundant fsyncs. (They should be
1335 * reasonably cheap anyway, since they won't cause I/O.)
1338 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1340 TransactionId *xids;
1342 char path[MAXPGPATH];
1346 * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1347 * it just long enough to make a list of the XIDs that require fsyncing,
1348 * and then do the I/O afterwards.
1350 * This approach creates a race condition: someone else could delete a
1351 * GXACT between the time we release TwoPhaseStateLock and the time we try
1352 * to open its state file. We handle this by special-casing ENOENT
1353 * failures: if we see that, we verify that the GXACT is no longer valid,
1354 * and if so ignore the failure.
1356 if (max_prepared_xacts <= 0)
1357 return; /* nothing to do */
1358 xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1361 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1363 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1365 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1368 XLByteLE(gxact->prepare_lsn, redo_horizon))
1369 xids[nxids++] = gxact->proc.xid;
1372 LWLockRelease(TwoPhaseStateLock);
1374 for (i = 0; i < nxids; i++)
1376 TransactionId xid = xids[i];
1379 TwoPhaseFilePath(path, xid);
1381 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1384 if (errno == ENOENT)
1386 /* OK if gxact is no longer valid */
1387 if (!TransactionIdIsPrepared(xid))
1389 /* Restore errno in case it was changed */
1393 (errcode_for_file_access(),
1394 errmsg("could not open two-phase state file \"%s\": %m",
1398 if (pg_fsync(fd) != 0)
1402 (errcode_for_file_access(),
1403 errmsg("could not fsync two-phase state file \"%s\": %m",
1409 (errcode_for_file_access(),
1410 errmsg("could not close two-phase state file \"%s\": %m",
1418 * PrescanPreparedTransactions
1420 * Scan the pg_twophase directory and determine the range of valid XIDs
1421 * present. This is run during database startup, after we have completed
1422 * reading WAL. ShmemVariableCache->nextXid has been set to one more than
1423 * the highest XID for which evidence exists in WAL.
1425 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1426 * are present, it suggests that the DBA has done a PITR recovery to an
1427 * earlier point in time without cleaning out pg_twophase. We dare not
1428 * try to recover such prepared xacts since they likely depend on database
1429 * state that doesn't exist now.
1431 * However, we will advance nextXid beyond any subxact XIDs belonging to
1432 * valid prepared xacts. We need to do this since subxact commit doesn't
1433 * write a WAL entry, and so there might be no evidence in WAL of those
1436 * Our other responsibility is to determine and return the oldest valid XID
1437 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1438 * This is needed to synchronize pg_subtrans startup properly.
1441 PrescanPreparedTransactions(void)
1443 TransactionId origNextXid = ShmemVariableCache->nextXid;
1444 TransactionId result = origNextXid;
1446 struct dirent *clde;
1448 cldir = AllocateDir(TWOPHASE_DIR);
1449 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1451 if (strlen(clde->d_name) == 8 &&
1452 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1456 TwoPhaseFileHeader *hdr;
1457 TransactionId *subxids;
1460 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1462 /* Reject XID if too new */
1463 if (TransactionIdFollowsOrEquals(xid, origNextXid))
1466 (errmsg("removing future two-phase state file \"%s\"",
1468 RemoveTwoPhaseFile(xid, true);
1473 * Note: we can't check if already processed because clog
1474 * subsystem isn't up yet.
1477 /* Read and validate file */
1478 buf = ReadTwoPhaseFile(xid);
1482 (errmsg("removing corrupt two-phase state file \"%s\"",
1484 RemoveTwoPhaseFile(xid, true);
1488 /* Deconstruct header */
1489 hdr = (TwoPhaseFileHeader *) buf;
1490 if (!TransactionIdEquals(hdr->xid, xid))
1493 (errmsg("removing corrupt two-phase state file \"%s\"",
1495 RemoveTwoPhaseFile(xid, true);
1501 * OK, we think this file is valid. Incorporate xid into the
1502 * running-minimum result.
1504 if (TransactionIdPrecedes(xid, result))
1508 * Examine subtransaction XIDs ... they should all follow main
1509 * XID, and they may force us to advance nextXid.
1511 subxids = (TransactionId *)
1512 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1513 for (i = 0; i < hdr->nsubxacts; i++)
1515 TransactionId subxid = subxids[i];
1517 Assert(TransactionIdFollows(subxid, xid));
1518 if (TransactionIdFollowsOrEquals(subxid,
1519 ShmemVariableCache->nextXid))
1521 ShmemVariableCache->nextXid = subxid;
1522 TransactionIdAdvance(ShmemVariableCache->nextXid);
1535 * RecoverPreparedTransactions
1537 * Scan the pg_twophase directory and reload shared-memory state for each
1538 * prepared transaction (reacquire locks, etc). This is run during database
1542 RecoverPreparedTransactions(void)
1544 char dir[MAXPGPATH];
1546 struct dirent *clde;
1548 snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1550 cldir = AllocateDir(dir);
1551 while ((clde = ReadDir(cldir, dir)) != NULL)
1553 if (strlen(clde->d_name) == 8 &&
1554 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1559 TwoPhaseFileHeader *hdr;
1560 TransactionId *subxids;
1561 GlobalTransaction gxact;
1564 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1566 /* Already processed? */
1567 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1570 (errmsg("removing stale two-phase state file \"%s\"",
1572 RemoveTwoPhaseFile(xid, true);
1576 /* Read and validate file */
1577 buf = ReadTwoPhaseFile(xid);
1581 (errmsg("removing corrupt two-phase state file \"%s\"",
1583 RemoveTwoPhaseFile(xid, true);
1588 (errmsg("recovering prepared transaction %u", xid)));
1590 /* Deconstruct header */
1591 hdr = (TwoPhaseFileHeader *) buf;
1592 Assert(TransactionIdEquals(hdr->xid, xid));
1593 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1594 subxids = (TransactionId *) bufptr;
1595 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1596 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1597 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1600 * Reconstruct subtrans state for the transaction --- needed
1601 * because pg_subtrans is not preserved over a restart. Note that
1602 * we are linking all the subtransactions directly to the
1603 * top-level XID; there may originally have been a more complex
1604 * hierarchy, but there's no need to restore that exactly.
1606 for (i = 0; i < hdr->nsubxacts; i++)
1607 SubTransSetParent(subxids[i], xid);
1610 * Recreate its GXACT and dummy PGPROC
1612 * Note: since we don't have the PREPARE record's WAL location at
1613 * hand, we leave prepare_lsn zeroes. This means the GXACT will
1614 * be fsync'd on every future checkpoint. We assume this
1615 * situation is infrequent enough that the performance cost is
1616 * negligible (especially since we know the state file has already
1619 gxact = MarkAsPreparing(xid, hdr->gid,
1621 hdr->owner, hdr->database);
1622 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1623 MarkAsPrepared(gxact);
1626 * Recover other state (notably locks) using resource managers
1628 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1637 * RecordTransactionCommitPrepared
1639 * This is basically the same as RecordTransactionCommit: in particular,
1640 * we must take the CheckpointStartLock to avoid a race condition.
1642 * We know the transaction made at least one XLOG entry (its PREPARE),
1643 * so it is never possible to optimize out the commit record.
1646 RecordTransactionCommitPrepared(TransactionId xid,
1648 TransactionId *children,
1652 XLogRecData rdata[3];
1654 xl_xact_commit_prepared xlrec;
1657 START_CRIT_SECTION();
1659 /* See notes in RecordTransactionCommit */
1660 LWLockAcquire(CheckpointStartLock, LW_SHARED);
1662 /* Emit the XLOG commit record */
1664 xlrec.crec.xtime = time(NULL);
1665 xlrec.crec.nrels = nrels;
1666 xlrec.crec.nsubxacts = nchildren;
1667 rdata[0].data = (char *) (&xlrec);
1668 rdata[0].len = MinSizeOfXactCommitPrepared;
1669 rdata[0].buffer = InvalidBuffer;
1670 /* dump rels to delete */
1673 rdata[0].next = &(rdata[1]);
1674 rdata[1].data = (char *) rels;
1675 rdata[1].len = nrels * sizeof(RelFileNode);
1676 rdata[1].buffer = InvalidBuffer;
1679 /* dump committed child Xids */
1682 rdata[lastrdata].next = &(rdata[2]);
1683 rdata[2].data = (char *) children;
1684 rdata[2].len = nchildren * sizeof(TransactionId);
1685 rdata[2].buffer = InvalidBuffer;
1688 rdata[lastrdata].next = NULL;
1690 recptr = XLogInsert(RM_XACT_ID,
1691 XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
1694 /* we don't currently try to sleep before flush here ... */
1696 /* Flush XLOG to disk */
1699 /* Mark the transaction committed in pg_clog */
1700 TransactionIdCommit(xid);
1701 /* to avoid race conditions, the parent must commit first */
1702 TransactionIdCommitTree(nchildren, children);
1704 /* Checkpoint is allowed again */
1705 LWLockRelease(CheckpointStartLock);
1711 * RecordTransactionAbortPrepared
1713 * This is basically the same as RecordTransactionAbort.
1715 * We know the transaction made at least one XLOG entry (its PREPARE),
1716 * so it is never possible to optimize out the abort record.
1719 RecordTransactionAbortPrepared(TransactionId xid,
1721 TransactionId *children,
1725 XLogRecData rdata[3];
1727 xl_xact_abort_prepared xlrec;
1731 * Catch the scenario where we aborted partway through
1732 * RecordTransactionCommitPrepared ...
1734 if (TransactionIdDidCommit(xid))
1735 elog(PANIC, "cannot abort transaction %u, it was already committed",
1738 START_CRIT_SECTION();
1740 /* Emit the XLOG abort record */
1742 xlrec.arec.xtime = time(NULL);
1743 xlrec.arec.nrels = nrels;
1744 xlrec.arec.nsubxacts = nchildren;
1745 rdata[0].data = (char *) (&xlrec);
1746 rdata[0].len = MinSizeOfXactAbortPrepared;
1747 rdata[0].buffer = InvalidBuffer;
1748 /* dump rels to delete */
1751 rdata[0].next = &(rdata[1]);
1752 rdata[1].data = (char *) rels;
1753 rdata[1].len = nrels * sizeof(RelFileNode);
1754 rdata[1].buffer = InvalidBuffer;
1757 /* dump committed child Xids */
1760 rdata[lastrdata].next = &(rdata[2]);
1761 rdata[2].data = (char *) children;
1762 rdata[2].len = nchildren * sizeof(TransactionId);
1763 rdata[2].buffer = InvalidBuffer;
1766 rdata[lastrdata].next = NULL;
1768 recptr = XLogInsert(RM_XACT_ID,
1769 XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
1772 /* Always flush, since we're about to remove the 2PC state file */
1776 * Mark the transaction aborted in clog. This is not absolutely necessary
1777 * but we may as well do it while we are here.
1779 TransactionIdAbort(xid);
1780 TransactionIdAbortTree(nchildren, children);