]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/twophase.c
Remove the CheckpointStartLock in favor of having backends show whether they
[postgresql] / src / backend / access / transam / twophase.c
1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *              Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *              $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.29 2007/04/03 16:34:35 tgl Exp $
11  *
12  * NOTES
13  *              Each global transaction is associated with a global transaction
14  *              identifier (GID). The client assigns a GID to a postgres
15  *              transaction with the PREPARE TRANSACTION command.
16  *
17  *              We keep all active global transactions in a shared memory array.
18  *              When the PREPARE TRANSACTION command is issued, the GID is
19  *              reserved for the transaction in the array. This is done before
20  *              a WAL entry is made, because the reservation checks for duplicate
21  *              GIDs and aborts the transaction if there already is a global
22  *              transaction in prepared state with the same GID.
23  *
24  *              A global transaction (gxact) also has a dummy PGPROC that is entered
25  *              into the ProcArray array; this is what keeps the XID considered
26  *              running by TransactionIdIsInProgress.  It is also convenient as a
27  *              PGPROC to hook the gxact's locks to.
28  *
29  *              In order to survive crashes and shutdowns, all prepared
30  *              transactions must be stored in permanent storage. This includes
31  *              locking information, pending notifications etc. All that state
32  *              information is written to the per-transaction state file in
33  *              the pg_twophase directory.
34  *
35  *-------------------------------------------------------------------------
36  */
37 #include "postgres.h"
38
39 #include <fcntl.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <time.h>
43 #include <unistd.h>
44
45 #include "access/heapam.h"
46 #include "access/subtrans.h"
47 #include "access/transam.h"
48 #include "access/twophase.h"
49 #include "access/twophase_rmgr.h"
50 #include "access/xact.h"
51 #include "catalog/pg_type.h"
52 #include "funcapi.h"
53 #include "miscadmin.h"
54 #include "pgstat.h"
55 #include "storage/fd.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/builtins.h"
59
60
61 /*
62  * Directory where Two-phase commit files reside within PGDATA
63  */
64 #define TWOPHASE_DIR "pg_twophase"
65
66 /* GUC variable, can't be changed after startup */
67 int                     max_prepared_xacts = 5;
68
69 /*
70  * This struct describes one global transaction that is in prepared state
71  * or attempting to become prepared.
72  *
73  * The first component of the struct is a dummy PGPROC that is inserted
74  * into the global ProcArray so that the transaction appears to still be
75  * running and holding locks.  It must be first because we cast pointers
76  * to PGPROC and pointers to GlobalTransactionData back and forth.
77  *
78  * The lifecycle of a global transaction is:
79  *
80  * 1. After checking that the requested GID is not in use, set up an
81  * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
82  * with locking_xid = my own XID and valid = false.
83  *
84  * 2. After successfully completing prepare, set valid = true and enter the
85  * contained PGPROC into the global ProcArray.
86  *
87  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
88  * is valid and its locking_xid is no longer active, then store my current
89  * XID into locking_xid.  This prevents concurrent attempts to commit or
90  * rollback the same prepared xact.
91  *
92  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
93  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
94  * the freelist.
95  *
96  * Note that if the preparing transaction fails between steps 1 and 2, the
97  * entry will remain in prepXacts until recycled.  We can detect recyclable
98  * entries by checking for valid = false and locking_xid no longer active.
99  *
100  * typedef struct GlobalTransactionData *GlobalTransaction appears in
101  * twophase.h
102  */
103 #define GIDSIZE 200
104
105 typedef struct GlobalTransactionData
106 {
107         PGPROC          proc;                   /* dummy proc */
108         TimestampTz prepared_at;        /* time of preparation */
109         XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
110         Oid                     owner;                  /* ID of user that executed the xact */
111         TransactionId locking_xid;      /* top-level XID of backend working on xact */
112         bool            valid;                  /* TRUE if fully prepared */
113         char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
114 } GlobalTransactionData;
115
116 /*
117  * Two Phase Commit shared state.  Access to this struct is protected
118  * by TwoPhaseStateLock.
119  */
120 typedef struct TwoPhaseStateData
121 {
122         /* Head of linked list of free GlobalTransactionData structs */
123         SHMEM_OFFSET freeGXacts;
124
125         /* Number of valid prepXacts entries. */
126         int                     numPrepXacts;
127
128         /*
129          * There are max_prepared_xacts items in this array, but C wants a
130          * fixed-size array.
131          */
132         GlobalTransaction prepXacts[1];         /* VARIABLE LENGTH ARRAY */
133 } TwoPhaseStateData;                    /* VARIABLE LENGTH STRUCT */
134
135 static TwoPhaseStateData *TwoPhaseState;
136
137
138 static void RecordTransactionCommitPrepared(TransactionId xid,
139                                                                 int nchildren,
140                                                                 TransactionId *children,
141                                                                 int nrels,
142                                                                 RelFileNode *rels);
143 static void RecordTransactionAbortPrepared(TransactionId xid,
144                                                            int nchildren,
145                                                            TransactionId *children,
146                                                            int nrels,
147                                                            RelFileNode *rels);
148 static void ProcessRecords(char *bufptr, TransactionId xid,
149                            const TwoPhaseCallback callbacks[]);
150
151
152 /*
153  * Initialization of shared memory
154  */
155 Size
156 TwoPhaseShmemSize(void)
157 {
158         Size            size;
159
160         /* Need the fixed struct, the array of pointers, and the GTD structs */
161         size = offsetof(TwoPhaseStateData, prepXacts);
162         size = add_size(size, mul_size(max_prepared_xacts,
163                                                                    sizeof(GlobalTransaction)));
164         size = MAXALIGN(size);
165         size = add_size(size, mul_size(max_prepared_xacts,
166                                                                    sizeof(GlobalTransactionData)));
167
168         return size;
169 }
170
171 void
172 TwoPhaseShmemInit(void)
173 {
174         bool            found;
175
176         TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
177                                                                         TwoPhaseShmemSize(),
178                                                                         &found);
179         if (!IsUnderPostmaster)
180         {
181                 GlobalTransaction gxacts;
182                 int                     i;
183
184                 Assert(!found);
185                 TwoPhaseState->freeGXacts = INVALID_OFFSET;
186                 TwoPhaseState->numPrepXacts = 0;
187
188                 /*
189                  * Initialize the linked list of free GlobalTransactionData structs
190                  */
191                 gxacts = (GlobalTransaction)
192                         ((char *) TwoPhaseState +
193                          MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
194                                           sizeof(GlobalTransaction) * max_prepared_xacts));
195                 for (i = 0; i < max_prepared_xacts; i++)
196                 {
197                         gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
198                         TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
199                 }
200         }
201         else
202                 Assert(found);
203 }
204
205
206 /*
207  * MarkAsPreparing
208  *              Reserve the GID for the given transaction.
209  *
210  * Internally, this creates a gxact struct and puts it into the active array.
211  * NOTE: this is also used when reloading a gxact after a crash; so avoid
212  * assuming that we can use very much backend context.
213  */
214 GlobalTransaction
215 MarkAsPreparing(TransactionId xid, const char *gid,
216                                 TimestampTz prepared_at, Oid owner, Oid databaseid)
217 {
218         GlobalTransaction gxact;
219         int                     i;
220
221         if (strlen(gid) >= GIDSIZE)
222                 ereport(ERROR,
223                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224                                  errmsg("transaction identifier \"%s\" is too long",
225                                                 gid)));
226
227         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
228
229         /*
230          * First, find and recycle any gxacts that failed during prepare. We do
231          * this partly to ensure we don't mistakenly say their GIDs are still
232          * reserved, and partly so we don't fail on out-of-slots unnecessarily.
233          */
234         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
235         {
236                 gxact = TwoPhaseState->prepXacts[i];
237                 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
238                 {
239                         /* It's dead Jim ... remove from the active array */
240                         TwoPhaseState->numPrepXacts--;
241                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
242                         /* and put it back in the freelist */
243                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
244                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
245                         /* Back up index count too, so we don't miss scanning one */
246                         i--;
247                 }
248         }
249
250         /* Check for conflicting GID */
251         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
252         {
253                 gxact = TwoPhaseState->prepXacts[i];
254                 if (strcmp(gxact->gid, gid) == 0)
255                 {
256                         ereport(ERROR,
257                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
258                                          errmsg("transaction identifier \"%s\" is already in use",
259                                                         gid)));
260                 }
261         }
262
263         /* Get a free gxact from the freelist */
264         if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_OUT_OF_MEMORY),
267                                  errmsg("maximum number of prepared transactions reached"),
268                                  errhint("Increase max_prepared_transactions (currently %d).",
269                                                  max_prepared_xacts)));
270         gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
271         TwoPhaseState->freeGXacts = gxact->proc.links.next;
272
273         /* Initialize it */
274         MemSet(&gxact->proc, 0, sizeof(PGPROC));
275         SHMQueueElemInit(&(gxact->proc.links));
276         gxact->proc.waitStatus = STATUS_OK;
277         gxact->proc.xid = xid;
278         gxact->proc.xmin = InvalidTransactionId;
279         gxact->proc.pid = 0;
280         gxact->proc.databaseId = databaseid;
281         gxact->proc.roleId = owner;
282         gxact->proc.inCommit = false;
283         gxact->proc.inVacuum = false;
284         gxact->proc.isAutovacuum = false;
285         gxact->proc.lwWaiting = false;
286         gxact->proc.lwExclusive = false;
287         gxact->proc.lwWaitLink = NULL;
288         gxact->proc.waitLock = NULL;
289         gxact->proc.waitProcLock = NULL;
290         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
291                 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
292         /* subxid data must be filled later by GXactLoadSubxactData */
293         gxact->proc.subxids.overflowed = false;
294         gxact->proc.subxids.nxids = 0;
295
296         gxact->prepared_at = prepared_at;
297         /* initialize LSN to 0 (start of WAL) */
298         gxact->prepare_lsn.xlogid = 0;
299         gxact->prepare_lsn.xrecoff = 0;
300         gxact->owner = owner;
301         gxact->locking_xid = xid;
302         gxact->valid = false;
303         strcpy(gxact->gid, gid);
304
305         /* And insert it into the active array */
306         Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
307         TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
308
309         LWLockRelease(TwoPhaseStateLock);
310
311         return gxact;
312 }
313
314 /*
315  * GXactLoadSubxactData
316  *
317  * If the transaction being persisted had any subtransactions, this must
318  * be called before MarkAsPrepared() to load information into the dummy
319  * PGPROC.
320  */
321 static void
322 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
323                                          TransactionId *children)
324 {
325         /* We need no extra lock since the GXACT isn't valid yet */
326         if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
327         {
328                 gxact->proc.subxids.overflowed = true;
329                 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
330         }
331         if (nsubxacts > 0)
332         {
333                 memcpy(gxact->proc.subxids.xids, children,
334                            nsubxacts * sizeof(TransactionId));
335                 gxact->proc.subxids.nxids = nsubxacts;
336         }
337 }
338
339 /*
340  * MarkAsPrepared
341  *              Mark the GXACT as fully valid, and enter it into the global ProcArray.
342  */
343 static void
344 MarkAsPrepared(GlobalTransaction gxact)
345 {
346         /* Lock here may be overkill, but I'm not convinced of that ... */
347         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
348         Assert(!gxact->valid);
349         gxact->valid = true;
350         LWLockRelease(TwoPhaseStateLock);
351
352         /*
353          * Put it into the global ProcArray so TransactionIdInProgress considers
354          * the XID as still running.
355          */
356         ProcArrayAdd(&gxact->proc);
357 }
358
359 /*
360  * LockGXact
361  *              Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
362  */
363 static GlobalTransaction
364 LockGXact(const char *gid, Oid user)
365 {
366         int                     i;
367
368         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
369
370         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
371         {
372                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
373
374                 /* Ignore not-yet-valid GIDs */
375                 if (!gxact->valid)
376                         continue;
377                 if (strcmp(gxact->gid, gid) != 0)
378                         continue;
379
380                 /* Found it, but has someone else got it locked? */
381                 if (TransactionIdIsValid(gxact->locking_xid))
382                 {
383                         if (TransactionIdIsActive(gxact->locking_xid))
384                                 ereport(ERROR,
385                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
386                                 errmsg("prepared transaction with identifier \"%s\" is busy",
387                                            gid)));
388                         gxact->locking_xid = InvalidTransactionId;
389                 }
390
391                 if (user != gxact->owner && !superuser_arg(user))
392                         ereport(ERROR,
393                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
394                                   errmsg("permission denied to finish prepared transaction"),
395                                          errhint("Must be superuser or the user that prepared the transaction.")));
396
397                 /*
398                  * Note: it probably would be possible to allow committing from another
399                  * database; but at the moment NOTIFY is known not to work and there
400                  * may be some other issues as well.  Hence disallow until someone
401                  * gets motivated to make it work.
402                  */
403                 if (MyDatabaseId != gxact->proc.databaseId)
404                         ereport(ERROR,
405                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
406                                          errmsg("prepared transaction belongs to another database"),
407                                          errhint("Connect to the database where the transaction was prepared to finish it.")));
408
409                 /* OK for me to lock it */
410                 gxact->locking_xid = GetTopTransactionId();
411
412                 LWLockRelease(TwoPhaseStateLock);
413
414                 return gxact;
415         }
416
417         LWLockRelease(TwoPhaseStateLock);
418
419         ereport(ERROR,
420                         (errcode(ERRCODE_UNDEFINED_OBJECT),
421                  errmsg("prepared transaction with identifier \"%s\" does not exist",
422                                 gid)));
423
424         /* NOTREACHED */
425         return NULL;
426 }
427
428 /*
429  * RemoveGXact
430  *              Remove the prepared transaction from the shared memory array.
431  *
432  * NB: caller should have already removed it from ProcArray
433  */
434 static void
435 RemoveGXact(GlobalTransaction gxact)
436 {
437         int                     i;
438
439         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
440
441         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
442         {
443                 if (gxact == TwoPhaseState->prepXacts[i])
444                 {
445                         /* remove from the active array */
446                         TwoPhaseState->numPrepXacts--;
447                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
448
449                         /* and put it back in the freelist */
450                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
451                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
452
453                         LWLockRelease(TwoPhaseStateLock);
454
455                         return;
456                 }
457         }
458
459         LWLockRelease(TwoPhaseStateLock);
460
461         elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
462 }
463
464 /*
465  * TransactionIdIsPrepared
466  *              True iff transaction associated with the identifier is prepared
467  *              for two-phase commit
468  *
469  * Note: only gxacts marked "valid" are considered; but notice we do not
470  * check the locking status.
471  *
472  * This is not currently exported, because it is only needed internally.
473  */
474 static bool
475 TransactionIdIsPrepared(TransactionId xid)
476 {
477         bool            result = false;
478         int                     i;
479
480         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
481
482         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
483         {
484                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
485
486                 if (gxact->valid && gxact->proc.xid == xid)
487                 {
488                         result = true;
489                         break;
490                 }
491         }
492
493         LWLockRelease(TwoPhaseStateLock);
494
495         return result;
496 }
497
498 /*
499  * Returns an array of all prepared transactions for the user-level
500  * function pg_prepared_xact.
501  *
502  * The returned array and all its elements are copies of internal data
503  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
504  *
505  * WARNING -- we return even those transactions that are not fully prepared
506  * yet.  The caller should filter them out if he doesn't want them.
507  *
508  * The returned array is palloc'd.
509  */
510 static int
511 GetPreparedTransactionList(GlobalTransaction *gxacts)
512 {
513         GlobalTransaction array;
514         int                     num;
515         int                     i;
516
517         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
518
519         if (TwoPhaseState->numPrepXacts == 0)
520         {
521                 LWLockRelease(TwoPhaseStateLock);
522
523                 *gxacts = NULL;
524                 return 0;
525         }
526
527         num = TwoPhaseState->numPrepXacts;
528         array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
529         *gxacts = array;
530         for (i = 0; i < num; i++)
531                 memcpy(array + i, TwoPhaseState->prepXacts[i],
532                            sizeof(GlobalTransactionData));
533
534         LWLockRelease(TwoPhaseStateLock);
535
536         return num;
537 }
538
539
540 /* Working status for pg_prepared_xact */
541 typedef struct
542 {
543         GlobalTransaction array;
544         int                     ngxacts;
545         int                     currIdx;
546 } Working_State;
547
548 /*
549  * pg_prepared_xact
550  *              Produce a view with one row per prepared transaction.
551  *
552  * This function is here so we don't have to export the
553  * GlobalTransactionData struct definition.
554  */
555 Datum
556 pg_prepared_xact(PG_FUNCTION_ARGS)
557 {
558         FuncCallContext *funcctx;
559         Working_State *status;
560
561         if (SRF_IS_FIRSTCALL())
562         {
563                 TupleDesc       tupdesc;
564                 MemoryContext oldcontext;
565
566                 /* create a function context for cross-call persistence */
567                 funcctx = SRF_FIRSTCALL_INIT();
568
569                 /*
570                  * Switch to memory context appropriate for multiple function calls
571                  */
572                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
573
574                 /* build tupdesc for result tuples */
575                 /* this had better match pg_prepared_xacts view in system_views.sql */
576                 tupdesc = CreateTemplateTupleDesc(5, false);
577                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
578                                                    XIDOID, -1, 0);
579                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
580                                                    TEXTOID, -1, 0);
581                 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
582                                                    TIMESTAMPTZOID, -1, 0);
583                 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
584                                                    OIDOID, -1, 0);
585                 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
586                                                    OIDOID, -1, 0);
587
588                 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
589
590                 /*
591                  * Collect all the 2PC status information that we will format and send
592                  * out as a result set.
593                  */
594                 status = (Working_State *) palloc(sizeof(Working_State));
595                 funcctx->user_fctx = (void *) status;
596
597                 status->ngxacts = GetPreparedTransactionList(&status->array);
598                 status->currIdx = 0;
599
600                 MemoryContextSwitchTo(oldcontext);
601         }
602
603         funcctx = SRF_PERCALL_SETUP();
604         status = (Working_State *) funcctx->user_fctx;
605
606         while (status->array != NULL && status->currIdx < status->ngxacts)
607         {
608                 GlobalTransaction gxact = &status->array[status->currIdx++];
609                 Datum           values[5];
610                 bool            nulls[5];
611                 HeapTuple       tuple;
612                 Datum           result;
613
614                 if (!gxact->valid)
615                         continue;
616
617                 /*
618                  * Form tuple with appropriate data.
619                  */
620                 MemSet(values, 0, sizeof(values));
621                 MemSet(nulls, 0, sizeof(nulls));
622
623                 values[0] = TransactionIdGetDatum(gxact->proc.xid);
624                 values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
625                 values[2] = TimestampTzGetDatum(gxact->prepared_at);
626                 values[3] = ObjectIdGetDatum(gxact->owner);
627                 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
628
629                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
630                 result = HeapTupleGetDatum(tuple);
631                 SRF_RETURN_NEXT(funcctx, result);
632         }
633
634         SRF_RETURN_DONE(funcctx);
635 }
636
637 /*
638  * TwoPhaseGetDummyProc
639  *              Get the PGPROC that represents a prepared transaction specified by XID
640  */
641 PGPROC *
642 TwoPhaseGetDummyProc(TransactionId xid)
643 {
644         PGPROC     *result = NULL;
645         int                     i;
646
647         static TransactionId cached_xid = InvalidTransactionId;
648         static PGPROC *cached_proc = NULL;
649
650         /*
651          * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
652          * repeatedly for the same XID.  We can save work with a simple cache.
653          */
654         if (xid == cached_xid)
655                 return cached_proc;
656
657         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
658
659         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
660         {
661                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
662
663                 if (gxact->proc.xid == xid)
664                 {
665                         result = &gxact->proc;
666                         break;
667                 }
668         }
669
670         LWLockRelease(TwoPhaseStateLock);
671
672         if (result == NULL)                     /* should not happen */
673                 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
674
675         cached_xid = xid;
676         cached_proc = result;
677
678         return result;
679 }
680
681 /************************************************************************/
682 /* State file support                                                                                                   */
683 /************************************************************************/
684
685 #define TwoPhaseFilePath(path, xid) \
686         snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
687
688 /*
689  * 2PC state file format:
690  *
691  *      1. TwoPhaseFileHeader
692  *      2. TransactionId[] (subtransactions)
693  *      3. RelFileNode[] (files to be deleted at commit)
694  *      4. RelFileNode[] (files to be deleted at abort)
695  *      5. TwoPhaseRecordOnDisk
696  *      6. ...
697  *      7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
698  *      8. CRC32
699  *
700  * Each segment except the final CRC32 is MAXALIGN'd.
701  */
702
703 /*
704  * Header for a 2PC state file
705  */
706 #define TWOPHASE_MAGIC  0x57F94531              /* format identifier */
707
708 typedef struct TwoPhaseFileHeader
709 {
710         uint32          magic;                  /* format identifier */
711         uint32          total_len;              /* actual file length */
712         TransactionId xid;                      /* original transaction XID */
713         Oid                     database;               /* OID of database it was in */
714         TimestampTz prepared_at;        /* time of preparation */
715         Oid                     owner;                  /* user running the transaction */
716         int32           nsubxacts;              /* number of following subxact XIDs */
717         int32           ncommitrels;    /* number of delete-on-commit rels */
718         int32           nabortrels;             /* number of delete-on-abort rels */
719         char            gid[GIDSIZE];   /* GID for transaction */
720 } TwoPhaseFileHeader;
721
722 /*
723  * Header for each record in a state file
724  *
725  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
726  * The rmgr data will be stored starting on a MAXALIGN boundary.
727  */
728 typedef struct TwoPhaseRecordOnDisk
729 {
730         uint32          len;                    /* length of rmgr data */
731         TwoPhaseRmgrId rmid;            /* resource manager for this record */
732         uint16          info;                   /* flag bits for use by rmgr */
733 } TwoPhaseRecordOnDisk;
734
735 /*
736  * During prepare, the state file is assembled in memory before writing it
737  * to WAL and the actual state file.  We use a chain of XLogRecData blocks
738  * so that we will be able to pass the state file contents directly to
739  * XLogInsert.
740  */
741 static struct xllist
742 {
743         XLogRecData *head;                      /* first data block in the chain */
744         XLogRecData *tail;                      /* last block in chain */
745         uint32          bytes_free;             /* free bytes left in tail block */
746         uint32          total_len;              /* total data bytes in chain */
747 }       records;
748
749
750 /*
751  * Append a block of data to records data structure.
752  *
753  * NB: each block is padded to a MAXALIGN multiple.  This must be
754  * accounted for when the file is later read!
755  *
756  * The data is copied, so the caller is free to modify it afterwards.
757  */
758 static void
759 save_state_data(const void *data, uint32 len)
760 {
761         uint32          padlen = MAXALIGN(len);
762
763         if (padlen > records.bytes_free)
764         {
765                 records.tail->next = palloc0(sizeof(XLogRecData));
766                 records.tail = records.tail->next;
767                 records.tail->buffer = InvalidBuffer;
768                 records.tail->len = 0;
769                 records.tail->next = NULL;
770
771                 records.bytes_free = Max(padlen, 512);
772                 records.tail->data = palloc(records.bytes_free);
773         }
774
775         memcpy(((char *) records.tail->data) + records.tail->len, data, len);
776         records.tail->len += padlen;
777         records.bytes_free -= padlen;
778         records.total_len += padlen;
779 }
780
781 /*
782  * Start preparing a state file.
783  *
784  * Initializes data structure and inserts the 2PC file header record.
785  */
786 void
787 StartPrepare(GlobalTransaction gxact)
788 {
789         TransactionId xid = gxact->proc.xid;
790         TwoPhaseFileHeader hdr;
791         TransactionId *children;
792         RelFileNode *commitrels;
793         RelFileNode *abortrels;
794
795         /* Initialize linked list */
796         records.head = palloc0(sizeof(XLogRecData));
797         records.head->buffer = InvalidBuffer;
798         records.head->len = 0;
799         records.head->next = NULL;
800
801         records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
802         records.head->data = palloc(records.bytes_free);
803
804         records.tail = records.head;
805
806         records.total_len = 0;
807
808         /* Create header */
809         hdr.magic = TWOPHASE_MAGIC;
810         hdr.total_len = 0;                      /* EndPrepare will fill this in */
811         hdr.xid = xid;
812         hdr.database = gxact->proc.databaseId;
813         hdr.prepared_at = gxact->prepared_at;
814         hdr.owner = gxact->owner;
815         hdr.nsubxacts = xactGetCommittedChildren(&children);
816         hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
817         hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
818         StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
819
820         save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
821
822         /* Add the additional info about subxacts and deletable files */
823         if (hdr.nsubxacts > 0)
824         {
825                 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
826                 /* While we have the child-xact data, stuff it in the gxact too */
827                 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
828                 pfree(children);
829         }
830         if (hdr.ncommitrels > 0)
831         {
832                 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
833                 pfree(commitrels);
834         }
835         if (hdr.nabortrels > 0)
836         {
837                 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
838                 pfree(abortrels);
839         }
840 }
841
842 /*
843  * Finish preparing state file.
844  *
845  * Calculates CRC and writes state file to WAL and in pg_twophase directory.
846  */
847 void
848 EndPrepare(GlobalTransaction gxact)
849 {
850         TransactionId xid = gxact->proc.xid;
851         TwoPhaseFileHeader *hdr;
852         char            path[MAXPGPATH];
853         XLogRecData *record;
854         pg_crc32        statefile_crc;
855         pg_crc32        bogus_crc;
856         int                     fd;
857
858         /* Add the end sentinel to the list of 2PC records */
859         RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
860                                                    NULL, 0);
861
862         /* Go back and fill in total_len in the file header record */
863         hdr = (TwoPhaseFileHeader *) records.head->data;
864         Assert(hdr->magic == TWOPHASE_MAGIC);
865         hdr->total_len = records.total_len + sizeof(pg_crc32);
866
867         /*
868          * Create the 2PC state file.
869          *
870          * Note: because we use BasicOpenFile(), we are responsible for ensuring
871          * the FD gets closed in any error exit path.  Once we get into the
872          * critical section, though, it doesn't matter since any failure causes
873          * PANIC anyway.
874          */
875         TwoPhaseFilePath(path, xid);
876
877         fd = BasicOpenFile(path,
878                                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
879                                            S_IRUSR | S_IWUSR);
880         if (fd < 0)
881                 ereport(ERROR,
882                                 (errcode_for_file_access(),
883                                  errmsg("could not create two-phase state file \"%s\": %m",
884                                                 path)));
885
886         /* Write data to file, and calculate CRC as we pass over it */
887         INIT_CRC32(statefile_crc);
888
889         for (record = records.head; record != NULL; record = record->next)
890         {
891                 COMP_CRC32(statefile_crc, record->data, record->len);
892                 if ((write(fd, record->data, record->len)) != record->len)
893                 {
894                         close(fd);
895                         ereport(ERROR,
896                                         (errcode_for_file_access(),
897                                          errmsg("could not write two-phase state file: %m")));
898                 }
899         }
900
901         FIN_CRC32(statefile_crc);
902
903         /*
904          * Write a deliberately bogus CRC to the state file; this is just paranoia
905          * to catch the case where four more bytes will run us out of disk space.
906          */
907         bogus_crc = ~statefile_crc;
908
909         if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
910         {
911                 close(fd);
912                 ereport(ERROR,
913                                 (errcode_for_file_access(),
914                                  errmsg("could not write two-phase state file: %m")));
915         }
916
917         /* Back up to prepare for rewriting the CRC */
918         if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
919         {
920                 close(fd);
921                 ereport(ERROR,
922                                 (errcode_for_file_access(),
923                                  errmsg("could not seek in two-phase state file: %m")));
924         }
925
926         /*
927          * The state file isn't valid yet, because we haven't written the correct
928          * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
929          *
930          * Between the time we have written the WAL entry and the time we write
931          * out the correct state file CRC, we have an inconsistency: the xact is
932          * prepared according to WAL but not according to our on-disk state. We
933          * use a critical section to force a PANIC if we are unable to complete
934          * the write --- then, WAL replay should repair the inconsistency.      The
935          * odds of a PANIC actually occurring should be very tiny given that we
936          * were able to write the bogus CRC above.
937          *
938          * We have to set inCommit here, too; otherwise a checkpoint
939          * starting immediately after the WAL record is inserted could complete
940          * without fsync'ing our state file.  (This is essentially the same kind
941          * of race condition as the COMMIT-to-clog-write case that
942          * RecordTransactionCommit uses inCommit for; see notes there.)
943          *
944          * We save the PREPARE record's location in the gxact for later use by
945          * CheckPointTwoPhase.
946          */
947         START_CRIT_SECTION();
948
949         MyProc->inCommit = true;
950
951         gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
952                                                                         records.head);
953         XLogFlush(gxact->prepare_lsn);
954
955         /* If we crash now, we have prepared: WAL replay will fix things */
956
957         /* write correct CRC and close file */
958         if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
959         {
960                 close(fd);
961                 ereport(ERROR,
962                                 (errcode_for_file_access(),
963                                  errmsg("could not write two-phase state file: %m")));
964         }
965
966         if (close(fd) != 0)
967                 ereport(ERROR,
968                                 (errcode_for_file_access(),
969                                  errmsg("could not close two-phase state file: %m")));
970
971         /*
972          * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
973          * as not running our XID (which it will do immediately after this
974          * function returns), others can commit/rollback the xact.
975          *
976          * NB: a side effect of this is to make a dummy ProcArray entry for the
977          * prepared XID.  This must happen before we clear the XID from MyProc,
978          * else there is a window where the XID is not running according to
979          * TransactionIdInProgress, and onlookers would be entitled to assume the
980          * xact crashed.  Instead we have a window where the same XID appears
981          * twice in ProcArray, which is OK.
982          */
983         MarkAsPrepared(gxact);
984
985         /*
986          * Now we can mark ourselves as out of the commit critical section:
987          * a checkpoint starting after this will certainly see the gxact as a
988          * candidate for fsyncing.
989          */
990         MyProc->inCommit = false;
991
992         END_CRIT_SECTION();
993
994         records.tail = records.head = NULL;
995 }
996
997 /*
998  * Register a 2PC record to be written to state file.
999  */
1000 void
1001 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1002                                            const void *data, uint32 len)
1003 {
1004         TwoPhaseRecordOnDisk record;
1005
1006         record.rmid = rmid;
1007         record.info = info;
1008         record.len = len;
1009         save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1010         if (len > 0)
1011                 save_state_data(data, len);
1012 }
1013
1014
1015 /*
1016  * Read and validate the state file for xid.
1017  *
1018  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1019  * contents of the file.  Otherwise return NULL.
1020  */
1021 static char *
1022 ReadTwoPhaseFile(TransactionId xid)
1023 {
1024         char            path[MAXPGPATH];
1025         char       *buf;
1026         TwoPhaseFileHeader *hdr;
1027         int                     fd;
1028         struct stat stat;
1029         uint32          crc_offset;
1030         pg_crc32        calc_crc,
1031                                 file_crc;
1032
1033         TwoPhaseFilePath(path, xid);
1034
1035         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1036         if (fd < 0)
1037         {
1038                 ereport(WARNING,
1039                                 (errcode_for_file_access(),
1040                                  errmsg("could not open two-phase state file \"%s\": %m",
1041                                                 path)));
1042                 return NULL;
1043         }
1044
1045         /*
1046          * Check file length.  We can determine a lower bound pretty easily. We
1047          * set an upper bound mainly to avoid palloc() failure on a corrupt file.
1048          */
1049         if (fstat(fd, &stat))
1050         {
1051                 close(fd);
1052                 ereport(WARNING,
1053                                 (errcode_for_file_access(),
1054                                  errmsg("could not stat two-phase state file \"%s\": %m",
1055                                                 path)));
1056                 return NULL;
1057         }
1058
1059         if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1060                                                 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1061                                                 sizeof(pg_crc32)) ||
1062                 stat.st_size > 10000000)
1063         {
1064                 close(fd);
1065                 return NULL;
1066         }
1067
1068         crc_offset = stat.st_size - sizeof(pg_crc32);
1069         if (crc_offset != MAXALIGN(crc_offset))
1070         {
1071                 close(fd);
1072                 return NULL;
1073         }
1074
1075         /*
1076          * OK, slurp in the file.
1077          */
1078         buf = (char *) palloc(stat.st_size);
1079
1080         if (read(fd, buf, stat.st_size) != stat.st_size)
1081         {
1082                 close(fd);
1083                 ereport(WARNING,
1084                                 (errcode_for_file_access(),
1085                                  errmsg("could not read two-phase state file \"%s\": %m",
1086                                                 path)));
1087                 pfree(buf);
1088                 return NULL;
1089         }
1090
1091         close(fd);
1092
1093         hdr = (TwoPhaseFileHeader *) buf;
1094         if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1095         {
1096                 pfree(buf);
1097                 return NULL;
1098         }
1099
1100         INIT_CRC32(calc_crc);
1101         COMP_CRC32(calc_crc, buf, crc_offset);
1102         FIN_CRC32(calc_crc);
1103
1104         file_crc = *((pg_crc32 *) (buf + crc_offset));
1105
1106         if (!EQ_CRC32(calc_crc, file_crc))
1107         {
1108                 pfree(buf);
1109                 return NULL;
1110         }
1111
1112         return buf;
1113 }
1114
1115
1116 /*
1117  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1118  */
1119 void
1120 FinishPreparedTransaction(const char *gid, bool isCommit)
1121 {
1122         GlobalTransaction gxact;
1123         TransactionId xid;
1124         char       *buf;
1125         char       *bufptr;
1126         TwoPhaseFileHeader *hdr;
1127         TransactionId *children;
1128         RelFileNode *commitrels;
1129         RelFileNode *abortrels;
1130         int                     i;
1131
1132         /*
1133          * Validate the GID, and lock the GXACT to ensure that two backends do not
1134          * try to commit the same GID at once.
1135          */
1136         gxact = LockGXact(gid, GetUserId());
1137         xid = gxact->proc.xid;
1138
1139         /*
1140          * Read and validate the state file
1141          */
1142         buf = ReadTwoPhaseFile(xid);
1143         if (buf == NULL)
1144                 ereport(ERROR,
1145                                 (errcode(ERRCODE_DATA_CORRUPTED),
1146                                  errmsg("two-phase state file for transaction %u is corrupt",
1147                                                 xid)));
1148
1149         /*
1150          * Disassemble the header area
1151          */
1152         hdr = (TwoPhaseFileHeader *) buf;
1153         Assert(TransactionIdEquals(hdr->xid, xid));
1154         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1155         children = (TransactionId *) bufptr;
1156         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1157         commitrels = (RelFileNode *) bufptr;
1158         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1159         abortrels = (RelFileNode *) bufptr;
1160         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1161
1162         /*
1163          * The order of operations here is critical: make the XLOG entry for
1164          * commit or abort, then mark the transaction committed or aborted in
1165          * pg_clog, then remove its PGPROC from the global ProcArray (which means
1166          * TransactionIdIsInProgress will stop saying the prepared xact is in
1167          * progress), then run the post-commit or post-abort callbacks. The
1168          * callbacks will release the locks the transaction held.
1169          */
1170         if (isCommit)
1171                 RecordTransactionCommitPrepared(xid,
1172                                                                                 hdr->nsubxacts, children,
1173                                                                                 hdr->ncommitrels, commitrels);
1174         else
1175                 RecordTransactionAbortPrepared(xid,
1176                                                                            hdr->nsubxacts, children,
1177                                                                            hdr->nabortrels, abortrels);
1178
1179         ProcArrayRemove(&gxact->proc);
1180
1181         /*
1182          * In case we fail while running the callbacks, mark the gxact invalid so
1183          * no one else will try to commit/rollback, and so it can be recycled
1184          * properly later.      It is still locked by our XID so it won't go away yet.
1185          *
1186          * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1187          */
1188         gxact->valid = false;
1189
1190         /*
1191          * We have to remove any files that were supposed to be dropped. For
1192          * consistency with the regular xact.c code paths, must do this before
1193          * releasing locks, so do it before running the callbacks.
1194          *
1195          * NB: this code knows that we couldn't be dropping any temp rels ...
1196          */
1197         if (isCommit)
1198         {
1199                 for (i = 0; i < hdr->ncommitrels; i++)
1200                         smgrdounlink(smgropen(commitrels[i]), false, false);
1201         }
1202         else
1203         {
1204                 for (i = 0; i < hdr->nabortrels; i++)
1205                         smgrdounlink(smgropen(abortrels[i]), false, false);
1206         }
1207
1208         /* And now do the callbacks */
1209         if (isCommit)
1210                 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1211         else
1212                 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1213
1214         pgstat_count_xact_commit();
1215
1216         /*
1217          * And now we can clean up our mess.
1218          */
1219         RemoveTwoPhaseFile(xid, true);
1220
1221         RemoveGXact(gxact);
1222
1223         pfree(buf);
1224 }
1225
1226 /*
1227  * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1228  * and call the indicated callbacks for each 2PC record.
1229  */
1230 static void
1231 ProcessRecords(char *bufptr, TransactionId xid,
1232                            const TwoPhaseCallback callbacks[])
1233 {
1234         for (;;)
1235         {
1236                 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1237
1238                 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1239                 if (record->rmid == TWOPHASE_RM_END_ID)
1240                         break;
1241
1242                 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1243
1244                 if (callbacks[record->rmid] != NULL)
1245                         callbacks[record->rmid] (xid, record->info,
1246                                                                          (void *) bufptr, record->len);
1247
1248                 bufptr += MAXALIGN(record->len);
1249         }
1250 }
1251
1252 /*
1253  * Remove the 2PC file for the specified XID.
1254  *
1255  * If giveWarning is false, do not complain about file-not-present;
1256  * this is an expected case during WAL replay.
1257  */
1258 void
1259 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1260 {
1261         char            path[MAXPGPATH];
1262
1263         TwoPhaseFilePath(path, xid);
1264         if (unlink(path))
1265                 if (errno != ENOENT || giveWarning)
1266                         ereport(WARNING,
1267                                         (errcode_for_file_access(),
1268                                          errmsg("could not remove two-phase state file \"%s\": %m",
1269                                                         path)));
1270 }
1271
1272 /*
1273  * Recreates a state file. This is used in WAL replay.
1274  *
1275  * Note: content and len don't include CRC.
1276  */
1277 void
1278 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1279 {
1280         char            path[MAXPGPATH];
1281         pg_crc32        statefile_crc;
1282         int                     fd;
1283
1284         /* Recompute CRC */
1285         INIT_CRC32(statefile_crc);
1286         COMP_CRC32(statefile_crc, content, len);
1287         FIN_CRC32(statefile_crc);
1288
1289         TwoPhaseFilePath(path, xid);
1290
1291         fd = BasicOpenFile(path,
1292                                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1293                                            S_IRUSR | S_IWUSR);
1294         if (fd < 0)
1295                 ereport(ERROR,
1296                                 (errcode_for_file_access(),
1297                                  errmsg("could not recreate two-phase state file \"%s\": %m",
1298                                                 path)));
1299
1300         /* Write content and CRC */
1301         if (write(fd, content, len) != len)
1302         {
1303                 close(fd);
1304                 ereport(ERROR,
1305                                 (errcode_for_file_access(),
1306                                  errmsg("could not write two-phase state file: %m")));
1307         }
1308         if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1309         {
1310                 close(fd);
1311                 ereport(ERROR,
1312                                 (errcode_for_file_access(),
1313                                  errmsg("could not write two-phase state file: %m")));
1314         }
1315
1316         /*
1317          * We must fsync the file because the end-of-replay checkpoint will not do
1318          * so, there being no GXACT in shared memory yet to tell it to.
1319          */
1320         if (pg_fsync(fd) != 0)
1321         {
1322                 close(fd);
1323                 ereport(ERROR,
1324                                 (errcode_for_file_access(),
1325                                  errmsg("could not fsync two-phase state file: %m")));
1326         }
1327
1328         if (close(fd) != 0)
1329                 ereport(ERROR,
1330                                 (errcode_for_file_access(),
1331                                  errmsg("could not close two-phase state file: %m")));
1332 }
1333
1334 /*
1335  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1336  *
1337  * We must fsync the state file of any GXACT that is valid and has a PREPARE
1338  * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
1339  * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1340  *
1341  * This is deliberately run as late as possible in the checkpoint sequence,
1342  * because GXACTs ordinarily have short lifespans, and so it is quite
1343  * possible that GXACTs that were valid at checkpoint start will no longer
1344  * exist if we wait a little bit.
1345  *
1346  * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1347  * each time.  This is considered unusual enough that we don't bother to
1348  * expend any extra code to avoid the redundant fsyncs.  (They should be
1349  * reasonably cheap anyway, since they won't cause I/O.)
1350  */
1351 void
1352 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1353 {
1354         TransactionId *xids;
1355         int                     nxids;
1356         char            path[MAXPGPATH];
1357         int                     i;
1358
1359         /*
1360          * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1361          * it just long enough to make a list of the XIDs that require fsyncing,
1362          * and then do the I/O afterwards.
1363          *
1364          * This approach creates a race condition: someone else could delete a
1365          * GXACT between the time we release TwoPhaseStateLock and the time we try
1366          * to open its state file.      We handle this by special-casing ENOENT
1367          * failures: if we see that, we verify that the GXACT is no longer valid,
1368          * and if so ignore the failure.
1369          */
1370         if (max_prepared_xacts <= 0)
1371                 return;                                 /* nothing to do */
1372         xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1373         nxids = 0;
1374
1375         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1376
1377         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1378         {
1379                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1380
1381                 if (gxact->valid &&
1382                         XLByteLE(gxact->prepare_lsn, redo_horizon))
1383                         xids[nxids++] = gxact->proc.xid;
1384         }
1385
1386         LWLockRelease(TwoPhaseStateLock);
1387
1388         for (i = 0; i < nxids; i++)
1389         {
1390                 TransactionId xid = xids[i];
1391                 int                     fd;
1392
1393                 TwoPhaseFilePath(path, xid);
1394
1395                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1396                 if (fd < 0)
1397                 {
1398                         if (errno == ENOENT)
1399                         {
1400                                 /* OK if gxact is no longer valid */
1401                                 if (!TransactionIdIsPrepared(xid))
1402                                         continue;
1403                                 /* Restore errno in case it was changed */
1404                                 errno = ENOENT;
1405                         }
1406                         ereport(ERROR,
1407                                         (errcode_for_file_access(),
1408                                          errmsg("could not open two-phase state file \"%s\": %m",
1409                                                         path)));
1410                 }
1411
1412                 if (pg_fsync(fd) != 0)
1413                 {
1414                         close(fd);
1415                         ereport(ERROR,
1416                                         (errcode_for_file_access(),
1417                                          errmsg("could not fsync two-phase state file \"%s\": %m",
1418                                                         path)));
1419                 }
1420
1421                 if (close(fd) != 0)
1422                         ereport(ERROR,
1423                                         (errcode_for_file_access(),
1424                                          errmsg("could not close two-phase state file \"%s\": %m",
1425                                                         path)));
1426         }
1427
1428         pfree(xids);
1429 }
1430
1431 /*
1432  * PrescanPreparedTransactions
1433  *
1434  * Scan the pg_twophase directory and determine the range of valid XIDs
1435  * present.  This is run during database startup, after we have completed
1436  * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
1437  * the highest XID for which evidence exists in WAL.
1438  *
1439  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1440  * are present, it suggests that the DBA has done a PITR recovery to an
1441  * earlier point in time without cleaning out pg_twophase.      We dare not
1442  * try to recover such prepared xacts since they likely depend on database
1443  * state that doesn't exist now.
1444  *
1445  * However, we will advance nextXid beyond any subxact XIDs belonging to
1446  * valid prepared xacts.  We need to do this since subxact commit doesn't
1447  * write a WAL entry, and so there might be no evidence in WAL of those
1448  * subxact XIDs.
1449  *
1450  * Our other responsibility is to determine and return the oldest valid XID
1451  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1452  * This is needed to synchronize pg_subtrans startup properly.
1453  */
1454 TransactionId
1455 PrescanPreparedTransactions(void)
1456 {
1457         TransactionId origNextXid = ShmemVariableCache->nextXid;
1458         TransactionId result = origNextXid;
1459         DIR                *cldir;
1460         struct dirent *clde;
1461
1462         cldir = AllocateDir(TWOPHASE_DIR);
1463         while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1464         {
1465                 if (strlen(clde->d_name) == 8 &&
1466                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1467                 {
1468                         TransactionId xid;
1469                         char       *buf;
1470                         TwoPhaseFileHeader *hdr;
1471                         TransactionId *subxids;
1472                         int                     i;
1473
1474                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1475
1476                         /* Reject XID if too new */
1477                         if (TransactionIdFollowsOrEquals(xid, origNextXid))
1478                         {
1479                                 ereport(WARNING,
1480                                                 (errmsg("removing future two-phase state file \"%s\"",
1481                                                                 clde->d_name)));
1482                                 RemoveTwoPhaseFile(xid, true);
1483                                 continue;
1484                         }
1485
1486                         /*
1487                          * Note: we can't check if already processed because clog
1488                          * subsystem isn't up yet.
1489                          */
1490
1491                         /* Read and validate file */
1492                         buf = ReadTwoPhaseFile(xid);
1493                         if (buf == NULL)
1494                         {
1495                                 ereport(WARNING,
1496                                                 (errmsg("removing corrupt two-phase state file \"%s\"",
1497                                                                 clde->d_name)));
1498                                 RemoveTwoPhaseFile(xid, true);
1499                                 continue;
1500                         }
1501
1502                         /* Deconstruct header */
1503                         hdr = (TwoPhaseFileHeader *) buf;
1504                         if (!TransactionIdEquals(hdr->xid, xid))
1505                         {
1506                                 ereport(WARNING,
1507                                                 (errmsg("removing corrupt two-phase state file \"%s\"",
1508                                                                 clde->d_name)));
1509                                 RemoveTwoPhaseFile(xid, true);
1510                                 pfree(buf);
1511                                 continue;
1512                         }
1513
1514                         /*
1515                          * OK, we think this file is valid.  Incorporate xid into the
1516                          * running-minimum result.
1517                          */
1518                         if (TransactionIdPrecedes(xid, result))
1519                                 result = xid;
1520
1521                         /*
1522                          * Examine subtransaction XIDs ... they should all follow main
1523                          * XID, and they may force us to advance nextXid.
1524                          */
1525                         subxids = (TransactionId *)
1526                                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1527                         for (i = 0; i < hdr->nsubxacts; i++)
1528                         {
1529                                 TransactionId subxid = subxids[i];
1530
1531                                 Assert(TransactionIdFollows(subxid, xid));
1532                                 if (TransactionIdFollowsOrEquals(subxid,
1533                                                                                                  ShmemVariableCache->nextXid))
1534                                 {
1535                                         ShmemVariableCache->nextXid = subxid;
1536                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
1537                                 }
1538                         }
1539
1540                         pfree(buf);
1541                 }
1542         }
1543         FreeDir(cldir);
1544
1545         return result;
1546 }
1547
1548 /*
1549  * RecoverPreparedTransactions
1550  *
1551  * Scan the pg_twophase directory and reload shared-memory state for each
1552  * prepared transaction (reacquire locks, etc).  This is run during database
1553  * startup.
1554  */
1555 void
1556 RecoverPreparedTransactions(void)
1557 {
1558         char            dir[MAXPGPATH];
1559         DIR                *cldir;
1560         struct dirent *clde;
1561
1562         snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1563
1564         cldir = AllocateDir(dir);
1565         while ((clde = ReadDir(cldir, dir)) != NULL)
1566         {
1567                 if (strlen(clde->d_name) == 8 &&
1568                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1569                 {
1570                         TransactionId xid;
1571                         char       *buf;
1572                         char       *bufptr;
1573                         TwoPhaseFileHeader *hdr;
1574                         TransactionId *subxids;
1575                         GlobalTransaction gxact;
1576                         int                     i;
1577
1578                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1579
1580                         /* Already processed? */
1581                         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1582                         {
1583                                 ereport(WARNING,
1584                                                 (errmsg("removing stale two-phase state file \"%s\"",
1585                                                                 clde->d_name)));
1586                                 RemoveTwoPhaseFile(xid, true);
1587                                 continue;
1588                         }
1589
1590                         /* Read and validate file */
1591                         buf = ReadTwoPhaseFile(xid);
1592                         if (buf == NULL)
1593                         {
1594                                 ereport(WARNING,
1595                                                 (errmsg("removing corrupt two-phase state file \"%s\"",
1596                                                                 clde->d_name)));
1597                                 RemoveTwoPhaseFile(xid, true);
1598                                 continue;
1599                         }
1600
1601                         ereport(LOG,
1602                                         (errmsg("recovering prepared transaction %u", xid)));
1603
1604                         /* Deconstruct header */
1605                         hdr = (TwoPhaseFileHeader *) buf;
1606                         Assert(TransactionIdEquals(hdr->xid, xid));
1607                         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1608                         subxids = (TransactionId *) bufptr;
1609                         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1610                         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1611                         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1612
1613                         /*
1614                          * Reconstruct subtrans state for the transaction --- needed
1615                          * because pg_subtrans is not preserved over a restart.  Note that
1616                          * we are linking all the subtransactions directly to the
1617                          * top-level XID; there may originally have been a more complex
1618                          * hierarchy, but there's no need to restore that exactly.
1619                          */
1620                         for (i = 0; i < hdr->nsubxacts; i++)
1621                                 SubTransSetParent(subxids[i], xid);
1622
1623                         /*
1624                          * Recreate its GXACT and dummy PGPROC
1625                          *
1626                          * Note: since we don't have the PREPARE record's WAL location at
1627                          * hand, we leave prepare_lsn zeroes.  This means the GXACT will
1628                          * be fsync'd on every future checkpoint.  We assume this
1629                          * situation is infrequent enough that the performance cost is
1630                          * negligible (especially since we know the state file has already
1631                          * been fsynced).
1632                          */
1633                         gxact = MarkAsPreparing(xid, hdr->gid,
1634                                                                         hdr->prepared_at,
1635                                                                         hdr->owner, hdr->database);
1636                         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1637                         MarkAsPrepared(gxact);
1638
1639                         /*
1640                          * Recover other state (notably locks) using resource managers
1641                          */
1642                         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1643
1644                         pfree(buf);
1645                 }
1646         }
1647         FreeDir(cldir);
1648 }
1649
1650 /*
1651  *      RecordTransactionCommitPrepared
1652  *
1653  * This is basically the same as RecordTransactionCommit: in particular,
1654  * we must set the inCommit flag to avoid a race condition.
1655  *
1656  * We know the transaction made at least one XLOG entry (its PREPARE),
1657  * so it is never possible to optimize out the commit record.
1658  */
1659 static void
1660 RecordTransactionCommitPrepared(TransactionId xid,
1661                                                                 int nchildren,
1662                                                                 TransactionId *children,
1663                                                                 int nrels,
1664                                                                 RelFileNode *rels)
1665 {
1666         XLogRecData rdata[3];
1667         int                     lastrdata = 0;
1668         xl_xact_commit_prepared xlrec;
1669         XLogRecPtr      recptr;
1670
1671         START_CRIT_SECTION();
1672
1673         /* See notes in RecordTransactionCommit */
1674         MyProc->inCommit = true;
1675
1676         /* Emit the XLOG commit record */
1677         xlrec.xid = xid;
1678         xlrec.crec.xtime = time(NULL);
1679         xlrec.crec.nrels = nrels;
1680         xlrec.crec.nsubxacts = nchildren;
1681         rdata[0].data = (char *) (&xlrec);
1682         rdata[0].len = MinSizeOfXactCommitPrepared;
1683         rdata[0].buffer = InvalidBuffer;
1684         /* dump rels to delete */
1685         if (nrels > 0)
1686         {
1687                 rdata[0].next = &(rdata[1]);
1688                 rdata[1].data = (char *) rels;
1689                 rdata[1].len = nrels * sizeof(RelFileNode);
1690                 rdata[1].buffer = InvalidBuffer;
1691                 lastrdata = 1;
1692         }
1693         /* dump committed child Xids */
1694         if (nchildren > 0)
1695         {
1696                 rdata[lastrdata].next = &(rdata[2]);
1697                 rdata[2].data = (char *) children;
1698                 rdata[2].len = nchildren * sizeof(TransactionId);
1699                 rdata[2].buffer = InvalidBuffer;
1700                 lastrdata = 2;
1701         }
1702         rdata[lastrdata].next = NULL;
1703
1704         recptr = XLogInsert(RM_XACT_ID,
1705                                                 XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
1706                                                 rdata);
1707
1708         /* we don't currently try to sleep before flush here ... */
1709
1710         /* Flush XLOG to disk */
1711         XLogFlush(recptr);
1712
1713         /* Mark the transaction committed in pg_clog */
1714         TransactionIdCommit(xid);
1715         /* to avoid race conditions, the parent must commit first */
1716         TransactionIdCommitTree(nchildren, children);
1717
1718         /* Checkpoint can proceed now */
1719         MyProc->inCommit = false;
1720
1721         END_CRIT_SECTION();
1722 }
1723
1724 /*
1725  *      RecordTransactionAbortPrepared
1726  *
1727  * This is basically the same as RecordTransactionAbort.
1728  *
1729  * We know the transaction made at least one XLOG entry (its PREPARE),
1730  * so it is never possible to optimize out the abort record.
1731  */
1732 static void
1733 RecordTransactionAbortPrepared(TransactionId xid,
1734                                                            int nchildren,
1735                                                            TransactionId *children,
1736                                                            int nrels,
1737                                                            RelFileNode *rels)
1738 {
1739         XLogRecData rdata[3];
1740         int                     lastrdata = 0;
1741         xl_xact_abort_prepared xlrec;
1742         XLogRecPtr      recptr;
1743
1744         /*
1745          * Catch the scenario where we aborted partway through
1746          * RecordTransactionCommitPrepared ...
1747          */
1748         if (TransactionIdDidCommit(xid))
1749                 elog(PANIC, "cannot abort transaction %u, it was already committed",
1750                          xid);
1751
1752         START_CRIT_SECTION();
1753
1754         /* Emit the XLOG abort record */
1755         xlrec.xid = xid;
1756         xlrec.arec.xtime = time(NULL);
1757         xlrec.arec.nrels = nrels;
1758         xlrec.arec.nsubxacts = nchildren;
1759         rdata[0].data = (char *) (&xlrec);
1760         rdata[0].len = MinSizeOfXactAbortPrepared;
1761         rdata[0].buffer = InvalidBuffer;
1762         /* dump rels to delete */
1763         if (nrels > 0)
1764         {
1765                 rdata[0].next = &(rdata[1]);
1766                 rdata[1].data = (char *) rels;
1767                 rdata[1].len = nrels * sizeof(RelFileNode);
1768                 rdata[1].buffer = InvalidBuffer;
1769                 lastrdata = 1;
1770         }
1771         /* dump committed child Xids */
1772         if (nchildren > 0)
1773         {
1774                 rdata[lastrdata].next = &(rdata[2]);
1775                 rdata[2].data = (char *) children;
1776                 rdata[2].len = nchildren * sizeof(TransactionId);
1777                 rdata[2].buffer = InvalidBuffer;
1778                 lastrdata = 2;
1779         }
1780         rdata[lastrdata].next = NULL;
1781
1782         recptr = XLogInsert(RM_XACT_ID,
1783                                                 XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
1784                                                 rdata);
1785
1786         /* Always flush, since we're about to remove the 2PC state file */
1787         XLogFlush(recptr);
1788
1789         /*
1790          * Mark the transaction aborted in clog.  This is not absolutely necessary
1791          * but we may as well do it while we are here.
1792          */
1793         TransactionIdAbort(xid);
1794         TransactionIdAbortTree(nchildren, children);
1795
1796         END_CRIT_SECTION();
1797 }