]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/twophase.c
Divide the lock manager's shared state into 'partitions', so as to
[postgresql] / src / backend / access / transam / twophase.c
1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *              Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *              $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.18 2005/12/11 21:02:17 tgl Exp $
11  *
12  * NOTES
13  *              Each global transaction is associated with a global transaction
14  *              identifier (GID). The client assigns a GID to a postgres
15  *              transaction with the PREPARE TRANSACTION command.
16  *
17  *              We keep all active global transactions in a shared memory array.
18  *              When the PREPARE TRANSACTION command is issued, the GID is
19  *              reserved for the transaction in the array. This is done before
20  *              a WAL entry is made, because the reservation checks for duplicate
21  *              GIDs and aborts the transaction if there already is a global
22  *              transaction in prepared state with the same GID.
23  *
24  *              A global transaction (gxact) also has a dummy PGPROC that is entered
25  *              into the ProcArray array; this is what keeps the XID considered
26  *              running by TransactionIdIsInProgress.  It is also convenient as a
27  *              PGPROC to hook the gxact's locks to.
28  *
29  *              In order to survive crashes and shutdowns, all prepared
30  *              transactions must be stored in permanent storage. This includes
31  *              locking information, pending notifications etc. All that state
32  *              information is written to the per-transaction state file in
33  *              the pg_twophase directory.
34  *
35  *-------------------------------------------------------------------------
36  */
37 #include "postgres.h"
38
39 #include <fcntl.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <time.h>
43 #include <unistd.h>
44
45 #include "access/heapam.h"
46 #include "access/subtrans.h"
47 #include "access/twophase.h"
48 #include "access/twophase_rmgr.h"
49 #include "access/xact.h"
50 #include "catalog/pg_type.h"
51 #include "funcapi.h"
52 #include "miscadmin.h"
53 #include "pgstat.h"
54 #include "storage/fd.h"
55 #include "storage/proc.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/builtins.h"
59
60
61 /*
62  * Directory where Two-phase commit files reside within PGDATA
63  */
64 #define TWOPHASE_DIR "pg_twophase"
65
66 /* GUC variable, can't be changed after startup */
67 int                     max_prepared_xacts = 5;
68
69 /*
70  * This struct describes one global transaction that is in prepared state
71  * or attempting to become prepared.
72  *
73  * The first component of the struct is a dummy PGPROC that is inserted
74  * into the global ProcArray so that the transaction appears to still be
75  * running and holding locks.  It must be first because we cast pointers
76  * to PGPROC and pointers to GlobalTransactionData back and forth.
77  *
78  * The lifecycle of a global transaction is:
79  *
80  * 1. After checking that the requested GID is not in use, set up an
81  * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
82  * with locking_xid = my own XID and valid = false.
83  *
84  * 2. After successfully completing prepare, set valid = true and enter the
85  * contained PGPROC into the global ProcArray.
86  *
87  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
88  * is valid and its locking_xid is no longer active, then store my current
89  * XID into locking_xid.  This prevents concurrent attempts to commit or
90  * rollback the same prepared xact.
91  *
92  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
93  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
94  * the freelist.
95  *
96  * Note that if the preparing transaction fails between steps 1 and 2, the
97  * entry will remain in prepXacts until recycled.  We can detect recyclable
98  * entries by checking for valid = false and locking_xid no longer active.
99  *
100  * typedef struct GlobalTransactionData *GlobalTransaction appears in
101  * twophase.h
102  */
103 #define GIDSIZE 200
104
105 typedef struct GlobalTransactionData
106 {
107         PGPROC          proc;                   /* dummy proc */
108         TimestampTz prepared_at;        /* time of preparation */
109         XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
110         Oid                     owner;                  /* ID of user that executed the xact */
111         TransactionId locking_xid;      /* top-level XID of backend working on xact */
112         bool            valid;                  /* TRUE if fully prepared */
113         char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
114 } GlobalTransactionData;
115
116 /*
117  * Two Phase Commit shared state.  Access to this struct is protected
118  * by TwoPhaseStateLock.
119  */
120 typedef struct TwoPhaseStateData
121 {
122         /* Head of linked list of free GlobalTransactionData structs */
123         SHMEM_OFFSET freeGXacts;
124
125         /* Number of valid prepXacts entries. */
126         int                     numPrepXacts;
127
128         /*
129          * There are max_prepared_xacts items in this array, but C wants a
130          * fixed-size array.
131          */
132         GlobalTransaction prepXacts[1];         /* VARIABLE LENGTH ARRAY */
133 } TwoPhaseStateData;                    /* VARIABLE LENGTH STRUCT */
134
135 static TwoPhaseStateData *TwoPhaseState;
136
137
138 static void RecordTransactionCommitPrepared(TransactionId xid,
139                                                                 int nchildren,
140                                                                 TransactionId *children,
141                                                                 int nrels,
142                                                                 RelFileNode *rels);
143 static void RecordTransactionAbortPrepared(TransactionId xid,
144                                                            int nchildren,
145                                                            TransactionId *children,
146                                                            int nrels,
147                                                            RelFileNode *rels);
148 static void ProcessRecords(char *bufptr, TransactionId xid,
149                            const TwoPhaseCallback callbacks[]);
150
151
152 /*
153  * Initialization of shared memory
154  */
155 Size
156 TwoPhaseShmemSize(void)
157 {
158         Size            size;
159
160         /* Need the fixed struct, the array of pointers, and the GTD structs */
161         size = offsetof(TwoPhaseStateData, prepXacts);
162         size = add_size(size, mul_size(max_prepared_xacts,
163                                                                    sizeof(GlobalTransaction)));
164         size = MAXALIGN(size);
165         size = add_size(size, mul_size(max_prepared_xacts,
166                                                                    sizeof(GlobalTransactionData)));
167
168         return size;
169 }
170
171 void
172 TwoPhaseShmemInit(void)
173 {
174         bool            found;
175
176         TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
177                                                                         TwoPhaseShmemSize(),
178                                                                         &found);
179         if (!IsUnderPostmaster)
180         {
181                 GlobalTransaction gxacts;
182                 int                     i;
183
184                 Assert(!found);
185                 TwoPhaseState->freeGXacts = INVALID_OFFSET;
186                 TwoPhaseState->numPrepXacts = 0;
187
188                 /*
189                  * Initialize the linked list of free GlobalTransactionData structs
190                  */
191                 gxacts = (GlobalTransaction)
192                         ((char *) TwoPhaseState +
193                          MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
194                                           sizeof(GlobalTransaction) * max_prepared_xacts));
195                 for (i = 0; i < max_prepared_xacts; i++)
196                 {
197                         gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
198                         TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
199                 }
200         }
201         else
202                 Assert(found);
203 }
204
205
206 /*
207  * MarkAsPreparing
208  *              Reserve the GID for the given transaction.
209  *
210  * Internally, this creates a gxact struct and puts it into the active array.
211  * NOTE: this is also used when reloading a gxact after a crash; so avoid
212  * assuming that we can use very much backend context.
213  */
214 GlobalTransaction
215 MarkAsPreparing(TransactionId xid, const char *gid,
216                                 TimestampTz prepared_at, Oid owner, Oid databaseid)
217 {
218         GlobalTransaction gxact;
219         int                     i;
220
221         if (strlen(gid) >= GIDSIZE)
222                 ereport(ERROR,
223                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224                                  errmsg("transaction identifier \"%s\" is too long",
225                                                 gid)));
226
227         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
228
229         /*
230          * First, find and recycle any gxacts that failed during prepare. We do
231          * this partly to ensure we don't mistakenly say their GIDs are still
232          * reserved, and partly so we don't fail on out-of-slots unnecessarily.
233          */
234         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
235         {
236                 gxact = TwoPhaseState->prepXacts[i];
237                 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
238                 {
239                         /* It's dead Jim ... remove from the active array */
240                         TwoPhaseState->numPrepXacts--;
241                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
242                         /* and put it back in the freelist */
243                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
244                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
245                         /* Back up index count too, so we don't miss scanning one */
246                         i--;
247                 }
248         }
249
250         /* Check for conflicting GID */
251         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
252         {
253                 gxact = TwoPhaseState->prepXacts[i];
254                 if (strcmp(gxact->gid, gid) == 0)
255                 {
256                         ereport(ERROR,
257                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
258                                          errmsg("transaction identifier \"%s\" is already in use",
259                                                         gid)));
260                 }
261         }
262
263         /* Get a free gxact from the freelist */
264         if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_OUT_OF_MEMORY),
267                                  errmsg("maximum number of prepared transactions reached"),
268                                  errhint("Increase max_prepared_transactions (currently %d).",
269                                                  max_prepared_xacts)));
270         gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
271         TwoPhaseState->freeGXacts = gxact->proc.links.next;
272
273         /* Initialize it */
274         MemSet(&gxact->proc, 0, sizeof(PGPROC));
275         SHMQueueElemInit(&(gxact->proc.links));
276         gxact->proc.waitStatus = STATUS_OK;
277         gxact->proc.xid = xid;
278         gxact->proc.xmin = InvalidTransactionId;
279         gxact->proc.pid = 0;
280         gxact->proc.databaseId = databaseid;
281         gxact->proc.roleId = owner;
282         gxact->proc.lwWaiting = false;
283         gxact->proc.lwExclusive = false;
284         gxact->proc.lwWaitLink = NULL;
285         gxact->proc.waitLock = NULL;
286         gxact->proc.waitProcLock = NULL;
287         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
288                 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
289         /* subxid data must be filled later by GXactLoadSubxactData */
290         gxact->proc.subxids.overflowed = false;
291         gxact->proc.subxids.nxids = 0;
292
293         gxact->prepared_at = prepared_at;
294         /* initialize LSN to 0 (start of WAL) */
295         gxact->prepare_lsn.xlogid = 0;
296         gxact->prepare_lsn.xrecoff = 0;
297         gxact->owner = owner;
298         gxact->locking_xid = xid;
299         gxact->valid = false;
300         strcpy(gxact->gid, gid);
301
302         /* And insert it into the active array */
303         Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
304         TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
305
306         LWLockRelease(TwoPhaseStateLock);
307
308         return gxact;
309 }
310
311 /*
312  * GXactLoadSubxactData
313  *
314  * If the transaction being persisted had any subtransactions, this must
315  * be called before MarkAsPrepared() to load information into the dummy
316  * PGPROC.
317  */
318 static void
319 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
320                                          TransactionId *children)
321 {
322         /* We need no extra lock since the GXACT isn't valid yet */
323         if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
324         {
325                 gxact->proc.subxids.overflowed = true;
326                 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
327         }
328         if (nsubxacts > 0)
329         {
330                 memcpy(gxact->proc.subxids.xids, children,
331                            nsubxacts * sizeof(TransactionId));
332                 gxact->proc.subxids.nxids = nsubxacts;
333         }
334 }
335
336 /*
337  * MarkAsPrepared
338  *              Mark the GXACT as fully valid, and enter it into the global ProcArray.
339  */
340 static void
341 MarkAsPrepared(GlobalTransaction gxact)
342 {
343         /* Lock here may be overkill, but I'm not convinced of that ... */
344         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
345         Assert(!gxact->valid);
346         gxact->valid = true;
347         LWLockRelease(TwoPhaseStateLock);
348
349         /*
350          * Put it into the global ProcArray so TransactionIdInProgress considers
351          * the XID as still running.
352          */
353         ProcArrayAdd(&gxact->proc);
354 }
355
356 /*
357  * LockGXact
358  *              Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
359  */
360 static GlobalTransaction
361 LockGXact(const char *gid, Oid user)
362 {
363         int                     i;
364
365         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
366
367         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
368         {
369                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
370
371                 /* Ignore not-yet-valid GIDs */
372                 if (!gxact->valid)
373                         continue;
374                 if (strcmp(gxact->gid, gid) != 0)
375                         continue;
376
377                 /* Found it, but has someone else got it locked? */
378                 if (TransactionIdIsValid(gxact->locking_xid))
379                 {
380                         if (TransactionIdIsActive(gxact->locking_xid))
381                                 ereport(ERROR,
382                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
383                                 errmsg("prepared transaction with identifier \"%s\" is busy",
384                                            gid)));
385                         gxact->locking_xid = InvalidTransactionId;
386                 }
387
388                 if (user != gxact->owner && !superuser_arg(user))
389                         ereport(ERROR,
390                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
391                                   errmsg("permission denied to finish prepared transaction"),
392                                          errhint("Must be superuser or the user that prepared the transaction.")));
393
394                 /* OK for me to lock it */
395                 gxact->locking_xid = GetTopTransactionId();
396
397                 LWLockRelease(TwoPhaseStateLock);
398
399                 return gxact;
400         }
401
402         LWLockRelease(TwoPhaseStateLock);
403
404         ereport(ERROR,
405                         (errcode(ERRCODE_UNDEFINED_OBJECT),
406                  errmsg("prepared transaction with identifier \"%s\" does not exist",
407                                 gid)));
408
409         /* NOTREACHED */
410         return NULL;
411 }
412
413 /*
414  * RemoveGXact
415  *              Remove the prepared transaction from the shared memory array.
416  *
417  * NB: caller should have already removed it from ProcArray
418  */
419 static void
420 RemoveGXact(GlobalTransaction gxact)
421 {
422         int                     i;
423
424         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
425
426         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
427         {
428                 if (gxact == TwoPhaseState->prepXacts[i])
429                 {
430                         /* remove from the active array */
431                         TwoPhaseState->numPrepXacts--;
432                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
433
434                         /* and put it back in the freelist */
435                         gxact->proc.links.next = TwoPhaseState->freeGXacts;
436                         TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
437
438                         LWLockRelease(TwoPhaseStateLock);
439
440                         return;
441                 }
442         }
443
444         LWLockRelease(TwoPhaseStateLock);
445
446         elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
447 }
448
449 /*
450  * TransactionIdIsPrepared
451  *              True iff transaction associated with the identifier is prepared
452  *              for two-phase commit
453  *
454  * Note: only gxacts marked "valid" are considered; but notice we do not
455  * check the locking status.
456  *
457  * This is not currently exported, because it is only needed internally.
458  */
459 static bool
460 TransactionIdIsPrepared(TransactionId xid)
461 {
462         bool            result = false;
463         int                     i;
464
465         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
466
467         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
468         {
469                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
470
471                 if (gxact->valid && gxact->proc.xid == xid)
472                 {
473                         result = true;
474                         break;
475                 }
476         }
477
478         LWLockRelease(TwoPhaseStateLock);
479
480         return result;
481 }
482
483 /*
484  * Returns an array of all prepared transactions for the user-level
485  * function pg_prepared_xact.
486  *
487  * The returned array and all its elements are copies of internal data
488  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
489  *
490  * WARNING -- we return even those transactions that are not fully prepared
491  * yet.  The caller should filter them out if he doesn't want them.
492  *
493  * The returned array is palloc'd.
494  */
495 static int
496 GetPreparedTransactionList(GlobalTransaction *gxacts)
497 {
498         GlobalTransaction array;
499         int                     num;
500         int                     i;
501
502         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
503
504         if (TwoPhaseState->numPrepXacts == 0)
505         {
506                 LWLockRelease(TwoPhaseStateLock);
507
508                 *gxacts = NULL;
509                 return 0;
510         }
511
512         num = TwoPhaseState->numPrepXacts;
513         array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
514         *gxacts = array;
515         for (i = 0; i < num; i++)
516                 memcpy(array + i, TwoPhaseState->prepXacts[i],
517                            sizeof(GlobalTransactionData));
518
519         LWLockRelease(TwoPhaseStateLock);
520
521         return num;
522 }
523
524
525 /* Working status for pg_prepared_xact */
526 typedef struct
527 {
528         GlobalTransaction array;
529         int                     ngxacts;
530         int                     currIdx;
531 } Working_State;
532
533 /*
534  * pg_prepared_xact
535  *              Produce a view with one row per prepared transaction.
536  *
537  * This function is here so we don't have to export the
538  * GlobalTransactionData struct definition.
539  */
540 Datum
541 pg_prepared_xact(PG_FUNCTION_ARGS)
542 {
543         FuncCallContext *funcctx;
544         Working_State *status;
545
546         if (SRF_IS_FIRSTCALL())
547         {
548                 TupleDesc       tupdesc;
549                 MemoryContext oldcontext;
550
551                 /* create a function context for cross-call persistence */
552                 funcctx = SRF_FIRSTCALL_INIT();
553
554                 /*
555                  * Switch to memory context appropriate for multiple function calls
556                  */
557                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
558
559                 /* build tupdesc for result tuples */
560                 /* this had better match pg_prepared_xacts view in system_views.sql */
561                 tupdesc = CreateTemplateTupleDesc(5, false);
562                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
563                                                    XIDOID, -1, 0);
564                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
565                                                    TEXTOID, -1, 0);
566                 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
567                                                    TIMESTAMPTZOID, -1, 0);
568                 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
569                                                    OIDOID, -1, 0);
570                 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
571                                                    OIDOID, -1, 0);
572
573                 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
574
575                 /*
576                  * Collect all the 2PC status information that we will format and send
577                  * out as a result set.
578                  */
579                 status = (Working_State *) palloc(sizeof(Working_State));
580                 funcctx->user_fctx = (void *) status;
581
582                 status->ngxacts = GetPreparedTransactionList(&status->array);
583                 status->currIdx = 0;
584
585                 MemoryContextSwitchTo(oldcontext);
586         }
587
588         funcctx = SRF_PERCALL_SETUP();
589         status = (Working_State *) funcctx->user_fctx;
590
591         while (status->array != NULL && status->currIdx < status->ngxacts)
592         {
593                 GlobalTransaction gxact = &status->array[status->currIdx++];
594                 Datum           values[5];
595                 bool            nulls[5];
596                 HeapTuple       tuple;
597                 Datum           result;
598
599                 if (!gxact->valid)
600                         continue;
601
602                 /*
603                  * Form tuple with appropriate data.
604                  */
605                 MemSet(values, 0, sizeof(values));
606                 MemSet(nulls, 0, sizeof(nulls));
607
608                 values[0] = TransactionIdGetDatum(gxact->proc.xid);
609                 values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
610                 values[2] = TimestampTzGetDatum(gxact->prepared_at);
611                 values[3] = ObjectIdGetDatum(gxact->owner);
612                 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
613
614                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
615                 result = HeapTupleGetDatum(tuple);
616                 SRF_RETURN_NEXT(funcctx, result);
617         }
618
619         SRF_RETURN_DONE(funcctx);
620 }
621
622 /*
623  * TwoPhaseGetDummyProc
624  *              Get the PGPROC that represents a prepared transaction specified by XID
625  */
626 PGPROC *
627 TwoPhaseGetDummyProc(TransactionId xid)
628 {
629         PGPROC     *result = NULL;
630         int                     i;
631
632         static TransactionId cached_xid = InvalidTransactionId;
633         static PGPROC *cached_proc = NULL;
634
635         /*
636          * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
637          * repeatedly for the same XID.  We can save work with a simple cache.
638          */
639         if (xid == cached_xid)
640                 return cached_proc;
641
642         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
643
644         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
645         {
646                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
647
648                 if (gxact->proc.xid == xid)
649                 {
650                         result = &gxact->proc;
651                         break;
652                 }
653         }
654
655         LWLockRelease(TwoPhaseStateLock);
656
657         if (result == NULL)                     /* should not happen */
658                 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
659
660         cached_xid = xid;
661         cached_proc = result;
662
663         return result;
664 }
665
666 /************************************************************************/
667 /* State file support                                                                                                   */
668 /************************************************************************/
669
670 #define TwoPhaseFilePath(path, xid) \
671         snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
672
673 /*
674  * 2PC state file format:
675  *
676  *      1. TwoPhaseFileHeader
677  *      2. TransactionId[] (subtransactions)
678  *      3. RelFileNode[] (files to be deleted at commit)
679  *      4. RelFileNode[] (files to be deleted at abort)
680  *      5. TwoPhaseRecordOnDisk
681  *      6. ...
682  *      7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
683  *      8. CRC32
684  *
685  * Each segment except the final CRC32 is MAXALIGN'd.
686  */
687
688 /*
689  * Header for a 2PC state file
690  */
691 #define TWOPHASE_MAGIC  0x57F94531              /* format identifier */
692
693 typedef struct TwoPhaseFileHeader
694 {
695         uint32          magic;                  /* format identifier */
696         uint32          total_len;              /* actual file length */
697         TransactionId xid;                      /* original transaction XID */
698         Oid                     database;               /* OID of database it was in */
699         TimestampTz prepared_at;        /* time of preparation */
700         Oid                     owner;                  /* user running the transaction */
701         int32           nsubxacts;              /* number of following subxact XIDs */
702         int32           ncommitrels;    /* number of delete-on-commit rels */
703         int32           nabortrels;             /* number of delete-on-abort rels */
704         char            gid[GIDSIZE];   /* GID for transaction */
705 } TwoPhaseFileHeader;
706
707 /*
708  * Header for each record in a state file
709  *
710  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
711  * The rmgr data will be stored starting on a MAXALIGN boundary.
712  */
713 typedef struct TwoPhaseRecordOnDisk
714 {
715         uint32          len;                    /* length of rmgr data */
716         TwoPhaseRmgrId rmid;            /* resource manager for this record */
717         uint16          info;                   /* flag bits for use by rmgr */
718 } TwoPhaseRecordOnDisk;
719
720 /*
721  * During prepare, the state file is assembled in memory before writing it
722  * to WAL and the actual state file.  We use a chain of XLogRecData blocks
723  * so that we will be able to pass the state file contents directly to
724  * XLogInsert.
725  */
726 static struct xllist
727 {
728         XLogRecData *head;                      /* first data block in the chain */
729         XLogRecData *tail;                      /* last block in chain */
730         uint32          bytes_free;             /* free bytes left in tail block */
731         uint32          total_len;              /* total data bytes in chain */
732 }       records;
733
734
735 /*
736  * Append a block of data to records data structure.
737  *
738  * NB: each block is padded to a MAXALIGN multiple.  This must be
739  * accounted for when the file is later read!
740  *
741  * The data is copied, so the caller is free to modify it afterwards.
742  */
743 static void
744 save_state_data(const void *data, uint32 len)
745 {
746         uint32          padlen = MAXALIGN(len);
747
748         if (padlen > records.bytes_free)
749         {
750                 records.tail->next = palloc0(sizeof(XLogRecData));
751                 records.tail = records.tail->next;
752                 records.tail->buffer = InvalidBuffer;
753                 records.tail->len = 0;
754                 records.tail->next = NULL;
755
756                 records.bytes_free = Max(padlen, 512);
757                 records.tail->data = palloc(records.bytes_free);
758         }
759
760         memcpy(((char *) records.tail->data) + records.tail->len, data, len);
761         records.tail->len += padlen;
762         records.bytes_free -= padlen;
763         records.total_len += padlen;
764 }
765
766 /*
767  * Start preparing a state file.
768  *
769  * Initializes data structure and inserts the 2PC file header record.
770  */
771 void
772 StartPrepare(GlobalTransaction gxact)
773 {
774         TransactionId xid = gxact->proc.xid;
775         TwoPhaseFileHeader hdr;
776         TransactionId *children;
777         RelFileNode *commitrels;
778         RelFileNode *abortrels;
779
780         /* Initialize linked list */
781         records.head = palloc0(sizeof(XLogRecData));
782         records.head->buffer = InvalidBuffer;
783         records.head->len = 0;
784         records.head->next = NULL;
785
786         records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
787         records.head->data = palloc(records.bytes_free);
788
789         records.tail = records.head;
790
791         records.total_len = 0;
792
793         /* Create header */
794         hdr.magic = TWOPHASE_MAGIC;
795         hdr.total_len = 0;                      /* EndPrepare will fill this in */
796         hdr.xid = xid;
797         hdr.database = gxact->proc.databaseId;
798         hdr.prepared_at = gxact->prepared_at;
799         hdr.owner = gxact->owner;
800         hdr.nsubxacts = xactGetCommittedChildren(&children);
801         hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
802         hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
803         StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
804
805         save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
806
807         /* Add the additional info about subxacts and deletable files */
808         if (hdr.nsubxacts > 0)
809         {
810                 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
811                 /* While we have the child-xact data, stuff it in the gxact too */
812                 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
813                 pfree(children);
814         }
815         if (hdr.ncommitrels > 0)
816         {
817                 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
818                 pfree(commitrels);
819         }
820         if (hdr.nabortrels > 0)
821         {
822                 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
823                 pfree(abortrels);
824         }
825 }
826
827 /*
828  * Finish preparing state file.
829  *
830  * Calculates CRC and writes state file to WAL and in pg_twophase directory.
831  */
832 void
833 EndPrepare(GlobalTransaction gxact)
834 {
835         TransactionId xid = gxact->proc.xid;
836         TwoPhaseFileHeader *hdr;
837         char            path[MAXPGPATH];
838         XLogRecData *record;
839         pg_crc32        statefile_crc;
840         pg_crc32        bogus_crc;
841         int                     fd;
842
843         /* Add the end sentinel to the list of 2PC records */
844         RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
845                                                    NULL, 0);
846
847         /* Go back and fill in total_len in the file header record */
848         hdr = (TwoPhaseFileHeader *) records.head->data;
849         Assert(hdr->magic == TWOPHASE_MAGIC);
850         hdr->total_len = records.total_len + sizeof(pg_crc32);
851
852         /*
853          * Create the 2PC state file.
854          *
855          * Note: because we use BasicOpenFile(), we are responsible for ensuring
856          * the FD gets closed in any error exit path.  Once we get into the
857          * critical section, though, it doesn't matter since any failure causes
858          * PANIC anyway.
859          */
860         TwoPhaseFilePath(path, xid);
861
862         fd = BasicOpenFile(path,
863                                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
864                                            S_IRUSR | S_IWUSR);
865         if (fd < 0)
866                 ereport(ERROR,
867                                 (errcode_for_file_access(),
868                                  errmsg("could not create twophase state file \"%s\": %m",
869                                                 path)));
870
871         /* Write data to file, and calculate CRC as we pass over it */
872         INIT_CRC32(statefile_crc);
873
874         for (record = records.head; record != NULL; record = record->next)
875         {
876                 COMP_CRC32(statefile_crc, record->data, record->len);
877                 if ((write(fd, record->data, record->len)) != record->len)
878                 {
879                         close(fd);
880                         ereport(ERROR,
881                                         (errcode_for_file_access(),
882                                          errmsg("could not write twophase state file: %m")));
883                 }
884         }
885
886         FIN_CRC32(statefile_crc);
887
888         /*
889          * Write a deliberately bogus CRC to the state file; this is just paranoia
890          * to catch the case where four more bytes will run us out of disk space.
891          */
892         bogus_crc = ~statefile_crc;
893
894         if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
895         {
896                 close(fd);
897                 ereport(ERROR,
898                                 (errcode_for_file_access(),
899                                  errmsg("could not write twophase state file: %m")));
900         }
901
902         /* Back up to prepare for rewriting the CRC */
903         if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
904         {
905                 close(fd);
906                 ereport(ERROR,
907                                 (errcode_for_file_access(),
908                                  errmsg("could not seek in twophase state file: %m")));
909         }
910
911         /*
912          * The state file isn't valid yet, because we haven't written the correct
913          * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
914          *
915          * Between the time we have written the WAL entry and the time we write
916          * out the correct state file CRC, we have an inconsistency: the xact is
917          * prepared according to WAL but not according to our on-disk state. We
918          * use a critical section to force a PANIC if we are unable to complete
919          * the write --- then, WAL replay should repair the inconsistency.      The
920          * odds of a PANIC actually occurring should be very tiny given that we
921          * were able to write the bogus CRC above.
922          *
923          * We have to lock out checkpoint start here, too; otherwise a checkpoint
924          * starting immediately after the WAL record is inserted could complete
925          * without fsync'ing our state file.  (This is essentially the same kind
926          * of race condition as the COMMIT-to-clog-write case that
927          * RecordTransactionCommit uses CheckpointStartLock for; see notes there.)
928          *
929          * We save the PREPARE record's location in the gxact for later use by
930          * CheckPointTwoPhase.
931          */
932         START_CRIT_SECTION();
933
934         LWLockAcquire(CheckpointStartLock, LW_SHARED);
935
936         gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
937                                                                         records.head);
938         XLogFlush(gxact->prepare_lsn);
939
940         /* If we crash now, we have prepared: WAL replay will fix things */
941
942         /* write correct CRC and close file */
943         if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
944         {
945                 close(fd);
946                 ereport(ERROR,
947                                 (errcode_for_file_access(),
948                                  errmsg("could not write twophase state file: %m")));
949         }
950
951         if (close(fd) != 0)
952                 ereport(ERROR,
953                                 (errcode_for_file_access(),
954                                  errmsg("could not close twophase state file: %m")));
955
956         /*
957          * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
958          * as not running our XID (which it will do immediately after this
959          * function returns), others can commit/rollback the xact.
960          *
961          * NB: a side effect of this is to make a dummy ProcArray entry for the
962          * prepared XID.  This must happen before we clear the XID from MyProc,
963          * else there is a window where the XID is not running according to
964          * TransactionIdInProgress, and onlookers would be entitled to assume the
965          * xact crashed.  Instead we have a window where the same XID appears
966          * twice in ProcArray, which is OK.
967          */
968         MarkAsPrepared(gxact);
969
970         /*
971          * Now we can release the checkpoint start lock: a checkpoint starting
972          * after this will certainly see the gxact as a candidate for fsyncing.
973          */
974         LWLockRelease(CheckpointStartLock);
975
976         END_CRIT_SECTION();
977
978         records.tail = records.head = NULL;
979 }
980
981 /*
982  * Register a 2PC record to be written to state file.
983  */
984 void
985 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
986                                            const void *data, uint32 len)
987 {
988         TwoPhaseRecordOnDisk record;
989
990         record.rmid = rmid;
991         record.info = info;
992         record.len = len;
993         save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
994         if (len > 0)
995                 save_state_data(data, len);
996 }
997
998
999 /*
1000  * Read and validate the state file for xid.
1001  *
1002  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1003  * contents of the file.  Otherwise return NULL.
1004  */
1005 static char *
1006 ReadTwoPhaseFile(TransactionId xid)
1007 {
1008         char            path[MAXPGPATH];
1009         char       *buf;
1010         TwoPhaseFileHeader *hdr;
1011         int                     fd;
1012         struct stat stat;
1013         uint32          crc_offset;
1014         pg_crc32        calc_crc,
1015                                 file_crc;
1016
1017         TwoPhaseFilePath(path, xid);
1018
1019         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1020         if (fd < 0)
1021         {
1022                 ereport(WARNING,
1023                                 (errcode_for_file_access(),
1024                                  errmsg("could not open twophase state file \"%s\": %m",
1025                                                 path)));
1026                 return NULL;
1027         }
1028
1029         /*
1030          * Check file length.  We can determine a lower bound pretty easily. We
1031          * set an upper bound mainly to avoid palloc() failure on a corrupt file.
1032          */
1033         if (fstat(fd, &stat))
1034         {
1035                 close(fd);
1036                 ereport(WARNING,
1037                                 (errcode_for_file_access(),
1038                                  errmsg("could not stat twophase state file \"%s\": %m",
1039                                                 path)));
1040                 return NULL;
1041         }
1042
1043         if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1044                                                 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1045                                                 sizeof(pg_crc32)) ||
1046                 stat.st_size > 10000000)
1047         {
1048                 close(fd);
1049                 return NULL;
1050         }
1051
1052         crc_offset = stat.st_size - sizeof(pg_crc32);
1053         if (crc_offset != MAXALIGN(crc_offset))
1054         {
1055                 close(fd);
1056                 return NULL;
1057         }
1058
1059         /*
1060          * OK, slurp in the file.
1061          */
1062         buf = (char *) palloc(stat.st_size);
1063
1064         if (read(fd, buf, stat.st_size) != stat.st_size)
1065         {
1066                 close(fd);
1067                 ereport(WARNING,
1068                                 (errcode_for_file_access(),
1069                                  errmsg("could not read twophase state file \"%s\": %m",
1070                                                 path)));
1071                 pfree(buf);
1072                 return NULL;
1073         }
1074
1075         close(fd);
1076
1077         hdr = (TwoPhaseFileHeader *) buf;
1078         if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1079         {
1080                 pfree(buf);
1081                 return NULL;
1082         }
1083
1084         INIT_CRC32(calc_crc);
1085         COMP_CRC32(calc_crc, buf, crc_offset);
1086         FIN_CRC32(calc_crc);
1087
1088         file_crc = *((pg_crc32 *) (buf + crc_offset));
1089
1090         if (!EQ_CRC32(calc_crc, file_crc))
1091         {
1092                 pfree(buf);
1093                 return NULL;
1094         }
1095
1096         return buf;
1097 }
1098
1099
1100 /*
1101  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1102  */
1103 void
1104 FinishPreparedTransaction(const char *gid, bool isCommit)
1105 {
1106         GlobalTransaction gxact;
1107         TransactionId xid;
1108         char       *buf;
1109         char       *bufptr;
1110         TwoPhaseFileHeader *hdr;
1111         TransactionId *children;
1112         RelFileNode *commitrels;
1113         RelFileNode *abortrels;
1114         int                     i;
1115
1116         /*
1117          * Validate the GID, and lock the GXACT to ensure that two backends do not
1118          * try to commit the same GID at once.
1119          */
1120         gxact = LockGXact(gid, GetUserId());
1121         xid = gxact->proc.xid;
1122
1123         /*
1124          * Read and validate the state file
1125          */
1126         buf = ReadTwoPhaseFile(xid);
1127         if (buf == NULL)
1128                 ereport(ERROR,
1129                                 (errcode(ERRCODE_DATA_CORRUPTED),
1130                                  errmsg("twophase state file for transaction %u is corrupt",
1131                                                 xid)));
1132
1133         /*
1134          * Disassemble the header area
1135          */
1136         hdr = (TwoPhaseFileHeader *) buf;
1137         Assert(TransactionIdEquals(hdr->xid, xid));
1138         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1139         children = (TransactionId *) bufptr;
1140         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1141         commitrels = (RelFileNode *) bufptr;
1142         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1143         abortrels = (RelFileNode *) bufptr;
1144         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1145
1146         /*
1147          * The order of operations here is critical: make the XLOG entry for
1148          * commit or abort, then mark the transaction committed or aborted in
1149          * pg_clog, then remove its PGPROC from the global ProcArray (which means
1150          * TransactionIdIsInProgress will stop saying the prepared xact is in
1151          * progress), then run the post-commit or post-abort callbacks. The
1152          * callbacks will release the locks the transaction held.
1153          */
1154         if (isCommit)
1155                 RecordTransactionCommitPrepared(xid,
1156                                                                                 hdr->nsubxacts, children,
1157                                                                                 hdr->ncommitrels, commitrels);
1158         else
1159                 RecordTransactionAbortPrepared(xid,
1160                                                                            hdr->nsubxacts, children,
1161                                                                            hdr->nabortrels, abortrels);
1162
1163         ProcArrayRemove(&gxact->proc);
1164
1165         /*
1166          * In case we fail while running the callbacks, mark the gxact invalid so
1167          * no one else will try to commit/rollback, and so it can be recycled
1168          * properly later.      It is still locked by our XID so it won't go away yet.
1169          *
1170          * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1171          */
1172         gxact->valid = false;
1173
1174         /*
1175          * We have to remove any files that were supposed to be dropped. For
1176          * consistency with the regular xact.c code paths, must do this before
1177          * releasing locks, so do it before running the callbacks.
1178          *
1179          * NB: this code knows that we couldn't be dropping any temp rels ...
1180          */
1181         if (isCommit)
1182         {
1183                 for (i = 0; i < hdr->ncommitrels; i++)
1184                         smgrdounlink(smgropen(commitrels[i]), false, false);
1185         }
1186         else
1187         {
1188                 for (i = 0; i < hdr->nabortrels; i++)
1189                         smgrdounlink(smgropen(abortrels[i]), false, false);
1190         }
1191
1192         /* And now do the callbacks */
1193         if (isCommit)
1194                 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1195         else
1196                 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1197
1198         pgstat_count_xact_commit();
1199
1200         /*
1201          * And now we can clean up our mess.
1202          */
1203         RemoveTwoPhaseFile(xid, true);
1204
1205         RemoveGXact(gxact);
1206
1207         pfree(buf);
1208 }
1209
1210 /*
1211  * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1212  * and call the indicated callbacks for each 2PC record.
1213  */
1214 static void
1215 ProcessRecords(char *bufptr, TransactionId xid,
1216                            const TwoPhaseCallback callbacks[])
1217 {
1218         for (;;)
1219         {
1220                 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1221
1222                 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1223                 if (record->rmid == TWOPHASE_RM_END_ID)
1224                         break;
1225
1226                 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1227
1228                 if (callbacks[record->rmid] != NULL)
1229                         callbacks[record->rmid] (xid, record->info,
1230                                                                          (void *) bufptr, record->len);
1231
1232                 bufptr += MAXALIGN(record->len);
1233         }
1234 }
1235
1236 /*
1237  * Remove the 2PC file for the specified XID.
1238  *
1239  * If giveWarning is false, do not complain about file-not-present;
1240  * this is an expected case during WAL replay.
1241  */
1242 void
1243 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1244 {
1245         char            path[MAXPGPATH];
1246
1247         TwoPhaseFilePath(path, xid);
1248         if (unlink(path))
1249                 if (errno != ENOENT || giveWarning)
1250                         ereport(WARNING,
1251                                         (errcode_for_file_access(),
1252                                    errmsg("could not remove two-phase state file \"%s\": %m",
1253                                                   path)));
1254 }
1255
1256 /*
1257  * Recreates a state file. This is used in WAL replay.
1258  *
1259  * Note: content and len don't include CRC.
1260  */
1261 void
1262 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1263 {
1264         char            path[MAXPGPATH];
1265         pg_crc32        statefile_crc;
1266         int                     fd;
1267
1268         /* Recompute CRC */
1269         INIT_CRC32(statefile_crc);
1270         COMP_CRC32(statefile_crc, content, len);
1271         FIN_CRC32(statefile_crc);
1272
1273         TwoPhaseFilePath(path, xid);
1274
1275         fd = BasicOpenFile(path,
1276                                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1277                                            S_IRUSR | S_IWUSR);
1278         if (fd < 0)
1279                 ereport(ERROR,
1280                                 (errcode_for_file_access(),
1281                                  errmsg("could not recreate twophase state file \"%s\": %m",
1282                                                 path)));
1283
1284         /* Write content and CRC */
1285         if (write(fd, content, len) != len)
1286         {
1287                 close(fd);
1288                 ereport(ERROR,
1289                                 (errcode_for_file_access(),
1290                                  errmsg("could not write twophase state file: %m")));
1291         }
1292         if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1293         {
1294                 close(fd);
1295                 ereport(ERROR,
1296                                 (errcode_for_file_access(),
1297                                  errmsg("could not write twophase state file: %m")));
1298         }
1299
1300         /*
1301          * We must fsync the file because the end-of-replay checkpoint will not do
1302          * so, there being no GXACT in shared memory yet to tell it to.
1303          */
1304         if (pg_fsync(fd) != 0)
1305         {
1306                 close(fd);
1307                 ereport(ERROR,
1308                                 (errcode_for_file_access(),
1309                                  errmsg("could not fsync twophase state file: %m")));
1310         }
1311
1312         if (close(fd) != 0)
1313                 ereport(ERROR,
1314                                 (errcode_for_file_access(),
1315                                  errmsg("could not close twophase state file: %m")));
1316 }
1317
1318 /*
1319  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1320  *
1321  * We must fsync the state file of any GXACT that is valid and has a PREPARE
1322  * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
1323  * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1324  *
1325  * This is deliberately run as late as possible in the checkpoint sequence,
1326  * because GXACTs ordinarily have short lifespans, and so it is quite
1327  * possible that GXACTs that were valid at checkpoint start will no longer
1328  * exist if we wait a little bit.
1329  *
1330  * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1331  * each time.  This is considered unusual enough that we don't bother to
1332  * expend any extra code to avoid the redundant fsyncs.  (They should be
1333  * reasonably cheap anyway, since they won't cause I/O.)
1334  */
1335 void
1336 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1337 {
1338         TransactionId *xids;
1339         int                     nxids;
1340         char            path[MAXPGPATH];
1341         int                     i;
1342
1343         /*
1344          * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1345          * it just long enough to make a list of the XIDs that require fsyncing,
1346          * and then do the I/O afterwards.
1347          *
1348          * This approach creates a race condition: someone else could delete a
1349          * GXACT between the time we release TwoPhaseStateLock and the time we try
1350          * to open its state file.      We handle this by special-casing ENOENT
1351          * failures: if we see that, we verify that the GXACT is no longer valid,
1352          * and if so ignore the failure.
1353          */
1354         if (max_prepared_xacts <= 0)
1355                 return;                                 /* nothing to do */
1356         xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1357         nxids = 0;
1358
1359         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1360
1361         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1362         {
1363                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1364
1365                 if (gxact->valid &&
1366                         XLByteLE(gxact->prepare_lsn, redo_horizon))
1367                         xids[nxids++] = gxact->proc.xid;
1368         }
1369
1370         LWLockRelease(TwoPhaseStateLock);
1371
1372         for (i = 0; i < nxids; i++)
1373         {
1374                 TransactionId xid = xids[i];
1375                 int                     fd;
1376
1377                 TwoPhaseFilePath(path, xid);
1378
1379                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1380                 if (fd < 0)
1381                 {
1382                         if (errno == ENOENT)
1383                         {
1384                                 /* OK if gxact is no longer valid */
1385                                 if (!TransactionIdIsPrepared(xid))
1386                                         continue;
1387                                 /* Restore errno in case it was changed */
1388                                 errno = ENOENT;
1389                         }
1390                         ereport(ERROR,
1391                                         (errcode_for_file_access(),
1392                                          errmsg("could not open twophase state file \"%s\": %m",
1393                                                         path)));
1394                 }
1395
1396                 if (pg_fsync(fd) != 0)
1397                 {
1398                         close(fd);
1399                         ereport(ERROR,
1400                                         (errcode_for_file_access(),
1401                                          errmsg("could not fsync twophase state file \"%s\": %m",
1402                                                         path)));
1403                 }
1404
1405                 if (close(fd) != 0)
1406                         ereport(ERROR,
1407                                         (errcode_for_file_access(),
1408                                          errmsg("could not close twophase state file \"%s\": %m",
1409                                                         path)));
1410         }
1411
1412         pfree(xids);
1413 }
1414
1415 /*
1416  * PrescanPreparedTransactions
1417  *
1418  * Scan the pg_twophase directory and determine the range of valid XIDs
1419  * present.  This is run during database startup, after we have completed
1420  * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
1421  * the highest XID for which evidence exists in WAL.
1422  *
1423  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1424  * are present, it suggests that the DBA has done a PITR recovery to an
1425  * earlier point in time without cleaning out pg_twophase.      We dare not
1426  * try to recover such prepared xacts since they likely depend on database
1427  * state that doesn't exist now.
1428  *
1429  * However, we will advance nextXid beyond any subxact XIDs belonging to
1430  * valid prepared xacts.  We need to do this since subxact commit doesn't
1431  * write a WAL entry, and so there might be no evidence in WAL of those
1432  * subxact XIDs.
1433  *
1434  * Our other responsibility is to determine and return the oldest valid XID
1435  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1436  * This is needed to synchronize pg_subtrans startup properly.
1437  */
1438 TransactionId
1439 PrescanPreparedTransactions(void)
1440 {
1441         TransactionId origNextXid = ShmemVariableCache->nextXid;
1442         TransactionId result = origNextXid;
1443         DIR                *cldir;
1444         struct dirent *clde;
1445
1446         cldir = AllocateDir(TWOPHASE_DIR);
1447         while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1448         {
1449                 if (strlen(clde->d_name) == 8 &&
1450                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1451                 {
1452                         TransactionId xid;
1453                         char       *buf;
1454                         TwoPhaseFileHeader *hdr;
1455                         TransactionId *subxids;
1456                         int                     i;
1457
1458                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1459
1460                         /* Reject XID if too new */
1461                         if (TransactionIdFollowsOrEquals(xid, origNextXid))
1462                         {
1463                                 ereport(WARNING,
1464                                                 (errmsg("removing future twophase state file \"%s\"",
1465                                                                 clde->d_name)));
1466                                 RemoveTwoPhaseFile(xid, true);
1467                                 continue;
1468                         }
1469
1470                         /*
1471                          * Note: we can't check if already processed because clog
1472                          * subsystem isn't up yet.
1473                          */
1474
1475                         /* Read and validate file */
1476                         buf = ReadTwoPhaseFile(xid);
1477                         if (buf == NULL)
1478                         {
1479                                 ereport(WARNING,
1480                                                 (errmsg("removing corrupt twophase state file \"%s\"",
1481                                                                 clde->d_name)));
1482                                 RemoveTwoPhaseFile(xid, true);
1483                                 continue;
1484                         }
1485
1486                         /* Deconstruct header */
1487                         hdr = (TwoPhaseFileHeader *) buf;
1488                         if (!TransactionIdEquals(hdr->xid, xid))
1489                         {
1490                                 ereport(WARNING,
1491                                                 (errmsg("removing corrupt twophase state file \"%s\"",
1492                                                                 clde->d_name)));
1493                                 RemoveTwoPhaseFile(xid, true);
1494                                 pfree(buf);
1495                                 continue;
1496                         }
1497
1498                         /*
1499                          * OK, we think this file is valid.  Incorporate xid into the
1500                          * running-minimum result.
1501                          */
1502                         if (TransactionIdPrecedes(xid, result))
1503                                 result = xid;
1504
1505                         /*
1506                          * Examine subtransaction XIDs ... they should all follow main
1507                          * XID, and they may force us to advance nextXid.
1508                          */
1509                         subxids = (TransactionId *)
1510                                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1511                         for (i = 0; i < hdr->nsubxacts; i++)
1512                         {
1513                                 TransactionId subxid = subxids[i];
1514
1515                                 Assert(TransactionIdFollows(subxid, xid));
1516                                 if (TransactionIdFollowsOrEquals(subxid,
1517                                                                                                  ShmemVariableCache->nextXid))
1518                                 {
1519                                         ShmemVariableCache->nextXid = subxid;
1520                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
1521                                 }
1522                         }
1523
1524                         pfree(buf);
1525                 }
1526         }
1527         FreeDir(cldir);
1528
1529         return result;
1530 }
1531
1532 /*
1533  * RecoverPreparedTransactions
1534  *
1535  * Scan the pg_twophase directory and reload shared-memory state for each
1536  * prepared transaction (reacquire locks, etc).  This is run during database
1537  * startup.
1538  */
1539 void
1540 RecoverPreparedTransactions(void)
1541 {
1542         char            dir[MAXPGPATH];
1543         DIR                *cldir;
1544         struct dirent *clde;
1545
1546         snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1547
1548         cldir = AllocateDir(dir);
1549         while ((clde = ReadDir(cldir, dir)) != NULL)
1550         {
1551                 if (strlen(clde->d_name) == 8 &&
1552                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1553                 {
1554                         TransactionId xid;
1555                         char       *buf;
1556                         char       *bufptr;
1557                         TwoPhaseFileHeader *hdr;
1558                         TransactionId *subxids;
1559                         GlobalTransaction gxact;
1560                         int                     i;
1561
1562                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1563
1564                         /* Already processed? */
1565                         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1566                         {
1567                                 ereport(WARNING,
1568                                                 (errmsg("removing stale twophase state file \"%s\"",
1569                                                                 clde->d_name)));
1570                                 RemoveTwoPhaseFile(xid, true);
1571                                 continue;
1572                         }
1573
1574                         /* Read and validate file */
1575                         buf = ReadTwoPhaseFile(xid);
1576                         if (buf == NULL)
1577                         {
1578                                 ereport(WARNING,
1579                                                 (errmsg("removing corrupt twophase state file \"%s\"",
1580                                                                 clde->d_name)));
1581                                 RemoveTwoPhaseFile(xid, true);
1582                                 continue;
1583                         }
1584
1585                         ereport(LOG,
1586                                         (errmsg("recovering prepared transaction %u", xid)));
1587
1588                         /* Deconstruct header */
1589                         hdr = (TwoPhaseFileHeader *) buf;
1590                         Assert(TransactionIdEquals(hdr->xid, xid));
1591                         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1592                         subxids = (TransactionId *) bufptr;
1593                         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1594                         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1595                         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1596
1597                         /*
1598                          * Reconstruct subtrans state for the transaction --- needed
1599                          * because pg_subtrans is not preserved over a restart.  Note that
1600                          * we are linking all the subtransactions directly to the
1601                          * top-level XID; there may originally have been a more complex
1602                          * hierarchy, but there's no need to restore that exactly.
1603                          */
1604                         for (i = 0; i < hdr->nsubxacts; i++)
1605                                 SubTransSetParent(subxids[i], xid);
1606
1607                         /*
1608                          * Recreate its GXACT and dummy PGPROC
1609                          *
1610                          * Note: since we don't have the PREPARE record's WAL location at
1611                          * hand, we leave prepare_lsn zeroes.  This means the GXACT will
1612                          * be fsync'd on every future checkpoint.  We assume this
1613                          * situation is infrequent enough that the performance cost is
1614                          * negligible (especially since we know the state file has already
1615                          * been fsynced).
1616                          */
1617                         gxact = MarkAsPreparing(xid, hdr->gid,
1618                                                                         hdr->prepared_at,
1619                                                                         hdr->owner, hdr->database);
1620                         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1621                         MarkAsPrepared(gxact);
1622
1623                         /*
1624                          * Recover other state (notably locks) using resource managers
1625                          */
1626                         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1627
1628                         pfree(buf);
1629                 }
1630         }
1631         FreeDir(cldir);
1632 }
1633
1634 /*
1635  *      RecordTransactionCommitPrepared
1636  *
1637  * This is basically the same as RecordTransactionCommit: in particular,
1638  * we must take the CheckpointStartLock to avoid a race condition.
1639  *
1640  * We know the transaction made at least one XLOG entry (its PREPARE),
1641  * so it is never possible to optimize out the commit record.
1642  */
1643 static void
1644 RecordTransactionCommitPrepared(TransactionId xid,
1645                                                                 int nchildren,
1646                                                                 TransactionId *children,
1647                                                                 int nrels,
1648                                                                 RelFileNode *rels)
1649 {
1650         XLogRecData rdata[3];
1651         int                     lastrdata = 0;
1652         xl_xact_commit_prepared xlrec;
1653         XLogRecPtr      recptr;
1654
1655         START_CRIT_SECTION();
1656
1657         /* See notes in RecordTransactionCommit */
1658         LWLockAcquire(CheckpointStartLock, LW_SHARED);
1659
1660         /* Emit the XLOG commit record */
1661         xlrec.xid = xid;
1662         xlrec.crec.xtime = time(NULL);
1663         xlrec.crec.nrels = nrels;
1664         xlrec.crec.nsubxacts = nchildren;
1665         rdata[0].data = (char *) (&xlrec);
1666         rdata[0].len = MinSizeOfXactCommitPrepared;
1667         rdata[0].buffer = InvalidBuffer;
1668         /* dump rels to delete */
1669         if (nrels > 0)
1670         {
1671                 rdata[0].next = &(rdata[1]);
1672                 rdata[1].data = (char *) rels;
1673                 rdata[1].len = nrels * sizeof(RelFileNode);
1674                 rdata[1].buffer = InvalidBuffer;
1675                 lastrdata = 1;
1676         }
1677         /* dump committed child Xids */
1678         if (nchildren > 0)
1679         {
1680                 rdata[lastrdata].next = &(rdata[2]);
1681                 rdata[2].data = (char *) children;
1682                 rdata[2].len = nchildren * sizeof(TransactionId);
1683                 rdata[2].buffer = InvalidBuffer;
1684                 lastrdata = 2;
1685         }
1686         rdata[lastrdata].next = NULL;
1687
1688         recptr = XLogInsert(RM_XACT_ID,
1689                                                 XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
1690                                                 rdata);
1691
1692         /* we don't currently try to sleep before flush here ... */
1693
1694         /* Flush XLOG to disk */
1695         XLogFlush(recptr);
1696
1697         /* Mark the transaction committed in pg_clog */
1698         TransactionIdCommit(xid);
1699         /* to avoid race conditions, the parent must commit first */
1700         TransactionIdCommitTree(nchildren, children);
1701
1702         /* Checkpoint is allowed again */
1703         LWLockRelease(CheckpointStartLock);
1704
1705         END_CRIT_SECTION();
1706 }
1707
1708 /*
1709  *      RecordTransactionAbortPrepared
1710  *
1711  * This is basically the same as RecordTransactionAbort.
1712  *
1713  * We know the transaction made at least one XLOG entry (its PREPARE),
1714  * so it is never possible to optimize out the abort record.
1715  */
1716 static void
1717 RecordTransactionAbortPrepared(TransactionId xid,
1718                                                            int nchildren,
1719                                                            TransactionId *children,
1720                                                            int nrels,
1721                                                            RelFileNode *rels)
1722 {
1723         XLogRecData rdata[3];
1724         int                     lastrdata = 0;
1725         xl_xact_abort_prepared xlrec;
1726         XLogRecPtr      recptr;
1727
1728         /*
1729          * Catch the scenario where we aborted partway through
1730          * RecordTransactionCommitPrepared ...
1731          */
1732         if (TransactionIdDidCommit(xid))
1733                 elog(PANIC, "cannot abort transaction %u, it was already committed",
1734                          xid);
1735
1736         START_CRIT_SECTION();
1737
1738         /* Emit the XLOG abort record */
1739         xlrec.xid = xid;
1740         xlrec.arec.xtime = time(NULL);
1741         xlrec.arec.nrels = nrels;
1742         xlrec.arec.nsubxacts = nchildren;
1743         rdata[0].data = (char *) (&xlrec);
1744         rdata[0].len = MinSizeOfXactAbortPrepared;
1745         rdata[0].buffer = InvalidBuffer;
1746         /* dump rels to delete */
1747         if (nrels > 0)
1748         {
1749                 rdata[0].next = &(rdata[1]);
1750                 rdata[1].data = (char *) rels;
1751                 rdata[1].len = nrels * sizeof(RelFileNode);
1752                 rdata[1].buffer = InvalidBuffer;
1753                 lastrdata = 1;
1754         }
1755         /* dump committed child Xids */
1756         if (nchildren > 0)
1757         {
1758                 rdata[lastrdata].next = &(rdata[2]);
1759                 rdata[2].data = (char *) children;
1760                 rdata[2].len = nchildren * sizeof(TransactionId);
1761                 rdata[2].buffer = InvalidBuffer;
1762                 lastrdata = 2;
1763         }
1764         rdata[lastrdata].next = NULL;
1765
1766         recptr = XLogInsert(RM_XACT_ID,
1767                                                 XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
1768                                                 rdata);
1769
1770         /* Always flush, since we're about to remove the 2PC state file */
1771         XLogFlush(recptr);
1772
1773         /*
1774          * Mark the transaction aborted in clog.  This is not absolutely necessary
1775          * but we may as well do it while we are here.
1776          */
1777         TransactionIdAbort(xid);
1778         TransactionIdAbortTree(nchildren, children);
1779
1780         END_CRIT_SECTION();
1781 }