]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/twophase.c
Rethink the way FSM truncation works. Instead of WAL-logging FSM
[postgresql] / src / backend / access / transam / twophase.c
1 /*-------------------------------------------------------------------------
2  *
3  * twophase.c
4  *              Two-phase commit support functions.
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *              $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.48 2008/11/19 10:34:50 heikki Exp $
11  *
12  * NOTES
13  *              Each global transaction is associated with a global transaction
14  *              identifier (GID). The client assigns a GID to a postgres
15  *              transaction with the PREPARE TRANSACTION command.
16  *
17  *              We keep all active global transactions in a shared memory array.
18  *              When the PREPARE TRANSACTION command is issued, the GID is
19  *              reserved for the transaction in the array. This is done before
20  *              a WAL entry is made, because the reservation checks for duplicate
21  *              GIDs and aborts the transaction if there already is a global
22  *              transaction in prepared state with the same GID.
23  *
24  *              A global transaction (gxact) also has a dummy PGPROC that is entered
25  *              into the ProcArray array; this is what keeps the XID considered
26  *              running by TransactionIdIsInProgress.  It is also convenient as a
27  *              PGPROC to hook the gxact's locks to.
28  *
29  *              In order to survive crashes and shutdowns, all prepared
30  *              transactions must be stored in permanent storage. This includes
31  *              locking information, pending notifications etc. All that state
32  *              information is written to the per-transaction state file in
33  *              the pg_twophase directory.
34  *
35  *-------------------------------------------------------------------------
36  */
37 #include "postgres.h"
38
39 #include <fcntl.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <time.h>
43 #include <unistd.h>
44
45 #include "access/htup.h"
46 #include "access/subtrans.h"
47 #include "access/transam.h"
48 #include "access/twophase.h"
49 #include "access/twophase_rmgr.h"
50 #include "access/xact.h"
51 #include "access/xlogutils.h"
52 #include "catalog/pg_type.h"
53 #include "catalog/storage.h"
54 #include "funcapi.h"
55 #include "miscadmin.h"
56 #include "pg_trace.h"
57 #include "pgstat.h"
58 #include "storage/fd.h"
59 #include "storage/procarray.h"
60 #include "storage/smgr.h"
61 #include "utils/builtins.h"
62 #include "utils/memutils.h"
63
64
65 /*
66  * Directory where Two-phase commit files reside within PGDATA
67  */
68 #define TWOPHASE_DIR "pg_twophase"
69
70 /* GUC variable, can't be changed after startup */
71 int                     max_prepared_xacts = 5;
72
73 /*
74  * This struct describes one global transaction that is in prepared state
75  * or attempting to become prepared.
76  *
77  * The first component of the struct is a dummy PGPROC that is inserted
78  * into the global ProcArray so that the transaction appears to still be
79  * running and holding locks.  It must be first because we cast pointers
80  * to PGPROC and pointers to GlobalTransactionData back and forth.
81  *
82  * The lifecycle of a global transaction is:
83  *
84  * 1. After checking that the requested GID is not in use, set up an
85  * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
86  * with locking_xid = my own XID and valid = false.
87  *
88  * 2. After successfully completing prepare, set valid = true and enter the
89  * contained PGPROC into the global ProcArray.
90  *
91  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
92  * is valid and its locking_xid is no longer active, then store my current
93  * XID into locking_xid.  This prevents concurrent attempts to commit or
94  * rollback the same prepared xact.
95  *
96  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
97  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
98  * the freelist.
99  *
100  * Note that if the preparing transaction fails between steps 1 and 2, the
101  * entry will remain in prepXacts until recycled.  We can detect recyclable
102  * entries by checking for valid = false and locking_xid no longer active.
103  *
104  * typedef struct GlobalTransactionData *GlobalTransaction appears in
105  * twophase.h
106  */
107 #define GIDSIZE 200
108
109 typedef struct GlobalTransactionData
110 {
111         PGPROC          proc;                   /* dummy proc */
112         TimestampTz prepared_at;        /* time of preparation */
113         XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
114         Oid                     owner;                  /* ID of user that executed the xact */
115         TransactionId locking_xid;      /* top-level XID of backend working on xact */
116         bool            valid;                  /* TRUE if fully prepared */
117         char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
118 } GlobalTransactionData;
119
120 /*
121  * Two Phase Commit shared state.  Access to this struct is protected
122  * by TwoPhaseStateLock.
123  */
124 typedef struct TwoPhaseStateData
125 {
126         /* Head of linked list of free GlobalTransactionData structs */
127         GlobalTransaction freeGXacts;
128
129         /* Number of valid prepXacts entries. */
130         int                     numPrepXacts;
131
132         /*
133          * There are max_prepared_xacts items in this array, but C wants a
134          * fixed-size array.
135          */
136         GlobalTransaction prepXacts[1];         /* VARIABLE LENGTH ARRAY */
137 } TwoPhaseStateData;                    /* VARIABLE LENGTH STRUCT */
138
139 static TwoPhaseStateData *TwoPhaseState;
140
141
142 static void RecordTransactionCommitPrepared(TransactionId xid,
143                                                                 int nchildren,
144                                                                 TransactionId *children,
145                                                                 int nrels,
146                                                                 RelFileNode *rels);
147 static void RecordTransactionAbortPrepared(TransactionId xid,
148                                                            int nchildren,
149                                                            TransactionId *children,
150                                                            int nrels,
151                                                            RelFileNode *rels);
152 static void ProcessRecords(char *bufptr, TransactionId xid,
153                            const TwoPhaseCallback callbacks[]);
154
155
156 /*
157  * Initialization of shared memory
158  */
159 Size
160 TwoPhaseShmemSize(void)
161 {
162         Size            size;
163
164         /* Need the fixed struct, the array of pointers, and the GTD structs */
165         size = offsetof(TwoPhaseStateData, prepXacts);
166         size = add_size(size, mul_size(max_prepared_xacts,
167                                                                    sizeof(GlobalTransaction)));
168         size = MAXALIGN(size);
169         size = add_size(size, mul_size(max_prepared_xacts,
170                                                                    sizeof(GlobalTransactionData)));
171
172         return size;
173 }
174
175 void
176 TwoPhaseShmemInit(void)
177 {
178         bool            found;
179
180         TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
181                                                                         TwoPhaseShmemSize(),
182                                                                         &found);
183         if (!IsUnderPostmaster)
184         {
185                 GlobalTransaction gxacts;
186                 int                     i;
187
188                 Assert(!found);
189                 TwoPhaseState->freeGXacts = NULL;
190                 TwoPhaseState->numPrepXacts = 0;
191
192                 /*
193                  * Initialize the linked list of free GlobalTransactionData structs
194                  */
195                 gxacts = (GlobalTransaction)
196                         ((char *) TwoPhaseState +
197                          MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
198                                           sizeof(GlobalTransaction) * max_prepared_xacts));
199                 for (i = 0; i < max_prepared_xacts; i++)
200                 {
201                         gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
202                         TwoPhaseState->freeGXacts = &gxacts[i];
203                 }
204         }
205         else
206                 Assert(found);
207 }
208
209
210 /*
211  * MarkAsPreparing
212  *              Reserve the GID for the given transaction.
213  *
214  * Internally, this creates a gxact struct and puts it into the active array.
215  * NOTE: this is also used when reloading a gxact after a crash; so avoid
216  * assuming that we can use very much backend context.
217  */
218 GlobalTransaction
219 MarkAsPreparing(TransactionId xid, const char *gid,
220                                 TimestampTz prepared_at, Oid owner, Oid databaseid)
221 {
222         GlobalTransaction gxact;
223         int                     i;
224
225         if (strlen(gid) >= GIDSIZE)
226                 ereport(ERROR,
227                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
228                                  errmsg("transaction identifier \"%s\" is too long",
229                                                 gid)));
230
231         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
232
233         /*
234          * First, find and recycle any gxacts that failed during prepare. We do
235          * this partly to ensure we don't mistakenly say their GIDs are still
236          * reserved, and partly so we don't fail on out-of-slots unnecessarily.
237          */
238         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
239         {
240                 gxact = TwoPhaseState->prepXacts[i];
241                 if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
242                 {
243                         /* It's dead Jim ... remove from the active array */
244                         TwoPhaseState->numPrepXacts--;
245                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
246                         /* and put it back in the freelist */
247                         gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
248                         TwoPhaseState->freeGXacts = gxact;
249                         /* Back up index count too, so we don't miss scanning one */
250                         i--;
251                 }
252         }
253
254         /* Check for conflicting GID */
255         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
256         {
257                 gxact = TwoPhaseState->prepXacts[i];
258                 if (strcmp(gxact->gid, gid) == 0)
259                 {
260                         ereport(ERROR,
261                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
262                                          errmsg("transaction identifier \"%s\" is already in use",
263                                                         gid)));
264                 }
265         }
266
267         /* Get a free gxact from the freelist */
268         if (TwoPhaseState->freeGXacts == NULL)
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_OUT_OF_MEMORY),
271                                  errmsg("maximum number of prepared transactions reached"),
272                                  errhint("Increase max_prepared_transactions (currently %d).",
273                                                  max_prepared_xacts)));
274         gxact = TwoPhaseState->freeGXacts;
275         TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
276
277         /* Initialize it */
278         MemSet(&gxact->proc, 0, sizeof(PGPROC));
279         SHMQueueElemInit(&(gxact->proc.links));
280         gxact->proc.waitStatus = STATUS_OK;
281         /* We set up the gxact's VXID as InvalidBackendId/XID */
282         gxact->proc.lxid = (LocalTransactionId) xid;
283         gxact->proc.xid = xid;
284         gxact->proc.xmin = InvalidTransactionId;
285         gxact->proc.pid = 0;
286         gxact->proc.backendId = InvalidBackendId;
287         gxact->proc.databaseId = databaseid;
288         gxact->proc.roleId = owner;
289         gxact->proc.inCommit = false;
290         gxact->proc.vacuumFlags = 0;
291         gxact->proc.lwWaiting = false;
292         gxact->proc.lwExclusive = false;
293         gxact->proc.lwWaitLink = NULL;
294         gxact->proc.waitLock = NULL;
295         gxact->proc.waitProcLock = NULL;
296         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
297                 SHMQueueInit(&(gxact->proc.myProcLocks[i]));
298         /* subxid data must be filled later by GXactLoadSubxactData */
299         gxact->proc.subxids.overflowed = false;
300         gxact->proc.subxids.nxids = 0;
301
302         gxact->prepared_at = prepared_at;
303         /* initialize LSN to 0 (start of WAL) */
304         gxact->prepare_lsn.xlogid = 0;
305         gxact->prepare_lsn.xrecoff = 0;
306         gxact->owner = owner;
307         gxact->locking_xid = xid;
308         gxact->valid = false;
309         strcpy(gxact->gid, gid);
310
311         /* And insert it into the active array */
312         Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
313         TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
314
315         LWLockRelease(TwoPhaseStateLock);
316
317         return gxact;
318 }
319
320 /*
321  * GXactLoadSubxactData
322  *
323  * If the transaction being persisted had any subtransactions, this must
324  * be called before MarkAsPrepared() to load information into the dummy
325  * PGPROC.
326  */
327 static void
328 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
329                                          TransactionId *children)
330 {
331         /* We need no extra lock since the GXACT isn't valid yet */
332         if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
333         {
334                 gxact->proc.subxids.overflowed = true;
335                 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
336         }
337         if (nsubxacts > 0)
338         {
339                 memcpy(gxact->proc.subxids.xids, children,
340                            nsubxacts * sizeof(TransactionId));
341                 gxact->proc.subxids.nxids = nsubxacts;
342         }
343 }
344
345 /*
346  * MarkAsPrepared
347  *              Mark the GXACT as fully valid, and enter it into the global ProcArray.
348  */
349 static void
350 MarkAsPrepared(GlobalTransaction gxact)
351 {
352         /* Lock here may be overkill, but I'm not convinced of that ... */
353         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
354         Assert(!gxact->valid);
355         gxact->valid = true;
356         LWLockRelease(TwoPhaseStateLock);
357
358         /*
359          * Put it into the global ProcArray so TransactionIdIsInProgress considers
360          * the XID as still running.
361          */
362         ProcArrayAdd(&gxact->proc);
363 }
364
365 /*
366  * LockGXact
367  *              Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
368  */
369 static GlobalTransaction
370 LockGXact(const char *gid, Oid user)
371 {
372         int                     i;
373
374         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
375
376         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
377         {
378                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
379
380                 /* Ignore not-yet-valid GIDs */
381                 if (!gxact->valid)
382                         continue;
383                 if (strcmp(gxact->gid, gid) != 0)
384                         continue;
385
386                 /* Found it, but has someone else got it locked? */
387                 if (TransactionIdIsValid(gxact->locking_xid))
388                 {
389                         if (TransactionIdIsActive(gxact->locking_xid))
390                                 ereport(ERROR,
391                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
392                                 errmsg("prepared transaction with identifier \"%s\" is busy",
393                                            gid)));
394                         gxact->locking_xid = InvalidTransactionId;
395                 }
396
397                 if (user != gxact->owner && !superuser_arg(user))
398                         ereport(ERROR,
399                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
400                                   errmsg("permission denied to finish prepared transaction"),
401                                          errhint("Must be superuser or the user that prepared the transaction.")));
402
403                 /*
404                  * Note: it probably would be possible to allow committing from
405                  * another database; but at the moment NOTIFY is known not to work and
406                  * there may be some other issues as well.      Hence disallow until
407                  * someone gets motivated to make it work.
408                  */
409                 if (MyDatabaseId != gxact->proc.databaseId)
410                         ereport(ERROR,
411                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
412                                   errmsg("prepared transaction belongs to another database"),
413                                          errhint("Connect to the database where the transaction was prepared to finish it.")));
414
415                 /* OK for me to lock it */
416                 gxact->locking_xid = GetTopTransactionId();
417
418                 LWLockRelease(TwoPhaseStateLock);
419
420                 return gxact;
421         }
422
423         LWLockRelease(TwoPhaseStateLock);
424
425         ereport(ERROR,
426                         (errcode(ERRCODE_UNDEFINED_OBJECT),
427                  errmsg("prepared transaction with identifier \"%s\" does not exist",
428                                 gid)));
429
430         /* NOTREACHED */
431         return NULL;
432 }
433
434 /*
435  * RemoveGXact
436  *              Remove the prepared transaction from the shared memory array.
437  *
438  * NB: caller should have already removed it from ProcArray
439  */
440 static void
441 RemoveGXact(GlobalTransaction gxact)
442 {
443         int                     i;
444
445         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
446
447         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
448         {
449                 if (gxact == TwoPhaseState->prepXacts[i])
450                 {
451                         /* remove from the active array */
452                         TwoPhaseState->numPrepXacts--;
453                         TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
454
455                         /* and put it back in the freelist */
456                         gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
457                         TwoPhaseState->freeGXacts = gxact;
458
459                         LWLockRelease(TwoPhaseStateLock);
460
461                         return;
462                 }
463         }
464
465         LWLockRelease(TwoPhaseStateLock);
466
467         elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
468 }
469
470 /*
471  * TransactionIdIsPrepared
472  *              True iff transaction associated with the identifier is prepared
473  *              for two-phase commit
474  *
475  * Note: only gxacts marked "valid" are considered; but notice we do not
476  * check the locking status.
477  *
478  * This is not currently exported, because it is only needed internally.
479  */
480 static bool
481 TransactionIdIsPrepared(TransactionId xid)
482 {
483         bool            result = false;
484         int                     i;
485
486         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
487
488         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
489         {
490                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
491
492                 if (gxact->valid && gxact->proc.xid == xid)
493                 {
494                         result = true;
495                         break;
496                 }
497         }
498
499         LWLockRelease(TwoPhaseStateLock);
500
501         return result;
502 }
503
504 /*
505  * Returns an array of all prepared transactions for the user-level
506  * function pg_prepared_xact.
507  *
508  * The returned array and all its elements are copies of internal data
509  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
510  *
511  * WARNING -- we return even those transactions that are not fully prepared
512  * yet.  The caller should filter them out if he doesn't want them.
513  *
514  * The returned array is palloc'd.
515  */
516 static int
517 GetPreparedTransactionList(GlobalTransaction *gxacts)
518 {
519         GlobalTransaction array;
520         int                     num;
521         int                     i;
522
523         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
524
525         if (TwoPhaseState->numPrepXacts == 0)
526         {
527                 LWLockRelease(TwoPhaseStateLock);
528
529                 *gxacts = NULL;
530                 return 0;
531         }
532
533         num = TwoPhaseState->numPrepXacts;
534         array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
535         *gxacts = array;
536         for (i = 0; i < num; i++)
537                 memcpy(array + i, TwoPhaseState->prepXacts[i],
538                            sizeof(GlobalTransactionData));
539
540         LWLockRelease(TwoPhaseStateLock);
541
542         return num;
543 }
544
545
546 /* Working status for pg_prepared_xact */
547 typedef struct
548 {
549         GlobalTransaction array;
550         int                     ngxacts;
551         int                     currIdx;
552 } Working_State;
553
554 /*
555  * pg_prepared_xact
556  *              Produce a view with one row per prepared transaction.
557  *
558  * This function is here so we don't have to export the
559  * GlobalTransactionData struct definition.
560  */
561 Datum
562 pg_prepared_xact(PG_FUNCTION_ARGS)
563 {
564         FuncCallContext *funcctx;
565         Working_State *status;
566
567         if (SRF_IS_FIRSTCALL())
568         {
569                 TupleDesc       tupdesc;
570                 MemoryContext oldcontext;
571
572                 /* create a function context for cross-call persistence */
573                 funcctx = SRF_FIRSTCALL_INIT();
574
575                 /*
576                  * Switch to memory context appropriate for multiple function calls
577                  */
578                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
579
580                 /* build tupdesc for result tuples */
581                 /* this had better match pg_prepared_xacts view in system_views.sql */
582                 tupdesc = CreateTemplateTupleDesc(5, false);
583                 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
584                                                    XIDOID, -1, 0);
585                 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
586                                                    TEXTOID, -1, 0);
587                 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
588                                                    TIMESTAMPTZOID, -1, 0);
589                 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
590                                                    OIDOID, -1, 0);
591                 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
592                                                    OIDOID, -1, 0);
593
594                 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
595
596                 /*
597                  * Collect all the 2PC status information that we will format and send
598                  * out as a result set.
599                  */
600                 status = (Working_State *) palloc(sizeof(Working_State));
601                 funcctx->user_fctx = (void *) status;
602
603                 status->ngxacts = GetPreparedTransactionList(&status->array);
604                 status->currIdx = 0;
605
606                 MemoryContextSwitchTo(oldcontext);
607         }
608
609         funcctx = SRF_PERCALL_SETUP();
610         status = (Working_State *) funcctx->user_fctx;
611
612         while (status->array != NULL && status->currIdx < status->ngxacts)
613         {
614                 GlobalTransaction gxact = &status->array[status->currIdx++];
615                 Datum           values[5];
616                 bool            nulls[5];
617                 HeapTuple       tuple;
618                 Datum           result;
619
620                 if (!gxact->valid)
621                         continue;
622
623                 /*
624                  * Form tuple with appropriate data.
625                  */
626                 MemSet(values, 0, sizeof(values));
627                 MemSet(nulls, 0, sizeof(nulls));
628
629                 values[0] = TransactionIdGetDatum(gxact->proc.xid);
630                 values[1] = CStringGetTextDatum(gxact->gid);
631                 values[2] = TimestampTzGetDatum(gxact->prepared_at);
632                 values[3] = ObjectIdGetDatum(gxact->owner);
633                 values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
634
635                 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
636                 result = HeapTupleGetDatum(tuple);
637                 SRF_RETURN_NEXT(funcctx, result);
638         }
639
640         SRF_RETURN_DONE(funcctx);
641 }
642
643 /*
644  * TwoPhaseGetDummyProc
645  *              Get the PGPROC that represents a prepared transaction specified by XID
646  */
647 PGPROC *
648 TwoPhaseGetDummyProc(TransactionId xid)
649 {
650         PGPROC     *result = NULL;
651         int                     i;
652
653         static TransactionId cached_xid = InvalidTransactionId;
654         static PGPROC *cached_proc = NULL;
655
656         /*
657          * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
658          * repeatedly for the same XID.  We can save work with a simple cache.
659          */
660         if (xid == cached_xid)
661                 return cached_proc;
662
663         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
664
665         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
666         {
667                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
668
669                 if (gxact->proc.xid == xid)
670                 {
671                         result = &gxact->proc;
672                         break;
673                 }
674         }
675
676         LWLockRelease(TwoPhaseStateLock);
677
678         if (result == NULL)                     /* should not happen */
679                 elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
680
681         cached_xid = xid;
682         cached_proc = result;
683
684         return result;
685 }
686
687 /************************************************************************/
688 /* State file support                                                                                                   */
689 /************************************************************************/
690
691 #define TwoPhaseFilePath(path, xid) \
692         snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
693
694 /*
695  * 2PC state file format:
696  *
697  *      1. TwoPhaseFileHeader
698  *      2. TransactionId[] (subtransactions)
699  *      3. RelFileNode[] (files to be deleted at commit)
700  *      4. RelFileNode[] (files to be deleted at abort)
701  *      5. TwoPhaseRecordOnDisk
702  *      6. ...
703  *      7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
704  *      8. CRC32
705  *
706  * Each segment except the final CRC32 is MAXALIGN'd.
707  */
708
709 /*
710  * Header for a 2PC state file
711  */
712 #define TWOPHASE_MAGIC  0x57F94531              /* format identifier */
713
714 typedef struct TwoPhaseFileHeader
715 {
716         uint32          magic;                  /* format identifier */
717         uint32          total_len;              /* actual file length */
718         TransactionId xid;                      /* original transaction XID */
719         Oid                     database;               /* OID of database it was in */
720         TimestampTz prepared_at;        /* time of preparation */
721         Oid                     owner;                  /* user running the transaction */
722         int32           nsubxacts;              /* number of following subxact XIDs */
723         int32           ncommitrels;    /* number of delete-on-commit rels */
724         int32           nabortrels;             /* number of delete-on-abort rels */
725         char            gid[GIDSIZE];   /* GID for transaction */
726 } TwoPhaseFileHeader;
727
728 /*
729  * Header for each record in a state file
730  *
731  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
732  * The rmgr data will be stored starting on a MAXALIGN boundary.
733  */
734 typedef struct TwoPhaseRecordOnDisk
735 {
736         uint32          len;                    /* length of rmgr data */
737         TwoPhaseRmgrId rmid;            /* resource manager for this record */
738         uint16          info;                   /* flag bits for use by rmgr */
739 } TwoPhaseRecordOnDisk;
740
741 /*
742  * During prepare, the state file is assembled in memory before writing it
743  * to WAL and the actual state file.  We use a chain of XLogRecData blocks
744  * so that we will be able to pass the state file contents directly to
745  * XLogInsert.
746  */
747 static struct xllist
748 {
749         XLogRecData *head;                      /* first data block in the chain */
750         XLogRecData *tail;                      /* last block in chain */
751         uint32          bytes_free;             /* free bytes left in tail block */
752         uint32          total_len;              /* total data bytes in chain */
753 }       records;
754
755
756 /*
757  * Append a block of data to records data structure.
758  *
759  * NB: each block is padded to a MAXALIGN multiple.  This must be
760  * accounted for when the file is later read!
761  *
762  * The data is copied, so the caller is free to modify it afterwards.
763  */
764 static void
765 save_state_data(const void *data, uint32 len)
766 {
767         uint32          padlen = MAXALIGN(len);
768
769         if (padlen > records.bytes_free)
770         {
771                 records.tail->next = palloc0(sizeof(XLogRecData));
772                 records.tail = records.tail->next;
773                 records.tail->buffer = InvalidBuffer;
774                 records.tail->len = 0;
775                 records.tail->next = NULL;
776
777                 records.bytes_free = Max(padlen, 512);
778                 records.tail->data = palloc(records.bytes_free);
779         }
780
781         memcpy(((char *) records.tail->data) + records.tail->len, data, len);
782         records.tail->len += padlen;
783         records.bytes_free -= padlen;
784         records.total_len += padlen;
785 }
786
787 /*
788  * Start preparing a state file.
789  *
790  * Initializes data structure and inserts the 2PC file header record.
791  */
792 void
793 StartPrepare(GlobalTransaction gxact)
794 {
795         TransactionId xid = gxact->proc.xid;
796         TwoPhaseFileHeader hdr;
797         TransactionId *children;
798         RelFileNode *commitrels;
799         RelFileNode *abortrels;
800
801         /* Initialize linked list */
802         records.head = palloc0(sizeof(XLogRecData));
803         records.head->buffer = InvalidBuffer;
804         records.head->len = 0;
805         records.head->next = NULL;
806
807         records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
808         records.head->data = palloc(records.bytes_free);
809
810         records.tail = records.head;
811
812         records.total_len = 0;
813
814         /* Create header */
815         hdr.magic = TWOPHASE_MAGIC;
816         hdr.total_len = 0;                      /* EndPrepare will fill this in */
817         hdr.xid = xid;
818         hdr.database = gxact->proc.databaseId;
819         hdr.prepared_at = gxact->prepared_at;
820         hdr.owner = gxact->owner;
821         hdr.nsubxacts = xactGetCommittedChildren(&children);
822         hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
823         hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
824         StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
825
826         save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
827
828         /* Add the additional info about subxacts and deletable files */
829         if (hdr.nsubxacts > 0)
830         {
831                 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
832                 /* While we have the child-xact data, stuff it in the gxact too */
833                 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
834         }
835         if (hdr.ncommitrels > 0)
836         {
837                 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
838                 pfree(commitrels);
839         }
840         if (hdr.nabortrels > 0)
841         {
842                 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
843                 pfree(abortrels);
844         }
845 }
846
847 /*
848  * Finish preparing state file.
849  *
850  * Calculates CRC and writes state file to WAL and in pg_twophase directory.
851  */
852 void
853 EndPrepare(GlobalTransaction gxact)
854 {
855         TransactionId xid = gxact->proc.xid;
856         TwoPhaseFileHeader *hdr;
857         char            path[MAXPGPATH];
858         XLogRecData *record;
859         pg_crc32        statefile_crc;
860         pg_crc32        bogus_crc;
861         int                     fd;
862
863         /* Add the end sentinel to the list of 2PC records */
864         RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
865                                                    NULL, 0);
866
867         /* Go back and fill in total_len in the file header record */
868         hdr = (TwoPhaseFileHeader *) records.head->data;
869         Assert(hdr->magic == TWOPHASE_MAGIC);
870         hdr->total_len = records.total_len + sizeof(pg_crc32);
871
872         /*
873          * If the file size exceeds MaxAllocSize, we won't be able to read it in
874          * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
875          */
876         if (hdr->total_len > MaxAllocSize)
877                 ereport(ERROR,
878                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
879                                  errmsg("two-phase state file maximum length exceeded")));
880
881         /*
882          * Create the 2PC state file.
883          *
884          * Note: because we use BasicOpenFile(), we are responsible for ensuring
885          * the FD gets closed in any error exit path.  Once we get into the
886          * critical section, though, it doesn't matter since any failure causes
887          * PANIC anyway.
888          */
889         TwoPhaseFilePath(path, xid);
890
891         fd = BasicOpenFile(path,
892                                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
893                                            S_IRUSR | S_IWUSR);
894         if (fd < 0)
895                 ereport(ERROR,
896                                 (errcode_for_file_access(),
897                                  errmsg("could not create two-phase state file \"%s\": %m",
898                                                 path)));
899
900         /* Write data to file, and calculate CRC as we pass over it */
901         INIT_CRC32(statefile_crc);
902
903         for (record = records.head; record != NULL; record = record->next)
904         {
905                 COMP_CRC32(statefile_crc, record->data, record->len);
906                 if ((write(fd, record->data, record->len)) != record->len)
907                 {
908                         close(fd);
909                         ereport(ERROR,
910                                         (errcode_for_file_access(),
911                                          errmsg("could not write two-phase state file: %m")));
912                 }
913         }
914
915         FIN_CRC32(statefile_crc);
916
917         /*
918          * Write a deliberately bogus CRC to the state file; this is just paranoia
919          * to catch the case where four more bytes will run us out of disk space.
920          */
921         bogus_crc = ~statefile_crc;
922
923         if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
924         {
925                 close(fd);
926                 ereport(ERROR,
927                                 (errcode_for_file_access(),
928                                  errmsg("could not write two-phase state file: %m")));
929         }
930
931         /* Back up to prepare for rewriting the CRC */
932         if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
933         {
934                 close(fd);
935                 ereport(ERROR,
936                                 (errcode_for_file_access(),
937                                  errmsg("could not seek in two-phase state file: %m")));
938         }
939
940         /*
941          * The state file isn't valid yet, because we haven't written the correct
942          * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
943          *
944          * Between the time we have written the WAL entry and the time we write
945          * out the correct state file CRC, we have an inconsistency: the xact is
946          * prepared according to WAL but not according to our on-disk state. We
947          * use a critical section to force a PANIC if we are unable to complete
948          * the write --- then, WAL replay should repair the inconsistency.      The
949          * odds of a PANIC actually occurring should be very tiny given that we
950          * were able to write the bogus CRC above.
951          *
952          * We have to set inCommit here, too; otherwise a checkpoint starting
953          * immediately after the WAL record is inserted could complete without
954          * fsync'ing our state file.  (This is essentially the same kind of race
955          * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
956          * uses inCommit for; see notes there.)
957          *
958          * We save the PREPARE record's location in the gxact for later use by
959          * CheckPointTwoPhase.
960          */
961         START_CRIT_SECTION();
962
963         MyProc->inCommit = true;
964
965         gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
966                                                                         records.head);
967         XLogFlush(gxact->prepare_lsn);
968
969         /* If we crash now, we have prepared: WAL replay will fix things */
970
971         /* write correct CRC and close file */
972         if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
973         {
974                 close(fd);
975                 ereport(ERROR,
976                                 (errcode_for_file_access(),
977                                  errmsg("could not write two-phase state file: %m")));
978         }
979
980         if (close(fd) != 0)
981                 ereport(ERROR,
982                                 (errcode_for_file_access(),
983                                  errmsg("could not close two-phase state file: %m")));
984
985         /*
986          * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
987          * as not running our XID (which it will do immediately after this
988          * function returns), others can commit/rollback the xact.
989          *
990          * NB: a side effect of this is to make a dummy ProcArray entry for the
991          * prepared XID.  This must happen before we clear the XID from MyProc,
992          * else there is a window where the XID is not running according to
993          * TransactionIdIsInProgress, and onlookers would be entitled to assume
994          * the xact crashed.  Instead we have a window where the same XID appears
995          * twice in ProcArray, which is OK.
996          */
997         MarkAsPrepared(gxact);
998
999         /*
1000          * Now we can mark ourselves as out of the commit critical section: a
1001          * checkpoint starting after this will certainly see the gxact as a
1002          * candidate for fsyncing.
1003          */
1004         MyProc->inCommit = false;
1005
1006         END_CRIT_SECTION();
1007
1008         records.tail = records.head = NULL;
1009 }
1010
1011 /*
1012  * Register a 2PC record to be written to state file.
1013  */
1014 void
1015 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1016                                            const void *data, uint32 len)
1017 {
1018         TwoPhaseRecordOnDisk record;
1019
1020         record.rmid = rmid;
1021         record.info = info;
1022         record.len = len;
1023         save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1024         if (len > 0)
1025                 save_state_data(data, len);
1026 }
1027
1028
1029 /*
1030  * Read and validate the state file for xid.
1031  *
1032  * If it looks OK (has a valid magic number and CRC), return the palloc'd
1033  * contents of the file.  Otherwise return NULL.
1034  */
1035 static char *
1036 ReadTwoPhaseFile(TransactionId xid)
1037 {
1038         char            path[MAXPGPATH];
1039         char       *buf;
1040         TwoPhaseFileHeader *hdr;
1041         int                     fd;
1042         struct stat stat;
1043         uint32          crc_offset;
1044         pg_crc32        calc_crc,
1045                                 file_crc;
1046
1047         TwoPhaseFilePath(path, xid);
1048
1049         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1050         if (fd < 0)
1051         {
1052                 ereport(WARNING,
1053                                 (errcode_for_file_access(),
1054                                  errmsg("could not open two-phase state file \"%s\": %m",
1055                                                 path)));
1056                 return NULL;
1057         }
1058
1059         /*
1060          * Check file length.  We can determine a lower bound pretty easily. We
1061          * set an upper bound to avoid palloc() failure on a corrupt file, though
1062          * we can't guarantee that we won't get an out of memory error anyway,
1063          * even on a valid file.
1064          */
1065         if (fstat(fd, &stat))
1066         {
1067                 close(fd);
1068                 ereport(WARNING,
1069                                 (errcode_for_file_access(),
1070                                  errmsg("could not stat two-phase state file \"%s\": %m",
1071                                                 path)));
1072                 return NULL;
1073         }
1074
1075         if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1076                                                 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1077                                                 sizeof(pg_crc32)) ||
1078                 stat.st_size > MaxAllocSize)
1079         {
1080                 close(fd);
1081                 return NULL;
1082         }
1083
1084         crc_offset = stat.st_size - sizeof(pg_crc32);
1085         if (crc_offset != MAXALIGN(crc_offset))
1086         {
1087                 close(fd);
1088                 return NULL;
1089         }
1090
1091         /*
1092          * OK, slurp in the file.
1093          */
1094         buf = (char *) palloc(stat.st_size);
1095
1096         if (read(fd, buf, stat.st_size) != stat.st_size)
1097         {
1098                 close(fd);
1099                 ereport(WARNING,
1100                                 (errcode_for_file_access(),
1101                                  errmsg("could not read two-phase state file \"%s\": %m",
1102                                                 path)));
1103                 pfree(buf);
1104                 return NULL;
1105         }
1106
1107         close(fd);
1108
1109         hdr = (TwoPhaseFileHeader *) buf;
1110         if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1111         {
1112                 pfree(buf);
1113                 return NULL;
1114         }
1115
1116         INIT_CRC32(calc_crc);
1117         COMP_CRC32(calc_crc, buf, crc_offset);
1118         FIN_CRC32(calc_crc);
1119
1120         file_crc = *((pg_crc32 *) (buf + crc_offset));
1121
1122         if (!EQ_CRC32(calc_crc, file_crc))
1123         {
1124                 pfree(buf);
1125                 return NULL;
1126         }
1127
1128         return buf;
1129 }
1130
1131
1132 /*
1133  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1134  */
1135 void
1136 FinishPreparedTransaction(const char *gid, bool isCommit)
1137 {
1138         GlobalTransaction gxact;
1139         TransactionId xid;
1140         char       *buf;
1141         char       *bufptr;
1142         TwoPhaseFileHeader *hdr;
1143         TransactionId latestXid;
1144         TransactionId *children;
1145         RelFileNode *commitrels;
1146         RelFileNode *abortrels;
1147         RelFileNode *delrels;
1148         int                     ndelrels;
1149         int                     i;
1150
1151         /*
1152          * Validate the GID, and lock the GXACT to ensure that two backends do not
1153          * try to commit the same GID at once.
1154          */
1155         gxact = LockGXact(gid, GetUserId());
1156         xid = gxact->proc.xid;
1157
1158         /*
1159          * Read and validate the state file
1160          */
1161         buf = ReadTwoPhaseFile(xid);
1162         if (buf == NULL)
1163                 ereport(ERROR,
1164                                 (errcode(ERRCODE_DATA_CORRUPTED),
1165                                  errmsg("two-phase state file for transaction %u is corrupt",
1166                                                 xid)));
1167
1168         /*
1169          * Disassemble the header area
1170          */
1171         hdr = (TwoPhaseFileHeader *) buf;
1172         Assert(TransactionIdEquals(hdr->xid, xid));
1173         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1174         children = (TransactionId *) bufptr;
1175         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1176         commitrels = (RelFileNode *) bufptr;
1177         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1178         abortrels = (RelFileNode *) bufptr;
1179         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1180
1181         /* compute latestXid among all children */
1182         latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1183
1184         /*
1185          * The order of operations here is critical: make the XLOG entry for
1186          * commit or abort, then mark the transaction committed or aborted in
1187          * pg_clog, then remove its PGPROC from the global ProcArray (which means
1188          * TransactionIdIsInProgress will stop saying the prepared xact is in
1189          * progress), then run the post-commit or post-abort callbacks. The
1190          * callbacks will release the locks the transaction held.
1191          */
1192         if (isCommit)
1193                 RecordTransactionCommitPrepared(xid,
1194                                                                                 hdr->nsubxacts, children,
1195                                                                                 hdr->ncommitrels, commitrels);
1196         else
1197                 RecordTransactionAbortPrepared(xid,
1198                                                                            hdr->nsubxacts, children,
1199                                                                            hdr->nabortrels, abortrels);
1200
1201         ProcArrayRemove(&gxact->proc, latestXid);
1202
1203         /*
1204          * In case we fail while running the callbacks, mark the gxact invalid so
1205          * no one else will try to commit/rollback, and so it can be recycled
1206          * properly later.      It is still locked by our XID so it won't go away yet.
1207          *
1208          * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1209          */
1210         gxact->valid = false;
1211
1212         /*
1213          * We have to remove any files that were supposed to be dropped. For
1214          * consistency with the regular xact.c code paths, must do this before
1215          * releasing locks, so do it before running the callbacks.
1216          *
1217          * NB: this code knows that we couldn't be dropping any temp rels ...
1218          */
1219         if (isCommit)
1220         {
1221                 delrels = commitrels;
1222                 ndelrels = hdr->ncommitrels;
1223         }
1224         else
1225         {
1226                 delrels = abortrels;
1227                 ndelrels = hdr->nabortrels;
1228         }
1229         for (i = 0; i < ndelrels; i++)
1230         {
1231                 SMgrRelation srel = smgropen(delrels[i]);
1232                 ForkNumber      fork;
1233
1234                 for (fork = 0; fork <= MAX_FORKNUM; fork++)
1235                 {
1236                         if (smgrexists(srel, fork))
1237                         {
1238                                 XLogDropRelation(delrels[i], fork);
1239                                 smgrdounlink(srel, fork, false, true);
1240                         }
1241                 }
1242                 smgrclose(srel);
1243         }
1244
1245         /* And now do the callbacks */
1246         if (isCommit)
1247                 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1248         else
1249                 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1250
1251         /* Count the prepared xact as committed or aborted */
1252         AtEOXact_PgStat(isCommit);
1253
1254         /*
1255          * And now we can clean up our mess.
1256          */
1257         RemoveTwoPhaseFile(xid, true);
1258
1259         RemoveGXact(gxact);
1260
1261         pfree(buf);
1262 }
1263
1264 /*
1265  * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
1266  * and call the indicated callbacks for each 2PC record.
1267  */
1268 static void
1269 ProcessRecords(char *bufptr, TransactionId xid,
1270                            const TwoPhaseCallback callbacks[])
1271 {
1272         for (;;)
1273         {
1274                 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1275
1276                 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1277                 if (record->rmid == TWOPHASE_RM_END_ID)
1278                         break;
1279
1280                 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1281
1282                 if (callbacks[record->rmid] != NULL)
1283                         callbacks[record->rmid] (xid, record->info,
1284                                                                          (void *) bufptr, record->len);
1285
1286                 bufptr += MAXALIGN(record->len);
1287         }
1288 }
1289
1290 /*
1291  * Remove the 2PC file for the specified XID.
1292  *
1293  * If giveWarning is false, do not complain about file-not-present;
1294  * this is an expected case during WAL replay.
1295  */
1296 void
1297 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1298 {
1299         char            path[MAXPGPATH];
1300
1301         TwoPhaseFilePath(path, xid);
1302         if (unlink(path))
1303                 if (errno != ENOENT || giveWarning)
1304                         ereport(WARNING,
1305                                         (errcode_for_file_access(),
1306                                    errmsg("could not remove two-phase state file \"%s\": %m",
1307                                                   path)));
1308 }
1309
1310 /*
1311  * Recreates a state file. This is used in WAL replay.
1312  *
1313  * Note: content and len don't include CRC.
1314  */
1315 void
1316 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1317 {
1318         char            path[MAXPGPATH];
1319         pg_crc32        statefile_crc;
1320         int                     fd;
1321
1322         /* Recompute CRC */
1323         INIT_CRC32(statefile_crc);
1324         COMP_CRC32(statefile_crc, content, len);
1325         FIN_CRC32(statefile_crc);
1326
1327         TwoPhaseFilePath(path, xid);
1328
1329         fd = BasicOpenFile(path,
1330                                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
1331                                            S_IRUSR | S_IWUSR);
1332         if (fd < 0)
1333                 ereport(ERROR,
1334                                 (errcode_for_file_access(),
1335                                  errmsg("could not recreate two-phase state file \"%s\": %m",
1336                                                 path)));
1337
1338         /* Write content and CRC */
1339         if (write(fd, content, len) != len)
1340         {
1341                 close(fd);
1342                 ereport(ERROR,
1343                                 (errcode_for_file_access(),
1344                                  errmsg("could not write two-phase state file: %m")));
1345         }
1346         if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
1347         {
1348                 close(fd);
1349                 ereport(ERROR,
1350                                 (errcode_for_file_access(),
1351                                  errmsg("could not write two-phase state file: %m")));
1352         }
1353
1354         /*
1355          * We must fsync the file because the end-of-replay checkpoint will not do
1356          * so, there being no GXACT in shared memory yet to tell it to.
1357          */
1358         if (pg_fsync(fd) != 0)
1359         {
1360                 close(fd);
1361                 ereport(ERROR,
1362                                 (errcode_for_file_access(),
1363                                  errmsg("could not fsync two-phase state file: %m")));
1364         }
1365
1366         if (close(fd) != 0)
1367                 ereport(ERROR,
1368                                 (errcode_for_file_access(),
1369                                  errmsg("could not close two-phase state file: %m")));
1370 }
1371
1372 /*
1373  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1374  *
1375  * We must fsync the state file of any GXACT that is valid and has a PREPARE
1376  * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
1377  * has a later LSN, this checkpoint is not responsible for fsyncing it.)
1378  *
1379  * This is deliberately run as late as possible in the checkpoint sequence,
1380  * because GXACTs ordinarily have short lifespans, and so it is quite
1381  * possible that GXACTs that were valid at checkpoint start will no longer
1382  * exist if we wait a little bit.
1383  *
1384  * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
1385  * each time.  This is considered unusual enough that we don't bother to
1386  * expend any extra code to avoid the redundant fsyncs.  (They should be
1387  * reasonably cheap anyway, since they won't cause I/O.)
1388  */
1389 void
1390 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1391 {
1392         TransactionId *xids;
1393         int                     nxids;
1394         char            path[MAXPGPATH];
1395         int                     i;
1396
1397         /*
1398          * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1399          * it just long enough to make a list of the XIDs that require fsyncing,
1400          * and then do the I/O afterwards.
1401          *
1402          * This approach creates a race condition: someone else could delete a
1403          * GXACT between the time we release TwoPhaseStateLock and the time we try
1404          * to open its state file.      We handle this by special-casing ENOENT
1405          * failures: if we see that, we verify that the GXACT is no longer valid,
1406          * and if so ignore the failure.
1407          */
1408         if (max_prepared_xacts <= 0)
1409                 return;                                 /* nothing to do */
1410
1411         TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1412
1413         xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1414         nxids = 0;
1415
1416         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1417
1418         for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1419         {
1420                 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1421
1422                 if (gxact->valid &&
1423                         XLByteLE(gxact->prepare_lsn, redo_horizon))
1424                         xids[nxids++] = gxact->proc.xid;
1425         }
1426
1427         LWLockRelease(TwoPhaseStateLock);
1428
1429         for (i = 0; i < nxids; i++)
1430         {
1431                 TransactionId xid = xids[i];
1432                 int                     fd;
1433
1434                 TwoPhaseFilePath(path, xid);
1435
1436                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
1437                 if (fd < 0)
1438                 {
1439                         if (errno == ENOENT)
1440                         {
1441                                 /* OK if gxact is no longer valid */
1442                                 if (!TransactionIdIsPrepared(xid))
1443                                         continue;
1444                                 /* Restore errno in case it was changed */
1445                                 errno = ENOENT;
1446                         }
1447                         ereport(ERROR,
1448                                         (errcode_for_file_access(),
1449                                          errmsg("could not open two-phase state file \"%s\": %m",
1450                                                         path)));
1451                 }
1452
1453                 if (pg_fsync(fd) != 0)
1454                 {
1455                         close(fd);
1456                         ereport(ERROR,
1457                                         (errcode_for_file_access(),
1458                                          errmsg("could not fsync two-phase state file \"%s\": %m",
1459                                                         path)));
1460                 }
1461
1462                 if (close(fd) != 0)
1463                         ereport(ERROR,
1464                                         (errcode_for_file_access(),
1465                                          errmsg("could not close two-phase state file \"%s\": %m",
1466                                                         path)));
1467         }
1468
1469         pfree(xids);
1470
1471         TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1472 }
1473
1474 /*
1475  * PrescanPreparedTransactions
1476  *
1477  * Scan the pg_twophase directory and determine the range of valid XIDs
1478  * present.  This is run during database startup, after we have completed
1479  * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
1480  * the highest XID for which evidence exists in WAL.
1481  *
1482  * We throw away any prepared xacts with main XID beyond nextXid --- if any
1483  * are present, it suggests that the DBA has done a PITR recovery to an
1484  * earlier point in time without cleaning out pg_twophase.      We dare not
1485  * try to recover such prepared xacts since they likely depend on database
1486  * state that doesn't exist now.
1487  *
1488  * However, we will advance nextXid beyond any subxact XIDs belonging to
1489  * valid prepared xacts.  We need to do this since subxact commit doesn't
1490  * write a WAL entry, and so there might be no evidence in WAL of those
1491  * subxact XIDs.
1492  *
1493  * Our other responsibility is to determine and return the oldest valid XID
1494  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1495  * This is needed to synchronize pg_subtrans startup properly.
1496  */
1497 TransactionId
1498 PrescanPreparedTransactions(void)
1499 {
1500         TransactionId origNextXid = ShmemVariableCache->nextXid;
1501         TransactionId result = origNextXid;
1502         DIR                *cldir;
1503         struct dirent *clde;
1504
1505         cldir = AllocateDir(TWOPHASE_DIR);
1506         while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1507         {
1508                 if (strlen(clde->d_name) == 8 &&
1509                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1510                 {
1511                         TransactionId xid;
1512                         char       *buf;
1513                         TwoPhaseFileHeader *hdr;
1514                         TransactionId *subxids;
1515                         int                     i;
1516
1517                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1518
1519                         /* Reject XID if too new */
1520                         if (TransactionIdFollowsOrEquals(xid, origNextXid))
1521                         {
1522                                 ereport(WARNING,
1523                                                 (errmsg("removing future two-phase state file \"%s\"",
1524                                                                 clde->d_name)));
1525                                 RemoveTwoPhaseFile(xid, true);
1526                                 continue;
1527                         }
1528
1529                         /*
1530                          * Note: we can't check if already processed because clog
1531                          * subsystem isn't up yet.
1532                          */
1533
1534                         /* Read and validate file */
1535                         buf = ReadTwoPhaseFile(xid);
1536                         if (buf == NULL)
1537                         {
1538                                 ereport(WARNING,
1539                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1540                                                           clde->d_name)));
1541                                 RemoveTwoPhaseFile(xid, true);
1542                                 continue;
1543                         }
1544
1545                         /* Deconstruct header */
1546                         hdr = (TwoPhaseFileHeader *) buf;
1547                         if (!TransactionIdEquals(hdr->xid, xid))
1548                         {
1549                                 ereport(WARNING,
1550                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1551                                                           clde->d_name)));
1552                                 RemoveTwoPhaseFile(xid, true);
1553                                 pfree(buf);
1554                                 continue;
1555                         }
1556
1557                         /*
1558                          * OK, we think this file is valid.  Incorporate xid into the
1559                          * running-minimum result.
1560                          */
1561                         if (TransactionIdPrecedes(xid, result))
1562                                 result = xid;
1563
1564                         /*
1565                          * Examine subtransaction XIDs ... they should all follow main
1566                          * XID, and they may force us to advance nextXid.
1567                          */
1568                         subxids = (TransactionId *)
1569                                 (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1570                         for (i = 0; i < hdr->nsubxacts; i++)
1571                         {
1572                                 TransactionId subxid = subxids[i];
1573
1574                                 Assert(TransactionIdFollows(subxid, xid));
1575                                 if (TransactionIdFollowsOrEquals(subxid,
1576                                                                                                  ShmemVariableCache->nextXid))
1577                                 {
1578                                         ShmemVariableCache->nextXid = subxid;
1579                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
1580                                 }
1581                         }
1582
1583                         pfree(buf);
1584                 }
1585         }
1586         FreeDir(cldir);
1587
1588         return result;
1589 }
1590
1591 /*
1592  * RecoverPreparedTransactions
1593  *
1594  * Scan the pg_twophase directory and reload shared-memory state for each
1595  * prepared transaction (reacquire locks, etc).  This is run during database
1596  * startup.
1597  */
1598 void
1599 RecoverPreparedTransactions(void)
1600 {
1601         char            dir[MAXPGPATH];
1602         DIR                *cldir;
1603         struct dirent *clde;
1604
1605         snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
1606
1607         cldir = AllocateDir(dir);
1608         while ((clde = ReadDir(cldir, dir)) != NULL)
1609         {
1610                 if (strlen(clde->d_name) == 8 &&
1611                         strspn(clde->d_name, "0123456789ABCDEF") == 8)
1612                 {
1613                         TransactionId xid;
1614                         char       *buf;
1615                         char       *bufptr;
1616                         TwoPhaseFileHeader *hdr;
1617                         TransactionId *subxids;
1618                         GlobalTransaction gxact;
1619                         int                     i;
1620
1621                         xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1622
1623                         /* Already processed? */
1624                         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1625                         {
1626                                 ereport(WARNING,
1627                                                 (errmsg("removing stale two-phase state file \"%s\"",
1628                                                                 clde->d_name)));
1629                                 RemoveTwoPhaseFile(xid, true);
1630                                 continue;
1631                         }
1632
1633                         /* Read and validate file */
1634                         buf = ReadTwoPhaseFile(xid);
1635                         if (buf == NULL)
1636                         {
1637                                 ereport(WARNING,
1638                                           (errmsg("removing corrupt two-phase state file \"%s\"",
1639                                                           clde->d_name)));
1640                                 RemoveTwoPhaseFile(xid, true);
1641                                 continue;
1642                         }
1643
1644                         ereport(LOG,
1645                                         (errmsg("recovering prepared transaction %u", xid)));
1646
1647                         /* Deconstruct header */
1648                         hdr = (TwoPhaseFileHeader *) buf;
1649                         Assert(TransactionIdEquals(hdr->xid, xid));
1650                         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1651                         subxids = (TransactionId *) bufptr;
1652                         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1653                         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1654                         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1655
1656                         /*
1657                          * Reconstruct subtrans state for the transaction --- needed
1658                          * because pg_subtrans is not preserved over a restart.  Note that
1659                          * we are linking all the subtransactions directly to the
1660                          * top-level XID; there may originally have been a more complex
1661                          * hierarchy, but there's no need to restore that exactly.
1662                          */
1663                         for (i = 0; i < hdr->nsubxacts; i++)
1664                                 SubTransSetParent(subxids[i], xid);
1665
1666                         /*
1667                          * Recreate its GXACT and dummy PGPROC
1668                          *
1669                          * Note: since we don't have the PREPARE record's WAL location at
1670                          * hand, we leave prepare_lsn zeroes.  This means the GXACT will
1671                          * be fsync'd on every future checkpoint.  We assume this
1672                          * situation is infrequent enough that the performance cost is
1673                          * negligible (especially since we know the state file has already
1674                          * been fsynced).
1675                          */
1676                         gxact = MarkAsPreparing(xid, hdr->gid,
1677                                                                         hdr->prepared_at,
1678                                                                         hdr->owner, hdr->database);
1679                         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1680                         MarkAsPrepared(gxact);
1681
1682                         /*
1683                          * Recover other state (notably locks) using resource managers
1684                          */
1685                         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
1686
1687                         pfree(buf);
1688                 }
1689         }
1690         FreeDir(cldir);
1691 }
1692
1693 /*
1694  *      RecordTransactionCommitPrepared
1695  *
1696  * This is basically the same as RecordTransactionCommit: in particular,
1697  * we must set the inCommit flag to avoid a race condition.
1698  *
1699  * We know the transaction made at least one XLOG entry (its PREPARE),
1700  * so it is never possible to optimize out the commit record.
1701  */
1702 static void
1703 RecordTransactionCommitPrepared(TransactionId xid,
1704                                                                 int nchildren,
1705                                                                 TransactionId *children,
1706                                                                 int nrels,
1707                                                                 RelFileNode *rels)
1708 {
1709         XLogRecData rdata[3];
1710         int                     lastrdata = 0;
1711         xl_xact_commit_prepared xlrec;
1712         XLogRecPtr      recptr;
1713
1714         START_CRIT_SECTION();
1715
1716         /* See notes in RecordTransactionCommit */
1717         MyProc->inCommit = true;
1718
1719         /* Emit the XLOG commit record */
1720         xlrec.xid = xid;
1721         xlrec.crec.xact_time = GetCurrentTimestamp();
1722         xlrec.crec.nrels = nrels;
1723         xlrec.crec.nsubxacts = nchildren;
1724         rdata[0].data = (char *) (&xlrec);
1725         rdata[0].len = MinSizeOfXactCommitPrepared;
1726         rdata[0].buffer = InvalidBuffer;
1727         /* dump rels to delete */
1728         if (nrels > 0)
1729         {
1730                 rdata[0].next = &(rdata[1]);
1731                 rdata[1].data = (char *) rels;
1732                 rdata[1].len = nrels * sizeof(RelFileNode);
1733                 rdata[1].buffer = InvalidBuffer;
1734                 lastrdata = 1;
1735         }
1736         /* dump committed child Xids */
1737         if (nchildren > 0)
1738         {
1739                 rdata[lastrdata].next = &(rdata[2]);
1740                 rdata[2].data = (char *) children;
1741                 rdata[2].len = nchildren * sizeof(TransactionId);
1742                 rdata[2].buffer = InvalidBuffer;
1743                 lastrdata = 2;
1744         }
1745         rdata[lastrdata].next = NULL;
1746
1747         recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
1748
1749         /*
1750          * We don't currently try to sleep before flush here ... nor is there any
1751          * support for async commit of a prepared xact (the very idea is probably
1752          * a contradiction)
1753          */
1754
1755         /* Flush XLOG to disk */
1756         XLogFlush(recptr);
1757
1758         /* Mark the transaction committed in pg_clog */
1759         TransactionIdCommitTree(xid, nchildren, children);
1760
1761         /* Checkpoint can proceed now */
1762         MyProc->inCommit = false;
1763
1764         END_CRIT_SECTION();
1765 }
1766
1767 /*
1768  *      RecordTransactionAbortPrepared
1769  *
1770  * This is basically the same as RecordTransactionAbort.
1771  *
1772  * We know the transaction made at least one XLOG entry (its PREPARE),
1773  * so it is never possible to optimize out the abort record.
1774  */
1775 static void
1776 RecordTransactionAbortPrepared(TransactionId xid,
1777                                                            int nchildren,
1778                                                            TransactionId *children,
1779                                                            int nrels,
1780                                                            RelFileNode *rels)
1781 {
1782         XLogRecData rdata[3];
1783         int                     lastrdata = 0;
1784         xl_xact_abort_prepared xlrec;
1785         XLogRecPtr      recptr;
1786
1787         /*
1788          * Catch the scenario where we aborted partway through
1789          * RecordTransactionCommitPrepared ...
1790          */
1791         if (TransactionIdDidCommit(xid))
1792                 elog(PANIC, "cannot abort transaction %u, it was already committed",
1793                          xid);
1794
1795         START_CRIT_SECTION();
1796
1797         /* Emit the XLOG abort record */
1798         xlrec.xid = xid;
1799         xlrec.arec.xact_time = GetCurrentTimestamp();
1800         xlrec.arec.nrels = nrels;
1801         xlrec.arec.nsubxacts = nchildren;
1802         rdata[0].data = (char *) (&xlrec);
1803         rdata[0].len = MinSizeOfXactAbortPrepared;
1804         rdata[0].buffer = InvalidBuffer;
1805         /* dump rels to delete */
1806         if (nrels > 0)
1807         {
1808                 rdata[0].next = &(rdata[1]);
1809                 rdata[1].data = (char *) rels;
1810                 rdata[1].len = nrels * sizeof(RelFileNode);
1811                 rdata[1].buffer = InvalidBuffer;
1812                 lastrdata = 1;
1813         }
1814         /* dump committed child Xids */
1815         if (nchildren > 0)
1816         {
1817                 rdata[lastrdata].next = &(rdata[2]);
1818                 rdata[2].data = (char *) children;
1819                 rdata[2].len = nchildren * sizeof(TransactionId);
1820                 rdata[2].buffer = InvalidBuffer;
1821                 lastrdata = 2;
1822         }
1823         rdata[lastrdata].next = NULL;
1824
1825         recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
1826
1827         /* Always flush, since we're about to remove the 2PC state file */
1828         XLogFlush(recptr);
1829
1830         /*
1831          * Mark the transaction aborted in clog.  This is not absolutely necessary
1832          * but we may as well do it while we are here.
1833          */
1834         TransactionIdAbortTree(xid, nchildren, children);
1835
1836         END_CRIT_SECTION();
1837 }