]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/twophase.c
Fix TransactionIdIsCurrentTransactionId() to use binary search instead of
[postgresql] / src / backend / access / transam / twophase.c
1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *              Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *              $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.40 2008/03/17 02:18:55 tgl Exp $
11  *
12  * NOTES
13  *              Each global transaction is associated with a global transaction
14  *              identifier (GID). The client assigns a GID to a postgres
15  *              transaction with the PREPARE TRANSACTION command.
16  *
17  *              We keep all active global transactions in a shared memory array.
18  *              When the PREPARE TRANSACTION command is issued, the GID is
19  *              reserved for the transaction in the array. This is done before
20  *              a WAL entry is made, because the reservation checks for duplicate
21  *              GIDs and aborts the transaction if there already is a global
22  *              transaction in prepared state with the same GID.
23  *
24  *              A global transaction (gxact) also has a dummy PGPROC that is entered
25  *              into the ProcArray array; this is what keeps the XID considered
26  *              running by TransactionIdIsInProgress.  It is also convenient as a
27  *              PGPROC to hook the gxact's locks to.
28  *
29  *              In order to survive crashes and shutdowns, all prepared
30  *              transactions must be stored in permanent storage. This includes
31  *              locking information, pending notifications etc. All that state
32  *              information is written to the per-transaction state file in
33  *              the pg_twophase directory.
34  *
35  *-------------------------------------------------------------------------
36  */
37 #include "postgres.h"
38
39 #include <fcntl.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <time.h>
43 #include <unistd.h>
44
45 #include "access/heapam.h"
46 #include "access/subtrans.h"
47 #include "access/transam.h"
48 #include "access/twophase.h"
49 #include "access/twophase_rmgr.h"
50 #include "access/xact.h"
51 #include "catalog/pg_type.h"
52 #include "funcapi.h"
53 #include "miscadmin.h"
54 #include "pgstat.h"
55 #include "storage/fd.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/builtins.h"
59
60
61 /*
62  * Directory where Two-phase commit files reside within PGDATA
63  */
64 #define TWOPHASE_DIR "pg_twophase"
65
66 /* GUC variable, can't be changed after startup */
67 int                     max_prepared_xacts = 5;
68
69 /*
70  * This struct describes one global transaction that is in prepared state
71  * or attempting to become prepared.
72  *
73  * The first component of the struct is a dummy PGPROC that is inserted
74  * into the global ProcArray so that the transaction appears to still be
75  * running and holding locks.  It must be first because we cast pointers
76  * to PGPROC and pointers to GlobalTransactionData back and forth.
77  *
78  * The lifecycle of a global transaction is:
79  *
80  * 1. After checking that the requested GID is not in use, set up an
81  * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
82  * with locking_xid = my own XID and valid = false.
83  *
84  * 2. After successfully completing prepare, set valid = true and enter the
85  * contained PGPROC into the global ProcArray.
86  *
87  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
88  * is valid and its locking_xid is no longer active, then store my current
89  * XID into locking_xid.  This prevents concurrent attempts to commit or
90  * rollback the same prepared xact.
91  *
92  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
93  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
94  * the freelist.
95  *
96  * Note that if the preparing transaction fails between steps 1 and 2, the
97  * entry will remain in prepXacts until recycled.  We can detect recyclable
98  * entries by checking for valid = false and locking_xid no longer active.
99  *
100  * typedef struct GlobalTransactionData *GlobalTransaction appears in
101  * twophase.h
102  */
103 #define GIDSIZE 200
104
105 typedef struct GlobalTransactionData
106 {
107         PGPROC          proc;                   /* dummy proc */
108         TimestampTz prepared_at;        /* time of preparation */
109         XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
110         Oid                     owner;                  /* ID of user that executed the xact */
111         TransactionId locking_xid;      /* top-level XID of backend working on xact */
112         bool            valid;                  /* TRUE if fully prepared */
113         char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
114 } GlobalTransactionData;
115
116 /*
117  * Two Phase Commit shared state.  Access to this struct is protected
118  * by TwoPhaseStateLock.
119  */
120 typedef struct TwoPhaseStateData
121 {
122         /* Head of linked list of free GlobalTransactionData structs */
123         SHMEM_OFFSET freeGXacts;
124
125         /* Number of valid prepXacts entries. */
126         int                     numPrepXacts;
127
128         /*
129          * There are max_prepared_xacts items in this array, but C wants a
130          * fixed-size array.
131          */
132         GlobalTransaction prepXacts[1];         /* VARIABLE LENGTH ARRAY */
133 } TwoPhaseStateData;                    /* VARIABLE LENGTH STRUCT */
134
135 static TwoPhaseStateData *TwoPhaseState;
136
137
138 static void RecordTransactionCommitPrepared(TransactionId xid,
139                                                                 int nchildren,
140                                                                 TransactionId *children,
141                                                                 int nrels,
142                                                                 RelFileNode *rels);
143 static void RecordTransactionAbortPrepared(TransactionId xid,
144                                                            int nchildren,
145                                                            TransactionId *children,
146                                                            int nrels,
147                                                            RelFileNode *rels);
148 static void ProcessRecords(char *bufptr, TransactionId xid,
149                            const TwoPhaseCallback callbacks[]);
150
151
152 /*
153  * Initialization of shared memory
154  */
155 Size
156 TwoPhaseShmemSize(void)
157 {
158         Size            size;
159
160         /* Need the fixed struct, the array of pointers, and the GTD structs */
161         size = offsetof(TwoPhaseStateData, prepXacts);
162         size = add_size(size, mul_size(max_prepared_xacts,
163                                                                    sizeof(GlobalTransaction)));
164         size = MAXALIGN(size);
165         size = add_size(size, mul_size(max_prepared_xacts,
166                                                                    sizeof(GlobalTransactionData)));
167
168         return size;
169 }
170
171 void
172 TwoPhaseShmemInit(void)
173 {
174         bool            found;
175
176         TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
177                                                                         TwoPhaseShmemSize(),
178                                                                         &found);
179         if (!IsUnderPostmaster)
180         {
181                 GlobalTransaction gxacts;
182                 int                     i;
183
184                 Assert(!found);
185                 TwoPhaseState->freeGXacts = INVALID_OFFSET;
186                 TwoPhaseState->numPrepXacts = 0;
187
188                 /*
189                  * Initialize the linked list of free GlobalTransactionData structs
190                  */
191                 gxacts = (GlobalTransaction)
192                         ((char *) TwoPhaseState +
193                          MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
194                                           sizeof(GlobalTransaction) * max_prepared_xacts));
195                 for (i = 0; i < max_prepared_xacts; i++)
196                 {
197                         gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
198                         TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
199                 }
200         }
201         else
202                 Assert(found);
203 }
204
205
206 /*
207  * MarkAsPreparing
208  *              Reserve the GID for the given transaction.
209  *
210  * Internally, this creates a gxact struct and puts it into the active array.
211  * NOTE: this is also used when reloading a gxact after a crash; so avoid
212  * assuming that we can use very much backend context.
213  */
214 GlobalTransaction
215 MarkAsPreparing(TransactionId xid, const char *gid,
216                                 TimestampTz prepared_at, Oid owner, Oid databaseid)
217 {
218         GlobalTransaction gxact;
219         int                     i;
220
221         if (strlen(gid) >= GIDSIZE)
222                 ereport(ERROR,
223                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224                                  errmsg("transaction identifier \"%s\" is too long",
225                                                 gid)));
226
227         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
228
229         /*
230          * First, find and recycle any gxacts that failed during prepare. We do
231          * this partly to ensure we don't mistakenly say their GIDs are still
232          * reserved, and partly so we don't fail on out-of-slots unnecessarily.
233          */
234         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
235         {
236                 gxact = TwoPhaseState->prepXacts[i];
237                 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
238                 {
239                         /* It's dead Jim ... remove from the active array */
240                         TwoPhaseState->numPrepXacts--;
241                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
242                         /* and put it back in the freelist */
243                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
244                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
245                         /* Back up index count too, so we don't miss scanning one */
246                         i--;
247                 }
248         }
249
250         /* Check for conflicting GID */
251         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
252         {
253                 gxact = TwoPhaseState->prepXacts[i];
254                 if (strcmp(gxact->gid, gid) == 0)
255                 {
256                         ereport(ERROR,
257                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
258                                          errmsg("transaction identifier \"%s\" is already in use",
259                                                         gid)));
260                 }
261         }
262
263         /* Get a free gxact from the freelist */
264         if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_OUT_OF_MEMORY),
267                                  errmsg("maximum number of prepared transactions reached"),
268                                  errhint("Increase max_prepared_transactions (currently %d).",
269                                                  max_prepared_xacts)));
270         gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
271         TwoPhaseState->freeGXacts = gxact->proc.links.next;
272
273         /* Initialize it */
274         MemSet(&gxact->proc, 0, sizeof(PGPROC));
275         SHMQueueElemInit(&(gxact->proc.links));
276         gxact->proc.waitStatus = STATUS_OK;
277         /* We set up the gxact's VXID as InvalidBackendId/XID */
278         gxact->proc.lxid = (LocalTransactionId) xid;
279         gxact->proc.xid = xid;
280         gxact->proc.xmin = InvalidTransactionId;
281         gxact->proc.pid = 0;
282         gxact->proc.backendId = InvalidBackendId;
283         gxact->proc.databaseId = databaseid;
284         gxact->proc.roleId = owner;
285         gxact->proc.inCommit = false;
286         gxact->proc.vacuumFlags = 0;
287         gxact->proc.lwWaiting = false;
288         gxact->proc.lwExclusive = false;
289         gxact->proc.lwWaitLink = NULL;
290         gxact->proc.waitLock = NULL;
291         gxact->proc.waitProcLock = NULL;
292         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
293                 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
294         /* subxid data must be filled later by GXactLoadSubxactData */
295         gxact->proc.subxids.overflowed = false;
296         gxact->proc.subxids.nxids = 0;
297
298         gxact->prepared_at = prepared_at;
299         /* initialize LSN to 0 (start of WAL) */
300         gxact->prepare_lsn.xlogid = 0;
301         gxact->prepare_lsn.xrecoff = 0;
302         gxact->owner = owner;
303         gxact->locking_xid = xid;
304         gxact->valid = false;
305         strcpy(gxact->gid, gid);
306
307         /* And insert it into the active array */
308         Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
309         TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
310
311         LWLockRelease(TwoPhaseStateLock);
312
313         return gxact;
314 }
315
316 /*
317  * GXactLoadSubxactData
318  *
319  * If the transaction being persisted had any subtransactions, this must
320  * be called before MarkAsPrepared() to load information into the dummy
321  * PGPROC.
322  */
323 static void
324 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
325                                          TransactionId *children)
326 {
327         /* We need no extra lock since the GXACT isn't valid yet */
328         if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
329         {
330                 gxact->proc.subxids.overflowed = true;
331                 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
332         }
333         if (nsubxacts > 0)
334         {
335                 memcpy(gxact->proc.subxids.xids, children,
336                            nsubxacts * sizeof(TransactionId));
337                 gxact->proc.subxids.nxids = nsubxacts;
338         }
339 }
340
341 /*
342  * MarkAsPrepared
343  *              Mark the GXACT as fully valid, and enter it into the global ProcArray.
344  */
345 static void
346 MarkAsPrepared(GlobalTransaction gxact)
347 {
348         /* Lock here may be overkill, but I'm not convinced of that ... */
349         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
350         Assert(!gxact->valid);
351         gxact->valid = true;
352         LWLockRelease(TwoPhaseStateLock);
353
354         /*
355          * Put it into the global ProcArray so TransactionIdIsInProgress considers
356          * the XID as still running.
357          */
358         ProcArrayAdd(&gxact->proc);
359 }
360
361 /*
362  * LockGXact
363  *              Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
364  */
365 static GlobalTransaction
366 LockGXact(const char *gid, Oid user)
367 {
368         int                     i;
369
370         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
371
372         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
373         {
374                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
375
376                 /* Ignore not-yet-valid GIDs */
377                 if (!gxact->valid)
378                         continue;
379                 if (strcmp(gxact->gid, gid) != 0)
380                         continue;
381
382                 /* Found it, but has someone else got it locked? */
383                 if (TransactionIdIsValid(gxact->locking_xid))
384                 {
385                         if (TransactionIdIsActive(gxact->locking_xid))
386                                 ereport(ERROR,
387                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388                                 errmsg("prepared transaction with identifier \"%s\" is busy",
389                                            gid)));
390                         gxact->locking_xid = InvalidTransactionId;
391                 }
392
393                 if (user != gxact->owner && !superuser_arg(user))
394                         ereport(ERROR,
395                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
396                                   errmsg("permission denied to finish prepared transaction"),
397                                          errhint("Must be superuser or the user that prepared the transaction.")));
398
399                 /*
400                  * Note: it probably would be possible to allow committing from
401                  * another database; but at the moment NOTIFY is known not to work and
402                  * there may be some other issues as well.      Hence disallow until
403                  * someone gets motivated to make it work.
404                  */
405                 if (MyDatabaseId != gxact->proc.databaseId)
406                         ereport(ERROR,
407                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408                                   errmsg("prepared transaction belongs to another database"),
409                                          errhint("Connect to the database where the transaction was prepared to finish it.")));
410
411                 /* OK for me to lock it */
412                 gxact->locking_xid = GetTopTransactionId();
413
414                 LWLockRelease(TwoPhaseStateLock);
415
416                 return gxact;
417         }
418
419         LWLockRelease(TwoPhaseStateLock);
420
421         ereport(ERROR,
422                         (errcode(ERRCODE_UNDEFINED_OBJECT),
423                  errmsg("prepared transaction with identifier \"%s\" does not exist",
424                                 gid)));
425
426         /* NOTREACHED */
427         return NULL;
428 }
429
430 /*
431  * RemoveGXact
432  *              Remove the prepared transaction from the shared memory array.
433  *
434  * NB: caller should have already removed it from ProcArray
435  */
436 static void
437 RemoveGXact(GlobalTransaction gxact)
438 {
439         int                     i;
440
441         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
442
443         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
444         {
445                 if (gxact == TwoPhaseState->prepXacts[i])
446                 {
447                         /* remove from the active array */
448                         TwoPhaseState->numPrepXacts--;
449                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
450
451                         /* and put it back in the freelist */
452                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
453                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
454
455                         LWLockRelease(TwoPhaseStateLock);
456
457                         return;
458                 }
459         }
460
461         LWLockRelease(TwoPhaseStateLock);
462
463         elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
464 }
465
466 /*
467  * TransactionIdIsPrepared
468  *              True iff transaction associated with the identifier is prepared
469  *              for two-phase commit
470  *
471  * Note: only gxacts marked "valid" are considered; but notice we do not
472  * check the locking status.
473  *
474  * This is not currently exported, because it is only needed internally.
475  */
476 static bool
477 TransactionIdIsPrepared(TransactionId xid)
478 {
479         bool            result = false;
480         int                     i;
481
482         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
483
484         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
485         {
486                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
487
488                 if (gxact->valid && gxact->proc.xid == xid)
489                 {
490                         result = true;
491                         break;
492                 }
493         }
494
495         LWLockRelease(TwoPhaseStateLock);
496
497         return result;
498 }
499
500 /*
501  * Returns an array of all prepared transactions for the user-level
502  * function pg_prepared_xact.
503  *
504  * The returned array and all its elements are copies of internal data
505  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
506  *
507  * WARNING -- we return even those transactions that are not fully prepared
508  * yet.  The caller should filter them out if he doesn't want them.
509  *
510  * The returned array is palloc'd.
511  */
512 static int
513 GetPreparedTransactionList(GlobalTransaction *gxacts)
514 {
515         GlobalTransaction array;
516         int                     num;
517         int                     i;
518
519         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
520
521         if (TwoPhaseState->numPrepXacts == 0)
522         {
523                 LWLockRelease(TwoPhaseStateLock);
524
525                 *gxacts = NULL;
526                 return 0;
527         }
528
529         num = TwoPhaseState->numPrepXacts;
530         array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
531         *gxacts = array;
532         for (i = 0; i < num; i++)
533                 memcpy(array + i, TwoPhaseState->prepXacts[i],
534                            sizeof(GlobalTransactionData));
535
536         LWLockRelease(TwoPhaseStateLock);
537
538         return num;
539 }
540
541
542 /* Working status for pg_prepared_xact */
543 typedef struct
544 {
545         GlobalTransaction array;
546         int                     ngxacts;
547         int                     currIdx;
548 } Working_State;
549
550 /*
551  * pg_prepared_xact
552  *              Produce a view with one row per prepared transaction.
553  *
554  * This function is here so we don't have to export the
555  * GlobalTransactionData struct definition.
556  */
557 Datum
558 pg_prepared_xact(PG_FUNCTION_ARGS)
559 {
560         FuncCallContext *funcctx;
561         Working_State *status;
562
563         if (SRF_IS_FIRSTCALL())
564         {
565                 TupleDesc       tupdesc;
566                 MemoryContext oldcontext;
567
568                 /* create a function context for cross-call persistence */
569                 funcctx = SRF_FIRSTCALL_INIT();
570
571                 /*
572                  * Switch to memory context appropriate for multiple function calls
573                  */
574                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
575
576                 /* build tupdesc for result tuples */
577                 /* this had better match pg_prepared_xacts view in system_views.sql */
578                 tupdesc = CreateTemplateTupleDesc(5, false);
579                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
580                                                    XIDOID, -1, 0);
581                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
582                                                    TEXTOID, -1, 0);
583                 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
584                                                    TIMESTAMPTZOID, -1, 0);
585                 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
586                                                    OIDOID, -1, 0);
587                 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
588                                                    OIDOID, -1, 0);
589
590                 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
591
592                 /*
593                  * Collect all the 2PC status information that we will format and send
594                  * out as a result set.
595                  */
596                 status = (Working_State *) palloc(sizeof(Working_State));
597                 funcctx->user_fctx = (void *) status;
598
599                 status->ngxacts = GetPreparedTransactionList(&status->array);
600                 status->currIdx = 0;
601
602                 MemoryContextSwitchTo(oldcontext);
603         }
604
605         funcctx = SRF_PERCALL_SETUP();
606         status = (Working_State *) funcctx->user_fctx;
607
608         while (status->array != NULL && status->currIdx < status->ngxacts)
609         {
610                 GlobalTransaction gxact = &status->array[status->currIdx++];
611                 Datum           values[5];
612                 bool            nulls[5];
613                 HeapTuple       tuple;
614                 Datum           result;
615
616                 if (!gxact->valid)
617                         continue;
618
619                 /*
620                  * Form tuple with appropriate data.
621                  */
622                 MemSet(values, 0, sizeof(values));
623                 MemSet(nulls, 0, sizeof(nulls));
624
625                 values[0] = TransactionIdGetDatum(gxact->proc.xid);
626                 values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
627                 values[2] = TimestampTzGetDatum(gxact->prepared_at);
628                 values[3] = ObjectIdGetDatum(gxact->owner);
629                 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
630
631                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
632                 result = HeapTupleGetDatum(tuple);
633                 SRF_RETURN_NEXT(funcctx, result);
634         }
635
636         SRF_RETURN_DONE(funcctx);
637 }
638
639 /*
640  * TwoPhaseGetDummyProc
641  *              Get the PGPROC that represents a prepared transaction specified by XID
642  */
643 PGPROC *
644 TwoPhaseGetDummyProc(TransactionId xid)
645 {
646         PGPROC     *result = NULL;
647         int                     i;
648
649         static TransactionId cached_xid = InvalidTransactionId;
650         static PGPROC *cached_proc = NULL;
651
652         /*
653          * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
654          * repeatedly for the same XID.  We can save work with a simple cache.
655          */
656         if (xid == cached_xid)
657                 return cached_proc;
658
659         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
660
661         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
662         {
663                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
664
665                 if (gxact->proc.xid == xid)
666                 {
667                         result = &gxact->proc;
668                         break;
669                 }
670         }
671
672         LWLockRelease(TwoPhaseStateLock);
673
674         if (result == NULL)                     /* should not happen */
675                 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
676
677         cached_xid = xid;
678         cached_proc = result;
679
680         return result;
681 }
682
683 /************************************************************************/
684 /* State file support                                                                                                   */
685 /************************************************************************/
686
687 #define TwoPhaseFilePath(path, xid) \
688         snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
689
690 /*
691  * 2PC state file format:
692  *
693  *      1. TwoPhaseFileHeader
694  *      2. TransactionId[] (subtransactions)
695  *      3. RelFileNode[] (files to be deleted at commit)
696  *      4. RelFileNode[] (files to be deleted at abort)
697  *      5. TwoPhaseRecordOnDisk
698  *      6. ...
699  *      7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
700  *      8. CRC32
701  *
702  * Each segment except the final CRC32 is MAXALIGN'd.
703  */
704
705 /*
706  * Header for a 2PC state file
707  */
708 #define TWOPHASE_MAGIC  0x57F94531              /* format identifier */
709
710 typedef struct TwoPhaseFileHeader
711 {
712         uint32          magic;                  /* format identifier */
713         uint32          total_len;              /* actual file length */
714         TransactionId xid;                      /* original transaction XID */
715         Oid                     database;               /* OID of database it was in */
716         TimestampTz prepared_at;        /* time of preparation */
717         Oid                     owner;                  /* user running the transaction */
718         int32           nsubxacts;              /* number of following subxact XIDs */
719         int32           ncommitrels;    /* number of delete-on-commit rels */
720         int32           nabortrels;             /* number of delete-on-abort rels */
721         char            gid[GIDSIZE];   /* GID for transaction */
722 } TwoPhaseFileHeader;
723
724 /*
725  * Header for each record in a state file
726  *
727  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
728  * The rmgr data will be stored starting on a MAXALIGN boundary.
729  */
730 typedef struct TwoPhaseRecordOnDisk
731 {
732         uint32          len;                    /* length of rmgr data */
733         TwoPhaseRmgrId rmid;            /* resource manager for this record */
734         uint16          info;                   /* flag bits for use by rmgr */
735 } TwoPhaseRecordOnDisk;
736
737 /*
738  * During prepare, the state file is assembled in memory before writing it
739  * to WAL and the actual state file.  We use a chain of XLogRecData blocks
740  * so that we will be able to pass the state file contents directly to
741  * XLogInsert.
742  */
743 static struct xllist
744 {
745         XLogRecData *head;                      /* first data block in the chain */
746         XLogRecData *tail;                      /* last block in chain */
747         uint32          bytes_free;             /* free bytes left in tail block */
748         uint32          total_len;              /* total data bytes in chain */
749 }       records;
750
751
752 /*
753  * Append a block of data to records data structure.
754  *
755  * NB: each block is padded to a MAXALIGN multiple.  This must be
756  * accounted for when the file is later read!
757  *
758  * The data is copied, so the caller is free to modify it afterwards.
759  */
760 static void
761 save_state_data(const void *data, uint32 len)
762 {
763         uint32          padlen = MAXALIGN(len);
764
765         if (padlen > records.bytes_free)
766         {
767                 records.tail->next = palloc0(sizeof(XLogRecData));
768                 records.tail = records.tail->next;
769                 records.tail->buffer = InvalidBuffer;
770                 records.tail->len = 0;
771                 records.tail->next = NULL;
772
773                 records.bytes_free = Max(padlen, 512);
774                 records.tail->data = palloc(records.bytes_free);
775         }
776
777         memcpy(((char *) records.tail->data) + records.tail->len, data, len);
778         records.tail->len += padlen;
779         records.bytes_free -= padlen;
780         records.total_len += padlen;
781 }
782
783 /*
784  * Start preparing a state file.
785  *
786  * Initializes data structure and inserts the 2PC file header record.
787  */
788 void
789 StartPrepare(GlobalTransaction gxact)
790 {
791         TransactionId xid = gxact->proc.xid;
792         TwoPhaseFileHeader hdr;
793         TransactionId *children;
794         RelFileNode *commitrels;
795         RelFileNode *abortrels;
796
797         /* Initialize linked list */
798         records.head = palloc0(sizeof(XLogRecData));
799         records.head->buffer = InvalidBuffer;
800         records.head->len = 0;
801         records.head->next = NULL;
802
803         records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
804         records.head->data = palloc(records.bytes_free);
805
806         records.tail = records.head;
807
808         records.total_len = 0;
809
810         /* Create header */
811         hdr.magic = TWOPHASE_MAGIC;
812         hdr.total_len = 0;                      /* EndPrepare will fill this in */
813         hdr.xid = xid;
814         hdr.database = gxact->proc.databaseId;
815         hdr.prepared_at = gxact->prepared_at;
816         hdr.owner = gxact->owner;
817         hdr.nsubxacts = xactGetCommittedChildren(&children);
818         hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
819         hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
820         StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
821
822         save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
823
824         /* Add the additional info about subxacts and deletable files */
825         if (hdr.nsubxacts > 0)
826         {
827                 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
828                 /* While we have the child-xact data, stuff it in the gxact too */
829                 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
830         }
831         if (hdr.ncommitrels > 0)
832         {
833                 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
834                 pfree(commitrels);
835         }
836         if (hdr.nabortrels > 0)
837         {
838                 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
839                 pfree(abortrels);
840         }
841 }
842
843 /*
844  * Finish preparing state file.
845  *
846  * Calculates CRC and writes state file to WAL and in pg_twophase directory.
847  */
848 void
849 EndPrepare(GlobalTransaction gxact)
850 {
851         TransactionId xid = gxact->proc.xid;
852         TwoPhaseFileHeader *hdr;
853         char            path[MAXPGPATH];
854         XLogRecData *record;
855         pg_crc32        statefile_crc;
856         pg_crc32        bogus_crc;
857         int                     fd;
858
859         /* Add the end sentinel to the list of 2PC records */
860         RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
861                                                    NULL, 0);
862
863         /* Go back and fill in total_len in the file header record */
864         hdr = (TwoPhaseFileHeader *) records.head->data;
865         Assert(hdr->magic == TWOPHASE_MAGIC);
866         hdr->total_len = records.total_len + sizeof(pg_crc32);
867
868         /*
869          * Create the 2PC state file.
870          *
871          * Note: because we use BasicOpenFile(), we are responsible for ensuring
872          * the FD gets closed in any error exit path.  Once we get into the
873          * critical section, though, it doesn't matter since any failure causes
874          * PANIC anyway.
875          */
876         TwoPhaseFilePath(path, xid);
877
878         fd = BasicOpenFile(path,
879                                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
880                                            S_IRUSR | S_IWUSR);
881         if (fd < 0)
882                 ereport(ERROR,
883                                 (errcode_for_file_access(),
884                                  errmsg("could not create two-phase state file \"%s\": %m",
885                                                 path)));
886
887         /* Write data to file, and calculate CRC as we pass over it */
888         INIT_CRC32(statefile_crc);
889
890         for (record = records.head; record != NULL; record = record->next)
891         {
892                 COMP_CRC32(statefile_crc, record->data, record->len);
893                 if ((write(fd, record->data, record->len)) != record->len)
894                 {
895                         close(fd);
896                         ereport(ERROR,
897                                         (errcode_for_file_access(),
898                                          errmsg("could not write two-phase state file: %m")));
899                 }
900         }
901
902         FIN_CRC32(statefile_crc);
903
904         /*
905          * Write a deliberately bogus CRC to the state file; this is just paranoia
906          * to catch the case where four more bytes will run us out of disk space.
907          */
908         bogus_crc = ~statefile_crc;
909
910         if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
911         {
912                 close(fd);
913                 ereport(ERROR,
914                                 (errcode_for_file_access(),
915                                  errmsg("could not write two-phase state file: %m")));
916         }
917
918         /* Back up to prepare for rewriting the CRC */
919         if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
920         {
921                 close(fd);
922                 ereport(ERROR,
923                                 (errcode_for_file_access(),
924                                  errmsg("could not seek in two-phase state file: %m")));
925         }
926
927         /*
928          * The state file isn't valid yet, because we haven't written the correct
929          * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
930          *
931          * Between the time we have written the WAL entry and the time we write
932          * out the correct state file CRC, we have an inconsistency: the xact is
933          * prepared according to WAL but not according to our on-disk state. We
934          * use a critical section to force a PANIC if we are unable to complete
935          * the write --- then, WAL replay should repair the inconsistency.      The
936          * odds of a PANIC actually occurring should be very tiny given that we
937          * were able to write the bogus CRC above.
938          *
939          * We have to set inCommit here, too; otherwise a checkpoint starting
940          * immediately after the WAL record is inserted could complete without
941          * fsync'ing our state file.  (This is essentially the same kind of race
942          * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
943          * uses inCommit for; see notes there.)
944          *
945          * We save the PREPARE record's location in the gxact for later use by
946          * CheckPointTwoPhase.
947          */
948         START_CRIT_SECTION();
949
950         MyProc->inCommit = true;
951
952         gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
953                                                                         records.head);
954         XLogFlush(gxact->prepare_lsn);
955
956         /* If we crash now, we have prepared: WAL replay will fix things */
957
958         /* write correct CRC and close file */
959         if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
960         {
961                 close(fd);
962                 ereport(ERROR,
963                                 (errcode_for_file_access(),
964                                  errmsg("could not write two-phase state file: %m")));
965         }
966
967         if (close(fd) != 0)
968                 ereport(ERROR,
969                                 (errcode_for_file_access(),
970                                  errmsg("could not close two-phase state file: %m")));
971
972         /*
973          * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
974          * as not running our XID (which it will do immediately after this
975          * function returns), others can commit/rollback the xact.
976          *
977          * NB: a side effect of this is to make a dummy ProcArray entry for the
978          * prepared XID.  This must happen before we clear the XID from MyProc,
979          * else there is a window where the XID is not running according to
980          * TransactionIdIsInProgress, and onlookers would be entitled to assume
981          * the xact crashed.  Instead we have a window where the same XID appears
982          * twice in ProcArray, which is OK.
983          */
984         MarkAsPrepared(gxact);
985
986         /*
987          * Now we can mark ourselves as out of the commit critical section: a
988          * checkpoint starting after this will certainly see the gxact as a
989          * candidate for fsyncing.
990          */
991         MyProc->inCommit = false;
992
993         END_CRIT_SECTION();
994
995         records.tail = records.head = NULL;
996 }
997
998 /*
999  * Register a 2PC record to be written to state file.
1000  */
1001 void
1002 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1003                                            const void *data, uint32 len)
1004 {
1005         TwoPhaseRecordOnDisk record;
1006
1007         record.rmid = rmid;
1008         record.info = info;
1009         record.len = len;
1010         save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1011         if (len > 0)
1012                 save_state_data(data, len);
1013 }
1014
1015
1016 /*
1017  * Read and validate the state file for xid.
1018  *
1019  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1020  * contents of the file.  Otherwise return NULL.
1021  */
1022 static char *
1023 ReadTwoPhaseFile(TransactionId xid)
1024 {
1025         char            path[MAXPGPATH];
1026         char       *buf;
1027         TwoPhaseFileHeader *hdr;
1028         int                     fd;
1029         struct stat stat;
1030         uint32          crc_offset;
1031         pg_crc32        calc_crc,
1032                                 file_crc;
1033
1034         TwoPhaseFilePath(path, xid);
1035
1036         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1037         if (fd < 0)
1038         {
1039                 ereport(WARNING,
1040                                 (errcode_for_file_access(),
1041                                  errmsg("could not open two-phase state file \"%s\": %m",
1042                                                 path)));
1043                 return NULL;
1044         }
1045
1046         /*
1047          * Check file length.  We can determine a lower bound pretty easily. We
1048          * set an upper bound mainly to avoid palloc() failure on a corrupt file.
1049          */
1050         if (fstat(fd, &stat))
1051         {
1052                 close(fd);
1053                 ereport(WARNING,
1054                                 (errcode_for_file_access(),
1055                                  errmsg("could not stat two-phase state file \"%s\": %m",
1056                                                 path)));
1057                 return NULL;
1058         }
1059
1060         if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1061                                                 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1062                                                 sizeof(pg_crc32)) ||
1063                 stat.st_size > 10000000)
1064         {
1065                 close(fd);
1066                 return NULL;
1067         }
1068
1069         crc_offset = stat.st_size - sizeof(pg_crc32);
1070         if (crc_offset != MAXALIGN(crc_offset))
1071         {
1072                 close(fd);
1073                 return NULL;
1074         }
1075
1076         /*
1077          * OK, slurp in the file.
1078          */
1079         buf = (char *) palloc(stat.st_size);
1080
1081         if (read(fd, buf, stat.st_size) != stat.st_size)
1082         {
1083                 close(fd);
1084                 ereport(WARNING,
1085                                 (errcode_for_file_access(),
1086                                  errmsg("could not read two-phase state file \"%s\": %m",
1087                                                 path)));
1088                 pfree(buf);
1089                 return NULL;
1090         }
1091
1092         close(fd);
1093
1094         hdr = (TwoPhaseFileHeader *) buf;
1095         if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1096         {
1097                 pfree(buf);
1098                 return NULL;
1099         }
1100
1101         INIT_CRC32(calc_crc);
1102         COMP_CRC32(calc_crc, buf, crc_offset);
1103         FIN_CRC32(calc_crc);
1104
1105         file_crc = *((pg_crc32 *) (buf + crc_offset));
1106
1107         if (!EQ_CRC32(calc_crc, file_crc))
1108         {
1109                 pfree(buf);
1110                 return NULL;
1111         }
1112
1113         return buf;
1114 }
1115
1116
1117 /*
1118  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1119  */
1120 void
1121 FinishPreparedTransaction(const char *gid, bool isCommit)
1122 {
1123         GlobalTransaction gxact;
1124         TransactionId xid;
1125         char       *buf;
1126         char       *bufptr;
1127         TwoPhaseFileHeader *hdr;
1128         TransactionId latestXid;
1129         TransactionId *children;
1130         RelFileNode *commitrels;
1131         RelFileNode *abortrels;
1132         int                     i;
1133
1134         /*
1135          * Validate the GID, and lock the GXACT to ensure that two backends do not
1136          * try to commit the same GID at once.
1137          */
1138         gxact = LockGXact(gid, GetUserId());
1139         xid = gxact->proc.xid;
1140
1141         /*
1142          * Read and validate the state file
1143          */
1144         buf = ReadTwoPhaseFile(xid);
1145         if (buf == NULL)
1146                 ereport(ERROR,
1147                                 (errcode(ERRCODE_DATA_CORRUPTED),
1148                                  errmsg("two-phase state file for transaction %u is corrupt",
1149                                                 xid)));
1150
1151         /*
1152          * Disassemble the header area
1153          */
1154         hdr = (TwoPhaseFileHeader *) buf;
1155         Assert(TransactionIdEquals(hdr->xid, xid));
1156         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1157         children = (TransactionId *) bufptr;
1158         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1159         commitrels = (RelFileNode *) bufptr;
1160         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1161         abortrels = (RelFileNode *) bufptr;
1162         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1163
1164         /* compute latestXid among all children */
1165         latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1166
1167         /*
1168          * The order of operations here is critical: make the XLOG entry for
1169          * commit or abort, then mark the transaction committed or aborted in
1170          * pg_clog, then remove its PGPROC from the global ProcArray (which means
1171          * TransactionIdIsInProgress will stop saying the prepared xact is in
1172          * progress), then run the post-commit or post-abort callbacks. The
1173          * callbacks will release the locks the transaction held.
1174          */
1175         if (isCommit)
1176                 RecordTransactionCommitPrepared(xid,
1177                                                                                 hdr->nsubxacts, children,
1178                                                                                 hdr->ncommitrels, commitrels);
1179         else
1180                 RecordTransactionAbortPrepared(xid,
1181                                                                            hdr->nsubxacts, children,
1182                                                                            hdr->nabortrels, abortrels);
1183
1184         ProcArrayRemove(&gxact->proc, latestXid);
1185
1186         /*
1187          * In case we fail while running the callbacks, mark the gxact invalid so
1188          * no one else will try to commit/rollback, and so it can be recycled
1189          * properly later.      It is still locked by our XID so it won't go away yet.
1190          *
1191          * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1192          */
1193         gxact->valid = false;
1194
1195         /*
1196          * We have to remove any files that were supposed to be dropped. For
1197          * consistency with the regular xact.c code paths, must do this before
1198          * releasing locks, so do it before running the callbacks.
1199          *
1200          * NB: this code knows that we couldn't be dropping any temp rels ...
1201          */
1202         if (isCommit)
1203         {
1204                 for (i = 0; i < hdr->ncommitrels; i++)
1205                         smgrdounlink(smgropen(commitrels[i]), false, false);
1206         }
1207         else
1208         {
1209                 for (i = 0; i < hdr->nabortrels; i++)
1210                         smgrdounlink(smgropen(abortrels[i]), false, false);
1211         }
1212
1213         /* And now do the callbacks */
1214         if (isCommit)
1215                 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1216         else
1217                 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1218
1219         /* Count the prepared xact as committed or aborted */
1220         AtEOXact_PgStat(isCommit);
1221
1222         /*
1223          * And now we can clean up our mess.
1224          */
1225         RemoveTwoPhaseFile(xid, true);
1226
1227         RemoveGXact(gxact);
1228
1229         pfree(buf);
1230 }
1231
1232 /*
1233  * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1234  * and call the indicated callbacks for each 2PC record.
1235  */
1236 static void
1237 ProcessRecords(char *bufptr, TransactionId xid,
1238                            const TwoPhaseCallback callbacks[])
1239 {
1240         for (;;)
1241         {
1242                 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1243
1244                 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1245                 if (record->rmid == TWOPHASE_RM_END_ID)
1246                         break;
1247
1248                 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1249
1250                 if (callbacks[record->rmid] != NULL)
1251                         callbacks[record->rmid] (xid, record->info,
1252                                                                          (void *) bufptr, record->len);
1253
1254                 bufptr += MAXALIGN(record->len);
1255         }
1256 }
1257
1258 /*
1259  * Remove the 2PC file for the specified XID.
1260  *
1261  * If giveWarning is false, do not complain about file-not-present;
1262  * this is an expected case during WAL replay.
1263  */
1264 void
1265 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1266 {
1267         char            path[MAXPGPATH];
1268
1269         TwoPhaseFilePath(path, xid);
1270         if (unlink(path))
1271                 if (errno != ENOENT || giveWarning)
1272                         ereport(WARNING,
1273                                         (errcode_for_file_access(),
1274                                    errmsg("could not remove two-phase state file \"%s\": %m",
1275                                                   path)));
1276 }
1277
1278 /*
1279  * Recreates a state file. This is used in WAL replay.
1280  *
1281  * Note: content and len don't include CRC.
1282  */
1283 void
1284 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1285 {
1286         char            path[MAXPGPATH];
1287         pg_crc32        statefile_crc;
1288         int                     fd;
1289
1290         /* Recompute CRC */
1291         INIT_CRC32(statefile_crc);
1292         COMP_CRC32(statefile_crc, content, len);
1293         FIN_CRC32(statefile_crc);
1294
1295         TwoPhaseFilePath(path, xid);
1296
1297         fd = BasicOpenFile(path,
1298                                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1299                                            S_IRUSR | S_IWUSR);
1300         if (fd < 0)
1301                 ereport(ERROR,
1302                                 (errcode_for_file_access(),
1303                                  errmsg("could not recreate two-phase state file \"%s\": %m",
1304                                                 path)));
1305
1306         /* Write content and CRC */
1307         if (write(fd, content, len) != len)
1308         {
1309                 close(fd);
1310                 ereport(ERROR,
1311                                 (errcode_for_file_access(),
1312                                  errmsg("could not write two-phase state file: %m")));
1313         }
1314         if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1315         {
1316                 close(fd);
1317                 ereport(ERROR,
1318                                 (errcode_for_file_access(),
1319                                  errmsg("could not write two-phase state file: %m")));
1320         }
1321
1322         /*
1323          * We must fsync the file because the end-of-replay checkpoint will not do
1324          * so, there being no GXACT in shared memory yet to tell it to.
1325          */
1326         if (pg_fsync(fd) != 0)
1327         {
1328                 close(fd);
1329                 ereport(ERROR,
1330                                 (errcode_for_file_access(),
1331                                  errmsg("could not fsync two-phase state file: %m")));
1332         }
1333
1334         if (close(fd) != 0)
1335                 ereport(ERROR,
1336                                 (errcode_for_file_access(),
1337                                  errmsg("could not close two-phase state file: %m")));
1338 }
1339
1340 /*
1341  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1342  *
1343  * We must fsync the state file of any GXACT that is valid and has a PREPARE
1344  * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
1345  * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1346  *
1347  * This is deliberately run as late as possible in the checkpoint sequence,
1348  * because GXACTs ordinarily have short lifespans, and so it is quite
1349  * possible that GXACTs that were valid at checkpoint start will no longer
1350  * exist if we wait a little bit.
1351  *
1352  * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1353  * each time.  This is considered unusual enough that we don't bother to
1354  * expend any extra code to avoid the redundant fsyncs.  (They should be
1355  * reasonably cheap anyway, since they won't cause I/O.)
1356  */
1357 void
1358 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1359 {
1360         TransactionId *xids;
1361         int                     nxids;
1362         char            path[MAXPGPATH];
1363         int                     i;
1364
1365         /*
1366          * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1367          * it just long enough to make a list of the XIDs that require fsyncing,
1368          * and then do the I/O afterwards.
1369          *
1370          * This approach creates a race condition: someone else could delete a
1371          * GXACT between the time we release TwoPhaseStateLock and the time we try
1372          * to open its state file.      We handle this by special-casing ENOENT
1373          * failures: if we see that, we verify that the GXACT is no longer valid,
1374          * and if so ignore the failure.
1375          */
1376         if (max_prepared_xacts <= 0)
1377                 return;                                 /* nothing to do */
1378         xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1379         nxids = 0;
1380
1381         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1382
1383         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1384         {
1385                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1386
1387                 if (gxact->valid &&
1388                         XLByteLE(gxact->prepare_lsn, redo_horizon))
1389                         xids[nxids++] = gxact->proc.xid;
1390         }
1391
1392         LWLockRelease(TwoPhaseStateLock);
1393
1394         for (i = 0; i < nxids; i++)
1395         {
1396                 TransactionId xid = xids[i];
1397                 int                     fd;
1398
1399                 TwoPhaseFilePath(path, xid);
1400
1401                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1402                 if (fd < 0)
1403                 {
1404                         if (errno == ENOENT)
1405                         {
1406                                 /* OK if gxact is no longer valid */
1407                                 if (!TransactionIdIsPrepared(xid))
1408                                         continue;
1409                                 /* Restore errno in case it was changed */
1410                                 errno = ENOENT;
1411                         }
1412                         ereport(ERROR,
1413                                         (errcode_for_file_access(),
1414                                          errmsg("could not open two-phase state file \"%s\": %m",
1415                                                         path)));
1416                 }
1417
1418                 if (pg_fsync(fd) != 0)
1419                 {
1420                         close(fd);
1421                         ereport(ERROR,
1422                                         (errcode_for_file_access(),
1423                                          errmsg("could not fsync two-phase state file \"%s\": %m",
1424                                                         path)));
1425                 }
1426
1427                 if (close(fd) != 0)
1428                         ereport(ERROR,
1429                                         (errcode_for_file_access(),
1430                                          errmsg("could not close two-phase state file \"%s\": %m",
1431                                                         path)));
1432         }
1433
1434         pfree(xids);
1435 }
1436
1437 /*
1438  * PrescanPreparedTransactions
1439  *
1440  * Scan the pg_twophase directory and determine the range of valid XIDs
1441  * present.  This is run during database startup, after we have completed
1442  * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
1443  * the highest XID for which evidence exists in WAL.
1444  *
1445  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1446  * are present, it suggests that the DBA has done a PITR recovery to an
1447  * earlier point in time without cleaning out pg_twophase.      We dare not
1448  * try to recover such prepared xacts since they likely depend on database
1449  * state that doesn't exist now.
1450  *
1451  * However, we will advance nextXid beyond any subxact XIDs belonging to
1452  * valid prepared xacts.  We need to do this since subxact commit doesn't
1453  * write a WAL entry, and so there might be no evidence in WAL of those
1454  * subxact XIDs.
1455  *
1456  * Our other responsibility is to determine and return the oldest valid XID
1457  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1458  * This is needed to synchronize pg_subtrans startup properly.
1459  */
1460 TransactionId
1461 PrescanPreparedTransactions(void)
1462 {
1463         TransactionId origNextXid = ShmemVariableCache->nextXid;
1464         TransactionId result = origNextXid;
1465         DIR                *cldir;
1466         struct dirent *clde;
1467
1468         cldir = AllocateDir(TWOPHASE_DIR);
1469         while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1470         {
1471                 if (strlen(clde->d_name) == 8 &&
1472                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1473                 {
1474                         TransactionId xid;
1475                         char       *buf;
1476                         TwoPhaseFileHeader *hdr;
1477                         TransactionId *subxids;
1478                         int                     i;
1479
1480                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1481
1482                         /* Reject XID if too new */
1483                         if (TransactionIdFollowsOrEquals(xid, origNextXid))
1484                         {
1485                                 ereport(WARNING,
1486                                                 (errmsg("removing future two-phase state file \"%s\"",
1487                                                                 clde->d_name)));
1488                                 RemoveTwoPhaseFile(xid, true);
1489                                 continue;
1490                         }
1491
1492                         /*
1493                          * Note: we can't check if already processed because clog
1494                          * subsystem isn't up yet.
1495                          */
1496
1497                         /* Read and validate file */
1498                         buf = ReadTwoPhaseFile(xid);
1499                         if (buf == NULL)
1500                         {
1501                                 ereport(WARNING,
1502                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1503                                                           clde->d_name)));
1504                                 RemoveTwoPhaseFile(xid, true);
1505                                 continue;
1506                         }
1507
1508                         /* Deconstruct header */
1509                         hdr = (TwoPhaseFileHeader *) buf;
1510                         if (!TransactionIdEquals(hdr->xid, xid))
1511                         {
1512                                 ereport(WARNING,
1513                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1514                                                           clde->d_name)));
1515                                 RemoveTwoPhaseFile(xid, true);
1516                                 pfree(buf);
1517                                 continue;
1518                         }
1519
1520                         /*
1521                          * OK, we think this file is valid.  Incorporate xid into the
1522                          * running-minimum result.
1523                          */
1524                         if (TransactionIdPrecedes(xid, result))
1525                                 result = xid;
1526
1527                         /*
1528                          * Examine subtransaction XIDs ... they should all follow main
1529                          * XID, and they may force us to advance nextXid.
1530                          */
1531                         subxids = (TransactionId *)
1532                                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1533                         for (i = 0; i < hdr->nsubxacts; i++)
1534                         {
1535                                 TransactionId subxid = subxids[i];
1536
1537                                 Assert(TransactionIdFollows(subxid, xid));
1538                                 if (TransactionIdFollowsOrEquals(subxid,
1539                                                                                                  ShmemVariableCache->nextXid))
1540                                 {
1541                                         ShmemVariableCache->nextXid = subxid;
1542                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
1543                                 }
1544                         }
1545
1546                         pfree(buf);
1547                 }
1548         }
1549         FreeDir(cldir);
1550
1551         return result;
1552 }
1553
1554 /*
1555  * RecoverPreparedTransactions
1556  *
1557  * Scan the pg_twophase directory and reload shared-memory state for each
1558  * prepared transaction (reacquire locks, etc).  This is run during database
1559  * startup.
1560  */
1561 void
1562 RecoverPreparedTransactions(void)
1563 {
1564         char            dir[MAXPGPATH];
1565         DIR                *cldir;
1566         struct dirent *clde;
1567
1568         snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1569
1570         cldir = AllocateDir(dir);
1571         while ((clde = ReadDir(cldir, dir)) != NULL)
1572         {
1573                 if (strlen(clde->d_name) == 8 &&
1574                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1575                 {
1576                         TransactionId xid;
1577                         char       *buf;
1578                         char       *bufptr;
1579                         TwoPhaseFileHeader *hdr;
1580                         TransactionId *subxids;
1581                         GlobalTransaction gxact;
1582                         int                     i;
1583
1584                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1585
1586                         /* Already processed? */
1587                         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1588                         {
1589                                 ereport(WARNING,
1590                                                 (errmsg("removing stale two-phase state file \"%s\"",
1591                                                                 clde->d_name)));
1592                                 RemoveTwoPhaseFile(xid, true);
1593                                 continue;
1594                         }
1595
1596                         /* Read and validate file */
1597                         buf = ReadTwoPhaseFile(xid);
1598                         if (buf == NULL)
1599                         {
1600                                 ereport(WARNING,
1601                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1602                                                           clde->d_name)));
1603                                 RemoveTwoPhaseFile(xid, true);
1604                                 continue;
1605                         }
1606
1607                         ereport(LOG,
1608                                         (errmsg("recovering prepared transaction %u", xid)));
1609
1610                         /* Deconstruct header */
1611                         hdr = (TwoPhaseFileHeader *) buf;
1612                         Assert(TransactionIdEquals(hdr->xid, xid));
1613                         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1614                         subxids = (TransactionId *) bufptr;
1615                         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1616                         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1617                         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1618
1619                         /*
1620                          * Reconstruct subtrans state for the transaction --- needed
1621                          * because pg_subtrans is not preserved over a restart.  Note that
1622                          * we are linking all the subtransactions directly to the
1623                          * top-level XID; there may originally have been a more complex
1624                          * hierarchy, but there's no need to restore that exactly.
1625                          */
1626                         for (i = 0; i < hdr->nsubxacts; i++)
1627                                 SubTransSetParent(subxids[i], xid);
1628
1629                         /*
1630                          * Recreate its GXACT and dummy PGPROC
1631                          *
1632                          * Note: since we don't have the PREPARE record's WAL location at
1633                          * hand, we leave prepare_lsn zeroes.  This means the GXACT will
1634                          * be fsync'd on every future checkpoint.  We assume this
1635                          * situation is infrequent enough that the performance cost is
1636                          * negligible (especially since we know the state file has already
1637                          * been fsynced).
1638                          */
1639                         gxact = MarkAsPreparing(xid, hdr->gid,
1640                                                                         hdr->prepared_at,
1641                                                                         hdr->owner, hdr->database);
1642                         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1643                         MarkAsPrepared(gxact);
1644
1645                         /*
1646                          * Recover other state (notably locks) using resource managers
1647                          */
1648                         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1649
1650                         pfree(buf);
1651                 }
1652         }
1653         FreeDir(cldir);
1654 }
1655
1656 /*
1657  *      RecordTransactionCommitPrepared
1658  *
1659  * This is basically the same as RecordTransactionCommit: in particular,
1660  * we must set the inCommit flag to avoid a race condition.
1661  *
1662  * We know the transaction made at least one XLOG entry (its PREPARE),
1663  * so it is never possible to optimize out the commit record.
1664  */
1665 static void
1666 RecordTransactionCommitPrepared(TransactionId xid,
1667                                                                 int nchildren,
1668                                                                 TransactionId *children,
1669                                                                 int nrels,
1670                                                                 RelFileNode *rels)
1671 {
1672         XLogRecData rdata[3];
1673         int                     lastrdata = 0;
1674         xl_xact_commit_prepared xlrec;
1675         XLogRecPtr      recptr;
1676
1677         START_CRIT_SECTION();
1678
1679         /* See notes in RecordTransactionCommit */
1680         MyProc->inCommit = true;
1681
1682         /* Emit the XLOG commit record */
1683         xlrec.xid = xid;
1684         xlrec.crec.xact_time = GetCurrentTimestamp();
1685         xlrec.crec.nrels = nrels;
1686         xlrec.crec.nsubxacts = nchildren;
1687         rdata[0].data = (char *) (&xlrec);
1688         rdata[0].len = MinSizeOfXactCommitPrepared;
1689         rdata[0].buffer = InvalidBuffer;
1690         /* dump rels to delete */
1691         if (nrels > 0)
1692         {
1693                 rdata[0].next = &(rdata[1]);
1694                 rdata[1].data = (char *) rels;
1695                 rdata[1].len = nrels * sizeof(RelFileNode);
1696                 rdata[1].buffer = InvalidBuffer;
1697                 lastrdata = 1;
1698         }
1699         /* dump committed child Xids */
1700         if (nchildren > 0)
1701         {
1702                 rdata[lastrdata].next = &(rdata[2]);
1703                 rdata[2].data = (char *) children;
1704                 rdata[2].len = nchildren * sizeof(TransactionId);
1705                 rdata[2].buffer = InvalidBuffer;
1706                 lastrdata = 2;
1707         }
1708         rdata[lastrdata].next = NULL;
1709
1710         recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
1711
1712         /*
1713          * We don't currently try to sleep before flush here ... nor is there any
1714          * support for async commit of a prepared xact (the very idea is probably
1715          * a contradiction)
1716          */
1717
1718         /* Flush XLOG to disk */
1719         XLogFlush(recptr);
1720
1721         /* Mark the transaction committed in pg_clog */
1722         TransactionIdCommit(xid);
1723         /* to avoid race conditions, the parent must commit first */
1724         TransactionIdCommitTree(nchildren, children);
1725
1726         /* Checkpoint can proceed now */
1727         MyProc->inCommit = false;
1728
1729         END_CRIT_SECTION();
1730 }
1731
1732 /*
1733  *      RecordTransactionAbortPrepared
1734  *
1735  * This is basically the same as RecordTransactionAbort.
1736  *
1737  * We know the transaction made at least one XLOG entry (its PREPARE),
1738  * so it is never possible to optimize out the abort record.
1739  */
1740 static void
1741 RecordTransactionAbortPrepared(TransactionId xid,
1742                                                            int nchildren,
1743                                                            TransactionId *children,
1744                                                            int nrels,
1745                                                            RelFileNode *rels)
1746 {
1747         XLogRecData rdata[3];
1748         int                     lastrdata = 0;
1749         xl_xact_abort_prepared xlrec;
1750         XLogRecPtr      recptr;
1751
1752         /*
1753          * Catch the scenario where we aborted partway through
1754          * RecordTransactionCommitPrepared ...
1755          */
1756         if (TransactionIdDidCommit(xid))
1757                 elog(PANIC, "cannot abort transaction %u, it was already committed",
1758                          xid);
1759
1760         START_CRIT_SECTION();
1761
1762         /* Emit the XLOG abort record */
1763         xlrec.xid = xid;
1764         xlrec.arec.xact_time = GetCurrentTimestamp();
1765         xlrec.arec.nrels = nrels;
1766         xlrec.arec.nsubxacts = nchildren;
1767         rdata[0].data = (char *) (&xlrec);
1768         rdata[0].len = MinSizeOfXactAbortPrepared;
1769         rdata[0].buffer = InvalidBuffer;
1770         /* dump rels to delete */
1771         if (nrels > 0)
1772         {
1773                 rdata[0].next = &(rdata[1]);
1774                 rdata[1].data = (char *) rels;
1775                 rdata[1].len = nrels * sizeof(RelFileNode);
1776                 rdata[1].buffer = InvalidBuffer;
1777                 lastrdata = 1;
1778         }
1779         /* dump committed child Xids */
1780         if (nchildren > 0)
1781         {
1782                 rdata[lastrdata].next = &(rdata[2]);
1783                 rdata[2].data = (char *) children;
1784                 rdata[2].len = nchildren * sizeof(TransactionId);
1785                 rdata[2].buffer = InvalidBuffer;
1786                 lastrdata = 2;
1787         }
1788         rdata[lastrdata].next = NULL;
1789
1790         recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
1791
1792         /* Always flush, since we're about to remove the 2PC state file */
1793         XLogFlush(recptr);
1794
1795         /*
1796          * Mark the transaction aborted in clog.  This is not absolutely necessary
1797          * but we may as well do it while we are here.
1798          */
1799         TransactionIdAbort(xid);
1800         TransactionIdAbortTree(nchildren, children);
1801
1802         END_CRIT_SECTION();
1803 }