]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/predicate.c
Make ascii-art in comments pgindent-safe, and some other formatting changes.
[postgresql] / src / backend / storage / lmgr / predicate.c
1 /*-------------------------------------------------------------------------
2  *
3  * predicate.c
4  *        POSTGRES predicate locking
5  *        to support full serializable transaction isolation
6  *
7  *
8  * The approach taken is to implement Serializable Snapshot Isolation (SSI)
9  * as initially described in this paper:
10  *
11  *      Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
12  *      Serializable isolation for snapshot databases.
13  *      In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
14  *      international conference on Management of data,
15  *      pages 729-738, New York, NY, USA. ACM.
16  *      http://doi.acm.org/10.1145/1376616.1376690
17  *
18  * and further elaborated in Cahill's doctoral thesis:
19  *
20  *      Michael James Cahill. 2009.
21  *      Serializable Isolation for Snapshot Databases.
22  *      Sydney Digital Theses.
23  *      University of Sydney, School of Information Technologies.
24  *      http://hdl.handle.net/2123/5353
25  *
26  *
27  * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
28  * locks, which are so different from normal locks that a distinct set of
29  * structures is required to handle them.  They are needed to detect
30  * rw-conflicts when the read happens before the write.  (When the write
31  * occurs first, the reading transaction can check for a conflict by
32  * examining the MVCC data.)
33  *
34  * (1)  Besides tuples actually read, they must cover ranges of tuples
35  *              which would have been read based on the predicate.      This will
36  *              require modelling the predicates through locks against database
37  *              objects such as pages, index ranges, or entire tables.
38  *
39  * (2)  They must be kept in RAM for quick access.      Because of this, it
40  *              isn't possible to always maintain tuple-level granularity -- when
41  *              the space allocated to store these approaches exhaustion, a
42  *              request for a lock may need to scan for situations where a single
43  *              transaction holds many fine-grained locks which can be coalesced
44  *              into a single coarser-grained lock.
45  *
46  * (3)  They never block anything; they are more like flags than locks
47  *              in that regard; although they refer to database objects and are
48  *              used to identify rw-conflicts with normal write locks.
49  *
50  * (4)  While they are associated with a transaction, they must survive
51  *              a successful COMMIT of that transaction, and remain until all
52  *              overlapping transactions complete.      This even means that they
53  *              must survive termination of the transaction's process.  If a
54  *              top level transaction is rolled back, however, it is immediately
55  *              flagged so that it can be ignored, and its SIREAD locks can be
56  *              released any time after that.
57  *
58  * (5)  The only transactions which create SIREAD locks or check for
59  *              conflicts with them are serializable transactions.
60  *
61  * (6)  When a write lock for a top level transaction is found to cover
62  *              an existing SIREAD lock for the same transaction, the SIREAD lock
63  *              can be deleted.
64  *
65  * (7)  A write from a serializable transaction must ensure that a xact
66  *              record exists for the transaction, with the same lifespan (until
67  *              all concurrent transaction complete or the transaction is rolled
68  *              back) so that rw-dependencies to that transaction can be
69  *              detected.
70  *
71  * We use an optimization for read-only transactions. Under certain
72  * circumstances, a read-only transaction's snapshot can be shown to
73  * never have conflicts with other transactions.  This is referred to
74  * as a "safe" snapshot (and one known not to be is "unsafe").
75  * However, it can't be determined whether a snapshot is safe until
76  * all concurrent read/write transactions complete.
77  *
78  * Once a read-only transaction is known to have a safe snapshot, it
79  * can release its predicate locks and exempt itself from further
80  * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
81  * on safe snapshots, waiting as necessary for one to be available.
82  *
83  *
84  * Lightweight locks to manage access to the predicate locking shared
85  * memory objects must be taken in this order, and should be released in
86  * reverse order:
87  *
88  *      SerializableFinishedListLock
89  *              - Protects the list of transactions which have completed but which
90  *                      may yet matter because they overlap still-active transactions.
91  *
92  *      SerializablePredicateLockListLock
93  *              - Protects the linked list of locks held by a transaction.      Note
94  *                      that the locks themselves are also covered by the partition
95  *                      locks of their respective lock targets; this lock only affects
96  *                      the linked list connecting the locks related to a transaction.
97  *              - All transactions share this single lock (with no partitioning).
98  *              - There is never a need for a process other than the one running
99  *                      an active transaction to walk the list of locks held by that
100  *                      transaction.
101  *              - It is relatively infrequent that another process needs to
102  *                      modify the list for a transaction, but it does happen for such
103  *                      things as index page splits for pages with predicate locks and
104  *                      freeing of predicate locked pages by a vacuum process.  When
105  *                      removing a lock in such cases, the lock itself contains the
106  *                      pointers needed to remove it from the list.  When adding a
107  *                      lock in such cases, the lock can be added using the anchor in
108  *                      the transaction structure.      Neither requires walking the list.
109  *              - Cleaning up the list for a terminated transaction is sometimes
110  *                      not done on a retail basis, in which case no lock is required.
111  *              - Due to the above, a process accessing its active transaction's
112  *                      list always uses a shared lock, regardless of whether it is
113  *                      walking or maintaining the list.  This improves concurrency
114  *                      for the common access patterns.
115  *              - A process which needs to alter the list of a transaction other
116  *                      than its own active transaction must acquire an exclusive
117  *                      lock.
118  *
119  *      FirstPredicateLockMgrLock based partition locks
120  *              - The same lock protects a target, all locks on that target, and
121  *                      the linked list of locks on the target..
122  *              - When more than one is needed, acquire in ascending order.
123  *
124  *      SerializableXactHashLock
125  *              - Protects both PredXact and SerializableXidHash.
126  *
127  *
128  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
129  * Portions Copyright (c) 1994, Regents of the University of California
130  *
131  *
132  * IDENTIFICATION
133  *        src/backend/storage/lmgr/predicate.c
134  *
135  *-------------------------------------------------------------------------
136  */
137 /*
138  * INTERFACE ROUTINES
139  *
140  * housekeeping for setting up shared memory predicate lock structures
141  *              InitPredicateLocks(void)
142  *              PredicateLockShmemSize(void)
143  *
144  * predicate lock reporting
145  *              GetPredicateLockStatusData(void)
146  *              PageIsPredicateLocked(Relation relation, BlockNumber blkno)
147  *
148  * predicate lock maintenance
149  *              RegisterSerializableTransaction(Snapshot snapshot)
150  *              RegisterPredicateLockingXid(void)
151  *              PredicateLockRelation(Relation relation)
152  *              PredicateLockPage(Relation relation, BlockNumber blkno)
153  *              PredicateLockTuple(Relation relation, HeapTuple tuple)
154  *              PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
155  *                                                         BlockNumber newblkno);
156  *              PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
157  *                                                               BlockNumber newblkno);
158  *              ReleasePredicateLocks(bool isCommit)
159  *
160  * conflict detection (may also trigger rollback)
161  *              CheckForSerializableConflictOut(bool visible, Relation relation,
162  *                                                                              HeapTupleData *tup, Buffer buffer)
163  *              CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
164  *                                                                         Buffer buffer)
165  *
166  * final rollback checking
167  *              PreCommit_CheckForSerializationFailure(void)
168  *
169  * two-phase commit support
170  *              AtPrepare_PredicateLocks(void);
171  *              PostPrepare_PredicateLocks(TransactionId xid);
172  *              PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
173  *              predicatelock_twophase_recover(TransactionId xid, uint16 info,
174  *                                                                         void *recdata, uint32 len);
175  */
176
177 #include "postgres.h"
178
179 #include "access/slru.h"
180 #include "access/subtrans.h"
181 #include "access/transam.h"
182 #include "access/twophase.h"
183 #include "access/twophase_rmgr.h"
184 #include "access/xact.h"
185 #include "miscadmin.h"
186 #include "storage/bufmgr.h"
187 #include "storage/predicate.h"
188 #include "storage/predicate_internals.h"
189 #include "storage/procarray.h"
190 #include "utils/rel.h"
191 #include "utils/snapmgr.h"
192 #include "utils/tqual.h"
193
194 /* Uncomment the next line to test the graceful degradation code. */
195 /* #define TEST_OLDSERXID */
196
197 /*
198  * Test the most selective fields first, for performance.
199  *
200  * a is covered by b if all of the following hold:
201  *      1) a.database = b.database
202  *      2) a.relation = b.relation
203  *      3) b.offset is invalid (b is page-granularity or higher)
204  *      4) either of the following:
205  *              4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
206  *       or 4b) a.offset is invalid and b.page is invalid (a is
207  *                      page-granularity and b is relation-granularity
208  */
209 #define TargetTagIsCoveredBy(covered_target, covering_target)                   \
210         ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */      \
211           GET_PREDICATELOCKTARGETTAG_RELATION(covering_target))                         \
212          && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) ==                      \
213                  InvalidOffsetNumber)                                                            /* (3) */      \
214          && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) !=                     \
215                    InvalidOffsetNumber)                                                          /* (4a) */ \
216                   && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
217                           GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)))                     \
218                  || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
219                           InvalidBlockNumber)                                                    /* (4b) */ \
220                          && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)            \
221                                  != InvalidBlockNumber)))                                                               \
222          && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) ==    /* (1) */      \
223                  GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
224
225 /*
226  * The predicate locking target and lock shared hash tables are partitioned to
227  * reduce contention.  To determine which partition a given target belongs to,
228  * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
229  * apply one of these macros.
230  * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
231  */
232 #define PredicateLockHashPartition(hashcode) \
233         ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
234 #define PredicateLockHashPartitionLock(hashcode) \
235         ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
236
237 #define NPREDICATELOCKTARGETENTS() \
238         mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
239
240 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
241
242 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
243 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
244 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
245 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
246 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
247 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
248 /*
249  * The following macro actually means that the specified transaction has a
250  * conflict out *to a transaction which committed ahead of it*.  It's hard
251  * to get that into a name of a reasonable length.
252  */
253 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
254 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
255 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
256 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
257 #define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0)
258
259 /*
260  * When a public interface method is called for a split on an index relation,
261  * this is the test to see if we should do a quick return.
262  */
263 #define SkipSplitTracking(relation) \
264         (((relation)->rd_id < FirstBootstrapObjectId) \
265         || RelationUsesLocalBuffers(relation))
266
267 /*
268  * When a public interface method is called for serializing a relation within
269  * the current transaction, this is the test to see if we should do a quick
270  * return.
271  */
272 #define SkipSerialization(relation) \
273         ((!IsolationIsSerializable()) \
274         || ((MySerializableXact == InvalidSerializableXact)) \
275         || ReleasePredicateLocksIfROSafe() \
276         || SkipSplitTracking(relation))
277
278
279 /*
280  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
281  *
282  * To avoid unnecessary recomputations of the hash code, we try to do this
283  * just once per function, and then pass it around as needed.  Aside from
284  * passing the hashcode to hash_search_with_hash_value(), we can extract
285  * the lock partition number from the hashcode.
286  */
287 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
288         (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
289
290 /*
291  * Given a predicate lock tag, and the hash for its target,
292  * compute the lock hash.
293  *
294  * To make the hash code also depend on the transaction, we xor the sxid
295  * struct's address into the hash code, left-shifted so that the
296  * partition-number bits don't change.  Since this is only a hash, we
297  * don't care if we lose high-order bits of the address; use an
298  * intermediate variable to suppress cast-pointer-to-int warnings.
299  */
300 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
301         ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
302          << LOG2_NUM_PREDICATELOCK_PARTITIONS)
303
304
305 /*
306  * The SLRU buffer area through which we access the old xids.
307  */
308 static SlruCtlData OldSerXidSlruCtlData;
309
310 #define OldSerXidSlruCtl                        (&OldSerXidSlruCtlData)
311
312 #define OLDSERXID_PAGESIZE                      BLCKSZ
313 #define OLDSERXID_ENTRYSIZE                     sizeof(SerCommitSeqNo)
314 #define OLDSERXID_ENTRIESPERPAGE        (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
315 #define OLDSERXID_MAX_PAGE                      (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
316
317 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
318
319 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
320         (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
321         ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
322
323 #define OldSerXidPage(xid)      ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
324 #define OldSerXidSegment(page)  ((page) / SLRU_PAGES_PER_SEGMENT)
325
326 typedef struct OldSerXidControlData
327 {
328         int                     headPage;               /* newest initialized page */
329         TransactionId headXid;          /* newest valid Xid in the SLRU */
330         TransactionId tailXid;          /* oldest xmin we might be interested in */
331         bool            warningIssued;
332 }       OldSerXidControlData;
333
334 typedef struct OldSerXidControlData *OldSerXidControl;
335
336 static OldSerXidControl oldSerXidControl;
337
338 /*
339  * When the oldest committed transaction on the "finished" list is moved to
340  * SLRU, its predicate locks will be moved to this "dummy" transaction,
341  * collapsing duplicate targets.  When a duplicate is found, the later
342  * commitSeqNo is used.
343  */
344 static SERIALIZABLEXACT *OldCommittedSxact;
345
346
347 /* This configuration variable is used to set the predicate lock table size */
348 int                     max_predicate_locks_per_xact;           /* set by guc.c */
349
350 /*
351  * This provides a list of objects in order to track transactions
352  * participating in predicate locking.  Entries in the list are fixed size,
353  * and reside in shared memory.  The memory address of an entry must remain
354  * fixed during its lifetime.  The list will be protected from concurrent
355  * update externally; no provision is made in this code to manage that.  The
356  * number of entries in the list, and the size allowed for each entry is
357  * fixed upon creation.
358  */
359 static PredXactList PredXact;
360
361 /*
362  * This provides a pool of RWConflict data elements to use in conflict lists
363  * between transactions.
364  */
365 static RWConflictPoolHeader RWConflictPool;
366
367 /*
368  * The predicate locking hash tables are in shared memory.
369  * Each backend keeps pointers to them.
370  */
371 static HTAB *SerializableXidHash;
372 static HTAB *PredicateLockTargetHash;
373 static HTAB *PredicateLockHash;
374 static SHM_QUEUE *FinishedSerializableTransactions;
375
376 /*
377  * Tag for a reserved entry in PredicateLockTargetHash; used to ensure
378  * there's an element available for scratch space if we need it,
379  * e.g. in PredicateLockPageSplit. This is an otherwise-invalid tag.
380  */
381 static const PREDICATELOCKTARGETTAG ReservedTargetTag = {0, 0, 0, 0, 0};
382
383 /*
384  * The local hash table used to determine when to combine multiple fine-
385  * grained locks into a single courser-grained lock.
386  */
387 static HTAB *LocalPredicateLockHash = NULL;
388
389 /*
390  * Keep a pointer to the currently-running serializable transaction (if any)
391  * for quick reference.
392  * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
393  */
394 static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
395
396 /* local functions */
397
398 static SERIALIZABLEXACT *CreatePredXact(void);
399 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
400 static SERIALIZABLEXACT *FirstPredXact(void);
401 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
402
403 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
404 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
405 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
406 static void ReleaseRWConflict(RWConflict conflict);
407 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
408
409 static bool OldSerXidPagePrecedesLogically(int p, int q);
410 static void OldSerXidInit(void);
411 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
412 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
413 static void OldSerXidSetActiveSerXmin(TransactionId xid);
414
415 static uint32 predicatelock_hash(const void *key, Size keysize);
416 static void SummarizeOldestCommittedSxact(void);
417 static Snapshot GetSafeSnapshot(Snapshot snapshot);
418 static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot);
419 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
420 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
421                                                   PREDICATELOCKTARGETTAG *parent);
422 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
423 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
424                                                    uint32 targettaghash);
425 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
426 static int      PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
427 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
428 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
429 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
430                                         uint32 targettaghash,
431                                         SERIALIZABLEXACT *sxact);
432 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
433 static bool TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
434                                                                   const PREDICATELOCKTARGETTAG newtargettag,
435                                                                   bool removeOld);
436 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
437 static void SetNewSxactGlobalXmin(void);
438 static bool ReleasePredicateLocksIfROSafe(void);
439 static void ClearOldPredicateLocks(void);
440 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
441                                                    bool summarize);
442 static bool XidIsConcurrent(TransactionId xid);
443 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
444 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
445 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
446                                                                                 SERIALIZABLEXACT *writer);
447
448 /*------------------------------------------------------------------------*/
449
450 /*
451  * These functions are a simple implementation of a list for this specific
452  * type of struct.      If there is ever a generalized shared memory list, we
453  * should probably switch to that.
454  */
455 static SERIALIZABLEXACT *
456 CreatePredXact(void)
457 {
458         PredXactListElement ptle;
459
460         ptle = (PredXactListElement)
461                 SHMQueueNext(&PredXact->availableList,
462                                          &PredXact->availableList,
463                                          offsetof(PredXactListElementData, link));
464         if (!ptle)
465                 return NULL;
466
467         SHMQueueDelete(&ptle->link);
468         SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
469         return &ptle->sxact;
470 }
471
472 static void
473 ReleasePredXact(SERIALIZABLEXACT *sxact)
474 {
475         PredXactListElement ptle;
476
477         Assert(ShmemAddrIsValid(sxact));
478
479         ptle = (PredXactListElement)
480                 (((char *) sxact)
481                  - offsetof(PredXactListElementData, sxact)
482                  + offsetof(PredXactListElementData, link));
483         SHMQueueDelete(&ptle->link);
484         SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
485 }
486
487 static SERIALIZABLEXACT *
488 FirstPredXact(void)
489 {
490         PredXactListElement ptle;
491
492         ptle = (PredXactListElement)
493                 SHMQueueNext(&PredXact->activeList,
494                                          &PredXact->activeList,
495                                          offsetof(PredXactListElementData, link));
496         if (!ptle)
497                 return NULL;
498
499         return &ptle->sxact;
500 }
501
502 static SERIALIZABLEXACT *
503 NextPredXact(SERIALIZABLEXACT *sxact)
504 {
505         PredXactListElement ptle;
506
507         Assert(ShmemAddrIsValid(sxact));
508
509         ptle = (PredXactListElement)
510                 (((char *) sxact)
511                  - offsetof(PredXactListElementData, sxact)
512                  + offsetof(PredXactListElementData, link));
513         ptle = (PredXactListElement)
514                 SHMQueueNext(&PredXact->activeList,
515                                          &ptle->link,
516                                          offsetof(PredXactListElementData, link));
517         if (!ptle)
518                 return NULL;
519
520         return &ptle->sxact;
521 }
522
523 /*------------------------------------------------------------------------*/
524
525 /*
526  * These functions manage primitive access to the RWConflict pool and lists.
527  */
528 static bool
529 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
530 {
531         RWConflict      conflict;
532
533         Assert(reader != writer);
534
535         /* Check the ends of the purported conflict first. */
536         if (SxactIsRolledBack(reader)
537                 || SxactIsRolledBack(writer)
538                 || SHMQueueEmpty(&reader->outConflicts)
539                 || SHMQueueEmpty(&writer->inConflicts))
540                 return false;
541
542         /* A conflict is possible; walk the list to find out. */
543         conflict = (RWConflict)
544                 SHMQueueNext(&reader->outConflicts,
545                                          &reader->outConflicts,
546                                          offsetof(RWConflictData, outLink));
547         while (conflict)
548         {
549                 if (conflict->sxactIn == writer)
550                         return true;
551                 conflict = (RWConflict)
552                         SHMQueueNext(&reader->outConflicts,
553                                                  &conflict->outLink,
554                                                  offsetof(RWConflictData, outLink));
555         }
556
557         /* No conflict found. */
558         return false;
559 }
560
561 static void
562 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
563 {
564         RWConflict      conflict;
565
566         Assert(reader != writer);
567         Assert(!RWConflictExists(reader, writer));
568
569         conflict = (RWConflict)
570                 SHMQueueNext(&RWConflictPool->availableList,
571                                          &RWConflictPool->availableList,
572                                          offsetof(RWConflictData, outLink));
573         if (!conflict)
574                 ereport(ERROR,
575                                 (errcode(ERRCODE_OUT_OF_MEMORY),
576                                  errmsg("not enough elements in RWConflictPool to record a rw-conflict"),
577                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
578
579         SHMQueueDelete(&conflict->outLink);
580
581         conflict->sxactOut = reader;
582         conflict->sxactIn = writer;
583         SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
584         SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
585 }
586
587 static void
588 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
589                                                   SERIALIZABLEXACT *activeXact)
590 {
591         RWConflict      conflict;
592
593         Assert(roXact != activeXact);
594         Assert(SxactIsReadOnly(roXact));
595         Assert(!SxactIsReadOnly(activeXact));
596
597         conflict = (RWConflict)
598                 SHMQueueNext(&RWConflictPool->availableList,
599                                          &RWConflictPool->availableList,
600                                          offsetof(RWConflictData, outLink));
601         if (!conflict)
602                 ereport(ERROR,
603                                 (errcode(ERRCODE_OUT_OF_MEMORY),
604                                  errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"),
605                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
606
607         SHMQueueDelete(&conflict->outLink);
608
609         conflict->sxactOut = activeXact;
610         conflict->sxactIn = roXact;
611         SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
612                                                  &conflict->outLink);
613         SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
614                                                  &conflict->inLink);
615 }
616
617 static void
618 ReleaseRWConflict(RWConflict conflict)
619 {
620         SHMQueueDelete(&conflict->inLink);
621         SHMQueueDelete(&conflict->outLink);
622         SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
623 }
624
625 static void
626 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
627 {
628         RWConflict      conflict,
629                                 nextConflict;
630
631         Assert(SxactIsReadOnly(sxact));
632         Assert(!SxactIsROSafe(sxact));
633
634         sxact->flags |= SXACT_FLAG_RO_UNSAFE;
635
636         /*
637          * We know this isn't a safe snapshot, so we can stop looking for other
638          * potential conflicts.
639          */
640         conflict = (RWConflict)
641                 SHMQueueNext(&sxact->possibleUnsafeConflicts,
642                                          &sxact->possibleUnsafeConflicts,
643                                          offsetof(RWConflictData, inLink));
644         while (conflict)
645         {
646                 nextConflict = (RWConflict)
647                         SHMQueueNext(&sxact->possibleUnsafeConflicts,
648                                                  &conflict->inLink,
649                                                  offsetof(RWConflictData, inLink));
650
651                 Assert(!SxactIsReadOnly(conflict->sxactOut));
652                 Assert(sxact == conflict->sxactIn);
653
654                 ReleaseRWConflict(conflict);
655
656                 conflict = nextConflict;
657         }
658 }
659
660 /*------------------------------------------------------------------------*/
661
662 /*
663  * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
664  * Compares using wraparound logic, as is required by slru.c.
665  */
666 static bool
667 OldSerXidPagePrecedesLogically(int p, int q)
668 {
669         int                     diff;
670
671         /*
672          * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2.  Both inputs should
673          * be in the range 0..OLDSERXID_MAX_PAGE.
674          */
675         Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
676         Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
677
678         diff = p - q;
679         if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
680                 diff -= OLDSERXID_MAX_PAGE + 1;
681         else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2))
682                 diff += OLDSERXID_MAX_PAGE + 1;
683         return diff < 0;
684 }
685
686 /*
687  * Initialize for the tracking of old serializable committed xids.
688  */
689 static void
690 OldSerXidInit(void)
691 {
692         bool            found;
693
694         /*
695          * Set up SLRU management of the pg_serial data.
696          */
697         OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
698         SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", NUM_OLDSERXID_BUFFERS, 0,
699                                   OldSerXidLock, "pg_serial");
700         /* Override default assumption that writes should be fsync'd */
701         OldSerXidSlruCtl->do_fsync = false;
702
703         /*
704          * Create or attach to the OldSerXidControl structure.
705          */
706         oldSerXidControl = (OldSerXidControl)
707                 ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
708
709         if (!found)
710         {
711                 /*
712                  * Set control information to reflect empty SLRU.
713                  */
714                 oldSerXidControl->headPage = -1;
715                 oldSerXidControl->headXid = InvalidTransactionId;
716                 oldSerXidControl->tailXid = InvalidTransactionId;
717                 oldSerXidControl->warningIssued = false;
718         }
719 }
720
721 /*
722  * Record a committed read write serializable xid and the minimum
723  * commitSeqNo of any transactions to which this xid had a rw-conflict out.
724  * A zero seqNo means that there were no conflicts out from xid.
725  */
726 static void
727 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
728 {
729         TransactionId tailXid;
730         int                     targetPage;
731         int                     slotno;
732         int                     firstZeroPage;
733         int                     xidSpread;
734         bool            isNewPage;
735
736         Assert(TransactionIdIsValid(xid));
737
738         targetPage = OldSerXidPage(xid);
739
740         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
741
742         /*
743          * If no serializable transactions are active, there shouldn't be anything
744          * to push out to the SLRU.  Hitting this assert would mean there's
745          * something wrong with the earlier cleanup logic.
746          */
747         tailXid = oldSerXidControl->tailXid;
748         Assert(TransactionIdIsValid(tailXid));
749
750         /*
751          * If the SLRU is currently unused, zero out the whole active region from
752          * tailXid to headXid before taking it into use. Otherwise zero out only
753          * any new pages that enter the tailXid-headXid range as we advance
754          * headXid.
755          */
756         if (oldSerXidControl->headPage < 0)
757         {
758                 firstZeroPage = OldSerXidPage(tailXid);
759                 isNewPage = true;
760         }
761         else
762         {
763                 firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
764                 isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
765                                                                                                    targetPage);
766         }
767
768         if (!TransactionIdIsValid(oldSerXidControl->headXid)
769                 || TransactionIdFollows(xid, oldSerXidControl->headXid))
770                 oldSerXidControl->headXid = xid;
771         if (isNewPage)
772                 oldSerXidControl->headPage = targetPage;
773
774         xidSpread = (((uint32) xid) - ((uint32) tailXid));
775         if (oldSerXidControl->warningIssued)
776         {
777                 if (xidSpread < 800000000)
778                         oldSerXidControl->warningIssued = false;
779         }
780         else if (xidSpread >= 1000000000)
781         {
782                 oldSerXidControl->warningIssued = true;
783                 ereport(WARNING,
784                                 (errmsg("memory for serializable conflict tracking is nearly exhausted"),
785                                  errhint("There may be an idle transaction or a forgotten prepared transaction causing this.")));
786         }
787
788         if (isNewPage)
789         {
790                 /* Initialize intervening pages. */
791                 while (firstZeroPage != targetPage)
792                 {
793                         (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
794                         firstZeroPage = OldSerXidNextPage(firstZeroPage);
795                 }
796                 slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
797         }
798         else
799                 slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
800
801         OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
802
803         LWLockRelease(OldSerXidLock);
804 }
805
806 /*
807  * Get the minimum commitSeqNo for any conflict out for the given xid.  For
808  * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
809  * will be returned.
810  */
811 static SerCommitSeqNo
812 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
813 {
814         TransactionId headXid;
815         TransactionId tailXid;
816         SerCommitSeqNo val;
817         int                     slotno;
818
819         Assert(TransactionIdIsValid(xid));
820
821         LWLockAcquire(OldSerXidLock, LW_SHARED);
822         headXid = oldSerXidControl->headXid;
823         tailXid = oldSerXidControl->tailXid;
824         LWLockRelease(OldSerXidLock);
825
826         if (!TransactionIdIsValid(headXid))
827                 return 0;
828
829         Assert(TransactionIdIsValid(tailXid));
830
831         if (TransactionIdPrecedes(xid, tailXid)
832                 || TransactionIdFollows(xid, headXid))
833                 return 0;
834
835         /*
836          * The following function must be called without holding OldSerXidLock,
837          * but will return with that lock held, which must then be released.
838          */
839         slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
840                                                                                 OldSerXidPage(xid), xid);
841         val = OldSerXidValue(slotno, xid);
842         LWLockRelease(OldSerXidLock);
843         return val;
844 }
845
846 /*
847  * Call this whenever there is a new xmin for active serializable
848  * transactions.  We don't need to keep information on transactions which
849  * precede that.  InvalidTransactionId means none active, so everything in
850  * the SLRU can be discarded.
851  */
852 static void
853 OldSerXidSetActiveSerXmin(TransactionId xid)
854 {
855         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
856
857         /*
858          * When no sxacts are active, nothing overlaps, set the xid values to
859          * invalid to show that there are no valid entries.  Don't clear headPage,
860          * though.      A new xmin might still land on that page, and we don't want to
861          * repeatedly zero out the same page.
862          */
863         if (!TransactionIdIsValid(xid))
864         {
865                 oldSerXidControl->tailXid = InvalidTransactionId;
866                 oldSerXidControl->headXid = InvalidTransactionId;
867                 LWLockRelease(OldSerXidLock);
868                 return;
869         }
870
871         /*
872          * When we're recovering prepared transactions, the global xmin might move
873          * backwards depending on the order they're recovered. Normally that's not
874          * OK, but during recovery no serializable transactions will commit, so
875          * the SLRU is empty and we can get away with it.
876          */
877         if (RecoveryInProgress())
878         {
879                 Assert(oldSerXidControl->headPage < 0);
880                 if (!TransactionIdIsValid(oldSerXidControl->tailXid)
881                         || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
882                 {
883                         oldSerXidControl->tailXid = xid;
884                 }
885                 LWLockRelease(OldSerXidLock);
886                 return;
887         }
888
889         Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
890                    || TransactionIdFollows(xid, oldSerXidControl->tailXid));
891
892         oldSerXidControl->tailXid = xid;
893
894         LWLockRelease(OldSerXidLock);
895 }
896
897 /*
898  * Perform a checkpoint --- either during shutdown, or on-the-fly
899  *
900  * We don't have any data that needs to survive a restart, but this is a
901  * convenient place to truncate the SLRU.
902  */
903 void
904 CheckPointPredicate(void)
905 {
906         int                     tailPage;
907
908         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
909
910         /* Exit quickly if the SLRU is currently not in use. */
911         if (oldSerXidControl->headPage < 0)
912         {
913                 LWLockRelease(OldSerXidLock);
914                 return;
915         }
916
917         if (TransactionIdIsValid(oldSerXidControl->tailXid))
918         {
919                 /* We can truncate the SLRU up to the page containing tailXid */
920                 tailPage = OldSerXidPage(oldSerXidControl->tailXid);
921         }
922         else
923         {
924                 /*
925                  * The SLRU is no longer needed. Truncate everything.  If we try to
926                  * leave the head page around to avoid re-zeroing it, we might not use
927                  * the SLRU again until we're past the wrap-around point, which makes
928                  * SLRU unhappy.
929                  *
930                  * While the API asks you to specify truncation by page, it silently
931                  * ignores the request unless the specified page is in a segment past
932                  * some allocated portion of the SLRU.  We don't care which page in a
933                  * later segment we hit, so just add the number of pages per segment
934                  * to the head page to land us *somewhere* in the next segment.
935                  */
936                 tailPage = oldSerXidControl->headPage + SLRU_PAGES_PER_SEGMENT;
937                 oldSerXidControl->headPage = -1;
938         }
939
940         LWLockRelease(OldSerXidLock);
941
942         /* Truncate away pages that are no longer required */
943         SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
944
945         /*
946          * Flush dirty SLRU pages to disk
947          *
948          * This is not actually necessary from a correctness point of view. We do
949          * it merely as a debugging aid.
950          *
951          * We're doing this after the truncation to avoid writing pages right
952          * before deleting the file in which they sit, which would be completely
953          * pointless.
954          */
955         SimpleLruFlush(OldSerXidSlruCtl, true);
956 }
957
958 /*------------------------------------------------------------------------*/
959
960 /*
961  * InitPredicateLocks -- Initialize the predicate locking data structures.
962  *
963  * This is called from CreateSharedMemoryAndSemaphores(), which see for
964  * more comments.  In the normal postmaster case, the shared hash tables
965  * are created here.  Backends inherit the pointers
966  * to the shared tables via fork().  In the EXEC_BACKEND case, each
967  * backend re-executes this code to obtain pointers to the already existing
968  * shared hash tables.
969  */
970 void
971 InitPredicateLocks(void)
972 {
973         HASHCTL         info;
974         int                     hash_flags;
975         long            max_table_size;
976         Size            requestSize;
977         bool            found;
978
979         /*
980          * Compute size of predicate lock target hashtable.
981          * Note these calculations must agree with PredicateLockShmemSize!
982          */
983         max_table_size = NPREDICATELOCKTARGETENTS();
984
985         /*
986          * Allocate hash table for PREDICATELOCKTARGET structs.  This stores
987          * per-predicate-lock-target information.
988          */
989         MemSet(&info, 0, sizeof(info));
990         info.keysize = sizeof(PREDICATELOCKTARGETTAG);
991         info.entrysize = sizeof(PREDICATELOCKTARGET);
992         info.hash = tag_hash;
993         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
994         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
995
996         PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
997                                                                                         max_table_size,
998                                                                                         max_table_size,
999                                                                                         &info,
1000                                                                                         hash_flags);
1001
1002         /* Assume an average of 2 xacts per target */
1003         max_table_size *= 2;
1004
1005         /*
1006          * Reserve an entry in the hash table; we use it to make sure there's
1007          * always one entry available when we need to split or combine a page,
1008          * because running out of space there could mean aborting a
1009          * non-serializable transaction.
1010          */
1011         hash_search(PredicateLockTargetHash, &ReservedTargetTag,
1012                                 HASH_ENTER, NULL);
1013
1014
1015         /*
1016          * Allocate hash table for PREDICATELOCK structs.  This stores per
1017          * xact-lock-of-a-target information.
1018          */
1019         MemSet(&info, 0, sizeof(info));
1020         info.keysize = sizeof(PREDICATELOCKTAG);
1021         info.entrysize = sizeof(PREDICATELOCK);
1022         info.hash = predicatelock_hash;
1023         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1024         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
1025
1026         PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
1027                                                                           max_table_size,
1028                                                                           max_table_size,
1029                                                                           &info,
1030                                                                           hash_flags);
1031
1032         /*
1033          * Compute size for serializable transaction hashtable.
1034          * Note these calculations must agree with PredicateLockShmemSize!
1035          */
1036         max_table_size = (MaxBackends + max_prepared_xacts);
1037
1038         /*
1039          * Allocate a list to hold information on transactions participating in
1040          * predicate locking.
1041          *
1042          * Assume an average of 10 predicate locking transactions per backend.
1043          * This allows aggressive cleanup while detail is present before data must
1044          * be summarized for storage in SLRU and the "dummy" transaction.
1045          */
1046         max_table_size *= 10;
1047
1048         PredXact = ShmemInitStruct("PredXactList",
1049                                                            PredXactListDataSize,
1050                                                            &found);
1051         if (!found)
1052         {
1053                 int                     i;
1054
1055                 SHMQueueInit(&PredXact->availableList);
1056                 SHMQueueInit(&PredXact->activeList);
1057                 PredXact->SxactGlobalXmin = InvalidTransactionId;
1058                 PredXact->SxactGlobalXminCount = 0;
1059                 PredXact->WritableSxactCount = 0;
1060                 PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
1061                 PredXact->CanPartialClearThrough = 0;
1062                 PredXact->HavePartialClearedThrough = 0;
1063                 requestSize = mul_size((Size) max_table_size,
1064                                                            PredXactListElementDataSize);
1065                 PredXact->element = ShmemAlloc(requestSize);
1066                 if (PredXact->element == NULL)
1067                         ereport(ERROR,
1068                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1069                          errmsg("not enough shared memory for elements of data structure"
1070                                         " \"%s\" (%lu bytes requested)",
1071                                         "PredXactList", (unsigned long) requestSize)));
1072                 /* Add all elements to available list, clean. */
1073                 memset(PredXact->element, 0, requestSize);
1074                 for (i = 0; i < max_table_size; i++)
1075                 {
1076                         SHMQueueInsertBefore(&(PredXact->availableList),
1077                                                                  &(PredXact->element[i].link));
1078                 }
1079                 PredXact->OldCommittedSxact = CreatePredXact();
1080                 SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
1081                 PredXact->OldCommittedSxact->commitSeqNo = 0;
1082                 PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
1083                 SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
1084                 SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
1085                 SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
1086                 SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
1087                 SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
1088                 PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
1089                 PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
1090                 PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
1091                 PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
1092                 PredXact->OldCommittedSxact->pid = 0;
1093         }
1094         /* This never changes, so let's keep a local copy. */
1095         OldCommittedSxact = PredXact->OldCommittedSxact;
1096
1097         /*
1098          * Allocate hash table for SERIALIZABLEXID structs.  This stores per-xid
1099          * information for serializable transactions which have accessed data.
1100          */
1101         MemSet(&info, 0, sizeof(info));
1102         info.keysize = sizeof(SERIALIZABLEXIDTAG);
1103         info.entrysize = sizeof(SERIALIZABLEXID);
1104         info.hash = tag_hash;
1105         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
1106
1107         SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
1108                                                                                 max_table_size,
1109                                                                                 max_table_size,
1110                                                                                 &info,
1111                                                                                 hash_flags);
1112
1113         /*
1114          * Allocate space for tracking rw-conflicts in lists attached to the
1115          * transactions.
1116          *
1117          * Assume an average of 5 conflicts per transaction.  Calculations suggest
1118          * that this will prevent resource exhaustion in even the most pessimal
1119          * loads up to max_connections = 200 with all 200 connections pounding the
1120          * database with serializable transactions.  Beyond that, there may be
1121          * occassional transactions canceled when trying to flag conflicts. That's
1122          * probably OK.
1123          */
1124         max_table_size *= 5;
1125
1126         RWConflictPool = ShmemInitStruct("RWConflictPool",
1127                                                                          RWConflictPoolHeaderDataSize,
1128                                                                          &found);
1129         if (!found)
1130         {
1131                 int                     i;
1132
1133                 SHMQueueInit(&RWConflictPool->availableList);
1134                 requestSize = mul_size((Size) max_table_size,
1135                                                            RWConflictDataSize);
1136                 RWConflictPool->element = ShmemAlloc(requestSize);
1137                 if (RWConflictPool->element == NULL)
1138                         ereport(ERROR,
1139                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1140                          errmsg("not enough shared memory for elements of data structure"
1141                                         " \"%s\" (%lu bytes requested)",
1142                                         "RWConflictPool", (unsigned long) requestSize)));
1143                 /* Add all elements to available list, clean. */
1144                 memset(RWConflictPool->element, 0, requestSize);
1145                 for (i = 0; i < max_table_size; i++)
1146                 {
1147                         SHMQueueInsertBefore(&(RWConflictPool->availableList),
1148                                                                  &(RWConflictPool->element[i].outLink));
1149                 }
1150         }
1151
1152         /*
1153          * Create or attach to the header for the list of finished serializable
1154          * transactions.
1155          */
1156         FinishedSerializableTransactions = (SHM_QUEUE *)
1157                 ShmemInitStruct("FinishedSerializableTransactions",
1158                                                 sizeof(SHM_QUEUE),
1159                                                 &found);
1160         if (!found)
1161                 SHMQueueInit(FinishedSerializableTransactions);
1162
1163         /*
1164          * Initialize the SLRU storage for old committed serializable
1165          * transactions.
1166          */
1167         OldSerXidInit();
1168 }
1169
1170 /*
1171  * Estimate shared-memory space used for predicate lock table
1172  */
1173 Size
1174 PredicateLockShmemSize(void)
1175 {
1176         Size            size = 0;
1177         long            max_table_size;
1178
1179         /* predicate lock target hash table */
1180         max_table_size = NPREDICATELOCKTARGETENTS();
1181         size = add_size(size, hash_estimate_size(max_table_size,
1182                                                                                          sizeof(PREDICATELOCKTARGET)));
1183
1184         /* predicate lock hash table */
1185         max_table_size *= 2;
1186         size = add_size(size, hash_estimate_size(max_table_size,
1187                                                                                          sizeof(PREDICATELOCK)));
1188
1189         /*
1190          * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
1191          * margin.
1192          */
1193         size = add_size(size, size / 10);
1194
1195         /* transaction list */
1196         max_table_size = MaxBackends + max_prepared_xacts;
1197         max_table_size *= 10;
1198         size = add_size(size, PredXactListDataSize);
1199         size = add_size(size, mul_size((Size) max_table_size,
1200                                                                    PredXactListElementDataSize));
1201
1202         /* transaction xid table */
1203         size = add_size(size, hash_estimate_size(max_table_size,
1204                                                                                          sizeof(SERIALIZABLEXID)));
1205
1206         /* rw-conflict pool */
1207         max_table_size *= 5;
1208         size = add_size(size, RWConflictPoolHeaderDataSize);
1209         size = add_size(size, mul_size((Size) max_table_size,
1210                                                                    RWConflictDataSize));
1211
1212         /* Head for list of finished serializable transactions. */
1213         size = add_size(size, sizeof(SHM_QUEUE));
1214
1215         /* Shared memory structures for SLRU tracking of old committed xids. */
1216         size = add_size(size, sizeof(OldSerXidControlData));
1217         size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
1218
1219         return size;
1220 }
1221
1222
1223 /*
1224  * Compute the hash code associated with a PREDICATELOCKTAG.
1225  *
1226  * Because we want to use just one set of partition locks for both the
1227  * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
1228  * that PREDICATELOCKs fall into the same partition number as their
1229  * associated PREDICATELOCKTARGETs.  dynahash.c expects the partition number
1230  * to be the low-order bits of the hash code, and therefore a
1231  * PREDICATELOCKTAG's hash code must have the same low-order bits as the
1232  * associated PREDICATELOCKTARGETTAG's hash code.  We achieve this with this
1233  * specialized hash function.
1234  */
1235 static uint32
1236 predicatelock_hash(const void *key, Size keysize)
1237 {
1238         const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
1239         uint32          targethash;
1240
1241         Assert(keysize == sizeof(PREDICATELOCKTAG));
1242
1243         /* Look into the associated target object, and compute its hash code */
1244         targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
1245
1246         return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
1247 }
1248
1249
1250 /*
1251  * GetPredicateLockStatusData
1252  *              Return a table containing the internal state of the predicate
1253  *              lock manager for use in pg_lock_status.
1254  *
1255  * Like GetLockStatusData, this function tries to hold the partition LWLocks
1256  * for as short a time as possible by returning two arrays that simply
1257  * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
1258  * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
1259  * SERIALIZABLEXACT will likely appear.
1260  */
1261 PredicateLockData *
1262 GetPredicateLockStatusData(void)
1263 {
1264         PredicateLockData *data;
1265         int                     i;
1266         int                     els,
1267                                 el;
1268         HASH_SEQ_STATUS seqstat;
1269         PREDICATELOCK *predlock;
1270
1271         data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
1272
1273         /*
1274          * To ensure consistency, take simultaneous locks on all partition locks
1275          * in ascending order, then SerializableXactHashLock.
1276          */
1277         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
1278                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
1279         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
1280
1281         /* Get number of locks and allocate appropriately-sized arrays. */
1282         els = hash_get_num_entries(PredicateLockHash);
1283         data->nelements = els;
1284         data->locktags = (PREDICATELOCKTARGETTAG *)
1285                 palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
1286         data->xacts = (SERIALIZABLEXACT *)
1287                 palloc(sizeof(SERIALIZABLEXACT) * els);
1288
1289
1290         /* Scan through PredicateLockHash and copy contents */
1291         hash_seq_init(&seqstat, PredicateLockHash);
1292
1293         el = 0;
1294
1295         while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
1296         {
1297                 data->locktags[el] = predlock->tag.myTarget->tag;
1298                 data->xacts[el] = *predlock->tag.myXact;
1299                 el++;
1300         }
1301
1302         Assert(el == els);
1303
1304         /* Release locks in reverse order */
1305         LWLockRelease(SerializableXactHashLock);
1306         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
1307                 LWLockRelease(FirstPredicateLockMgrLock + i);
1308
1309         return data;
1310 }
1311
1312 /*
1313  * Free up shared memory structures by pushing the oldest sxact (the one at
1314  * the front of the SummarizeOldestCommittedSxact queue) into summary form.
1315  * Each call will free exactly one SERIALIZABLEXACT structure and may also
1316  * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
1317  * PREDICATELOCKTARGET, RWConflictData.
1318  */
1319 static void
1320 SummarizeOldestCommittedSxact(void)
1321 {
1322         SERIALIZABLEXACT *sxact;
1323
1324         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
1325
1326         /*
1327          * This function is only called if there are no sxact slots available.
1328          * Some of them must belong to old, already-finished transactions, so
1329          * there should be something in FinishedSerializableTransactions list that
1330          * we can summarize. However, there's a race condition: while we were not
1331          * holding any locks, a transaction might have ended and cleaned up all
1332          * the finished sxact entries already, freeing up their sxact slots. In
1333          * that case, we have nothing to do here. The caller will find one of the
1334          * slots released by the other backend when it retries.
1335          */
1336         if (SHMQueueEmpty(FinishedSerializableTransactions))
1337         {
1338                 LWLockRelease(SerializableFinishedListLock);
1339                 return;
1340         }
1341
1342         /*
1343          * Grab the first sxact off the finished list -- this will be the earliest
1344          * commit.      Remove it from the list.
1345          */
1346         sxact = (SERIALIZABLEXACT *)
1347                 SHMQueueNext(FinishedSerializableTransactions,
1348                                          FinishedSerializableTransactions,
1349                                          offsetof(SERIALIZABLEXACT, finishedLink));
1350         SHMQueueDelete(&(sxact->finishedLink));
1351
1352         /* Add to SLRU summary information. */
1353         if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
1354                 OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
1355                    ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
1356
1357         /* Summarize and release the detail. */
1358         ReleaseOneSerializableXact(sxact, false, true);
1359
1360         LWLockRelease(SerializableFinishedListLock);
1361 }
1362
1363 /*
1364  * GetSafeSnapshot
1365  *              Obtain and register a snapshot for a READ ONLY DEFERRABLE
1366  *              transaction. Ensures that the snapshot is "safe", i.e. a
1367  *              read-only transaction running on it can execute serializably
1368  *              without further checks. This requires waiting for concurrent
1369  *              transactions to complete, and retrying with a new snapshot if
1370  *              one of them could possibly create a conflict.
1371  */
1372 static Snapshot
1373 GetSafeSnapshot(Snapshot origSnapshot)
1374 {
1375         Snapshot        snapshot;
1376
1377         Assert(XactReadOnly && XactDeferrable);
1378
1379         while (true)
1380         {
1381                 /*
1382                  * RegisterSerializableTransactionInt is going to call
1383                  * GetSnapshotData, so we need to provide it the static snapshot our
1384                  * caller passed to us. It returns a copy of that snapshot and
1385                  * registers it on TopTransactionResourceOwner.
1386                  */
1387                 snapshot = RegisterSerializableTransactionInt(origSnapshot);
1388
1389                 if (MySerializableXact == InvalidSerializableXact)
1390                         return snapshot;        /* no concurrent r/w xacts; it's safe */
1391
1392                 MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
1393
1394                 /*
1395                  * Wait for concurrent transactions to finish. Stop early if one of
1396                  * them marked us as conflicted.
1397                  */
1398                 while (!(SHMQueueEmpty((SHM_QUEUE *)
1399                                                          &MySerializableXact->possibleUnsafeConflicts) ||
1400                                  SxactIsROUnsafe(MySerializableXact)))
1401                         ProcWaitForSignal();
1402
1403                 MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
1404                 if (!SxactIsROUnsafe(MySerializableXact))
1405                         break;                          /* success */
1406
1407                 /* else, need to retry... */
1408                 ereport(DEBUG2,
1409                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1410                                  errmsg("deferrable snapshot was unsafe; trying a new one")));
1411                 ReleasePredicateLocks(false);
1412                 UnregisterSnapshotFromOwner(snapshot,
1413                                                                         TopTransactionResourceOwner);
1414         }
1415
1416         /*
1417          * Now we have a safe snapshot, so we don't need to do any further checks.
1418          */
1419         Assert(SxactIsROSafe(MySerializableXact));
1420         ReleasePredicateLocks(false);
1421
1422         return snapshot;
1423 }
1424
1425 /*
1426  * Acquire and register a snapshot which can be used for this transaction..
1427  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
1428  * It should be current for this process and be contained in PredXact.
1429  */
1430 Snapshot
1431 RegisterSerializableTransaction(Snapshot snapshot)
1432 {
1433         Assert(IsolationIsSerializable());
1434
1435         /*
1436          * A special optimization is available for SERIALIZABLE READ ONLY
1437          * DEFERRABLE transactions -- we can wait for a suitable snapshot and
1438          * thereby avoid all SSI overhead once it's running..
1439          */
1440         if (XactReadOnly && XactDeferrable)
1441                 return GetSafeSnapshot(snapshot);
1442
1443         return RegisterSerializableTransactionInt(snapshot);
1444 }
1445
1446 static Snapshot
1447 RegisterSerializableTransactionInt(Snapshot snapshot)
1448 {
1449         PGPROC     *proc;
1450         VirtualTransactionId vxid;
1451         SERIALIZABLEXACT *sxact,
1452                            *othersxact;
1453         HASHCTL         hash_ctl;
1454
1455         /* We only do this for serializable transactions.  Once. */
1456         Assert(MySerializableXact == InvalidSerializableXact);
1457
1458         Assert(!RecoveryInProgress());
1459
1460         proc = MyProc;
1461         Assert(proc != NULL);
1462         GET_VXID_FROM_PGPROC(vxid, *proc);
1463
1464         /*
1465          * First we get the sxact structure, which may involve looping and access
1466          * to the "finished" list to free a structure for use.
1467          */
1468 #ifdef TEST_OLDSERXID
1469         SummarizeOldestCommittedSxact();
1470 #endif
1471         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1472         do
1473         {
1474                 sxact = CreatePredXact();
1475                 /* If null, push out committed sxact to SLRU summary & retry. */
1476                 if (!sxact)
1477                 {
1478                         LWLockRelease(SerializableXactHashLock);
1479                         SummarizeOldestCommittedSxact();
1480                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1481                 }
1482         } while (!sxact);
1483
1484         /* Get and register a snapshot */
1485         snapshot = GetSnapshotData(snapshot);
1486         snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
1487
1488         /*
1489          * If there are no serializable transactions which are not read-only, we
1490          * can "opt out" of predicate locking and conflict checking for a
1491          * read-only transaction.
1492          *
1493          * The reason this is safe is that a read-only transaction can only become
1494          * part of a dangerous structure if it overlaps a writable transaction
1495          * which in turn overlaps a writable transaction which committed before
1496          * the read-only transaction started.  A new writable transaction can
1497          * overlap this one, but it can't meet the other condition of overlapping
1498          * a transaction which committed before this one started.
1499          */
1500         if (XactReadOnly && PredXact->WritableSxactCount == 0)
1501         {
1502                 ReleasePredXact(sxact);
1503                 LWLockRelease(SerializableXactHashLock);
1504                 return snapshot;
1505         }
1506
1507         /* Maintain serializable global xmin info. */
1508         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
1509         {
1510                 Assert(PredXact->SxactGlobalXminCount == 0);
1511                 PredXact->SxactGlobalXmin = snapshot->xmin;
1512                 PredXact->SxactGlobalXminCount = 1;
1513                 OldSerXidSetActiveSerXmin(snapshot->xmin);
1514         }
1515         else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
1516         {
1517                 Assert(PredXact->SxactGlobalXminCount > 0);
1518                 PredXact->SxactGlobalXminCount++;
1519         }
1520         else
1521         {
1522                 Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
1523         }
1524
1525         /* Initialize the structure. */
1526         sxact->vxid = vxid;
1527         sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
1528         sxact->commitSeqNo = InvalidSerCommitSeqNo;
1529         SHMQueueInit(&(sxact->outConflicts));
1530         SHMQueueInit(&(sxact->inConflicts));
1531         SHMQueueInit(&(sxact->possibleUnsafeConflicts));
1532         sxact->topXid = GetTopTransactionIdIfAny();
1533         sxact->finishedBefore = InvalidTransactionId;
1534         sxact->xmin = snapshot->xmin;
1535         sxact->pid = MyProcPid;
1536         SHMQueueInit(&(sxact->predicateLocks));
1537         SHMQueueElemInit(&(sxact->finishedLink));
1538         sxact->flags = 0;
1539         if (XactReadOnly)
1540         {
1541                 sxact->flags |= SXACT_FLAG_READ_ONLY;
1542
1543                 /*
1544                  * Register all concurrent r/w transactions as possible conflicts; if
1545                  * all of them commit without any outgoing conflicts to earlier
1546                  * transactions then this snapshot can be deemed safe (and we can run
1547                  * without tracking predicate locks).
1548                  */
1549                 for (othersxact = FirstPredXact();
1550                          othersxact != NULL;
1551                          othersxact = NextPredXact(othersxact))
1552                 {
1553                         if (!SxactIsOnFinishedList(othersxact) &&
1554                                 !SxactIsReadOnly(othersxact))
1555                         {
1556                                 SetPossibleUnsafeConflict(sxact, othersxact);
1557                         }
1558                 }
1559         }
1560         else
1561         {
1562                 ++(PredXact->WritableSxactCount);
1563                 Assert(PredXact->WritableSxactCount <=
1564                            (MaxBackends + max_prepared_xacts));
1565         }
1566
1567         MySerializableXact = sxact;
1568
1569         LWLockRelease(SerializableXactHashLock);
1570
1571         /* Initialize the backend-local hash table of parent locks */
1572         Assert(LocalPredicateLockHash == NULL);
1573         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
1574         hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
1575         hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
1576         hash_ctl.hash = tag_hash;
1577         LocalPredicateLockHash = hash_create("Local predicate lock",
1578                                                                                  max_predicate_locks_per_xact,
1579                                                                                  &hash_ctl,
1580                                                                                  HASH_ELEM | HASH_FUNCTION);
1581
1582         return snapshot;
1583 }
1584
1585 /*
1586  * Register the top level XID in SerializableXidHash.
1587  * Also store it for easy reference in MySerializableXact.
1588  */
1589 void
1590 RegisterPredicateLockingXid(const TransactionId xid)
1591 {
1592         SERIALIZABLEXIDTAG sxidtag;
1593         SERIALIZABLEXID *sxid;
1594         bool            found;
1595
1596         /*
1597          * If we're not tracking predicate lock data for this transaction, we
1598          * should ignore the request and return quickly.
1599          */
1600         if (MySerializableXact == InvalidSerializableXact)
1601                 return;
1602
1603         /* This should only be done once per transaction. */
1604         Assert(MySerializableXact->topXid == InvalidTransactionId);
1605
1606         /* We should have a valid XID and be at the top level. */
1607         Assert(TransactionIdIsValid(xid));
1608
1609         MySerializableXact->topXid = xid;
1610
1611         sxidtag.xid = xid;
1612         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1613         sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
1614                                                                                    &sxidtag,
1615                                                                                    HASH_ENTER, &found);
1616         Assert(sxid != NULL);
1617         Assert(!found);
1618
1619         /* Initialize the structure. */
1620         sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
1621         LWLockRelease(SerializableXactHashLock);
1622 }
1623
1624
1625 /*
1626  * Check whether there are any predicate locks held by any transaction
1627  * for the page at the given block number.
1628  *
1629  * Note that the transaction may be completed but not yet subject to
1630  * cleanup due to overlapping serializable transactions.  This must
1631  * return valid information regardless of transaction isolation level.
1632  *
1633  * Also note that this doesn't check for a conflicting relation lock,
1634  * just a lock specifically on the given page.
1635  *
1636  * One use is to support proper behavior during GiST index vacuum.
1637  */
1638 bool
1639 PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
1640 {
1641         PREDICATELOCKTARGETTAG targettag;
1642         uint32          targettaghash;
1643         LWLockId        partitionLock;
1644         PREDICATELOCKTARGET *target;
1645
1646         SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
1647                                                                         relation->rd_node.dbNode,
1648                                                                         relation->rd_id,
1649                                                                         blkno);
1650
1651         targettaghash = PredicateLockTargetTagHashCode(&targettag);
1652         partitionLock = PredicateLockHashPartitionLock(targettaghash);
1653         LWLockAcquire(partitionLock, LW_SHARED);
1654         target = (PREDICATELOCKTARGET *)
1655                 hash_search_with_hash_value(PredicateLockTargetHash,
1656                                                                         &targettag, targettaghash,
1657                                                                         HASH_FIND, NULL);
1658         LWLockRelease(partitionLock);
1659
1660         return (target != NULL);
1661 }
1662
1663
1664 /*
1665  * Check whether a particular lock is held by this transaction.
1666  *
1667  * Important note: this function may return false even if the lock is
1668  * being held, because it uses the local lock table which is not
1669  * updated if another transaction modifies our lock list (e.g. to
1670  * split an index page). It can also return true when a coarser
1671  * granularity lock that covers this target is being held. Be careful
1672  * to only use this function in circumstances where such errors are
1673  * acceptable!
1674  */
1675 static bool
1676 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
1677 {
1678         LOCALPREDICATELOCK *lock;
1679
1680         /* check local hash table */
1681         lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1682                                                                                           targettag,
1683                                                                                           HASH_FIND, NULL);
1684
1685         if (!lock)
1686                 return false;
1687
1688         /*
1689          * Found entry in the table, but still need to check whether it's actually
1690          * held -- it could just be a parent of some held lock.
1691          */
1692         return lock->held;
1693 }
1694
1695 /*
1696  * Return the parent lock tag in the lock hierarchy: the next coarser
1697  * lock that covers the provided tag.
1698  *
1699  * Returns true and sets *parent to the parent tag if one exists,
1700  * returns false if none exists.
1701  */
1702 static bool
1703 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
1704                                                   PREDICATELOCKTARGETTAG *parent)
1705 {
1706         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
1707         {
1708                 case PREDLOCKTAG_RELATION:
1709                         /* relation locks have no parent lock */
1710                         return false;
1711
1712                 case PREDLOCKTAG_PAGE:
1713                         /* parent lock is relation lock */
1714                         SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
1715                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1716                                                                   GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
1717
1718                         return true;
1719
1720                 case PREDLOCKTAG_TUPLE:
1721                         /* parent lock is page lock */
1722                         SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
1723                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1724                                                                    GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
1725                                                                           GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
1726                         return true;
1727         }
1728
1729         /* not reachable */
1730         Assert(false);
1731         return false;
1732 }
1733
1734 /*
1735  * Check whether the lock we are considering is already covered by a
1736  * coarser lock for our transaction.
1737  *
1738  * Like PredicateLockExists, this function might return a false
1739  * negative, but it will never return a false positive.
1740  */
1741 static bool
1742 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
1743 {
1744         PREDICATELOCKTARGETTAG targettag,
1745                                 parenttag;
1746
1747         targettag = *newtargettag;
1748
1749         /* check parents iteratively until no more */
1750         while (GetParentPredicateLockTag(&targettag, &parenttag))
1751         {
1752                 targettag = parenttag;
1753                 if (PredicateLockExists(&targettag))
1754                         return true;
1755         }
1756
1757         /* no more parents to check; lock is not covered */
1758         return false;
1759 }
1760
1761 /*
1762  * Check whether the list of related predicate locks is empty for a
1763  * predicate lock target, and remove the target if it is.
1764  */
1765 static void
1766 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
1767 {
1768         PREDICATELOCKTARGET *rmtarget;
1769
1770         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1771
1772         /* Can't remove it until no locks at this target. */
1773         if (!SHMQueueEmpty(&target->predicateLocks))
1774                 return;
1775
1776         /* Actually remove the target. */
1777         rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
1778                                                                                    &target->tag,
1779                                                                                    targettaghash,
1780                                                                                    HASH_REMOVE, NULL);
1781         Assert(rmtarget == target);
1782 }
1783
1784 /*
1785  * Delete child target locks owned by this process.
1786  * This implementation is assuming that the usage of each target tag field
1787  * is uniform.  No need to make this hard if we don't have to.
1788  *
1789  * We aren't acquiring lightweight locks for the predicate lock or lock
1790  * target structures associated with this transaction unless we're going
1791  * to modify them, because no other process is permitted to modify our
1792  * locks.
1793  */
1794 static void
1795 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
1796 {
1797         SERIALIZABLEXACT *sxact;
1798         PREDICATELOCK *predlock;
1799
1800         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
1801         sxact = (SERIALIZABLEXACT *) MySerializableXact;
1802         predlock = (PREDICATELOCK *)
1803                 SHMQueueNext(&(sxact->predicateLocks),
1804                                          &(sxact->predicateLocks),
1805                                          offsetof(PREDICATELOCK, xactLink));
1806         while (predlock)
1807         {
1808                 SHM_QUEUE  *predlocksxactlink;
1809                 PREDICATELOCK *nextpredlock;
1810                 PREDICATELOCKTAG oldlocktag;
1811                 PREDICATELOCKTARGET *oldtarget;
1812                 PREDICATELOCKTARGETTAG oldtargettag;
1813
1814                 predlocksxactlink = &(predlock->xactLink);
1815                 nextpredlock = (PREDICATELOCK *)
1816                         SHMQueueNext(&(sxact->predicateLocks),
1817                                                  predlocksxactlink,
1818                                                  offsetof(PREDICATELOCK, xactLink));
1819
1820                 oldlocktag = predlock->tag;
1821                 Assert(oldlocktag.myXact == sxact);
1822                 oldtarget = oldlocktag.myTarget;
1823                 oldtargettag = oldtarget->tag;
1824
1825                 if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
1826                 {
1827                         uint32          oldtargettaghash;
1828                         LWLockId        partitionLock;
1829                         PREDICATELOCK *rmpredlock;
1830
1831                         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
1832                         partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
1833
1834                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1835
1836                         SHMQueueDelete(predlocksxactlink);
1837                         SHMQueueDelete(&(predlock->targetLink));
1838                         rmpredlock = hash_search_with_hash_value
1839                                 (PredicateLockHash,
1840                                  &oldlocktag,
1841                                  PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
1842                                                                                                                  oldtargettaghash),
1843                                  HASH_REMOVE, NULL);
1844                         Assert(rmpredlock == predlock);
1845
1846                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
1847
1848                         LWLockRelease(partitionLock);
1849
1850                         DecrementParentLocks(&oldtargettag);
1851                 }
1852
1853                 predlock = nextpredlock;
1854         }
1855         LWLockRelease(SerializablePredicateLockListLock);
1856 }
1857
1858 /*
1859  * Returns the promotion threshold for a given predicate lock
1860  * target. This is the number of descendant locks required to promote
1861  * to the specified tag. Note that the threshold includes non-direct
1862  * descendants, e.g. both tuples and pages for a relation lock.
1863  *
1864  * TODO SSI: We should do something more intelligent about what the
1865  * thresholds are, either making it proportional to the number of
1866  * tuples in a page & pages in a relation, or at least making it a
1867  * GUC. Currently the threshold is 3 for a page lock, and
1868  * max_pred_locks_per_transaction/2 for a relation lock, chosen
1869  * entirely arbitrarily (and without benchmarking).
1870  */
1871 static int
1872 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
1873 {
1874         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
1875         {
1876                 case PREDLOCKTAG_RELATION:
1877                         return max_predicate_locks_per_xact / 2;
1878
1879                 case PREDLOCKTAG_PAGE:
1880                         return 3;
1881
1882                 case PREDLOCKTAG_TUPLE:
1883
1884                         /*
1885                          * not reachable: nothing is finer-granularity than a tuple, so we
1886                          * should never try to promote to it.
1887                          */
1888                         Assert(false);
1889                         return 0;
1890         }
1891
1892         /* not reachable */
1893         Assert(false);
1894         return 0;
1895 }
1896
1897 /*
1898  * For all ancestors of a newly-acquired predicate lock, increment
1899  * their child count in the parent hash table. If any of them have
1900  * more descendants than their promotion threshold, acquire the
1901  * coarsest such lock.
1902  *
1903  * Returns true if a parent lock was acquired and false otherwise.
1904  */
1905 static bool
1906 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
1907 {
1908         PREDICATELOCKTARGETTAG targettag,
1909                                 nexttag,
1910                                 promotiontag;
1911         LOCALPREDICATELOCK *parentlock;
1912         bool            found,
1913                                 promote;
1914
1915         promote = false;
1916
1917         targettag = *reqtag;
1918
1919         /* check parents iteratively */
1920         while (GetParentPredicateLockTag(&targettag, &nexttag))
1921         {
1922                 targettag = nexttag;
1923                 parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1924                                                                                                                 &targettag,
1925                                                                                                                 HASH_ENTER,
1926                                                                                                                 &found);
1927                 if (!found)
1928                 {
1929                         parentlock->held = false;
1930                         parentlock->childLocks = 1;
1931                 }
1932                 else
1933                         parentlock->childLocks++;
1934
1935                 if (parentlock->childLocks >=
1936                         PredicateLockPromotionThreshold(&targettag))
1937                 {
1938                         /*
1939                          * We should promote to this parent lock. Continue to check its
1940                          * ancestors, however, both to get their child counts right and to
1941                          * check whether we should just go ahead and promote to one of
1942                          * them.
1943                          */
1944                         promotiontag = targettag;
1945                         promote = true;
1946                 }
1947         }
1948
1949         if (promote)
1950         {
1951                 /* acquire coarsest ancestor eligible for promotion */
1952                 PredicateLockAcquire(&promotiontag);
1953                 return true;
1954         }
1955         else
1956                 return false;
1957 }
1958
1959 /*
1960  * When releasing a lock, decrement the child count on all ancestor
1961  * locks.
1962  *
1963  * This is called only when releasing a lock via
1964  * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
1965  * we've acquired its parent, possibly due to promotion) or when a new
1966  * MVCC write lock makes the predicate lock unnecessary. There's no
1967  * point in calling it when locks are released at transaction end, as
1968  * this information is no longer needed.
1969  */
1970 static void
1971 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
1972 {
1973         PREDICATELOCKTARGETTAG parenttag,
1974                                 nexttag;
1975
1976         parenttag = *targettag;
1977
1978         while (GetParentPredicateLockTag(&parenttag, &nexttag))
1979         {
1980                 uint32          targettaghash;
1981                 LOCALPREDICATELOCK *parentlock,
1982                                    *rmlock;
1983
1984                 parenttag = nexttag;
1985                 targettaghash = PredicateLockTargetTagHashCode(&parenttag);
1986                 parentlock = (LOCALPREDICATELOCK *)
1987                         hash_search_with_hash_value(LocalPredicateLockHash,
1988                                                                                 &parenttag, targettaghash,
1989                                                                                 HASH_FIND, NULL);
1990
1991                 /*
1992                  * There's a small chance the parent lock doesn't exist in the lock
1993                  * table. This can happen if we prematurely removed it because an
1994                  * index split caused the child refcount to be off.
1995                  */
1996                 if (parentlock == NULL)
1997                         continue;
1998
1999                 parentlock->childLocks--;
2000
2001                 /*
2002                  * Under similar circumstances the parent lock's refcount might be
2003                  * zero. This only happens if we're holding that lock (otherwise we
2004                  * would have removed the entry).
2005                  */
2006                 if (parentlock->childLocks < 0)
2007                 {
2008                         Assert(parentlock->held);
2009                         parentlock->childLocks = 0;
2010                 }
2011
2012                 if ((parentlock->childLocks == 0) && (!parentlock->held))
2013                 {
2014                         rmlock = (LOCALPREDICATELOCK *)
2015                                 hash_search_with_hash_value(LocalPredicateLockHash,
2016                                                                                         &parenttag, targettaghash,
2017                                                                                         HASH_REMOVE, NULL);
2018                         Assert(rmlock == parentlock);
2019                 }
2020         }
2021 }
2022
2023 /*
2024  * Indicate that a predicate lock on the given target is held by the
2025  * specified transaction. Has no effect if the lock is already held.
2026  *
2027  * This updates the lock table and the sxact's lock list, and creates
2028  * the lock target if necessary, but does *not* do anything related to
2029  * granularity promotion or the local lock table. See
2030  * PredicateLockAcquire for that.
2031  */
2032 static void
2033 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
2034                                         uint32 targettaghash,
2035                                         SERIALIZABLEXACT *sxact)
2036 {
2037         PREDICATELOCKTARGET *target;
2038         PREDICATELOCKTAG locktag;
2039         PREDICATELOCK *lock;
2040         LWLockId        partitionLock;
2041         bool            found;
2042
2043         partitionLock = PredicateLockHashPartitionLock(targettaghash);
2044
2045         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2046         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2047
2048         /* Make sure that the target is represented. */
2049         target = (PREDICATELOCKTARGET *)
2050                 hash_search_with_hash_value(PredicateLockTargetHash,
2051                                                                         targettag, targettaghash,
2052                                                                         HASH_ENTER_NULL, &found);
2053         if (!target)
2054                 ereport(ERROR,
2055                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2056                                  errmsg("out of shared memory"),
2057                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2058         if (!found)
2059                 SHMQueueInit(&(target->predicateLocks));
2060
2061         /* We've got the sxact and target, make sure they're joined. */
2062         locktag.myTarget = target;
2063         locktag.myXact = sxact;
2064         lock = (PREDICATELOCK *)
2065                 hash_search_with_hash_value(PredicateLockHash, &locktag,
2066                         PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
2067                                                                         HASH_ENTER_NULL, &found);
2068         if (!lock)
2069                 ereport(ERROR,
2070                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2071                                  errmsg("out of shared memory"),
2072                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2073
2074         if (!found)
2075         {
2076                 SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
2077                 SHMQueueInsertBefore(&(sxact->predicateLocks),
2078                                                          &(lock->xactLink));
2079                 lock->commitSeqNo = InvalidSerCommitSeqNo;
2080         }
2081
2082         LWLockRelease(partitionLock);
2083         LWLockRelease(SerializablePredicateLockListLock);
2084 }
2085
2086 /*
2087  * Acquire a predicate lock on the specified target for the current
2088  * connection if not already held. This updates the local lock table
2089  * and uses it to implement granularity promotion. It will consolidate
2090  * multiple locks into a coarser lock if warranted, and will release
2091  * any finer-grained locks covered by the new one.
2092  */
2093 static void
2094 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
2095 {
2096         uint32          targettaghash;
2097         bool            found;
2098         LOCALPREDICATELOCK *locallock;
2099
2100         /* Do we have the lock already, or a covering lock? */
2101         if (PredicateLockExists(targettag))
2102                 return;
2103
2104         if (CoarserLockCovers(targettag))
2105                 return;
2106
2107         /* the same hash and LW lock apply to the lock target and the local lock. */
2108         targettaghash = PredicateLockTargetTagHashCode(targettag);
2109
2110         /* Acquire lock in local table */
2111         locallock = (LOCALPREDICATELOCK *)
2112                 hash_search_with_hash_value(LocalPredicateLockHash,
2113                                                                         targettag, targettaghash,
2114                                                                         HASH_ENTER, &found);
2115         locallock->held = true;
2116         if (!found)
2117                 locallock->childLocks = 0;
2118
2119         /* Actually create the lock */
2120         CreatePredicateLock(targettag, targettaghash,
2121                                                 (SERIALIZABLEXACT *) MySerializableXact);
2122
2123         /*
2124          * Lock has been acquired. Check whether it should be promoted to a
2125          * coarser granularity, or whether there are finer-granularity locks to
2126          * clean up.
2127          */
2128         if (CheckAndPromotePredicateLockRequest(targettag))
2129         {
2130                 /*
2131                  * Lock request was promoted to a coarser-granularity lock, and that
2132                  * lock was acquired. It will delete this lock and any of its
2133                  * children, so we're done.
2134                  */
2135         }
2136         else
2137         {
2138                 /* Clean up any finer-granularity locks */
2139                 if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
2140                         DeleteChildTargetLocks(targettag);
2141         }
2142 }
2143
2144
2145 /*
2146  *              PredicateLockRelation
2147  *
2148  * Gets a predicate lock at the relation level.
2149  * Skip if not in full serializable transaction isolation level.
2150  * Skip if this is a temporary table.
2151  * Clear any finer-grained predicate locks this session has on the relation.
2152  */
2153 void
2154 PredicateLockRelation(const Relation relation)
2155 {
2156         PREDICATELOCKTARGETTAG tag;
2157
2158         if (SkipSerialization(relation))
2159                 return;
2160
2161         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2162                                                                                 relation->rd_node.dbNode,
2163                                                                                 relation->rd_id);
2164         PredicateLockAcquire(&tag);
2165 }
2166
2167 /*
2168  *              PredicateLockPage
2169  *
2170  * Gets a predicate lock at the page level.
2171  * Skip if not in full serializable transaction isolation level.
2172  * Skip if this is a temporary table.
2173  * Skip if a coarser predicate lock already covers this page.
2174  * Clear any finer-grained predicate locks this session has on the relation.
2175  */
2176 void
2177 PredicateLockPage(const Relation relation, const BlockNumber blkno)
2178 {
2179         PREDICATELOCKTARGETTAG tag;
2180
2181         if (SkipSerialization(relation))
2182                 return;
2183
2184         SET_PREDICATELOCKTARGETTAG_PAGE(tag,
2185                                                                         relation->rd_node.dbNode,
2186                                                                         relation->rd_id,
2187                                                                         blkno);
2188         PredicateLockAcquire(&tag);
2189 }
2190
2191 /*
2192  *              PredicateLockTuple
2193  *
2194  * Gets a predicate lock at the tuple level.
2195  * Skip if not in full serializable transaction isolation level.
2196  * Skip if this is a temporary table.
2197  */
2198 void
2199 PredicateLockTuple(const Relation relation, const HeapTuple tuple)
2200 {
2201         PREDICATELOCKTARGETTAG tag;
2202         ItemPointer tid;
2203         TransactionId targetxmin;
2204
2205         if (SkipSerialization(relation))
2206                 return;
2207
2208         /*
2209          * If it's a heap tuple, return if this xact wrote it.
2210          */
2211         if (relation->rd_index == NULL)
2212         {
2213                 TransactionId myxid;
2214
2215                 targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
2216
2217                 myxid = GetTopTransactionIdIfAny();
2218                 if (TransactionIdIsValid(myxid))
2219                 {
2220                         if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
2221                         {
2222                                 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
2223
2224                                 if (TransactionIdEquals(xid, myxid))
2225                                 {
2226                                         /* We wrote it; we already have a write lock. */
2227                                         return;
2228                                 }
2229                         }
2230                 }
2231         }
2232         else
2233                 targetxmin = InvalidTransactionId;
2234
2235         /*
2236          * Do quick-but-not-definitive test for a relation lock first.  This will
2237          * never cause a return when the relation is *not* locked, but will
2238          * occasionally let the check continue when there really *is* a relation
2239          * level lock.
2240          */
2241         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2242                                                                                 relation->rd_node.dbNode,
2243                                                                                 relation->rd_id);
2244         if (PredicateLockExists(&tag))
2245                 return;
2246
2247         tid = &(tuple->t_self);
2248         SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
2249                                                                          relation->rd_node.dbNode,
2250                                                                          relation->rd_id,
2251                                                                          ItemPointerGetBlockNumber(tid),
2252                                                                          ItemPointerGetOffsetNumber(tid),
2253                                                                          targetxmin);
2254         PredicateLockAcquire(&tag);
2255 }
2256
2257
2258 /*
2259  *              DeleteLockTarget
2260  *
2261  * Remove a predicate lock target along with any locks held for it.
2262  *
2263  * Caller must hold SerializablePredicateLockListLock and the
2264  * appropriate hash partition lock for the target.
2265  */
2266 static void
2267 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
2268 {
2269         PREDICATELOCK *predlock;
2270         SHM_QUEUE  *predlocktargetlink;
2271         PREDICATELOCK *nextpredlock;
2272         bool            found;
2273
2274         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2275         Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
2276
2277         predlock = (PREDICATELOCK *)
2278                 SHMQueueNext(&(target->predicateLocks),
2279                                          &(target->predicateLocks),
2280                                          offsetof(PREDICATELOCK, targetLink));
2281         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2282         while (predlock)
2283         {
2284                 predlocktargetlink = &(predlock->targetLink);
2285                 nextpredlock = (PREDICATELOCK *)
2286                         SHMQueueNext(&(target->predicateLocks),
2287                                                  predlocktargetlink,
2288                                                  offsetof(PREDICATELOCK, targetLink));
2289
2290                 SHMQueueDelete(&(predlock->xactLink));
2291                 SHMQueueDelete(&(predlock->targetLink));
2292
2293                 hash_search_with_hash_value
2294                         (PredicateLockHash,
2295                          &predlock->tag,
2296                          PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
2297                                                                                                          targettaghash),
2298                          HASH_REMOVE, &found);
2299                 Assert(found);
2300
2301                 predlock = nextpredlock;
2302         }
2303         LWLockRelease(SerializableXactHashLock);
2304
2305         /* Remove the target itself, if possible. */
2306         RemoveTargetIfNoLongerUsed(target, targettaghash);
2307 }
2308
2309
2310 /*
2311  *              TransferPredicateLocksToNewTarget
2312  *
2313  * Move or copy all the predicate locks for a lock target, for use by
2314  * index page splits/combines and other things that create or replace
2315  * lock targets. If 'removeOld' is true, the old locks and the target
2316  * will be removed.
2317  *
2318  * Returns true on success, or false if we ran out of shared memory to
2319  * allocate the new target or locks. Guaranteed to always succeed if
2320  * removeOld is set (by using the reserved entry in
2321  * PredicateLockTargetHash for scratch space).
2322  *
2323  * Warning: the "removeOld" option should be used only with care,
2324  * because this function does not (indeed, can not) update other
2325  * backends' LocalPredicateLockHash. If we are only adding new
2326  * entries, this is not a problem: the local lock table is used only
2327  * as a hint, so missing entries for locks that are held are
2328  * OK. Having entries for locks that are no longer held, as can happen
2329  * when using "removeOld", is not in general OK. We can only use it
2330  * safely when replacing a lock with a coarser-granularity lock that
2331  * covers it, or if we are absolutely certain that no one will need to
2332  * refer to that lock in the future.
2333  *
2334  * Caller must hold SerializablePredicateLockListLock.
2335  */
2336 static bool
2337 TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
2338                                                                   const PREDICATELOCKTARGETTAG newtargettag,
2339                                                                   bool removeOld)
2340 {
2341         uint32          oldtargettaghash;
2342         LWLockId        oldpartitionLock;
2343         PREDICATELOCKTARGET *oldtarget;
2344         uint32          newtargettaghash;
2345         LWLockId        newpartitionLock;
2346         bool            found;
2347         bool            outOfShmem = false;
2348         uint32          reservedtargettaghash;
2349         LWLockId        reservedpartitionLock;
2350
2351
2352         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2353
2354         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2355         newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
2356         oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2357         newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
2358
2359         reservedtargettaghash = 0;      /* Quiet compiler warnings. */
2360         reservedpartitionLock = 0;      /* Quiet compiler warnings. */
2361
2362         if (removeOld)
2363         {
2364                 /*
2365                  * Remove the reserved entry to give us scratch space, so we know
2366                  * we'll be able to create the new lock target.
2367                  */
2368                 reservedtargettaghash = PredicateLockTargetTagHashCode(&ReservedTargetTag);
2369                 reservedpartitionLock = PredicateLockHashPartitionLock(reservedtargettaghash);
2370                 LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
2371                 hash_search_with_hash_value(PredicateLockTargetHash,
2372                                                                         &ReservedTargetTag,
2373                                                                         reservedtargettaghash,
2374                                                                         HASH_REMOVE, &found);
2375                 Assert(found);
2376                 LWLockRelease(reservedpartitionLock);
2377         }
2378
2379         /*
2380          * We must get the partition locks in ascending sequence to avoid
2381          * deadlocks. If old and new partitions are the same, we must request the
2382          * lock only once.
2383          */
2384         if (oldpartitionLock < newpartitionLock)
2385         {
2386                 LWLockAcquire(oldpartitionLock,
2387                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2388                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2389         }
2390         else if (oldpartitionLock > newpartitionLock)
2391         {
2392                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2393                 LWLockAcquire(oldpartitionLock,
2394                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2395         }
2396         else
2397                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2398
2399         /*
2400          * Look for the old target.  If not found, that's OK; no predicate locks
2401          * are affected, so we can just clean up and return. If it does exist,
2402          * walk its list of predicate locks and move or copy them to the new
2403          * target.
2404          */
2405         oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2406                                                                                         &oldtargettag,
2407                                                                                         oldtargettaghash,
2408                                                                                         HASH_FIND, NULL);
2409
2410         if (oldtarget)
2411         {
2412                 PREDICATELOCKTARGET *newtarget;
2413                 PREDICATELOCK *oldpredlock;
2414                 PREDICATELOCKTAG newpredlocktag;
2415
2416                 newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2417                                                                                                 &newtargettag,
2418                                                                                                 newtargettaghash,
2419                                                                                                 HASH_ENTER_NULL, &found);
2420
2421                 if (!newtarget)
2422                 {
2423                         /* Failed to allocate due to insufficient shmem */
2424                         outOfShmem = true;
2425                         goto exit;
2426                 }
2427
2428                 /* If we created a new entry, initialize it */
2429                 if (!found)
2430                         SHMQueueInit(&(newtarget->predicateLocks));
2431
2432                 newpredlocktag.myTarget = newtarget;
2433
2434                 oldpredlock = (PREDICATELOCK *)
2435                         SHMQueueNext(&(oldtarget->predicateLocks),
2436                                                  &(oldtarget->predicateLocks),
2437                                                  offsetof(PREDICATELOCK, targetLink));
2438                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2439                 while (oldpredlock)
2440                 {
2441                         SHM_QUEUE  *predlocktargetlink;
2442                         PREDICATELOCK *nextpredlock;
2443                         PREDICATELOCK *newpredlock;
2444                         SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
2445
2446                         predlocktargetlink = &(oldpredlock->targetLink);
2447                         nextpredlock = (PREDICATELOCK *)
2448                                 SHMQueueNext(&(oldtarget->predicateLocks),
2449                                                          predlocktargetlink,
2450                                                          offsetof(PREDICATELOCK, targetLink));
2451                         newpredlocktag.myXact = oldpredlock->tag.myXact;
2452
2453                         if (removeOld)
2454                         {
2455                                 SHMQueueDelete(&(oldpredlock->xactLink));
2456                                 SHMQueueDelete(&(oldpredlock->targetLink));
2457
2458                                 hash_search_with_hash_value
2459                                         (PredicateLockHash,
2460                                          &oldpredlock->tag,
2461                                    PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
2462                                                                                                                    oldtargettaghash),
2463                                          HASH_REMOVE, &found);
2464                                 Assert(found);
2465                         }
2466
2467
2468                         newpredlock = (PREDICATELOCK *)
2469                                 hash_search_with_hash_value
2470                                 (PredicateLockHash,
2471                                  &newpredlocktag,
2472                                  PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2473                                                                                                                  newtargettaghash),
2474                                  HASH_ENTER_NULL, &found);
2475                         if (!newpredlock)
2476                         {
2477                                 /* Out of shared memory. Undo what we've done so far. */
2478                                 LWLockRelease(SerializableXactHashLock);
2479                                 DeleteLockTarget(newtarget, newtargettaghash);
2480                                 outOfShmem = true;
2481                                 goto exit;
2482                         }
2483                         if (!found)
2484                         {
2485                                 SHMQueueInsertBefore(&(newtarget->predicateLocks),
2486                                                                          &(newpredlock->targetLink));
2487                                 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2488                                                                          &(newpredlock->xactLink));
2489                                 newpredlock->commitSeqNo = oldCommitSeqNo;
2490                         }
2491                         else
2492                         {
2493                                 if (newpredlock->commitSeqNo < oldCommitSeqNo)
2494                                         newpredlock->commitSeqNo = oldCommitSeqNo;
2495                         }
2496
2497                         Assert(newpredlock->commitSeqNo != 0);
2498                         Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2499                                    || (newpredlock->tag.myXact == OldCommittedSxact));
2500
2501                         oldpredlock = nextpredlock;
2502                 }
2503                 LWLockRelease(SerializableXactHashLock);
2504
2505                 if (removeOld)
2506                 {
2507                         Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
2508                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2509                 }
2510         }
2511
2512
2513 exit:
2514         /* Release partition locks in reverse order of acquisition. */
2515         if (oldpartitionLock < newpartitionLock)
2516         {
2517                 LWLockRelease(newpartitionLock);
2518                 LWLockRelease(oldpartitionLock);
2519         }
2520         else if (oldpartitionLock > newpartitionLock)
2521         {
2522                 LWLockRelease(oldpartitionLock);
2523                 LWLockRelease(newpartitionLock);
2524         }
2525         else
2526                 LWLockRelease(newpartitionLock);
2527
2528         if (removeOld)
2529         {
2530                 /* We shouldn't run out of memory if we're moving locks */
2531                 Assert(!outOfShmem);
2532
2533                 /* Put the reserved entry back */
2534                 LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
2535                 hash_search_with_hash_value(PredicateLockTargetHash,
2536                                                                         &ReservedTargetTag,
2537                                                                         reservedtargettaghash,
2538                                                                         HASH_ENTER, &found);
2539                 Assert(!found);
2540                 LWLockRelease(reservedpartitionLock);
2541         }
2542
2543         return !outOfShmem;
2544 }
2545
2546
2547 /*
2548  *              PredicateLockPageSplit
2549  *
2550  * Copies any predicate locks for the old page to the new page.
2551  * Skip if this is a temporary table or toast table.
2552  *
2553  * NOTE: A page split (or overflow) affects all serializable transactions,
2554  * even if it occurs in the context of another transaction isolation level.
2555  *
2556  * NOTE: This currently leaves the local copy of the locks without
2557  * information on the new lock which is in shared memory.  This could cause
2558  * problems if enough page splits occur on locked pages without the processes
2559  * which hold the locks getting in and noticing.
2560  */
2561 void
2562 PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
2563                                            const BlockNumber newblkno)
2564 {
2565         PREDICATELOCKTARGETTAG oldtargettag;
2566         PREDICATELOCKTARGETTAG newtargettag;
2567         bool            success;
2568
2569         /*
2570          * Bail out quickly if there are no serializable transactions
2571          * running.
2572          *
2573          * It's safe to do this check without taking any additional
2574          * locks. Even if a serializable transaction starts concurrently,
2575          * we know it can't take any SIREAD locks on the page being split
2576          * because the caller is holding the associated buffer page lock.
2577          * Memory reordering isn't an issue; the memory barrier in the
2578          * LWLock acquisition guarantees that this read occurs while the
2579          * buffer page lock is held.
2580          */
2581         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2582                 return;
2583
2584         if (SkipSplitTracking(relation))
2585                 return;
2586
2587         Assert(oldblkno != newblkno);
2588         Assert(BlockNumberIsValid(oldblkno));
2589         Assert(BlockNumberIsValid(newblkno));
2590
2591         SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
2592                                                                         relation->rd_node.dbNode,
2593                                                                         relation->rd_id,
2594                                                                         oldblkno);
2595         SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
2596                                                                         relation->rd_node.dbNode,
2597                                                                         relation->rd_id,
2598                                                                         newblkno);
2599
2600         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2601
2602         /*
2603          * Try copying the locks over to the new page's tag, creating it if
2604          * necessary.
2605          */
2606         success = TransferPredicateLocksToNewTarget(oldtargettag,
2607                                                                                                 newtargettag,
2608                                                                                                 false);
2609
2610         if (!success)
2611         {
2612                 /*
2613                  * No more predicate lock entries are available. Failure isn't an
2614                  * option here, so promote the page lock to a relation lock.
2615                  */
2616
2617                 /* Get the parent relation lock's lock tag */
2618                 success = GetParentPredicateLockTag(&oldtargettag,
2619                                                                                         &newtargettag);
2620                 Assert(success);
2621
2622                 /*
2623                  * Move the locks to the parent. This shouldn't fail.
2624                  *
2625                  * Note that here we are removing locks held by other backends,
2626                  * leading to a possible inconsistency in their local lock hash table.
2627                  * This is OK because we're replacing it with a lock that covers the
2628                  * old one.
2629                  */
2630                 success = TransferPredicateLocksToNewTarget(oldtargettag,
2631                                                                                                         newtargettag,
2632                                                                                                         true);
2633                 Assert(success);
2634         }
2635
2636         LWLockRelease(SerializablePredicateLockListLock);
2637 }
2638
2639 /*
2640  *              PredicateLockPageCombine
2641  *
2642  * Combines predicate locks for two existing pages.
2643  * Skip if this is a temporary table or toast table.
2644  *
2645  * NOTE: A page combine affects all serializable transactions, even if it
2646  * occurs in the context of another transaction isolation level.
2647  */
2648 void
2649 PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
2650                                                  const BlockNumber newblkno)
2651 {
2652         /*
2653          * Page combines differ from page splits in that we ought to be able to
2654          * remove the locks on the old page after transferring them to the new
2655          * page, instead of duplicating them. However, because we can't edit other
2656          * backends' local lock tables, removing the old lock would leave them
2657          * with an entry in their LocalPredicateLockHash for a lock they're not
2658          * holding, which isn't acceptable. So we wind up having to do the same
2659          * work as a page split, acquiring a lock on the new page and keeping the
2660          * old page locked too. That can lead to some false positives, but should
2661          * be rare in practice.
2662          */
2663         PredicateLockPageSplit(relation, oldblkno, newblkno);
2664 }
2665
2666 /*
2667  * Walk the hash table and find the new xmin.
2668  */
2669 static void
2670 SetNewSxactGlobalXmin(void)
2671 {
2672         SERIALIZABLEXACT *sxact;
2673
2674         Assert(LWLockHeldByMe(SerializableXactHashLock));
2675
2676         PredXact->SxactGlobalXmin = InvalidTransactionId;
2677         PredXact->SxactGlobalXminCount = 0;
2678
2679         for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
2680         {
2681                 if (!SxactIsRolledBack(sxact)
2682                         && !SxactIsCommitted(sxact)
2683                         && sxact != OldCommittedSxact)
2684                 {
2685                         Assert(sxact->xmin != InvalidTransactionId);
2686                         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
2687                                 || TransactionIdPrecedes(sxact->xmin,
2688                                                                                  PredXact->SxactGlobalXmin))
2689                         {
2690                                 PredXact->SxactGlobalXmin = sxact->xmin;
2691                                 PredXact->SxactGlobalXminCount = 1;
2692                         }
2693                         else if (TransactionIdEquals(sxact->xmin,
2694                                                                                  PredXact->SxactGlobalXmin))
2695                                 PredXact->SxactGlobalXminCount++;
2696                 }
2697         }
2698
2699         OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
2700 }
2701
2702 /*
2703  *              ReleasePredicateLocks
2704  *
2705  * Releases predicate locks based on completion of the current transaction,
2706  * whether committed or rolled back.  It can also be called for a read only
2707  * transaction when it becomes impossible for the transaction to become
2708  * part of a dangerous structure.
2709  *
2710  * We do nothing unless this is a serializable transaction.
2711  *
2712  * This method must ensure that shared memory hash tables are cleaned
2713  * up in some relatively timely fashion.
2714  *
2715  * If this transaction is committing and is holding any predicate locks,
2716  * it must be added to a list of completed serializable transactions still
2717  * holding locks.
2718  */
2719 void
2720 ReleasePredicateLocks(const bool isCommit)
2721 {
2722         bool            needToClear;
2723         RWConflict      conflict,
2724                                 nextConflict,
2725                                 possibleUnsafeConflict;
2726         SERIALIZABLEXACT *roXact;
2727
2728         /*
2729          * We can't trust XactReadOnly here, because a transaction which started
2730          * as READ WRITE can show as READ ONLY later, e.g., within
2731          * substransactions.  We want to flag a transaction as READ ONLY if it
2732          * commits without writing so that de facto READ ONLY transactions get the
2733          * benefit of some RO optimizations, so we will use this local variable to
2734          * get some cleanup logic right which is based on whether the transaction
2735          * was declared READ ONLY at the top level.
2736          */
2737         bool            topLevelIsDeclaredReadOnly;
2738
2739         if (MySerializableXact == InvalidSerializableXact)
2740         {
2741                 Assert(LocalPredicateLockHash == NULL);
2742                 return;
2743         }
2744
2745         Assert(!isCommit || SxactIsPrepared(MySerializableXact));
2746         Assert(!SxactIsRolledBack(MySerializableXact));
2747         Assert(!SxactIsCommitted(MySerializableXact));
2748
2749         /* may not be serializable during COMMIT/ROLLBACK PREPARED */
2750         if (MySerializableXact->pid != 0)
2751                 Assert(IsolationIsSerializable());
2752
2753         /* We'd better not already be on the cleanup list. */
2754         Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
2755
2756         topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
2757
2758         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2759
2760         /*
2761          * We don't hold XidGenLock lock here, assuming that TransactionId is
2762          * atomic!
2763          *
2764          * If this value is changing, we don't care that much whether we get the
2765          * old or new value -- it is just used to determine how far
2766          * GlobalSerizableXmin must advance before this transaction can be fully
2767          * cleaned up.  The worst that could happen is we wait for one more
2768          * transaction to complete before freeing some RAM; correctness of visible
2769          * behavior is not affected.
2770          */
2771         MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
2772
2773         /*
2774          * If it's not a commit it's a rollback, and we can clear our locks
2775          * immediately.
2776          */
2777         if (isCommit)
2778         {
2779                 MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
2780                 MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
2781                 /* Recognize implicit read-only transaction (commit without write). */
2782                 if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
2783                         MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
2784         }
2785         else
2786         {
2787                 MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
2788         }
2789
2790         if (!topLevelIsDeclaredReadOnly)
2791         {
2792                 Assert(PredXact->WritableSxactCount > 0);
2793                 if (--(PredXact->WritableSxactCount) == 0)
2794                 {
2795                         /*
2796                          * Release predicate locks and rw-conflicts in for all committed
2797                          * transactions.  There are no longer any transactions which might
2798                          * conflict with the locks and no chance for new transactions to
2799                          * overlap.  Similarly, existing conflicts in can't cause pivots,
2800                          * and any conflicts in which could have completed a dangerous
2801                          * structure would already have caused a rollback, so any
2802                          * remaining ones must be benign.
2803                          */
2804                         PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
2805                 }
2806         }
2807         else
2808         {
2809                 /*
2810                  * Read-only transactions: clear the list of transactions that might
2811                  * make us unsafe. Note that we use 'inLink' for the iteration as
2812                  * opposed to 'outLink' for the r/w xacts.
2813                  */
2814                 possibleUnsafeConflict = (RWConflict)
2815                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2816                                   (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2817                                                  offsetof(RWConflictData, inLink));
2818                 while (possibleUnsafeConflict)
2819                 {
2820                         nextConflict = (RWConflict)
2821                                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2822                                                          &possibleUnsafeConflict->inLink,
2823                                                          offsetof(RWConflictData, inLink));
2824
2825                         Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
2826                         Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
2827
2828                         ReleaseRWConflict(possibleUnsafeConflict);
2829
2830                         possibleUnsafeConflict = nextConflict;
2831                 }
2832         }
2833
2834         /* Check for conflict out to old committed transactions. */
2835         if (isCommit
2836                 && !SxactIsReadOnly(MySerializableXact)
2837                 && SxactHasSummaryConflictOut(MySerializableXact))
2838         {
2839                 MySerializableXact->SeqNo.earliestOutConflictCommit =
2840                         FirstNormalSerCommitSeqNo;
2841                 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
2842         }
2843
2844         /*
2845          * Release all outConflicts to committed transactions.  If we're rolling
2846          * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
2847          * previously committed transactions.
2848          */
2849         conflict = (RWConflict)
2850                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
2851                                          (SHM_QUEUE *) &MySerializableXact->outConflicts,
2852                                          offsetof(RWConflictData, outLink));
2853         while (conflict)
2854         {
2855                 nextConflict = (RWConflict)
2856                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
2857                                                  &conflict->outLink,
2858                                                  offsetof(RWConflictData, outLink));
2859
2860                 if (isCommit
2861                         && !SxactIsReadOnly(MySerializableXact)
2862                         && SxactIsCommitted(conflict->sxactIn))
2863                 {
2864                         if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
2865                                 || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
2866                                 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
2867                         MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
2868                 }
2869
2870                 if (!isCommit
2871                         || SxactIsCommitted(conflict->sxactIn)
2872                         || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
2873                         ReleaseRWConflict(conflict);
2874
2875                 conflict = nextConflict;
2876         }
2877
2878         /*
2879          * Release all inConflicts from committed and read-only transactions. If
2880          * we're rolling back, clear them all.
2881          */
2882         conflict = (RWConflict)
2883                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
2884                                          (SHM_QUEUE *) &MySerializableXact->inConflicts,
2885                                          offsetof(RWConflictData, inLink));
2886         while (conflict)
2887         {
2888                 nextConflict = (RWConflict)
2889                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
2890                                                  &conflict->inLink,
2891                                                  offsetof(RWConflictData, inLink));
2892
2893                 if (!isCommit
2894                         || SxactIsCommitted(conflict->sxactOut)
2895                         || SxactIsReadOnly(conflict->sxactOut))
2896                         ReleaseRWConflict(conflict);
2897
2898                 conflict = nextConflict;
2899         }
2900
2901         if (!topLevelIsDeclaredReadOnly)
2902         {
2903                 /*
2904                  * Remove ourselves from the list of possible conflicts for concurrent
2905                  * READ ONLY transactions, flagging them as unsafe if we have a
2906                  * conflict out. If any are waiting DEFERRABLE transactions, wake them
2907                  * up if they are known safe or known unsafe.
2908                  */
2909                 possibleUnsafeConflict = (RWConflict)
2910                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2911                                   (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2912                                                  offsetof(RWConflictData, outLink));
2913                 while (possibleUnsafeConflict)
2914                 {
2915                         nextConflict = (RWConflict)
2916                                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
2917                                                          &possibleUnsafeConflict->outLink,
2918                                                          offsetof(RWConflictData, outLink));
2919
2920                         roXact = possibleUnsafeConflict->sxactIn;
2921                         Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
2922                         Assert(SxactIsReadOnly(roXact));
2923
2924                         /* Mark conflicted if necessary. */
2925                         if (isCommit
2926                                 && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE)
2927                                 && SxactHasConflictOut(MySerializableXact)
2928                                 && (MySerializableXact->SeqNo.earliestOutConflictCommit
2929                                         <= roXact->SeqNo.lastCommitBeforeSnapshot))
2930                         {
2931                                 /*
2932                                  * This releases possibleUnsafeConflict (as well as all other
2933                                  * possible conflicts for roXact)
2934                                  */
2935                                 FlagSxactUnsafe(roXact);
2936                         }
2937                         else
2938                         {
2939                                 ReleaseRWConflict(possibleUnsafeConflict);
2940
2941                                 /*
2942                                  * If we were the last possible conflict, flag it safe. The
2943                                  * transaction can now safely release its predicate locks (but
2944                                  * that transaction's backend has to do that itself).
2945                                  */
2946                                 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
2947                                         roXact->flags |= SXACT_FLAG_RO_SAFE;
2948                         }
2949
2950                         /*
2951                          * Wake up the process for a waiting DEFERRABLE transaction if we
2952                          * now know it's either safe or conflicted.
2953                          */
2954                         if (SxactIsDeferrableWaiting(roXact) &&
2955                                 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
2956                                 ProcSendSignal(roXact->pid);
2957
2958                         possibleUnsafeConflict = nextConflict;
2959                 }
2960         }
2961
2962         /*
2963          * Check whether it's time to clean up old transactions. This can only be
2964          * done when the last serializable transaction with the oldest xmin among
2965          * serializable transactions completes.  We then find the "new oldest"
2966          * xmin and purge any transactions which finished before this transaction
2967          * was launched.
2968          */
2969         needToClear = false;
2970         if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
2971         {
2972                 Assert(PredXact->SxactGlobalXminCount > 0);
2973                 if (--(PredXact->SxactGlobalXminCount) == 0)
2974                 {
2975                         SetNewSxactGlobalXmin();
2976                         needToClear = true;
2977                 }
2978         }
2979
2980         LWLockRelease(SerializableXactHashLock);
2981
2982         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
2983
2984         /* Add this to the list of transactions to check for later cleanup. */
2985         if (isCommit)
2986                 SHMQueueInsertBefore(FinishedSerializableTransactions,
2987                                                   (SHM_QUEUE *) &(MySerializableXact->finishedLink));
2988
2989         if (!isCommit)
2990                 ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact,
2991                                                                    false, false);
2992
2993         LWLockRelease(SerializableFinishedListLock);
2994
2995         if (needToClear)
2996                 ClearOldPredicateLocks();
2997
2998         MySerializableXact = InvalidSerializableXact;
2999
3000         /* Delete per-transaction lock table */
3001         if (LocalPredicateLockHash != NULL)
3002         {
3003                 hash_destroy(LocalPredicateLockHash);
3004                 LocalPredicateLockHash = NULL;
3005         }
3006 }
3007
3008 /*
3009  * ReleasePredicateLocksIfROSafe
3010  *              Check if the current transaction is read only and operating on
3011  *              a safe snapshot. If so, release predicate locks and return
3012  *              true.
3013  *
3014  * A transaction is flagged as RO_SAFE if all concurrent R/W
3015  * transactions commit without having conflicts out to an earlier
3016  * snapshot, thus ensuring that no conflicts are possible for this
3017  * transaction. Thus, we call this function as part of the
3018  * SkipSerialization check on all public interface methods.
3019  */
3020 static bool
3021 ReleasePredicateLocksIfROSafe(void)
3022 {
3023         if (SxactIsROSafe(MySerializableXact))
3024         {
3025                 ReleasePredicateLocks(false);
3026                 return true;
3027         }
3028         else
3029                 return false;
3030 }
3031
3032 /*
3033  * Clear old predicate locks.
3034  */
3035 static void
3036 ClearOldPredicateLocks(void)
3037 {
3038         SERIALIZABLEXACT *finishedSxact;
3039         PREDICATELOCK *predlock;
3040
3041         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3042         finishedSxact = (SERIALIZABLEXACT *)
3043                 SHMQueueNext(FinishedSerializableTransactions,
3044                                          FinishedSerializableTransactions,
3045                                          offsetof(SERIALIZABLEXACT, finishedLink));
3046         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3047         while (finishedSxact)
3048         {
3049                 SERIALIZABLEXACT *nextSxact;
3050
3051                 nextSxact = (SERIALIZABLEXACT *)
3052                         SHMQueueNext(FinishedSerializableTransactions,
3053                                                  &(finishedSxact->finishedLink),
3054                                                  offsetof(SERIALIZABLEXACT, finishedLink));
3055                 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3056                         || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
3057                                                                                          PredXact->SxactGlobalXmin))
3058                 {
3059                         LWLockRelease(SerializableXactHashLock);
3060                         SHMQueueDelete(&(finishedSxact->finishedLink));
3061                         ReleaseOneSerializableXact(finishedSxact, false, false);
3062                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3063                 }
3064                 else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
3065                    && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
3066                 {
3067                         LWLockRelease(SerializableXactHashLock);
3068                         ReleaseOneSerializableXact(finishedSxact,
3069                                                                            !SxactIsReadOnly(finishedSxact),
3070                                                                            false);
3071                         PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
3072                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3073                 }
3074                 else
3075                         break;
3076                 finishedSxact = nextSxact;
3077         }
3078         LWLockRelease(SerializableXactHashLock);
3079
3080         /*
3081          * Loop through predicate locks on dummy transaction for summarized data.
3082          */
3083         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3084         predlock = (PREDICATELOCK *)
3085                 SHMQueueNext(&OldCommittedSxact->predicateLocks,
3086                                          &OldCommittedSxact->predicateLocks,
3087                                          offsetof(PREDICATELOCK, xactLink));
3088         while (predlock)
3089         {
3090                 PREDICATELOCK *nextpredlock;
3091                 bool            canDoPartialCleanup;
3092
3093                 nextpredlock = (PREDICATELOCK *)
3094                         SHMQueueNext(&OldCommittedSxact->predicateLocks,
3095                                                  &predlock->xactLink,
3096                                                  offsetof(PREDICATELOCK, xactLink));
3097
3098                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3099                 Assert(predlock->commitSeqNo != 0);
3100                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3101                 canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
3102                 LWLockRelease(SerializableXactHashLock);
3103
3104                 if (canDoPartialCleanup)
3105                 {
3106                         PREDICATELOCKTAG tag;
3107                         SHM_QUEUE  *targetLink;
3108                         PREDICATELOCKTARGET *target;
3109                         PREDICATELOCKTARGETTAG targettag;
3110                         uint32          targettaghash;
3111                         LWLockId        partitionLock;
3112
3113                         tag = predlock->tag;
3114                         targetLink = &(predlock->targetLink);
3115                         target = tag.myTarget;
3116                         targettag = target->tag;
3117                         targettaghash = PredicateLockTargetTagHashCode(&targettag);
3118                         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3119
3120                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3121
3122                         SHMQueueDelete(targetLink);
3123                         SHMQueueDelete(&(predlock->xactLink));
3124
3125                         hash_search_with_hash_value(PredicateLockHash, &tag,
3126                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3127                                                                                                                           targettaghash),
3128                                                                                 HASH_REMOVE, NULL);
3129                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3130
3131                         LWLockRelease(partitionLock);
3132                 }
3133
3134                 predlock = nextpredlock;
3135         }
3136
3137         LWLockRelease(SerializablePredicateLockListLock);
3138         LWLockRelease(SerializableFinishedListLock);
3139 }
3140
3141 /*
3142  * This is the normal way to delete anything from any of the predicate
3143  * locking hash tables.  Given a transaction which we know can be deleted:
3144  * delete all predicate locks held by that transaction and any predicate
3145  * lock targets which are now unreferenced by a lock; delete all conflicts
3146  * for the transaction; delete all xid values for the transaction; then
3147  * delete the transaction.
3148  *
3149  * When the partial flag is set, we can release all predicate locks and
3150  * out-conflict information -- we've established that there are no longer
3151  * any overlapping read write transactions for which this transaction could
3152  * matter.
3153  *
3154  * When the summarize flag is set, we've run short of room for sxact data
3155  * and must summarize to the SLRU.      Predicate locks are transferred to a
3156  * dummy "old" transaction, with duplicate locks on a single target
3157  * collapsing to a single lock with the "latest" commitSeqNo from among
3158  * the conflicting locks..
3159  */
3160 static void
3161 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
3162                                                    bool summarize)
3163 {
3164         PREDICATELOCK *predlock;
3165         SERIALIZABLEXIDTAG sxidtag;
3166         RWConflict      conflict,
3167                                 nextConflict;
3168
3169         Assert(sxact != NULL);
3170         Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
3171         Assert(LWLockHeldByMe(SerializableFinishedListLock));
3172
3173         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3174         predlock = (PREDICATELOCK *)
3175                 SHMQueueNext(&(sxact->predicateLocks),
3176                                          &(sxact->predicateLocks),
3177                                          offsetof(PREDICATELOCK, xactLink));
3178         while (predlock)
3179         {
3180                 PREDICATELOCK *nextpredlock;
3181                 PREDICATELOCKTAG tag;
3182                 SHM_QUEUE  *targetLink;
3183                 PREDICATELOCKTARGET *target;
3184                 PREDICATELOCKTARGETTAG targettag;
3185                 uint32          targettaghash;
3186                 LWLockId        partitionLock;
3187
3188                 nextpredlock = (PREDICATELOCK *)
3189                         SHMQueueNext(&(sxact->predicateLocks),
3190                                                  &(predlock->xactLink),
3191                                                  offsetof(PREDICATELOCK, xactLink));
3192
3193                 tag = predlock->tag;
3194                 targetLink = &(predlock->targetLink);
3195                 target = tag.myTarget;
3196                 targettag = target->tag;
3197                 targettaghash = PredicateLockTargetTagHashCode(&targettag);
3198                 partitionLock = PredicateLockHashPartitionLock(targettaghash);
3199
3200                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3201
3202                 SHMQueueDelete(targetLink);
3203
3204                 hash_search_with_hash_value(PredicateLockHash, &tag,
3205                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3206                                                                                                                           targettaghash),
3207                                                                         HASH_REMOVE, NULL);
3208                 if (summarize)
3209                 {
3210                         bool            found;
3211
3212                         /* Fold into dummy transaction list. */
3213                         tag.myXact = OldCommittedSxact;
3214                         predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
3215                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3216                                                                                                                           targettaghash),
3217                                                                                                    HASH_ENTER_NULL, &found);
3218                         if (!predlock)
3219                                 ereport(ERROR,
3220                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3221                                                  errmsg("out of shared memory"),
3222                                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
3223                         if (found)
3224                         {
3225                                 Assert(predlock->commitSeqNo != 0);
3226                                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3227                                 if (predlock->commitSeqNo < sxact->commitSeqNo)
3228                                         predlock->commitSeqNo = sxact->commitSeqNo;
3229                         }
3230                         else
3231                         {
3232                                 SHMQueueInsertBefore(&(target->predicateLocks),
3233                                                                          &(predlock->targetLink));
3234                                 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
3235                                                                          &(predlock->xactLink));
3236                                 predlock->commitSeqNo = sxact->commitSeqNo;
3237                         }
3238                 }
3239                 else
3240                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3241
3242                 LWLockRelease(partitionLock);
3243
3244                 predlock = nextpredlock;
3245         }
3246
3247         /*
3248          * Rather than retail removal, just re-init the head after we've run
3249          * through the list.
3250          */
3251         SHMQueueInit(&sxact->predicateLocks);
3252
3253         LWLockRelease(SerializablePredicateLockListLock);
3254
3255         sxidtag.xid = sxact->topXid;
3256         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3257
3258         if (!partial)
3259         {
3260                 /* Release all outConflicts. */
3261                 conflict = (RWConflict)
3262                         SHMQueueNext(&sxact->outConflicts,
3263                                                  &sxact->outConflicts,
3264                                                  offsetof(RWConflictData, outLink));
3265                 while (conflict)
3266                 {
3267                         nextConflict = (RWConflict)
3268                                 SHMQueueNext(&sxact->outConflicts,
3269                                                          &conflict->outLink,
3270                                                          offsetof(RWConflictData, outLink));
3271                         if (summarize)
3272                                 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3273                         ReleaseRWConflict(conflict);
3274                         conflict = nextConflict;
3275                 }
3276         }
3277
3278         /* Release all inConflicts. */
3279         conflict = (RWConflict)
3280                 SHMQueueNext(&sxact->inConflicts,
3281                                          &sxact->inConflicts,
3282                                          offsetof(RWConflictData, inLink));
3283         while (conflict)
3284         {
3285                 nextConflict = (RWConflict)
3286                         SHMQueueNext(&sxact->inConflicts,
3287                                                  &conflict->inLink,
3288                                                  offsetof(RWConflictData, inLink));
3289                 if (summarize)
3290                         conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3291                 ReleaseRWConflict(conflict);
3292                 conflict = nextConflict;
3293         }
3294
3295         if (!partial)
3296         {
3297                 /* Get rid of the xid and the record of the transaction itself. */
3298                 if (sxidtag.xid != InvalidTransactionId)
3299                         hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
3300                 ReleasePredXact(sxact);
3301         }
3302
3303         LWLockRelease(SerializableXactHashLock);
3304 }
3305
3306 /*
3307  * Tests whether the given top level transaction is concurrent with
3308  * (overlaps) our current transaction.
3309  *
3310  * We need to identify the top level transaction for SSI, anyway, so pass
3311  * that to this function to save the overhead of checking the snapshot's
3312  * subxip array.
3313  */
3314 static bool
3315 XidIsConcurrent(TransactionId xid)
3316 {
3317         Snapshot        snap;
3318         uint32          i;
3319
3320         Assert(TransactionIdIsValid(xid));
3321         Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
3322
3323         snap = GetTransactionSnapshot();
3324
3325         if (TransactionIdPrecedes(xid, snap->xmin))
3326                 return false;
3327
3328         if (TransactionIdFollowsOrEquals(xid, snap->xmax))
3329                 return true;
3330
3331         for (i = 0; i < snap->xcnt; i++)
3332         {
3333                 if (xid == snap->xip[i])
3334                         return true;
3335         }
3336
3337         return false;
3338 }
3339
3340 /*
3341  * CheckForSerializableConflictOut
3342  *              We are reading a tuple which has been modified.  If it is visible to
3343  *              us but has been deleted, that indicates a rw-conflict out.      If it's
3344  *              not visible and was created by a concurrent (overlapping)
3345  *              serializable transaction, that is also a rw-conflict out,
3346  *
3347  * We will determine the top level xid of the writing transaction with which
3348  * we may be in conflict, and check for overlap with our own transaction.
3349  * If the transactions overlap (i.e., they cannot see each other's writes),
3350  * then we have a conflict out.
3351  *
3352  * This function should be called just about anywhere in heapam.c where a
3353  * tuple has been read. The caller must hold at least a shared lock on the
3354  * buffer, because this function might set hint bits on the tuple. There is
3355  * currently no known reason to call this function from an index AM.
3356  */
3357 void
3358 CheckForSerializableConflictOut(const bool visible, const Relation relation,
3359                                                                 const HeapTuple tuple, const Buffer buffer)
3360 {
3361         TransactionId xid;
3362         SERIALIZABLEXIDTAG sxidtag;
3363         SERIALIZABLEXID *sxid;
3364         SERIALIZABLEXACT *sxact;
3365         HTSV_Result htsvResult;
3366
3367         if (SkipSerialization(relation))
3368                 return;
3369
3370         if (SxactIsMarkedForDeath(MySerializableXact))
3371         {
3372                 ereport(ERROR,
3373                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3374                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3375                                  errdetail("Cancelled on identification as a pivot, during conflict out checking."),
3376                                  errhint("The transaction might succeed if retried.")));
3377         }
3378
3379         /*
3380          * Check to see whether the tuple has been written to by a concurrent
3381          * transaction, either to create it not visible to us, or to delete it
3382          * while it is visible to us.  The "visible" bool indicates whether the
3383          * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
3384          * is going on with it.
3385          */
3386         htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
3387         switch (htsvResult)
3388         {
3389                 case HEAPTUPLE_LIVE:
3390                         if (visible)
3391                                 return;
3392                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3393                         break;
3394                 case HEAPTUPLE_RECENTLY_DEAD:
3395                         if (!visible)
3396                                 return;
3397                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3398                         break;
3399                 case HEAPTUPLE_DELETE_IN_PROGRESS:
3400                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3401                         break;
3402                 case HEAPTUPLE_INSERT_IN_PROGRESS:
3403                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3404                         break;
3405                 case HEAPTUPLE_DEAD:
3406                         return;
3407                 default:
3408
3409                         /*
3410                          * The only way to get to this default clause is if a new value is
3411                          * added to the enum type without adding it to this switch
3412                          * statement.  That's a bug, so elog.
3413                          */
3414                         elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
3415
3416                         /*
3417                          * In spite of having all enum values covered and calling elog on
3418                          * this default, some compilers think this is a code path which
3419                          * allows xid to be used below without initialization. Silence
3420                          * that warning.
3421                          */
3422                         xid = InvalidTransactionId;
3423         }
3424         Assert(TransactionIdIsValid(xid));
3425         Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
3426
3427         /*
3428          * Find top level xid.  Bail out if xid is too early to be a conflict, or
3429          * if it's our own xid.
3430          */
3431         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3432                 return;
3433         xid = SubTransGetTopmostTransaction(xid);
3434         if (TransactionIdPrecedes(xid, TransactionXmin))
3435                 return;
3436         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3437                 return;
3438
3439         /*
3440          * Find sxact or summarized info for the top level xid.
3441          */
3442         sxidtag.xid = xid;
3443         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3444         sxid = (SERIALIZABLEXID *)
3445                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
3446         if (!sxid)
3447         {
3448                 /*
3449                  * Transaction not found in "normal" SSI structures.  Check whether it
3450                  * got pushed out to SLRU storage for "old committed" transactions.
3451                  */
3452                 SerCommitSeqNo conflictCommitSeqNo;
3453
3454                 conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
3455                 if (conflictCommitSeqNo != 0)
3456                 {
3457                         if (conflictCommitSeqNo != InvalidSerCommitSeqNo
3458                                 && (!SxactIsReadOnly(MySerializableXact)
3459                                         || conflictCommitSeqNo
3460                                         <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
3461                                 ereport(ERROR,
3462                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3463                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3464                                 errdetail("Cancelled on conflict out to old pivot %u.", xid),
3465                                           errhint("The transaction might succeed if retried.")));
3466
3467                         if (SxactHasSummaryConflictIn(MySerializableXact)
3468                         || !SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
3469                                 ereport(ERROR,
3470                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3471                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3472                                                  errdetail("Cancelled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
3473                                           errhint("The transaction might succeed if retried.")));
3474
3475                         MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3476                 }
3477
3478                 /* It's not serializable or otherwise not important. */
3479                 LWLockRelease(SerializableXactHashLock);
3480                 return;
3481         }
3482         sxact = sxid->myXact;
3483         Assert(TransactionIdEquals(sxact->topXid, xid));
3484         if (sxact == MySerializableXact
3485                 || SxactIsRolledBack(sxact)
3486                 || SxactIsMarkedForDeath(sxact))
3487         {
3488                 /* We can't conflict with our own transaction or one rolled back. */
3489                 LWLockRelease(SerializableXactHashLock);
3490                 return;
3491         }
3492
3493         /*
3494          * We have a conflict out to a transaction which has a conflict out to a
3495          * summarized transaction.      That summarized transaction must have
3496          * committed first, and we can't tell when it committed in relation to our
3497          * snapshot acquisition, so something needs to be cancelled.
3498          */
3499         if (SxactHasSummaryConflictOut(sxact))
3500         {
3501                 if (!SxactIsPrepared(sxact))
3502                 {
3503                         sxact->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
3504                         LWLockRelease(SerializableXactHashLock);
3505                         return;
3506                 }
3507                 else
3508                 {
3509                         LWLockRelease(SerializableXactHashLock);
3510                         ereport(ERROR,
3511                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3512                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
3513                                          errdetail("Cancelled on conflict out to old pivot."),
3514                                          errhint("The transaction might succeed if retried.")));
3515                 }
3516         }
3517
3518         /*
3519          * If this is a read-only transaction and the writing transaction has
3520          * committed, and it doesn't have a rw-conflict to a transaction which
3521          * committed before it, no conflict.
3522          */
3523         if (SxactIsReadOnly(MySerializableXact)
3524                 && SxactIsCommitted(sxact)
3525                 && !SxactHasSummaryConflictOut(sxact)
3526                 && (!SxactHasConflictOut(sxact)
3527                         || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
3528         {
3529                 /* Read-only transaction will appear to run first.      No conflict. */
3530                 LWLockRelease(SerializableXactHashLock);
3531                 return;
3532         }
3533
3534         if (!XidIsConcurrent(xid))
3535         {
3536                 /* This write was already in our snapshot; no conflict. */
3537                 LWLockRelease(SerializableXactHashLock);
3538                 return;
3539         }
3540
3541         if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
3542         {
3543                 /* We don't want duplicate conflict records in the list. */
3544                 LWLockRelease(SerializableXactHashLock);
3545                 return;
3546         }
3547
3548         /*
3549          * Flag the conflict.  But first, if this conflict creates a dangerous
3550          * structure, ereport an error.
3551          */
3552         FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
3553         LWLockRelease(SerializableXactHashLock);
3554 }
3555
3556 /*
3557  * Check a particular target for rw-dependency conflict in.
3558  */
3559 static void
3560 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
3561 {
3562         uint32          targettaghash;
3563         LWLockId        partitionLock;
3564         PREDICATELOCKTARGET *target;
3565         PREDICATELOCK *predlock;
3566         PREDICATELOCK *mypredlock = NULL;
3567         PREDICATELOCKTAG mypredlocktag;
3568
3569         Assert(MySerializableXact != InvalidSerializableXact);
3570
3571         /*
3572          * The same hash and LW lock apply to the lock target and the lock itself.
3573          */
3574         targettaghash = PredicateLockTargetTagHashCode(targettag);
3575         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3576         LWLockAcquire(partitionLock, LW_SHARED);
3577         target = (PREDICATELOCKTARGET *)
3578                 hash_search_with_hash_value(PredicateLockTargetHash,
3579                                                                         targettag, targettaghash,
3580                                                                         HASH_FIND, NULL);
3581         if (!target)
3582         {
3583                 /* Nothing has this target locked; we're done here. */
3584                 LWLockRelease(partitionLock);
3585                 return;
3586         }
3587
3588         /*
3589          * Each lock for an overlapping transaction represents a conflict: a
3590          * rw-dependency in to this transaction.
3591          */
3592         predlock = (PREDICATELOCK *)
3593                 SHMQueueNext(&(target->predicateLocks),
3594                                          &(target->predicateLocks),
3595                                          offsetof(PREDICATELOCK, targetLink));
3596         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3597         while (predlock)
3598         {
3599                 SHM_QUEUE  *predlocktargetlink;
3600                 PREDICATELOCK *nextpredlock;
3601                 SERIALIZABLEXACT *sxact;
3602
3603                 predlocktargetlink = &(predlock->targetLink);
3604                 nextpredlock = (PREDICATELOCK *)
3605                         SHMQueueNext(&(target->predicateLocks),
3606                                                  predlocktargetlink,
3607                                                  offsetof(PREDICATELOCK, targetLink));
3608
3609                 sxact = predlock->tag.myXact;
3610                 if (sxact == MySerializableXact)
3611                 {
3612                         /*
3613                          * If we're getting a write lock on a tuple, we don't need
3614                          * a predicate (SIREAD) lock on the same tuple. We can
3615                          * safely remove our SIREAD lock, but we'll defer doing so
3616                          * until after the loop because that requires upgrading to
3617                          * an exclusive partition lock.
3618                          *
3619                          * We can't use this optimization within a subtransaction
3620                          * because the subtransaction could roll back, and we
3621                          * would be left without any lock at the top level.
3622                          */
3623                         if (!IsSubTransaction()
3624                                 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
3625                         {
3626                                 mypredlock = predlock;
3627                                 mypredlocktag = predlock->tag;
3628                         }
3629                 }
3630                 else if (!SxactIsRolledBack(sxact)
3631                                  && (!SxactIsCommitted(sxact)
3632                                          || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
3633                                                                                           sxact->finishedBefore))
3634                 && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
3635                 {
3636                         LWLockRelease(SerializableXactHashLock);
3637                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3638
3639                         /*
3640                          * Re-check after getting exclusive lock because the other
3641                          * transaction may have flagged a conflict.
3642                          */
3643                         if (!SxactIsRolledBack(sxact)
3644                                 && (!SxactIsCommitted(sxact)
3645                                         || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
3646                                                                                          sxact->finishedBefore))
3647                                 && !RWConflictExists(sxact,
3648                                                                          (SERIALIZABLEXACT *) MySerializableXact))
3649                         {
3650                                 FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
3651                         }
3652
3653                         LWLockRelease(SerializableXactHashLock);
3654                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3655                 }
3656
3657                 predlock = nextpredlock;
3658         }
3659         LWLockRelease(SerializableXactHashLock);
3660         LWLockRelease(partitionLock);
3661
3662         /*
3663          * If we found one of our own SIREAD locks to remove, remove it
3664          * now.
3665          *
3666          * At this point our transaction already has an ExclusiveRowLock
3667          * on the relation, so we are OK to drop the predicate lock on the
3668          * tuple, if found, without fearing that another write against the
3669          * tuple will occur before the MVCC information makes it to the
3670          * buffer.
3671          */
3672         if (mypredlock != NULL)
3673         {
3674                 uint32          predlockhashcode;
3675                 PREDICATELOCK *rmpredlock;
3676
3677                 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3678                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3679                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3680
3681                 /*
3682                  * Remove the predicate lock from shared memory, if it wasn't
3683                  * removed while the locks were released.  One way that could
3684                  * happen is from autovacuum cleaning up an index.
3685                  */
3686                 predlockhashcode = PredicateLockHashCodeFromTargetHashCode
3687                         (&mypredlocktag, targettaghash);
3688                 rmpredlock = (PREDICATELOCK *)
3689                         hash_search_with_hash_value(PredicateLockHash,
3690                                                                                 &mypredlocktag,
3691                                                                                 predlockhashcode,
3692                                                                                 HASH_FIND, NULL);
3693                 if (rmpredlock != NULL)
3694                 {
3695                         Assert(rmpredlock == mypredlock);
3696
3697                         SHMQueueDelete(&(mypredlock->targetLink));
3698                         SHMQueueDelete(&(mypredlock->xactLink));
3699
3700                         rmpredlock = (PREDICATELOCK *)
3701                                 hash_search_with_hash_value(PredicateLockHash,
3702                                                                                         &mypredlocktag,
3703                                                                                         predlockhashcode,
3704                                                                                         HASH_REMOVE, NULL);
3705                         Assert(rmpredlock == mypredlock);
3706
3707                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3708                 }
3709
3710                 LWLockRelease(SerializableXactHashLock);
3711                 LWLockRelease(partitionLock);
3712                 LWLockRelease(SerializablePredicateLockListLock);
3713                 
3714                 if (rmpredlock != NULL)
3715                 {
3716                         /*
3717                          * Remove entry in local lock table if it exists. It's OK
3718                          * if it doesn't exist; that means the lock was
3719                          * transferred to a new target by a different backend.
3720                          */
3721                         hash_search_with_hash_value(LocalPredicateLockHash,
3722                                                                                 targettag, targettaghash,
3723                                                                                 HASH_REMOVE, NULL);
3724
3725                         DecrementParentLocks(targettag);
3726                 }
3727         }
3728 }
3729
3730 /*
3731  * CheckForSerializableConflictIn
3732  *              We are writing the given tuple.  If that indicates a rw-conflict
3733  *              in from another serializable transaction, take appropriate action.
3734  *
3735  * Skip checking for any granularity for which a parameter is missing.
3736  *
3737  * A tuple update or delete is in conflict if we have a predicate lock
3738  * against the relation or page in which the tuple exists, or against the
3739  * tuple itself.
3740  */
3741 void
3742 CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
3743                                                            const Buffer buffer)
3744 {
3745         PREDICATELOCKTARGETTAG targettag;
3746
3747         if (SkipSerialization(relation))
3748                 return;
3749
3750         if (SxactIsMarkedForDeath(MySerializableXact))
3751                 ereport(ERROR,
3752                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3753                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3754                                  errdetail("Cancelled on identification as a pivot, during conflict in checking."),
3755                                  errhint("The transaction might succeed if retried.")));
3756
3757         MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
3758
3759         /*
3760          * It is important that we check for locks from the finest granularity to
3761          * the coarsest granularity, so that granularity promotion doesn't cause
3762          * us to miss a lock.  The new (coarser) lock will be acquired before the
3763          * old (finer) locks are released.
3764          *
3765          * It is not possible to take and hold a lock across the checks for all
3766          * granularities because each target could be in a separate partition.
3767          */
3768         if (tuple != NULL)
3769         {
3770                 SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
3771                                                                                  relation->rd_node.dbNode,
3772                                                                                  relation->rd_id,
3773                                                  ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
3774                                                 ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
3775                                                                           HeapTupleHeaderGetXmin(tuple->t_data));
3776                 CheckTargetForConflictsIn(&targettag);
3777         }
3778
3779         if (BufferIsValid(buffer))
3780         {
3781                 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
3782                                                                                 relation->rd_node.dbNode,
3783                                                                                 relation->rd_id,
3784                                                                                 BufferGetBlockNumber(buffer));
3785                 CheckTargetForConflictsIn(&targettag);
3786         }
3787
3788         SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
3789                                                                                 relation->rd_node.dbNode,
3790                                                                                 relation->rd_id);
3791         CheckTargetForConflictsIn(&targettag);
3792 }
3793
3794 /*
3795  * Flag a rw-dependency between two serializable transactions.
3796  *
3797  * The caller is responsible for ensuring that we have a LW lock on
3798  * the transaction hash table.
3799  */
3800 static void
3801 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
3802 {
3803         Assert(reader != writer);
3804
3805         /* First, see if this conflict causes failure. */
3806         OnConflict_CheckForSerializationFailure(reader, writer);
3807
3808         /* Actually do the conflict flagging. */
3809         if (reader == OldCommittedSxact)
3810                 writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3811         else if (writer == OldCommittedSxact)
3812                 reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3813         else
3814                 SetRWConflict(reader, writer);
3815 }
3816
3817 /*----------------------------------------------------------------------------
3818  * We are about to add a RW-edge to the dependency graph - check that we don't
3819  * introduce a dangerous structure by doing so, and abort one of the
3820  * transactions if so.
3821  *
3822  * A serialization failure can only occur if there is a dangerous structure
3823  * in the dependency graph:
3824  *
3825  *              Tin ------> Tpivot ------> Tout
3826  *                        rw                     rw
3827  *
3828  * Furthermore, Tout must commit first.
3829  *
3830  * One more optimization is that if Tin is declared READ ONLY (or commits
3831  * without writing), we can only have a problem if Tout committed before Tin
3832  * acquired its snapshot.
3833  *----------------------------------------------------------------------------
3834  */
3835 static void
3836 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
3837                                                                                 SERIALIZABLEXACT *writer)
3838 {
3839         bool            failure;
3840         RWConflict      conflict;
3841
3842         Assert(LWLockHeldByMe(SerializableXactHashLock));
3843
3844         failure = false;
3845
3846         /*------------------------------------------------------------------------
3847          * Check for already-committed writer with rw-conflict out flagged
3848          * (conflict-flag on W means that T2 committed before W):
3849          *
3850          *              R ------> W ------> T2
3851          *                      rw                rw
3852          *
3853          * That is a dangerous structure, so we must abort. (Since the writer
3854          * has already committed, we must be the reader)
3855          *------------------------------------------------------------------------
3856          */
3857         if (SxactIsCommitted(writer)
3858           && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
3859                 failure = true;
3860
3861         /*------------------------------------------------------------------------
3862          * Check whether the writer has become a pivot with an out-conflict
3863          * committed transaction (T2), and T2 committed first:
3864          *
3865          *              R ------> W ------> T2
3866          *                      rw                rw
3867          *
3868          * Because T2 must've committed first, there is no anomaly if:
3869          * - the reader committed before T2
3870          * - the writer committed before T2
3871          * - the reader is a READ ONLY transaction and the reader was concurrent
3872          *       with T2 (= reader acquired its snapshot before T2 committed)
3873          *------------------------------------------------------------------------
3874          */
3875         if (!failure)
3876         {
3877                 if (SxactHasSummaryConflictOut(writer))
3878                 {
3879                         failure = true;
3880                         conflict = NULL;
3881                 }
3882                 else
3883                         conflict = (RWConflict)
3884                                 SHMQueueNext(&writer->outConflicts,
3885                                                          &writer->outConflicts,
3886                                                          offsetof(RWConflictData, outLink));
3887                 while (conflict)
3888                 {
3889                         SERIALIZABLEXACT *t2 = conflict->sxactIn;
3890
3891                         if (SxactIsCommitted(t2)
3892                                 && (!SxactIsCommitted(reader)
3893                                         || t2->commitSeqNo <= reader->commitSeqNo)
3894                                 && (!SxactIsCommitted(writer)
3895                                         || t2->commitSeqNo <= writer->commitSeqNo)
3896                                 && (!SxactIsReadOnly(reader)
3897                            || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
3898                         {
3899                                 failure = true;
3900                                 break;
3901                         }
3902                         conflict = (RWConflict)
3903                                 SHMQueueNext(&writer->outConflicts,
3904                                                          &conflict->outLink,
3905                                                          offsetof(RWConflictData, outLink));
3906                 }
3907         }
3908
3909         /*------------------------------------------------------------------------
3910          * Check whether the reader has become a pivot with a committed writer:
3911          *
3912          *              T0 ------> R ------> W
3913          *                       rw                rw
3914          *
3915          * Because W must've committed first for an anomaly to occur, there is no
3916          * anomaly if:
3917          * - T0 committed before the writer
3918          * - T0 is READ ONLY, and overlaps the writer
3919          *------------------------------------------------------------------------
3920          */
3921         if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
3922         {
3923                 if (SxactHasSummaryConflictIn(reader))
3924                 {
3925                         failure = true;
3926                         conflict = NULL;
3927                 }
3928                 else
3929                         conflict = (RWConflict)
3930                                 SHMQueueNext(&reader->inConflicts,
3931                                                          &reader->inConflicts,
3932                                                          offsetof(RWConflictData, inLink));
3933                 while (conflict)
3934                 {
3935                         SERIALIZABLEXACT *t0 = conflict->sxactOut;
3936
3937                         if (!SxactIsRolledBack(t0)
3938                                 && (!SxactIsCommitted(t0)
3939                                         || t0->commitSeqNo >= writer->commitSeqNo)
3940                                 && (!SxactIsReadOnly(t0)
3941                            || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
3942                         {
3943                                 failure = true;
3944                                 break;
3945                         }
3946                         conflict = (RWConflict)
3947                                 SHMQueueNext(&reader->inConflicts,
3948                                                          &conflict->inLink,
3949                                                          offsetof(RWConflictData, inLink));
3950                 }
3951         }
3952
3953         if (failure)
3954         {
3955                 /*
3956                  * We have to kill a transaction to avoid a possible anomaly from
3957                  * occurring. If the writer is us, we can just ereport() to cause a
3958                  * transaction abort. Otherwise we flag the writer for termination,
3959                  * causing it to abort when it tries to commit. However, if the writer
3960                  * is a prepared transaction, already prepared, we can't abort it
3961                  * anymore, so we have to kill the reader instead.
3962                  */
3963                 if (MySerializableXact == writer)
3964                 {
3965                         LWLockRelease(SerializableXactHashLock);
3966                         ereport(ERROR,
3967                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3968                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
3969                                          errdetail("Cancelled on identification as a pivot, during write."),
3970                                          errhint("The transaction might succeed if retried.")));
3971                 }
3972                 else if (SxactIsPrepared(writer))
3973                 {
3974                         LWLockRelease(SerializableXactHashLock);
3975
3976                         /* if we're not the writer, we have to be the reader */
3977                         Assert(MySerializableXact == reader);
3978                         ereport(ERROR,
3979                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3980                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
3981                                          errdetail("Cancelled on conflict out to pivot %u, during read.", writer->topXid),
3982                                          errhint("The transaction might succeed if retried.")));
3983                 }
3984                 writer->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
3985         }
3986 }
3987
3988 /*
3989  * PreCommit_CheckForSerializableConflicts
3990  *              Check for dangerous structures in a serializable transaction
3991  *              at commit.
3992  *
3993  * We're checking for a dangerous structure as each conflict is recorded.
3994  * The only way we could have a problem at commit is if this is the "out"
3995  * side of a pivot, and neither the "in" side nor the pivot has yet
3996  * committed.
3997  *
3998  * If a dangerous structure is found, the pivot (the near conflict) is
3999  * marked for death, because rolling back another transaction might mean
4000  * that we flail without ever making progress.  This transaction is
4001  * committing writes, so letting it commit ensures progress.  If we
4002  * cancelled the far conflict, it might immediately fail again on retry.
4003  */
4004 void
4005 PreCommit_CheckForSerializationFailure(void)
4006 {
4007         RWConflict      nearConflict;
4008
4009         if (MySerializableXact == InvalidSerializableXact)
4010                 return;
4011
4012         Assert(IsolationIsSerializable());
4013
4014         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4015
4016         if (SxactIsMarkedForDeath(MySerializableXact))
4017         {
4018                 LWLockRelease(SerializableXactHashLock);
4019                 ereport(ERROR,
4020                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4021                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4022                                  errdetail("Cancelled on identification as a pivot, during commit attempt."),
4023                                  errhint("The transaction might succeed if retried.")));
4024         }
4025
4026         nearConflict = (RWConflict)
4027                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
4028                                          (SHM_QUEUE *) &MySerializableXact->inConflicts,
4029                                          offsetof(RWConflictData, inLink));
4030         while (nearConflict)
4031         {
4032                 if (!SxactIsCommitted(nearConflict->sxactOut)
4033                         && !SxactIsRolledBack(nearConflict->sxactOut)
4034                         && !SxactIsMarkedForDeath(nearConflict->sxactOut))
4035                 {
4036                         RWConflict      farConflict;
4037
4038                         farConflict = (RWConflict)
4039                                 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4040                                                          &nearConflict->sxactOut->inConflicts,
4041                                                          offsetof(RWConflictData, inLink));
4042                         while (farConflict)
4043                         {
4044                                 if (farConflict->sxactOut == MySerializableXact
4045                                         || (!SxactIsCommitted(farConflict->sxactOut)
4046                                                 && !SxactIsReadOnly(farConflict->sxactOut)
4047                                                 && !SxactIsRolledBack(farConflict->sxactOut)
4048                                                 && !SxactIsMarkedForDeath(farConflict->sxactOut)))
4049                                 {
4050                                         nearConflict->sxactOut->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
4051                                         break;
4052                                 }
4053                                 farConflict = (RWConflict)
4054                                         SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4055                                                                  &farConflict->inLink,
4056                                                                  offsetof(RWConflictData, inLink));
4057                         }
4058                 }
4059
4060                 nearConflict = (RWConflict)
4061                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
4062                                                  &nearConflict->inLink,
4063                                                  offsetof(RWConflictData, inLink));
4064         }
4065
4066         MySerializableXact->flags |= SXACT_FLAG_PREPARED;
4067
4068         LWLockRelease(SerializableXactHashLock);
4069 }
4070
4071 /*------------------------------------------------------------------------*/
4072
4073 /*
4074  * Two-phase commit support
4075  */
4076
4077 /*
4078  * AtPrepare_Locks
4079  *              Do the preparatory work for a PREPARE: make 2PC state file
4080  *              records for all predicate locks currently held.
4081  */
4082 void
4083 AtPrepare_PredicateLocks(void)
4084 {
4085         PREDICATELOCK *predlock;
4086         SERIALIZABLEXACT *sxact;
4087         TwoPhasePredicateRecord record;
4088         TwoPhasePredicateXactRecord *xactRecord;
4089         TwoPhasePredicateLockRecord *lockRecord;
4090
4091         sxact = (SERIALIZABLEXACT *) MySerializableXact;
4092         xactRecord = &(record.data.xactRecord);
4093         lockRecord = &(record.data.lockRecord);
4094
4095         if (MySerializableXact == InvalidSerializableXact)
4096                 return;
4097
4098         /* Generate a xact record for our SERIALIZABLEXACT */
4099         record.type = TWOPHASEPREDICATERECORD_XACT;
4100         xactRecord->xmin = MySerializableXact->xmin;
4101         xactRecord->flags = MySerializableXact->flags;
4102
4103         /*
4104          * Tweak the flags. Since we're not going to output the inConflicts and
4105          * outConflicts lists, if they're non-empty we'll represent that by
4106          * setting the appropriate summary conflict flags.
4107          */
4108         if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
4109                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4110         if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->outConflicts))
4111                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4112
4113         RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4114                                                    &record, sizeof(record));
4115
4116         /*
4117          * Generate a lock record for each lock.
4118          *
4119          * To do this, we need to walk the predicate lock list in our sxact rather
4120          * than using the local predicate lock table because the latter is not
4121          * guaranteed to be accurate.
4122          */
4123         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4124
4125         predlock = (PREDICATELOCK *)
4126                 SHMQueueNext(&(sxact->predicateLocks),
4127                                          &(sxact->predicateLocks),
4128                                          offsetof(PREDICATELOCK, xactLink));
4129
4130         while (predlock != NULL)
4131         {
4132                 record.type = TWOPHASEPREDICATERECORD_LOCK;
4133                 lockRecord->target = predlock->tag.myTarget->tag;
4134
4135                 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4136                                                            &record, sizeof(record));
4137
4138                 predlock = (PREDICATELOCK *)
4139                         SHMQueueNext(&(sxact->predicateLocks),
4140                                                  &(predlock->xactLink),
4141                                                  offsetof(PREDICATELOCK, xactLink));
4142         }
4143
4144         LWLockRelease(SerializablePredicateLockListLock);
4145 }
4146
4147 /*
4148  * PostPrepare_Locks
4149  *              Clean up after successful PREPARE. Unlike the non-predicate
4150  *              lock manager, we do not need to transfer locks to a dummy
4151  *              PGPROC because our SERIALIZABLEXACT will stay around
4152  *              anyway. We only need to clean up our local state.
4153  */
4154 void
4155 PostPrepare_PredicateLocks(TransactionId xid)
4156 {
4157         if (MySerializableXact == InvalidSerializableXact)
4158                 return;
4159
4160         Assert(SxactIsPrepared(MySerializableXact));
4161
4162         MySerializableXact->pid = 0;
4163
4164         hash_destroy(LocalPredicateLockHash);
4165         LocalPredicateLockHash = NULL;
4166
4167         MySerializableXact = InvalidSerializableXact;
4168 }
4169
4170 /*
4171  * PredicateLockTwoPhaseFinish
4172  *              Release a prepared transaction's predicate locks once it
4173  *              commits or aborts.
4174  */
4175 void
4176 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
4177 {
4178         SERIALIZABLEXID *sxid;
4179         SERIALIZABLEXIDTAG sxidtag;
4180
4181         sxidtag.xid = xid;
4182
4183         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4184         sxid = (SERIALIZABLEXID *)
4185                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4186         LWLockRelease(SerializableXactHashLock);
4187
4188         /* xid will not be found if it wasn't a serializable transaction */
4189         if (sxid == NULL)
4190                 return;
4191
4192         /* Release its locks */
4193         MySerializableXact = sxid->myXact;
4194         ReleasePredicateLocks(isCommit);
4195 }
4196
4197 /*
4198  * Re-acquire a predicate lock belonging to a transaction that was prepared.
4199  */
4200 void
4201 predicatelock_twophase_recover(TransactionId xid, uint16 info,
4202                                                            void *recdata, uint32 len)
4203 {
4204         TwoPhasePredicateRecord *record;
4205
4206         Assert(len == sizeof(TwoPhasePredicateRecord));
4207
4208         record = (TwoPhasePredicateRecord *) recdata;
4209
4210         Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
4211                    (record->type == TWOPHASEPREDICATERECORD_LOCK));
4212
4213         if (record->type == TWOPHASEPREDICATERECORD_XACT)
4214         {
4215                 /* Per-transaction record. Set up a SERIALIZABLEXACT. */
4216                 TwoPhasePredicateXactRecord *xactRecord;
4217                 SERIALIZABLEXACT *sxact;
4218                 SERIALIZABLEXID *sxid;
4219                 SERIALIZABLEXIDTAG sxidtag;
4220                 bool            found;
4221
4222                 xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
4223
4224                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4225                 sxact = CreatePredXact();
4226                 if (!sxact)
4227                         ereport(ERROR,
4228                                         (errcode(ERRCODE_OUT_OF_MEMORY),
4229                                          errmsg("out of shared memory")));
4230
4231                 /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
4232                 sxact->vxid.backendId = InvalidBackendId;
4233                 sxact->vxid.localTransactionId = (LocalTransactionId) xid;
4234                 sxact->pid = 0;
4235
4236                 /* a prepared xact hasn't committed yet */
4237                 sxact->commitSeqNo = InvalidSerCommitSeqNo;
4238                 sxact->finishedBefore = InvalidTransactionId;
4239
4240                 sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
4241
4242
4243                 /*
4244                  * We don't need the details of a prepared transaction's conflicts,
4245                  * just whether it had conflicts in or out (which we get from the
4246                  * flags)
4247                  */
4248                 SHMQueueInit(&(sxact->outConflicts));
4249                 SHMQueueInit(&(sxact->inConflicts));
4250
4251                 /*
4252                  * Don't need to track this; no transactions running at the time the
4253                  * recovered xact started are still active, except possibly other
4254                  * prepared xacts and we don't care whether those are RO_SAFE or not.
4255                  */
4256                 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
4257
4258                 SHMQueueInit(&(sxact->predicateLocks));
4259                 SHMQueueElemInit(&(sxact->finishedLink));
4260
4261                 sxact->topXid = xid;
4262                 sxact->xmin = xactRecord->xmin;
4263                 sxact->flags = xactRecord->flags;
4264                 Assert(SxactIsPrepared(sxact));
4265                 if (!SxactIsReadOnly(sxact))
4266                 {
4267                         ++(PredXact->WritableSxactCount);
4268                         Assert(PredXact->WritableSxactCount <=
4269                                    (MaxBackends + max_prepared_xacts));
4270                 }
4271
4272                 /* Register the transaction's xid */
4273                 sxidtag.xid = xid;
4274                 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
4275                                                                                            &sxidtag,
4276                                                                                            HASH_ENTER, &found);
4277                 Assert(sxid != NULL);
4278                 Assert(!found);
4279                 sxid->myXact = (SERIALIZABLEXACT *) sxact;
4280
4281                 /*
4282                  * Update global xmin. Note that this is a special case compared to
4283                  * registering a normal transaction, because the global xmin might go
4284                  * backwards. That's OK, because until recovery is over we're not
4285                  * going to complete any transactions or create any non-prepared
4286                  * transactions, so there's no danger of throwing away.
4287                  */
4288                 if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
4289                         (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
4290                 {
4291                         PredXact->SxactGlobalXmin = sxact->xmin;
4292                         PredXact->SxactGlobalXminCount = 1;
4293                         OldSerXidSetActiveSerXmin(sxact->xmin);
4294                 }
4295                 else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
4296                 {
4297                         Assert(PredXact->SxactGlobalXminCount > 0);
4298                         PredXact->SxactGlobalXminCount++;
4299                 }
4300
4301                 LWLockRelease(SerializableXactHashLock);
4302         }
4303         else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
4304         {
4305                 /* Lock record. Recreate the PREDICATELOCK */
4306                 TwoPhasePredicateLockRecord *lockRecord;
4307                 SERIALIZABLEXID *sxid;
4308                 SERIALIZABLEXACT *sxact;
4309                 SERIALIZABLEXIDTAG sxidtag;
4310                 uint32          targettaghash;
4311
4312                 lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
4313                 targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
4314
4315                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4316                 sxidtag.xid = xid;
4317                 sxid = (SERIALIZABLEXID *)
4318                         hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4319                 LWLockRelease(SerializableXactHashLock);
4320
4321                 Assert(sxid != NULL);
4322                 sxact = sxid->myXact;
4323                 Assert(sxact != InvalidSerializableXact);
4324
4325                 CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
4326         }
4327 }