]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/predicate.c
28da729b6dd0250c5a956348326b8181d280f0cc
[postgresql] / src / backend / storage / lmgr / predicate.c
1 /*-------------------------------------------------------------------------
2  *
3  * predicate.c
4  *        POSTGRES predicate locking
5  *        to support full serializable transaction isolation
6  *
7  *
8  * The approach taken is to implement Serializable Snapshot Isolation (SSI)
9  * as initially described in this paper:
10  *
11  *      Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
12  *      Serializable isolation for snapshot databases.
13  *      In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
14  *      international conference on Management of data,
15  *      pages 729-738, New York, NY, USA. ACM.
16  *      http://doi.acm.org/10.1145/1376616.1376690
17  *
18  * and further elaborated in Cahill's doctoral thesis:
19  *
20  *      Michael James Cahill. 2009.
21  *      Serializable Isolation for Snapshot Databases.
22  *      Sydney Digital Theses.
23  *      University of Sydney, School of Information Technologies.
24  *      http://hdl.handle.net/2123/5353
25  *
26  *
27  * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
28  * locks, which are so different from normal locks that a distinct set of
29  * structures is required to handle them.  They are needed to detect
30  * rw-conflicts when the read happens before the write.  (When the write
31  * occurs first, the reading transaction can check for a conflict by
32  * examining the MVCC data.)
33  *
34  * (1)  Besides tuples actually read, they must cover ranges of tuples
35  *              which would have been read based on the predicate.      This will
36  *              require modelling the predicates through locks against database
37  *              objects such as pages, index ranges, or entire tables.
38  *
39  * (2)  They must be kept in RAM for quick access.      Because of this, it
40  *              isn't possible to always maintain tuple-level granularity -- when
41  *              the space allocated to store these approaches exhaustion, a
42  *              request for a lock may need to scan for situations where a single
43  *              transaction holds many fine-grained locks which can be coalesced
44  *              into a single coarser-grained lock.
45  *
46  * (3)  They never block anything; they are more like flags than locks
47  *              in that regard; although they refer to database objects and are
48  *              used to identify rw-conflicts with normal write locks.
49  *
50  * (4)  While they are associated with a transaction, they must survive
51  *              a successful COMMIT of that transaction, and remain until all
52  *              overlapping transactions complete.      This even means that they
53  *              must survive termination of the transaction's process.  If a
54  *              top level transaction is rolled back, however, it is immediately
55  *              flagged so that it can be ignored, and its SIREAD locks can be
56  *              released any time after that.
57  *
58  * (5)  The only transactions which create SIREAD locks or check for
59  *              conflicts with them are serializable transactions.
60  *
61  * (6)  When a write lock for a top level transaction is found to cover
62  *              an existing SIREAD lock for the same transaction, the SIREAD lock
63  *              can be deleted.
64  *
65  * (7)  A write from a serializable transaction must ensure that a xact
66  *              record exists for the transaction, with the same lifespan (until
67  *              all concurrent transaction complete or the transaction is rolled
68  *              back) so that rw-dependencies to that transaction can be
69  *              detected.
70  *
71  * We use an optimization for read-only transactions. Under certain
72  * circumstances, a read-only transaction's snapshot can be shown to
73  * never have conflicts with other transactions.  This is referred to
74  * as a "safe" snapshot (and one known not to be is "unsafe").
75  * However, it can't be determined whether a snapshot is safe until
76  * all concurrent read/write transactions complete.
77  *
78  * Once a read-only transaction is known to have a safe snapshot, it
79  * can release its predicate locks and exempt itself from further
80  * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
81  * on safe snapshots, waiting as necessary for one to be available.
82  *
83  *
84  * Lightweight locks to manage access to the predicate locking shared
85  * memory objects must be taken in this order, and should be released in
86  * reverse order:
87  *
88  *      SerializableFinishedListLock
89  *              - Protects the list of transactions which have completed but which
90  *                      may yet matter because they overlap still-active transactions.
91  *
92  *      SerializablePredicateLockListLock
93  *              - Protects the linked list of locks held by a transaction.      Note
94  *                      that the locks themselves are also covered by the partition
95  *                      locks of their respective lock targets; this lock only affects
96  *                      the linked list connecting the locks related to a transaction.
97  *              - All transactions share this single lock (with no partitioning).
98  *              - There is never a need for a process other than the one running
99  *                      an active transaction to walk the list of locks held by that
100  *                      transaction.
101  *              - It is relatively infrequent that another process needs to
102  *                      modify the list for a transaction, but it does happen for such
103  *                      things as index page splits for pages with predicate locks and
104  *                      freeing of predicate locked pages by a vacuum process.  When
105  *                      removing a lock in such cases, the lock itself contains the
106  *                      pointers needed to remove it from the list.  When adding a
107  *                      lock in such cases, the lock can be added using the anchor in
108  *                      the transaction structure.      Neither requires walking the list.
109  *              - Cleaning up the list for a terminated transaction is sometimes
110  *                      not done on a retail basis, in which case no lock is required.
111  *              - Due to the above, a process accessing its active transaction's
112  *                      list always uses a shared lock, regardless of whether it is
113  *                      walking or maintaining the list.  This improves concurrency
114  *                      for the common access patterns.
115  *              - A process which needs to alter the list of a transaction other
116  *                      than its own active transaction must acquire an exclusive
117  *                      lock.
118  *
119  *      FirstPredicateLockMgrLock based partition locks
120  *              - The same lock protects a target, all locks on that target, and
121  *                      the linked list of locks on the target..
122  *              - When more than one is needed, acquire in ascending order.
123  *
124  *      SerializableXactHashLock
125  *              - Protects both PredXact and SerializableXidHash.
126  *
127  *
128  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
129  * Portions Copyright (c) 1994, Regents of the University of California
130  *
131  *
132  * IDENTIFICATION
133  *        src/backend/storage/lmgr/predicate.c
134  *
135  *-------------------------------------------------------------------------
136  */
137 /*
138  * INTERFACE ROUTINES
139  *
140  * housekeeping for setting up shared memory predicate lock structures
141  *              InitPredicateLocks(void)
142  *              PredicateLockShmemSize(void)
143  *
144  * predicate lock reporting
145  *              GetPredicateLockStatusData(void)
146  *              PageIsPredicateLocked(Relation relation, BlockNumber blkno)
147  *
148  * predicate lock maintenance
149  *              RegisterSerializableTransaction(Snapshot snapshot)
150  *              RegisterPredicateLockingXid(void)
151  *              PredicateLockRelation(Relation relation)
152  *              PredicateLockPage(Relation relation, BlockNumber blkno)
153  *              PredicateLockTuple(Relation relation, HeapTuple tuple)
154  *              PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
155  *                                                         BlockNumber newblkno);
156  *              PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
157  *                                                               BlockNumber newblkno);
158  *              TransferPredicateLocksToHeapRelation(Relation relation)
159  *              ReleasePredicateLocks(bool isCommit)
160  *
161  * conflict detection (may also trigger rollback)
162  *              CheckForSerializableConflictOut(bool visible, Relation relation,
163  *                                                                              HeapTupleData *tup, Buffer buffer)
164  *              CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
165  *                                                                         Buffer buffer)
166  *              CheckTableForSerializableConflictIn(Relation relation)
167  *
168  * final rollback checking
169  *              PreCommit_CheckForSerializationFailure(void)
170  *
171  * two-phase commit support
172  *              AtPrepare_PredicateLocks(void);
173  *              PostPrepare_PredicateLocks(TransactionId xid);
174  *              PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
175  *              predicatelock_twophase_recover(TransactionId xid, uint16 info,
176  *                                                                         void *recdata, uint32 len);
177  */
178
179 #include "postgres.h"
180
181 #include "access/slru.h"
182 #include "access/subtrans.h"
183 #include "access/transam.h"
184 #include "access/twophase.h"
185 #include "access/twophase_rmgr.h"
186 #include "access/xact.h"
187 #include "miscadmin.h"
188 #include "storage/bufmgr.h"
189 #include "storage/predicate.h"
190 #include "storage/predicate_internals.h"
191 #include "storage/procarray.h"
192 #include "utils/rel.h"
193 #include "utils/snapmgr.h"
194 #include "utils/tqual.h"
195
196 /* Uncomment the next line to test the graceful degradation code. */
197 /* #define TEST_OLDSERXID */
198
199 /*
200  * Test the most selective fields first, for performance.
201  *
202  * a is covered by b if all of the following hold:
203  *      1) a.database = b.database
204  *      2) a.relation = b.relation
205  *      3) b.offset is invalid (b is page-granularity or higher)
206  *      4) either of the following:
207  *              4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
208  *       or 4b) a.offset is invalid and b.page is invalid (a is
209  *                      page-granularity and b is relation-granularity
210  */
211 #define TargetTagIsCoveredBy(covered_target, covering_target)                   \
212         ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */      \
213           GET_PREDICATELOCKTARGETTAG_RELATION(covering_target))                         \
214          && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) ==                      \
215                  InvalidOffsetNumber)                                                            /* (3) */      \
216          && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) !=                     \
217                    InvalidOffsetNumber)                                                          /* (4a) */ \
218                   && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
219                           GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)))                     \
220                  || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
221                           InvalidBlockNumber)                                                    /* (4b) */ \
222                          && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)            \
223                                  != InvalidBlockNumber)))                                                               \
224          && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) ==    /* (1) */      \
225                  GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
226
227 /*
228  * The predicate locking target and lock shared hash tables are partitioned to
229  * reduce contention.  To determine which partition a given target belongs to,
230  * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
231  * apply one of these macros.
232  * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
233  */
234 #define PredicateLockHashPartition(hashcode) \
235         ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
236 #define PredicateLockHashPartitionLock(hashcode) \
237         ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
238
239 #define NPREDICATELOCKTARGETENTS() \
240         mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
241
242 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
243
244 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
245 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
246 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
247 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
248 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
249 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
250 /*
251  * The following macro actually means that the specified transaction has a
252  * conflict out *to a transaction which committed ahead of it*.  It's hard
253  * to get that into a name of a reasonable length.
254  */
255 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
256 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
257 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
258 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
259 #define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0)
260
261 /*
262  * Is this relation exempt from predicate locking? We don't do predicate
263  * locking on system or temporary relations.
264  */
265 #define SkipPredicateLocksForRelation(relation) \
266         (((relation)->rd_id < FirstBootstrapObjectId) \
267         || RelationUsesLocalBuffers(relation))
268
269 /*
270  * When a public interface method is called for serializing a relation within
271  * the current transaction, this is the test to see if we should do a quick
272  * return.
273  */
274 #define SkipSerialization(relation) \
275         ((!IsolationIsSerializable()) \
276         || ((MySerializableXact == InvalidSerializableXact)) \
277         || ReleasePredicateLocksIfROSafe() \
278         || SkipPredicateLocksForRelation(relation))
279
280
281 /*
282  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
283  *
284  * To avoid unnecessary recomputations of the hash code, we try to do this
285  * just once per function, and then pass it around as needed.  Aside from
286  * passing the hashcode to hash_search_with_hash_value(), we can extract
287  * the lock partition number from the hashcode.
288  */
289 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
290         (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
291
292 /*
293  * Given a predicate lock tag, and the hash for its target,
294  * compute the lock hash.
295  *
296  * To make the hash code also depend on the transaction, we xor the sxid
297  * struct's address into the hash code, left-shifted so that the
298  * partition-number bits don't change.  Since this is only a hash, we
299  * don't care if we lose high-order bits of the address; use an
300  * intermediate variable to suppress cast-pointer-to-int warnings.
301  */
302 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
303         ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
304          << LOG2_NUM_PREDICATELOCK_PARTITIONS)
305
306
307 /*
308  * The SLRU buffer area through which we access the old xids.
309  */
310 static SlruCtlData OldSerXidSlruCtlData;
311
312 #define OldSerXidSlruCtl                        (&OldSerXidSlruCtlData)
313
314 #define OLDSERXID_PAGESIZE                      BLCKSZ
315 #define OLDSERXID_ENTRYSIZE                     sizeof(SerCommitSeqNo)
316 #define OLDSERXID_ENTRIESPERPAGE        (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
317 #define OLDSERXID_MAX_PAGE                      (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
318
319 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
320
321 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
322         (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
323         ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
324
325 #define OldSerXidPage(xid)      ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
326 #define OldSerXidSegment(page)  ((page) / SLRU_PAGES_PER_SEGMENT)
327
328 typedef struct OldSerXidControlData
329 {
330         int                     headPage;               /* newest initialized page */
331         TransactionId headXid;          /* newest valid Xid in the SLRU */
332         TransactionId tailXid;          /* oldest xmin we might be interested in */
333         bool            warningIssued;
334 }       OldSerXidControlData;
335
336 typedef struct OldSerXidControlData *OldSerXidControl;
337
338 static OldSerXidControl oldSerXidControl;
339
340 /*
341  * When the oldest committed transaction on the "finished" list is moved to
342  * SLRU, its predicate locks will be moved to this "dummy" transaction,
343  * collapsing duplicate targets.  When a duplicate is found, the later
344  * commitSeqNo is used.
345  */
346 static SERIALIZABLEXACT *OldCommittedSxact;
347
348
349 /* This configuration variable is used to set the predicate lock table size */
350 int                     max_predicate_locks_per_xact;           /* set by guc.c */
351
352 /*
353  * This provides a list of objects in order to track transactions
354  * participating in predicate locking.  Entries in the list are fixed size,
355  * and reside in shared memory.  The memory address of an entry must remain
356  * fixed during its lifetime.  The list will be protected from concurrent
357  * update externally; no provision is made in this code to manage that.  The
358  * number of entries in the list, and the size allowed for each entry is
359  * fixed upon creation.
360  */
361 static PredXactList PredXact;
362
363 /*
364  * This provides a pool of RWConflict data elements to use in conflict lists
365  * between transactions.
366  */
367 static RWConflictPoolHeader RWConflictPool;
368
369 /*
370  * The predicate locking hash tables are in shared memory.
371  * Each backend keeps pointers to them.
372  */
373 static HTAB *SerializableXidHash;
374 static HTAB *PredicateLockTargetHash;
375 static HTAB *PredicateLockHash;
376 static SHM_QUEUE *FinishedSerializableTransactions;
377
378 /*
379  * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
380  * this entry, you can ensure that there's enough scratch space available for
381  * inserting one entry in the hash table. This is an otherwise-invalid tag.
382  */
383 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0, 0};
384 static uint32 ScratchTargetTagHash;
385 static int      ScratchPartitionLock;
386
387 /*
388  * The local hash table used to determine when to combine multiple fine-
389  * grained locks into a single courser-grained lock.
390  */
391 static HTAB *LocalPredicateLockHash = NULL;
392
393 /*
394  * Keep a pointer to the currently-running serializable transaction (if any)
395  * for quick reference.
396  * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
397  */
398 static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
399
400 /* local functions */
401
402 static SERIALIZABLEXACT *CreatePredXact(void);
403 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
404 static SERIALIZABLEXACT *FirstPredXact(void);
405 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
406
407 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
408 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
409 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
410 static void ReleaseRWConflict(RWConflict conflict);
411 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
412
413 static bool OldSerXidPagePrecedesLogically(int p, int q);
414 static void OldSerXidInit(void);
415 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
416 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
417 static void OldSerXidSetActiveSerXmin(TransactionId xid);
418
419 static uint32 predicatelock_hash(const void *key, Size keysize);
420 static void SummarizeOldestCommittedSxact(void);
421 static Snapshot GetSafeSnapshot(Snapshot snapshot);
422 static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot);
423 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
424 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
425                                                   PREDICATELOCKTARGETTAG *parent);
426 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
427 static void RemoveScratchTarget(bool lockheld);
428 static void RestoreScratchTarget(bool lockheld);
429 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
430                                                    uint32 targettaghash);
431 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
432 static int      PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
433 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
434 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
435 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
436                                         uint32 targettaghash,
437                                         SERIALIZABLEXACT *sxact);
438 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
439 static bool TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
440                                                                   const PREDICATELOCKTARGETTAG newtargettag,
441                                                                   bool removeOld);
442 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
443 static void DropAllPredicateLocksFromTable(const Relation relation,
444                                                            bool transfer);
445 static void SetNewSxactGlobalXmin(void);
446 static bool ReleasePredicateLocksIfROSafe(void);
447 static void ClearOldPredicateLocks(void);
448 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
449                                                    bool summarize);
450 static bool XidIsConcurrent(TransactionId xid);
451 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
452 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
453 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
454                                                                                 SERIALIZABLEXACT *writer);
455
456 /*------------------------------------------------------------------------*/
457
458 /*
459  * These functions are a simple implementation of a list for this specific
460  * type of struct.      If there is ever a generalized shared memory list, we
461  * should probably switch to that.
462  */
463 static SERIALIZABLEXACT *
464 CreatePredXact(void)
465 {
466         PredXactListElement ptle;
467
468         ptle = (PredXactListElement)
469                 SHMQueueNext(&PredXact->availableList,
470                                          &PredXact->availableList,
471                                          offsetof(PredXactListElementData, link));
472         if (!ptle)
473                 return NULL;
474
475         SHMQueueDelete(&ptle->link);
476         SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
477         return &ptle->sxact;
478 }
479
480 static void
481 ReleasePredXact(SERIALIZABLEXACT *sxact)
482 {
483         PredXactListElement ptle;
484
485         Assert(ShmemAddrIsValid(sxact));
486
487         ptle = (PredXactListElement)
488                 (((char *) sxact)
489                  - offsetof(PredXactListElementData, sxact)
490                  + offsetof(PredXactListElementData, link));
491         SHMQueueDelete(&ptle->link);
492         SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
493 }
494
495 static SERIALIZABLEXACT *
496 FirstPredXact(void)
497 {
498         PredXactListElement ptle;
499
500         ptle = (PredXactListElement)
501                 SHMQueueNext(&PredXact->activeList,
502                                          &PredXact->activeList,
503                                          offsetof(PredXactListElementData, link));
504         if (!ptle)
505                 return NULL;
506
507         return &ptle->sxact;
508 }
509
510 static SERIALIZABLEXACT *
511 NextPredXact(SERIALIZABLEXACT *sxact)
512 {
513         PredXactListElement ptle;
514
515         Assert(ShmemAddrIsValid(sxact));
516
517         ptle = (PredXactListElement)
518                 (((char *) sxact)
519                  - offsetof(PredXactListElementData, sxact)
520                  + offsetof(PredXactListElementData, link));
521         ptle = (PredXactListElement)
522                 SHMQueueNext(&PredXact->activeList,
523                                          &ptle->link,
524                                          offsetof(PredXactListElementData, link));
525         if (!ptle)
526                 return NULL;
527
528         return &ptle->sxact;
529 }
530
531 /*------------------------------------------------------------------------*/
532
533 /*
534  * These functions manage primitive access to the RWConflict pool and lists.
535  */
536 static bool
537 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
538 {
539         RWConflict      conflict;
540
541         Assert(reader != writer);
542
543         /* Check the ends of the purported conflict first. */
544         if (SxactIsRolledBack(reader)
545                 || SxactIsRolledBack(writer)
546                 || SHMQueueEmpty(&reader->outConflicts)
547                 || SHMQueueEmpty(&writer->inConflicts))
548                 return false;
549
550         /* A conflict is possible; walk the list to find out. */
551         conflict = (RWConflict)
552                 SHMQueueNext(&reader->outConflicts,
553                                          &reader->outConflicts,
554                                          offsetof(RWConflictData, outLink));
555         while (conflict)
556         {
557                 if (conflict->sxactIn == writer)
558                         return true;
559                 conflict = (RWConflict)
560                         SHMQueueNext(&reader->outConflicts,
561                                                  &conflict->outLink,
562                                                  offsetof(RWConflictData, outLink));
563         }
564
565         /* No conflict found. */
566         return false;
567 }
568
569 static void
570 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
571 {
572         RWConflict      conflict;
573
574         Assert(reader != writer);
575         Assert(!RWConflictExists(reader, writer));
576
577         conflict = (RWConflict)
578                 SHMQueueNext(&RWConflictPool->availableList,
579                                          &RWConflictPool->availableList,
580                                          offsetof(RWConflictData, outLink));
581         if (!conflict)
582                 ereport(ERROR,
583                                 (errcode(ERRCODE_OUT_OF_MEMORY),
584                                  errmsg("not enough elements in RWConflictPool to record a rw-conflict"),
585                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
586
587         SHMQueueDelete(&conflict->outLink);
588
589         conflict->sxactOut = reader;
590         conflict->sxactIn = writer;
591         SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
592         SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
593 }
594
595 static void
596 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
597                                                   SERIALIZABLEXACT *activeXact)
598 {
599         RWConflict      conflict;
600
601         Assert(roXact != activeXact);
602         Assert(SxactIsReadOnly(roXact));
603         Assert(!SxactIsReadOnly(activeXact));
604
605         conflict = (RWConflict)
606                 SHMQueueNext(&RWConflictPool->availableList,
607                                          &RWConflictPool->availableList,
608                                          offsetof(RWConflictData, outLink));
609         if (!conflict)
610                 ereport(ERROR,
611                                 (errcode(ERRCODE_OUT_OF_MEMORY),
612                                  errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"),
613                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
614
615         SHMQueueDelete(&conflict->outLink);
616
617         conflict->sxactOut = activeXact;
618         conflict->sxactIn = roXact;
619         SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
620                                                  &conflict->outLink);
621         SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
622                                                  &conflict->inLink);
623 }
624
625 static void
626 ReleaseRWConflict(RWConflict conflict)
627 {
628         SHMQueueDelete(&conflict->inLink);
629         SHMQueueDelete(&conflict->outLink);
630         SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
631 }
632
633 static void
634 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
635 {
636         RWConflict      conflict,
637                                 nextConflict;
638
639         Assert(SxactIsReadOnly(sxact));
640         Assert(!SxactIsROSafe(sxact));
641
642         sxact->flags |= SXACT_FLAG_RO_UNSAFE;
643
644         /*
645          * We know this isn't a safe snapshot, so we can stop looking for other
646          * potential conflicts.
647          */
648         conflict = (RWConflict)
649                 SHMQueueNext(&sxact->possibleUnsafeConflicts,
650                                          &sxact->possibleUnsafeConflicts,
651                                          offsetof(RWConflictData, inLink));
652         while (conflict)
653         {
654                 nextConflict = (RWConflict)
655                         SHMQueueNext(&sxact->possibleUnsafeConflicts,
656                                                  &conflict->inLink,
657                                                  offsetof(RWConflictData, inLink));
658
659                 Assert(!SxactIsReadOnly(conflict->sxactOut));
660                 Assert(sxact == conflict->sxactIn);
661
662                 ReleaseRWConflict(conflict);
663
664                 conflict = nextConflict;
665         }
666 }
667
668 /*------------------------------------------------------------------------*/
669
670 /*
671  * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
672  * Compares using wraparound logic, as is required by slru.c.
673  */
674 static bool
675 OldSerXidPagePrecedesLogically(int p, int q)
676 {
677         int                     diff;
678
679         /*
680          * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2.  Both inputs should
681          * be in the range 0..OLDSERXID_MAX_PAGE.
682          */
683         Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
684         Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
685
686         diff = p - q;
687         if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
688                 diff -= OLDSERXID_MAX_PAGE + 1;
689         else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2))
690                 diff += OLDSERXID_MAX_PAGE + 1;
691         return diff < 0;
692 }
693
694 /*
695  * Initialize for the tracking of old serializable committed xids.
696  */
697 static void
698 OldSerXidInit(void)
699 {
700         bool            found;
701
702         /*
703          * Set up SLRU management of the pg_serial data.
704          */
705         OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
706         SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", NUM_OLDSERXID_BUFFERS, 0,
707                                   OldSerXidLock, "pg_serial");
708         /* Override default assumption that writes should be fsync'd */
709         OldSerXidSlruCtl->do_fsync = false;
710
711         /*
712          * Create or attach to the OldSerXidControl structure.
713          */
714         oldSerXidControl = (OldSerXidControl)
715                 ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
716
717         if (!found)
718         {
719                 /*
720                  * Set control information to reflect empty SLRU.
721                  */
722                 oldSerXidControl->headPage = -1;
723                 oldSerXidControl->headXid = InvalidTransactionId;
724                 oldSerXidControl->tailXid = InvalidTransactionId;
725                 oldSerXidControl->warningIssued = false;
726         }
727 }
728
729 /*
730  * Record a committed read write serializable xid and the minimum
731  * commitSeqNo of any transactions to which this xid had a rw-conflict out.
732  * A zero seqNo means that there were no conflicts out from xid.
733  */
734 static void
735 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
736 {
737         TransactionId tailXid;
738         int                     targetPage;
739         int                     slotno;
740         int                     firstZeroPage;
741         int                     xidSpread;
742         bool            isNewPage;
743
744         Assert(TransactionIdIsValid(xid));
745
746         targetPage = OldSerXidPage(xid);
747
748         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
749
750         /*
751          * If no serializable transactions are active, there shouldn't be anything
752          * to push out to the SLRU.  Hitting this assert would mean there's
753          * something wrong with the earlier cleanup logic.
754          */
755         tailXid = oldSerXidControl->tailXid;
756         Assert(TransactionIdIsValid(tailXid));
757
758         /*
759          * If the SLRU is currently unused, zero out the whole active region from
760          * tailXid to headXid before taking it into use. Otherwise zero out only
761          * any new pages that enter the tailXid-headXid range as we advance
762          * headXid.
763          */
764         if (oldSerXidControl->headPage < 0)
765         {
766                 firstZeroPage = OldSerXidPage(tailXid);
767                 isNewPage = true;
768         }
769         else
770         {
771                 firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
772                 isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
773                                                                                                    targetPage);
774         }
775
776         if (!TransactionIdIsValid(oldSerXidControl->headXid)
777                 || TransactionIdFollows(xid, oldSerXidControl->headXid))
778                 oldSerXidControl->headXid = xid;
779         if (isNewPage)
780                 oldSerXidControl->headPage = targetPage;
781
782         xidSpread = (((uint32) xid) - ((uint32) tailXid));
783         if (oldSerXidControl->warningIssued)
784         {
785                 if (xidSpread < 800000000)
786                         oldSerXidControl->warningIssued = false;
787         }
788         else if (xidSpread >= 1000000000)
789         {
790                 oldSerXidControl->warningIssued = true;
791                 ereport(WARNING,
792                                 (errmsg("memory for serializable conflict tracking is nearly exhausted"),
793                                  errhint("There may be an idle transaction or a forgotten prepared transaction causing this.")));
794         }
795
796         if (isNewPage)
797         {
798                 /* Initialize intervening pages. */
799                 while (firstZeroPage != targetPage)
800                 {
801                         (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
802                         firstZeroPage = OldSerXidNextPage(firstZeroPage);
803                 }
804                 slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
805         }
806         else
807                 slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
808
809         OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
810
811         LWLockRelease(OldSerXidLock);
812 }
813
814 /*
815  * Get the minimum commitSeqNo for any conflict out for the given xid.  For
816  * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
817  * will be returned.
818  */
819 static SerCommitSeqNo
820 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
821 {
822         TransactionId headXid;
823         TransactionId tailXid;
824         SerCommitSeqNo val;
825         int                     slotno;
826
827         Assert(TransactionIdIsValid(xid));
828
829         LWLockAcquire(OldSerXidLock, LW_SHARED);
830         headXid = oldSerXidControl->headXid;
831         tailXid = oldSerXidControl->tailXid;
832         LWLockRelease(OldSerXidLock);
833
834         if (!TransactionIdIsValid(headXid))
835                 return 0;
836
837         Assert(TransactionIdIsValid(tailXid));
838
839         if (TransactionIdPrecedes(xid, tailXid)
840                 || TransactionIdFollows(xid, headXid))
841                 return 0;
842
843         /*
844          * The following function must be called without holding OldSerXidLock,
845          * but will return with that lock held, which must then be released.
846          */
847         slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
848                                                                                 OldSerXidPage(xid), xid);
849         val = OldSerXidValue(slotno, xid);
850         LWLockRelease(OldSerXidLock);
851         return val;
852 }
853
854 /*
855  * Call this whenever there is a new xmin for active serializable
856  * transactions.  We don't need to keep information on transactions which
857  * precede that.  InvalidTransactionId means none active, so everything in
858  * the SLRU can be discarded.
859  */
860 static void
861 OldSerXidSetActiveSerXmin(TransactionId xid)
862 {
863         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
864
865         /*
866          * When no sxacts are active, nothing overlaps, set the xid values to
867          * invalid to show that there are no valid entries.  Don't clear headPage,
868          * though.      A new xmin might still land on that page, and we don't want to
869          * repeatedly zero out the same page.
870          */
871         if (!TransactionIdIsValid(xid))
872         {
873                 oldSerXidControl->tailXid = InvalidTransactionId;
874                 oldSerXidControl->headXid = InvalidTransactionId;
875                 LWLockRelease(OldSerXidLock);
876                 return;
877         }
878
879         /*
880          * When we're recovering prepared transactions, the global xmin might move
881          * backwards depending on the order they're recovered. Normally that's not
882          * OK, but during recovery no serializable transactions will commit, so
883          * the SLRU is empty and we can get away with it.
884          */
885         if (RecoveryInProgress())
886         {
887                 Assert(oldSerXidControl->headPage < 0);
888                 if (!TransactionIdIsValid(oldSerXidControl->tailXid)
889                         || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
890                 {
891                         oldSerXidControl->tailXid = xid;
892                 }
893                 LWLockRelease(OldSerXidLock);
894                 return;
895         }
896
897         Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
898                    || TransactionIdFollows(xid, oldSerXidControl->tailXid));
899
900         oldSerXidControl->tailXid = xid;
901
902         LWLockRelease(OldSerXidLock);
903 }
904
905 /*
906  * Perform a checkpoint --- either during shutdown, or on-the-fly
907  *
908  * We don't have any data that needs to survive a restart, but this is a
909  * convenient place to truncate the SLRU.
910  */
911 void
912 CheckPointPredicate(void)
913 {
914         int                     tailPage;
915
916         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
917
918         /* Exit quickly if the SLRU is currently not in use. */
919         if (oldSerXidControl->headPage < 0)
920         {
921                 LWLockRelease(OldSerXidLock);
922                 return;
923         }
924
925         if (TransactionIdIsValid(oldSerXidControl->tailXid))
926         {
927                 /* We can truncate the SLRU up to the page containing tailXid */
928                 tailPage = OldSerXidPage(oldSerXidControl->tailXid);
929         }
930         else
931         {
932                 /*
933                  * The SLRU is no longer needed. Truncate everything.  If we try to
934                  * leave the head page around to avoid re-zeroing it, we might not use
935                  * the SLRU again until we're past the wrap-around point, which makes
936                  * SLRU unhappy.
937                  *
938                  * While the API asks you to specify truncation by page, it silently
939                  * ignores the request unless the specified page is in a segment past
940                  * some allocated portion of the SLRU.  We don't care which page in a
941                  * later segment we hit, so just add the number of pages per segment
942                  * to the head page to land us *somewhere* in the next segment.
943                  */
944                 tailPage = oldSerXidControl->headPage + SLRU_PAGES_PER_SEGMENT;
945                 oldSerXidControl->headPage = -1;
946         }
947
948         LWLockRelease(OldSerXidLock);
949
950         /* Truncate away pages that are no longer required */
951         SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
952
953         /*
954          * Flush dirty SLRU pages to disk
955          *
956          * This is not actually necessary from a correctness point of view. We do
957          * it merely as a debugging aid.
958          *
959          * We're doing this after the truncation to avoid writing pages right
960          * before deleting the file in which they sit, which would be completely
961          * pointless.
962          */
963         SimpleLruFlush(OldSerXidSlruCtl, true);
964 }
965
966 /*------------------------------------------------------------------------*/
967
968 /*
969  * InitPredicateLocks -- Initialize the predicate locking data structures.
970  *
971  * This is called from CreateSharedMemoryAndSemaphores(), which see for
972  * more comments.  In the normal postmaster case, the shared hash tables
973  * are created here.  Backends inherit the pointers
974  * to the shared tables via fork().  In the EXEC_BACKEND case, each
975  * backend re-executes this code to obtain pointers to the already existing
976  * shared hash tables.
977  */
978 void
979 InitPredicateLocks(void)
980 {
981         HASHCTL         info;
982         int                     hash_flags;
983         long            max_table_size;
984         Size            requestSize;
985         bool            found;
986
987         /*
988          * Compute size of predicate lock target hashtable. Note these
989          * calculations must agree with PredicateLockShmemSize!
990          */
991         max_table_size = NPREDICATELOCKTARGETENTS();
992
993         /*
994          * Allocate hash table for PREDICATELOCKTARGET structs.  This stores
995          * per-predicate-lock-target information.
996          */
997         MemSet(&info, 0, sizeof(info));
998         info.keysize = sizeof(PREDICATELOCKTARGETTAG);
999         info.entrysize = sizeof(PREDICATELOCKTARGET);
1000         info.hash = tag_hash;
1001         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1002         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
1003
1004         PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
1005                                                                                         max_table_size,
1006                                                                                         max_table_size,
1007                                                                                         &info,
1008                                                                                         hash_flags);
1009
1010         /* Assume an average of 2 xacts per target */
1011         max_table_size *= 2;
1012
1013         /*
1014          * Reserve a dummy entry in the hash table; we use it to make sure there's
1015          * always one entry available when we need to split or combine a page,
1016          * because running out of space there could mean aborting a
1017          * non-serializable transaction.
1018          */
1019         hash_search(PredicateLockTargetHash, &ScratchTargetTag, HASH_ENTER, NULL);
1020
1021         /*
1022          * Allocate hash table for PREDICATELOCK structs.  This stores per
1023          * xact-lock-of-a-target information.
1024          */
1025         MemSet(&info, 0, sizeof(info));
1026         info.keysize = sizeof(PREDICATELOCKTAG);
1027         info.entrysize = sizeof(PREDICATELOCK);
1028         info.hash = predicatelock_hash;
1029         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1030         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
1031
1032         PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
1033                                                                           max_table_size,
1034                                                                           max_table_size,
1035                                                                           &info,
1036                                                                           hash_flags);
1037
1038         /*
1039          * Compute size for serializable transaction hashtable. Note these
1040          * calculations must agree with PredicateLockShmemSize!
1041          */
1042         max_table_size = (MaxBackends + max_prepared_xacts);
1043
1044         /*
1045          * Allocate a list to hold information on transactions participating in
1046          * predicate locking.
1047          *
1048          * Assume an average of 10 predicate locking transactions per backend.
1049          * This allows aggressive cleanup while detail is present before data must
1050          * be summarized for storage in SLRU and the "dummy" transaction.
1051          */
1052         max_table_size *= 10;
1053
1054         PredXact = ShmemInitStruct("PredXactList",
1055                                                            PredXactListDataSize,
1056                                                            &found);
1057         if (!found)
1058         {
1059                 int                     i;
1060
1061                 SHMQueueInit(&PredXact->availableList);
1062                 SHMQueueInit(&PredXact->activeList);
1063                 PredXact->SxactGlobalXmin = InvalidTransactionId;
1064                 PredXact->SxactGlobalXminCount = 0;
1065                 PredXact->WritableSxactCount = 0;
1066                 PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
1067                 PredXact->CanPartialClearThrough = 0;
1068                 PredXact->HavePartialClearedThrough = 0;
1069                 requestSize = mul_size((Size) max_table_size,
1070                                                            PredXactListElementDataSize);
1071                 PredXact->element = ShmemAlloc(requestSize);
1072                 if (PredXact->element == NULL)
1073                         ereport(ERROR,
1074                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1075                          errmsg("not enough shared memory for elements of data structure"
1076                                         " \"%s\" (%lu bytes requested)",
1077                                         "PredXactList", (unsigned long) requestSize)));
1078                 /* Add all elements to available list, clean. */
1079                 memset(PredXact->element, 0, requestSize);
1080                 for (i = 0; i < max_table_size; i++)
1081                 {
1082                         SHMQueueInsertBefore(&(PredXact->availableList),
1083                                                                  &(PredXact->element[i].link));
1084                 }
1085                 PredXact->OldCommittedSxact = CreatePredXact();
1086                 SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
1087                 PredXact->OldCommittedSxact->commitSeqNo = 0;
1088                 PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
1089                 SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
1090                 SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
1091                 SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
1092                 SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
1093                 SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
1094                 PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
1095                 PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
1096                 PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
1097                 PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
1098                 PredXact->OldCommittedSxact->pid = 0;
1099         }
1100         /* This never changes, so let's keep a local copy. */
1101         OldCommittedSxact = PredXact->OldCommittedSxact;
1102
1103         /*
1104          * Allocate hash table for SERIALIZABLEXID structs.  This stores per-xid
1105          * information for serializable transactions which have accessed data.
1106          */
1107         MemSet(&info, 0, sizeof(info));
1108         info.keysize = sizeof(SERIALIZABLEXIDTAG);
1109         info.entrysize = sizeof(SERIALIZABLEXID);
1110         info.hash = tag_hash;
1111         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
1112
1113         SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
1114                                                                                 max_table_size,
1115                                                                                 max_table_size,
1116                                                                                 &info,
1117                                                                                 hash_flags);
1118
1119         /*
1120          * Allocate space for tracking rw-conflicts in lists attached to the
1121          * transactions.
1122          *
1123          * Assume an average of 5 conflicts per transaction.  Calculations suggest
1124          * that this will prevent resource exhaustion in even the most pessimal
1125          * loads up to max_connections = 200 with all 200 connections pounding the
1126          * database with serializable transactions.  Beyond that, there may be
1127          * occassional transactions canceled when trying to flag conflicts. That's
1128          * probably OK.
1129          */
1130         max_table_size *= 5;
1131
1132         RWConflictPool = ShmemInitStruct("RWConflictPool",
1133                                                                          RWConflictPoolHeaderDataSize,
1134                                                                          &found);
1135         if (!found)
1136         {
1137                 int                     i;
1138
1139                 SHMQueueInit(&RWConflictPool->availableList);
1140                 requestSize = mul_size((Size) max_table_size,
1141                                                            RWConflictDataSize);
1142                 RWConflictPool->element = ShmemAlloc(requestSize);
1143                 if (RWConflictPool->element == NULL)
1144                         ereport(ERROR,
1145                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1146                          errmsg("not enough shared memory for elements of data structure"
1147                                         " \"%s\" (%lu bytes requested)",
1148                                         "RWConflictPool", (unsigned long) requestSize)));
1149                 /* Add all elements to available list, clean. */
1150                 memset(RWConflictPool->element, 0, requestSize);
1151                 for (i = 0; i < max_table_size; i++)
1152                 {
1153                         SHMQueueInsertBefore(&(RWConflictPool->availableList),
1154                                                                  &(RWConflictPool->element[i].outLink));
1155                 }
1156         }
1157
1158         /*
1159          * Create or attach to the header for the list of finished serializable
1160          * transactions.
1161          */
1162         FinishedSerializableTransactions = (SHM_QUEUE *)
1163                 ShmemInitStruct("FinishedSerializableTransactions",
1164                                                 sizeof(SHM_QUEUE),
1165                                                 &found);
1166         if (!found)
1167                 SHMQueueInit(FinishedSerializableTransactions);
1168
1169         /*
1170          * Initialize the SLRU storage for old committed serializable
1171          * transactions.
1172          */
1173         OldSerXidInit();
1174
1175         /* Pre-calculate the hash and partition lock of the scratch entry */
1176         ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
1177         ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
1178 }
1179
1180 /*
1181  * Estimate shared-memory space used for predicate lock table
1182  */
1183 Size
1184 PredicateLockShmemSize(void)
1185 {
1186         Size            size = 0;
1187         long            max_table_size;
1188
1189         /* predicate lock target hash table */
1190         max_table_size = NPREDICATELOCKTARGETENTS();
1191         size = add_size(size, hash_estimate_size(max_table_size,
1192                                                                                          sizeof(PREDICATELOCKTARGET)));
1193
1194         /* predicate lock hash table */
1195         max_table_size *= 2;
1196         size = add_size(size, hash_estimate_size(max_table_size,
1197                                                                                          sizeof(PREDICATELOCK)));
1198
1199         /*
1200          * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
1201          * margin.
1202          */
1203         size = add_size(size, size / 10);
1204
1205         /* transaction list */
1206         max_table_size = MaxBackends + max_prepared_xacts;
1207         max_table_size *= 10;
1208         size = add_size(size, PredXactListDataSize);
1209         size = add_size(size, mul_size((Size) max_table_size,
1210                                                                    PredXactListElementDataSize));
1211
1212         /* transaction xid table */
1213         size = add_size(size, hash_estimate_size(max_table_size,
1214                                                                                          sizeof(SERIALIZABLEXID)));
1215
1216         /* rw-conflict pool */
1217         max_table_size *= 5;
1218         size = add_size(size, RWConflictPoolHeaderDataSize);
1219         size = add_size(size, mul_size((Size) max_table_size,
1220                                                                    RWConflictDataSize));
1221
1222         /* Head for list of finished serializable transactions. */
1223         size = add_size(size, sizeof(SHM_QUEUE));
1224
1225         /* Shared memory structures for SLRU tracking of old committed xids. */
1226         size = add_size(size, sizeof(OldSerXidControlData));
1227         size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
1228
1229         return size;
1230 }
1231
1232
1233 /*
1234  * Compute the hash code associated with a PREDICATELOCKTAG.
1235  *
1236  * Because we want to use just one set of partition locks for both the
1237  * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
1238  * that PREDICATELOCKs fall into the same partition number as their
1239  * associated PREDICATELOCKTARGETs.  dynahash.c expects the partition number
1240  * to be the low-order bits of the hash code, and therefore a
1241  * PREDICATELOCKTAG's hash code must have the same low-order bits as the
1242  * associated PREDICATELOCKTARGETTAG's hash code.  We achieve this with this
1243  * specialized hash function.
1244  */
1245 static uint32
1246 predicatelock_hash(const void *key, Size keysize)
1247 {
1248         const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
1249         uint32          targethash;
1250
1251         Assert(keysize == sizeof(PREDICATELOCKTAG));
1252
1253         /* Look into the associated target object, and compute its hash code */
1254         targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
1255
1256         return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
1257 }
1258
1259
1260 /*
1261  * GetPredicateLockStatusData
1262  *              Return a table containing the internal state of the predicate
1263  *              lock manager for use in pg_lock_status.
1264  *
1265  * Like GetLockStatusData, this function tries to hold the partition LWLocks
1266  * for as short a time as possible by returning two arrays that simply
1267  * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
1268  * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
1269  * SERIALIZABLEXACT will likely appear.
1270  */
1271 PredicateLockData *
1272 GetPredicateLockStatusData(void)
1273 {
1274         PredicateLockData *data;
1275         int                     i;
1276         int                     els,
1277                                 el;
1278         HASH_SEQ_STATUS seqstat;
1279         PREDICATELOCK *predlock;
1280
1281         data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
1282
1283         /*
1284          * To ensure consistency, take simultaneous locks on all partition locks
1285          * in ascending order, then SerializableXactHashLock.
1286          */
1287         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
1288                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
1289         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
1290
1291         /* Get number of locks and allocate appropriately-sized arrays. */
1292         els = hash_get_num_entries(PredicateLockHash);
1293         data->nelements = els;
1294         data->locktags = (PREDICATELOCKTARGETTAG *)
1295                 palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
1296         data->xacts = (SERIALIZABLEXACT *)
1297                 palloc(sizeof(SERIALIZABLEXACT) * els);
1298
1299
1300         /* Scan through PredicateLockHash and copy contents */
1301         hash_seq_init(&seqstat, PredicateLockHash);
1302
1303         el = 0;
1304
1305         while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
1306         {
1307                 data->locktags[el] = predlock->tag.myTarget->tag;
1308                 data->xacts[el] = *predlock->tag.myXact;
1309                 el++;
1310         }
1311
1312         Assert(el == els);
1313
1314         /* Release locks in reverse order */
1315         LWLockRelease(SerializableXactHashLock);
1316         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
1317                 LWLockRelease(FirstPredicateLockMgrLock + i);
1318
1319         return data;
1320 }
1321
1322 /*
1323  * Free up shared memory structures by pushing the oldest sxact (the one at
1324  * the front of the SummarizeOldestCommittedSxact queue) into summary form.
1325  * Each call will free exactly one SERIALIZABLEXACT structure and may also
1326  * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
1327  * PREDICATELOCKTARGET, RWConflictData.
1328  */
1329 static void
1330 SummarizeOldestCommittedSxact(void)
1331 {
1332         SERIALIZABLEXACT *sxact;
1333
1334         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
1335
1336         /*
1337          * This function is only called if there are no sxact slots available.
1338          * Some of them must belong to old, already-finished transactions, so
1339          * there should be something in FinishedSerializableTransactions list that
1340          * we can summarize. However, there's a race condition: while we were not
1341          * holding any locks, a transaction might have ended and cleaned up all
1342          * the finished sxact entries already, freeing up their sxact slots. In
1343          * that case, we have nothing to do here. The caller will find one of the
1344          * slots released by the other backend when it retries.
1345          */
1346         if (SHMQueueEmpty(FinishedSerializableTransactions))
1347         {
1348                 LWLockRelease(SerializableFinishedListLock);
1349                 return;
1350         }
1351
1352         /*
1353          * Grab the first sxact off the finished list -- this will be the earliest
1354          * commit.      Remove it from the list.
1355          */
1356         sxact = (SERIALIZABLEXACT *)
1357                 SHMQueueNext(FinishedSerializableTransactions,
1358                                          FinishedSerializableTransactions,
1359                                          offsetof(SERIALIZABLEXACT, finishedLink));
1360         SHMQueueDelete(&(sxact->finishedLink));
1361
1362         /* Add to SLRU summary information. */
1363         if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
1364                 OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
1365                    ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
1366
1367         /* Summarize and release the detail. */
1368         ReleaseOneSerializableXact(sxact, false, true);
1369
1370         LWLockRelease(SerializableFinishedListLock);
1371 }
1372
1373 /*
1374  * GetSafeSnapshot
1375  *              Obtain and register a snapshot for a READ ONLY DEFERRABLE
1376  *              transaction. Ensures that the snapshot is "safe", i.e. a
1377  *              read-only transaction running on it can execute serializably
1378  *              without further checks. This requires waiting for concurrent
1379  *              transactions to complete, and retrying with a new snapshot if
1380  *              one of them could possibly create a conflict.
1381  */
1382 static Snapshot
1383 GetSafeSnapshot(Snapshot origSnapshot)
1384 {
1385         Snapshot        snapshot;
1386
1387         Assert(XactReadOnly && XactDeferrable);
1388
1389         while (true)
1390         {
1391                 /*
1392                  * RegisterSerializableTransactionInt is going to call
1393                  * GetSnapshotData, so we need to provide it the static snapshot our
1394                  * caller passed to us. It returns a copy of that snapshot and
1395                  * registers it on TopTransactionResourceOwner.
1396                  */
1397                 snapshot = RegisterSerializableTransactionInt(origSnapshot);
1398
1399                 if (MySerializableXact == InvalidSerializableXact)
1400                         return snapshot;        /* no concurrent r/w xacts; it's safe */
1401
1402                 MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
1403
1404                 /*
1405                  * Wait for concurrent transactions to finish. Stop early if one of
1406                  * them marked us as conflicted.
1407                  */
1408                 while (!(SHMQueueEmpty((SHM_QUEUE *)
1409                                                          &MySerializableXact->possibleUnsafeConflicts) ||
1410                                  SxactIsROUnsafe(MySerializableXact)))
1411                         ProcWaitForSignal();
1412
1413                 MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
1414                 if (!SxactIsROUnsafe(MySerializableXact))
1415                         break;                          /* success */
1416
1417                 /* else, need to retry... */
1418                 ereport(DEBUG2,
1419                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1420                                  errmsg("deferrable snapshot was unsafe; trying a new one")));
1421                 ReleasePredicateLocks(false);
1422                 UnregisterSnapshotFromOwner(snapshot,
1423                                                                         TopTransactionResourceOwner);
1424         }
1425
1426         /*
1427          * Now we have a safe snapshot, so we don't need to do any further checks.
1428          */
1429         Assert(SxactIsROSafe(MySerializableXact));
1430         ReleasePredicateLocks(false);
1431
1432         return snapshot;
1433 }
1434
1435 /*
1436  * Acquire and register a snapshot which can be used for this transaction..
1437  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
1438  * It should be current for this process and be contained in PredXact.
1439  */
1440 Snapshot
1441 RegisterSerializableTransaction(Snapshot snapshot)
1442 {
1443         Assert(IsolationIsSerializable());
1444
1445         /*
1446          * A special optimization is available for SERIALIZABLE READ ONLY
1447          * DEFERRABLE transactions -- we can wait for a suitable snapshot and
1448          * thereby avoid all SSI overhead once it's running..
1449          */
1450         if (XactReadOnly && XactDeferrable)
1451                 return GetSafeSnapshot(snapshot);
1452
1453         return RegisterSerializableTransactionInt(snapshot);
1454 }
1455
1456 static Snapshot
1457 RegisterSerializableTransactionInt(Snapshot snapshot)
1458 {
1459         PGPROC     *proc;
1460         VirtualTransactionId vxid;
1461         SERIALIZABLEXACT *sxact,
1462                            *othersxact;
1463         HASHCTL         hash_ctl;
1464
1465         /* We only do this for serializable transactions.  Once. */
1466         Assert(MySerializableXact == InvalidSerializableXact);
1467
1468         Assert(!RecoveryInProgress());
1469
1470         proc = MyProc;
1471         Assert(proc != NULL);
1472         GET_VXID_FROM_PGPROC(vxid, *proc);
1473
1474         /*
1475          * First we get the sxact structure, which may involve looping and access
1476          * to the "finished" list to free a structure for use.
1477          */
1478 #ifdef TEST_OLDSERXID
1479         SummarizeOldestCommittedSxact();
1480 #endif
1481         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1482         do
1483         {
1484                 sxact = CreatePredXact();
1485                 /* If null, push out committed sxact to SLRU summary & retry. */
1486                 if (!sxact)
1487                 {
1488                         LWLockRelease(SerializableXactHashLock);
1489                         SummarizeOldestCommittedSxact();
1490                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1491                 }
1492         } while (!sxact);
1493
1494         /* Get and register a snapshot */
1495         snapshot = GetSnapshotData(snapshot);
1496         snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
1497
1498         /*
1499          * If there are no serializable transactions which are not read-only, we
1500          * can "opt out" of predicate locking and conflict checking for a
1501          * read-only transaction.
1502          *
1503          * The reason this is safe is that a read-only transaction can only become
1504          * part of a dangerous structure if it overlaps a writable transaction
1505          * which in turn overlaps a writable transaction which committed before
1506          * the read-only transaction started.  A new writable transaction can
1507          * overlap this one, but it can't meet the other condition of overlapping
1508          * a transaction which committed before this one started.
1509          */
1510         if (XactReadOnly && PredXact->WritableSxactCount == 0)
1511         {
1512                 ReleasePredXact(sxact);
1513                 LWLockRelease(SerializableXactHashLock);
1514                 return snapshot;
1515         }
1516
1517         /* Maintain serializable global xmin info. */
1518         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
1519         {
1520                 Assert(PredXact->SxactGlobalXminCount == 0);
1521                 PredXact->SxactGlobalXmin = snapshot->xmin;
1522                 PredXact->SxactGlobalXminCount = 1;
1523                 OldSerXidSetActiveSerXmin(snapshot->xmin);
1524         }
1525         else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
1526         {
1527                 Assert(PredXact->SxactGlobalXminCount > 0);
1528                 PredXact->SxactGlobalXminCount++;
1529         }
1530         else
1531         {
1532                 Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
1533         }
1534
1535         /* Initialize the structure. */
1536         sxact->vxid = vxid;
1537         sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
1538         sxact->commitSeqNo = InvalidSerCommitSeqNo;
1539         SHMQueueInit(&(sxact->outConflicts));
1540         SHMQueueInit(&(sxact->inConflicts));
1541         SHMQueueInit(&(sxact->possibleUnsafeConflicts));
1542         sxact->topXid = GetTopTransactionIdIfAny();
1543         sxact->finishedBefore = InvalidTransactionId;
1544         sxact->xmin = snapshot->xmin;
1545         sxact->pid = MyProcPid;
1546         SHMQueueInit(&(sxact->predicateLocks));
1547         SHMQueueElemInit(&(sxact->finishedLink));
1548         sxact->flags = 0;
1549         if (XactReadOnly)
1550         {
1551                 sxact->flags |= SXACT_FLAG_READ_ONLY;
1552
1553                 /*
1554                  * Register all concurrent r/w transactions as possible conflicts; if
1555                  * all of them commit without any outgoing conflicts to earlier
1556                  * transactions then this snapshot can be deemed safe (and we can run
1557                  * without tracking predicate locks).
1558                  */
1559                 for (othersxact = FirstPredXact();
1560                          othersxact != NULL;
1561                          othersxact = NextPredXact(othersxact))
1562                 {
1563                         if (!SxactIsOnFinishedList(othersxact) &&
1564                                 !SxactIsReadOnly(othersxact))
1565                         {
1566                                 SetPossibleUnsafeConflict(sxact, othersxact);
1567                         }
1568                 }
1569         }
1570         else
1571         {
1572                 ++(PredXact->WritableSxactCount);
1573                 Assert(PredXact->WritableSxactCount <=
1574                            (MaxBackends + max_prepared_xacts));
1575         }
1576
1577         MySerializableXact = sxact;
1578
1579         LWLockRelease(SerializableXactHashLock);
1580
1581         /* Initialize the backend-local hash table of parent locks */
1582         Assert(LocalPredicateLockHash == NULL);
1583         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
1584         hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
1585         hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
1586         hash_ctl.hash = tag_hash;
1587         LocalPredicateLockHash = hash_create("Local predicate lock",
1588                                                                                  max_predicate_locks_per_xact,
1589                                                                                  &hash_ctl,
1590                                                                                  HASH_ELEM | HASH_FUNCTION);
1591
1592         return snapshot;
1593 }
1594
1595 /*
1596  * Register the top level XID in SerializableXidHash.
1597  * Also store it for easy reference in MySerializableXact.
1598  */
1599 void
1600 RegisterPredicateLockingXid(const TransactionId xid)
1601 {
1602         SERIALIZABLEXIDTAG sxidtag;
1603         SERIALIZABLEXID *sxid;
1604         bool            found;
1605
1606         /*
1607          * If we're not tracking predicate lock data for this transaction, we
1608          * should ignore the request and return quickly.
1609          */
1610         if (MySerializableXact == InvalidSerializableXact)
1611                 return;
1612
1613         /* This should only be done once per transaction. */
1614         Assert(MySerializableXact->topXid == InvalidTransactionId);
1615
1616         /* We should have a valid XID and be at the top level. */
1617         Assert(TransactionIdIsValid(xid));
1618
1619         MySerializableXact->topXid = xid;
1620
1621         sxidtag.xid = xid;
1622         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1623         sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
1624                                                                                    &sxidtag,
1625                                                                                    HASH_ENTER, &found);
1626         Assert(sxid != NULL);
1627         Assert(!found);
1628
1629         /* Initialize the structure. */
1630         sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
1631         LWLockRelease(SerializableXactHashLock);
1632 }
1633
1634
1635 /*
1636  * Check whether there are any predicate locks held by any transaction
1637  * for the page at the given block number.
1638  *
1639  * Note that the transaction may be completed but not yet subject to
1640  * cleanup due to overlapping serializable transactions.  This must
1641  * return valid information regardless of transaction isolation level.
1642  *
1643  * Also note that this doesn't check for a conflicting relation lock,
1644  * just a lock specifically on the given page.
1645  *
1646  * One use is to support proper behavior during GiST index vacuum.
1647  */
1648 bool
1649 PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
1650 {
1651         PREDICATELOCKTARGETTAG targettag;
1652         uint32          targettaghash;
1653         LWLockId        partitionLock;
1654         PREDICATELOCKTARGET *target;
1655
1656         SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
1657                                                                         relation->rd_node.dbNode,
1658                                                                         relation->rd_id,
1659                                                                         blkno);
1660
1661         targettaghash = PredicateLockTargetTagHashCode(&targettag);
1662         partitionLock = PredicateLockHashPartitionLock(targettaghash);
1663         LWLockAcquire(partitionLock, LW_SHARED);
1664         target = (PREDICATELOCKTARGET *)
1665                 hash_search_with_hash_value(PredicateLockTargetHash,
1666                                                                         &targettag, targettaghash,
1667                                                                         HASH_FIND, NULL);
1668         LWLockRelease(partitionLock);
1669
1670         return (target != NULL);
1671 }
1672
1673
1674 /*
1675  * Check whether a particular lock is held by this transaction.
1676  *
1677  * Important note: this function may return false even if the lock is
1678  * being held, because it uses the local lock table which is not
1679  * updated if another transaction modifies our lock list (e.g. to
1680  * split an index page). It can also return true when a coarser
1681  * granularity lock that covers this target is being held. Be careful
1682  * to only use this function in circumstances where such errors are
1683  * acceptable!
1684  */
1685 static bool
1686 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
1687 {
1688         LOCALPREDICATELOCK *lock;
1689
1690         /* check local hash table */
1691         lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1692                                                                                           targettag,
1693                                                                                           HASH_FIND, NULL);
1694
1695         if (!lock)
1696                 return false;
1697
1698         /*
1699          * Found entry in the table, but still need to check whether it's actually
1700          * held -- it could just be a parent of some held lock.
1701          */
1702         return lock->held;
1703 }
1704
1705 /*
1706  * Return the parent lock tag in the lock hierarchy: the next coarser
1707  * lock that covers the provided tag.
1708  *
1709  * Returns true and sets *parent to the parent tag if one exists,
1710  * returns false if none exists.
1711  */
1712 static bool
1713 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
1714                                                   PREDICATELOCKTARGETTAG *parent)
1715 {
1716         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
1717         {
1718                 case PREDLOCKTAG_RELATION:
1719                         /* relation locks have no parent lock */
1720                         return false;
1721
1722                 case PREDLOCKTAG_PAGE:
1723                         /* parent lock is relation lock */
1724                         SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
1725                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1726                                                                   GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
1727
1728                         return true;
1729
1730                 case PREDLOCKTAG_TUPLE:
1731                         /* parent lock is page lock */
1732                         SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
1733                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1734                                                                    GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
1735                                                                           GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
1736                         return true;
1737         }
1738
1739         /* not reachable */
1740         Assert(false);
1741         return false;
1742 }
1743
1744 /*
1745  * Check whether the lock we are considering is already covered by a
1746  * coarser lock for our transaction.
1747  *
1748  * Like PredicateLockExists, this function might return a false
1749  * negative, but it will never return a false positive.
1750  */
1751 static bool
1752 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
1753 {
1754         PREDICATELOCKTARGETTAG targettag,
1755                                 parenttag;
1756
1757         targettag = *newtargettag;
1758
1759         /* check parents iteratively until no more */
1760         while (GetParentPredicateLockTag(&targettag, &parenttag))
1761         {
1762                 targettag = parenttag;
1763                 if (PredicateLockExists(&targettag))
1764                         return true;
1765         }
1766
1767         /* no more parents to check; lock is not covered */
1768         return false;
1769 }
1770
1771 /*
1772  * Remove the dummy entry from the predicate lock target hash, to free up some
1773  * scratch space. The caller must be holding SerializablePredicateLockListLock,
1774  * and must restore the entry with RestoreScratchTarget() before releasing the
1775  * lock.
1776  *
1777  * If lockheld is true, the caller is already holding the partition lock
1778  * of the partition containing the scratch entry.
1779  */
1780 static void
1781 RemoveScratchTarget(bool lockheld)
1782 {
1783         bool            found;
1784
1785         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1786
1787         if (!lockheld)
1788                 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
1789         hash_search_with_hash_value(PredicateLockTargetHash,
1790                                                                 &ScratchTargetTag,
1791                                                                 ScratchTargetTagHash,
1792                                                                 HASH_REMOVE, &found);
1793         Assert(found);
1794         if (!lockheld)
1795                 LWLockRelease(ScratchPartitionLock);
1796 }
1797
1798 /*
1799  * Re-insert the dummy entry in predicate lock target hash.
1800  */
1801 static void
1802 RestoreScratchTarget(bool lockheld)
1803 {
1804         bool            found;
1805
1806         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1807
1808         if (!lockheld)
1809                 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
1810         hash_search_with_hash_value(PredicateLockTargetHash,
1811                                                                 &ScratchTargetTag,
1812                                                                 ScratchTargetTagHash,
1813                                                                 HASH_ENTER, &found);
1814         Assert(!found);
1815         if (!lockheld)
1816                 LWLockRelease(ScratchPartitionLock);
1817 }
1818
1819 /*
1820  * Check whether the list of related predicate locks is empty for a
1821  * predicate lock target, and remove the target if it is.
1822  */
1823 static void
1824 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
1825 {
1826         PREDICATELOCKTARGET *rmtarget;
1827
1828         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1829
1830         /* Can't remove it until no locks at this target. */
1831         if (!SHMQueueEmpty(&target->predicateLocks))
1832                 return;
1833
1834         /* Actually remove the target. */
1835         rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
1836                                                                                    &target->tag,
1837                                                                                    targettaghash,
1838                                                                                    HASH_REMOVE, NULL);
1839         Assert(rmtarget == target);
1840 }
1841
1842 /*
1843  * Delete child target locks owned by this process.
1844  * This implementation is assuming that the usage of each target tag field
1845  * is uniform.  No need to make this hard if we don't have to.
1846  *
1847  * We aren't acquiring lightweight locks for the predicate lock or lock
1848  * target structures associated with this transaction unless we're going
1849  * to modify them, because no other process is permitted to modify our
1850  * locks.
1851  */
1852 static void
1853 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
1854 {
1855         SERIALIZABLEXACT *sxact;
1856         PREDICATELOCK *predlock;
1857
1858         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
1859         sxact = (SERIALIZABLEXACT *) MySerializableXact;
1860         predlock = (PREDICATELOCK *)
1861                 SHMQueueNext(&(sxact->predicateLocks),
1862                                          &(sxact->predicateLocks),
1863                                          offsetof(PREDICATELOCK, xactLink));
1864         while (predlock)
1865         {
1866                 SHM_QUEUE  *predlocksxactlink;
1867                 PREDICATELOCK *nextpredlock;
1868                 PREDICATELOCKTAG oldlocktag;
1869                 PREDICATELOCKTARGET *oldtarget;
1870                 PREDICATELOCKTARGETTAG oldtargettag;
1871
1872                 predlocksxactlink = &(predlock->xactLink);
1873                 nextpredlock = (PREDICATELOCK *)
1874                         SHMQueueNext(&(sxact->predicateLocks),
1875                                                  predlocksxactlink,
1876                                                  offsetof(PREDICATELOCK, xactLink));
1877
1878                 oldlocktag = predlock->tag;
1879                 Assert(oldlocktag.myXact == sxact);
1880                 oldtarget = oldlocktag.myTarget;
1881                 oldtargettag = oldtarget->tag;
1882
1883                 if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
1884                 {
1885                         uint32          oldtargettaghash;
1886                         LWLockId        partitionLock;
1887                         PREDICATELOCK *rmpredlock;
1888
1889                         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
1890                         partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
1891
1892                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1893
1894                         SHMQueueDelete(predlocksxactlink);
1895                         SHMQueueDelete(&(predlock->targetLink));
1896                         rmpredlock = hash_search_with_hash_value
1897                                 (PredicateLockHash,
1898                                  &oldlocktag,
1899                                  PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
1900                                                                                                                  oldtargettaghash),
1901                                  HASH_REMOVE, NULL);
1902                         Assert(rmpredlock == predlock);
1903
1904                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
1905
1906                         LWLockRelease(partitionLock);
1907
1908                         DecrementParentLocks(&oldtargettag);
1909                 }
1910
1911                 predlock = nextpredlock;
1912         }
1913         LWLockRelease(SerializablePredicateLockListLock);
1914 }
1915
1916 /*
1917  * Returns the promotion threshold for a given predicate lock
1918  * target. This is the number of descendant locks required to promote
1919  * to the specified tag. Note that the threshold includes non-direct
1920  * descendants, e.g. both tuples and pages for a relation lock.
1921  *
1922  * TODO SSI: We should do something more intelligent about what the
1923  * thresholds are, either making it proportional to the number of
1924  * tuples in a page & pages in a relation, or at least making it a
1925  * GUC. Currently the threshold is 3 for a page lock, and
1926  * max_pred_locks_per_transaction/2 for a relation lock, chosen
1927  * entirely arbitrarily (and without benchmarking).
1928  */
1929 static int
1930 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
1931 {
1932         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
1933         {
1934                 case PREDLOCKTAG_RELATION:
1935                         return max_predicate_locks_per_xact / 2;
1936
1937                 case PREDLOCKTAG_PAGE:
1938                         return 3;
1939
1940                 case PREDLOCKTAG_TUPLE:
1941
1942                         /*
1943                          * not reachable: nothing is finer-granularity than a tuple, so we
1944                          * should never try to promote to it.
1945                          */
1946                         Assert(false);
1947                         return 0;
1948         }
1949
1950         /* not reachable */
1951         Assert(false);
1952         return 0;
1953 }
1954
1955 /*
1956  * For all ancestors of a newly-acquired predicate lock, increment
1957  * their child count in the parent hash table. If any of them have
1958  * more descendants than their promotion threshold, acquire the
1959  * coarsest such lock.
1960  *
1961  * Returns true if a parent lock was acquired and false otherwise.
1962  */
1963 static bool
1964 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
1965 {
1966         PREDICATELOCKTARGETTAG targettag,
1967                                 nexttag,
1968                                 promotiontag;
1969         LOCALPREDICATELOCK *parentlock;
1970         bool            found,
1971                                 promote;
1972
1973         promote = false;
1974
1975         targettag = *reqtag;
1976
1977         /* check parents iteratively */
1978         while (GetParentPredicateLockTag(&targettag, &nexttag))
1979         {
1980                 targettag = nexttag;
1981                 parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1982                                                                                                                 &targettag,
1983                                                                                                                 HASH_ENTER,
1984                                                                                                                 &found);
1985                 if (!found)
1986                 {
1987                         parentlock->held = false;
1988                         parentlock->childLocks = 1;
1989                 }
1990                 else
1991                         parentlock->childLocks++;
1992
1993                 if (parentlock->childLocks >=
1994                         PredicateLockPromotionThreshold(&targettag))
1995                 {
1996                         /*
1997                          * We should promote to this parent lock. Continue to check its
1998                          * ancestors, however, both to get their child counts right and to
1999                          * check whether we should just go ahead and promote to one of
2000                          * them.
2001                          */
2002                         promotiontag = targettag;
2003                         promote = true;
2004                 }
2005         }
2006
2007         if (promote)
2008         {
2009                 /* acquire coarsest ancestor eligible for promotion */
2010                 PredicateLockAcquire(&promotiontag);
2011                 return true;
2012         }
2013         else
2014                 return false;
2015 }
2016
2017 /*
2018  * When releasing a lock, decrement the child count on all ancestor
2019  * locks.
2020  *
2021  * This is called only when releasing a lock via
2022  * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
2023  * we've acquired its parent, possibly due to promotion) or when a new
2024  * MVCC write lock makes the predicate lock unnecessary. There's no
2025  * point in calling it when locks are released at transaction end, as
2026  * this information is no longer needed.
2027  */
2028 static void
2029 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
2030 {
2031         PREDICATELOCKTARGETTAG parenttag,
2032                                 nexttag;
2033
2034         parenttag = *targettag;
2035
2036         while (GetParentPredicateLockTag(&parenttag, &nexttag))
2037         {
2038                 uint32          targettaghash;
2039                 LOCALPREDICATELOCK *parentlock,
2040                                    *rmlock;
2041
2042                 parenttag = nexttag;
2043                 targettaghash = PredicateLockTargetTagHashCode(&parenttag);
2044                 parentlock = (LOCALPREDICATELOCK *)
2045                         hash_search_with_hash_value(LocalPredicateLockHash,
2046                                                                                 &parenttag, targettaghash,
2047                                                                                 HASH_FIND, NULL);
2048
2049                 /*
2050                  * There's a small chance the parent lock doesn't exist in the lock
2051                  * table. This can happen if we prematurely removed it because an
2052                  * index split caused the child refcount to be off.
2053                  */
2054                 if (parentlock == NULL)
2055                         continue;
2056
2057                 parentlock->childLocks--;
2058
2059                 /*
2060                  * Under similar circumstances the parent lock's refcount might be
2061                  * zero. This only happens if we're holding that lock (otherwise we
2062                  * would have removed the entry).
2063                  */
2064                 if (parentlock->childLocks < 0)
2065                 {
2066                         Assert(parentlock->held);
2067                         parentlock->childLocks = 0;
2068                 }
2069
2070                 if ((parentlock->childLocks == 0) && (!parentlock->held))
2071                 {
2072                         rmlock = (LOCALPREDICATELOCK *)
2073                                 hash_search_with_hash_value(LocalPredicateLockHash,
2074                                                                                         &parenttag, targettaghash,
2075                                                                                         HASH_REMOVE, NULL);
2076                         Assert(rmlock == parentlock);
2077                 }
2078         }
2079 }
2080
2081 /*
2082  * Indicate that a predicate lock on the given target is held by the
2083  * specified transaction. Has no effect if the lock is already held.
2084  *
2085  * This updates the lock table and the sxact's lock list, and creates
2086  * the lock target if necessary, but does *not* do anything related to
2087  * granularity promotion or the local lock table. See
2088  * PredicateLockAcquire for that.
2089  */
2090 static void
2091 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
2092                                         uint32 targettaghash,
2093                                         SERIALIZABLEXACT *sxact)
2094 {
2095         PREDICATELOCKTARGET *target;
2096         PREDICATELOCKTAG locktag;
2097         PREDICATELOCK *lock;
2098         LWLockId        partitionLock;
2099         bool            found;
2100
2101         partitionLock = PredicateLockHashPartitionLock(targettaghash);
2102
2103         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2104         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2105
2106         /* Make sure that the target is represented. */
2107         target = (PREDICATELOCKTARGET *)
2108                 hash_search_with_hash_value(PredicateLockTargetHash,
2109                                                                         targettag, targettaghash,
2110                                                                         HASH_ENTER_NULL, &found);
2111         if (!target)
2112                 ereport(ERROR,
2113                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2114                                  errmsg("out of shared memory"),
2115                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2116         if (!found)
2117                 SHMQueueInit(&(target->predicateLocks));
2118
2119         /* We've got the sxact and target, make sure they're joined. */
2120         locktag.myTarget = target;
2121         locktag.myXact = sxact;
2122         lock = (PREDICATELOCK *)
2123                 hash_search_with_hash_value(PredicateLockHash, &locktag,
2124                         PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
2125                                                                         HASH_ENTER_NULL, &found);
2126         if (!lock)
2127                 ereport(ERROR,
2128                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2129                                  errmsg("out of shared memory"),
2130                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2131
2132         if (!found)
2133         {
2134                 SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
2135                 SHMQueueInsertBefore(&(sxact->predicateLocks),
2136                                                          &(lock->xactLink));
2137                 lock->commitSeqNo = InvalidSerCommitSeqNo;
2138         }
2139
2140         LWLockRelease(partitionLock);
2141         LWLockRelease(SerializablePredicateLockListLock);
2142 }
2143
2144 /*
2145  * Acquire a predicate lock on the specified target for the current
2146  * connection if not already held. This updates the local lock table
2147  * and uses it to implement granularity promotion. It will consolidate
2148  * multiple locks into a coarser lock if warranted, and will release
2149  * any finer-grained locks covered by the new one.
2150  */
2151 static void
2152 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
2153 {
2154         uint32          targettaghash;
2155         bool            found;
2156         LOCALPREDICATELOCK *locallock;
2157
2158         /* Do we have the lock already, or a covering lock? */
2159         if (PredicateLockExists(targettag))
2160                 return;
2161
2162         if (CoarserLockCovers(targettag))
2163                 return;
2164
2165         /* the same hash and LW lock apply to the lock target and the local lock. */
2166         targettaghash = PredicateLockTargetTagHashCode(targettag);
2167
2168         /* Acquire lock in local table */
2169         locallock = (LOCALPREDICATELOCK *)
2170                 hash_search_with_hash_value(LocalPredicateLockHash,
2171                                                                         targettag, targettaghash,
2172                                                                         HASH_ENTER, &found);
2173         locallock->held = true;
2174         if (!found)
2175                 locallock->childLocks = 0;
2176
2177         /* Actually create the lock */
2178         CreatePredicateLock(targettag, targettaghash,
2179                                                 (SERIALIZABLEXACT *) MySerializableXact);
2180
2181         /*
2182          * Lock has been acquired. Check whether it should be promoted to a
2183          * coarser granularity, or whether there are finer-granularity locks to
2184          * clean up.
2185          */
2186         if (CheckAndPromotePredicateLockRequest(targettag))
2187         {
2188                 /*
2189                  * Lock request was promoted to a coarser-granularity lock, and that
2190                  * lock was acquired. It will delete this lock and any of its
2191                  * children, so we're done.
2192                  */
2193         }
2194         else
2195         {
2196                 /* Clean up any finer-granularity locks */
2197                 if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
2198                         DeleteChildTargetLocks(targettag);
2199         }
2200 }
2201
2202
2203 /*
2204  *              PredicateLockRelation
2205  *
2206  * Gets a predicate lock at the relation level.
2207  * Skip if not in full serializable transaction isolation level.
2208  * Skip if this is a temporary table.
2209  * Clear any finer-grained predicate locks this session has on the relation.
2210  */
2211 void
2212 PredicateLockRelation(const Relation relation)
2213 {
2214         PREDICATELOCKTARGETTAG tag;
2215
2216         if (SkipSerialization(relation))
2217                 return;
2218
2219         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2220                                                                                 relation->rd_node.dbNode,
2221                                                                                 relation->rd_id);
2222         PredicateLockAcquire(&tag);
2223 }
2224
2225 /*
2226  *              PredicateLockPage
2227  *
2228  * Gets a predicate lock at the page level.
2229  * Skip if not in full serializable transaction isolation level.
2230  * Skip if this is a temporary table.
2231  * Skip if a coarser predicate lock already covers this page.
2232  * Clear any finer-grained predicate locks this session has on the relation.
2233  */
2234 void
2235 PredicateLockPage(const Relation relation, const BlockNumber blkno)
2236 {
2237         PREDICATELOCKTARGETTAG tag;
2238
2239         if (SkipSerialization(relation))
2240                 return;
2241
2242         SET_PREDICATELOCKTARGETTAG_PAGE(tag,
2243                                                                         relation->rd_node.dbNode,
2244                                                                         relation->rd_id,
2245                                                                         blkno);
2246         PredicateLockAcquire(&tag);
2247 }
2248
2249 /*
2250  *              PredicateLockTuple
2251  *
2252  * Gets a predicate lock at the tuple level.
2253  * Skip if not in full serializable transaction isolation level.
2254  * Skip if this is a temporary table.
2255  */
2256 void
2257 PredicateLockTuple(const Relation relation, const HeapTuple tuple)
2258 {
2259         PREDICATELOCKTARGETTAG tag;
2260         ItemPointer tid;
2261         TransactionId targetxmin;
2262
2263         if (SkipSerialization(relation))
2264                 return;
2265
2266         /*
2267          * If it's a heap tuple, return if this xact wrote it.
2268          */
2269         if (relation->rd_index == NULL)
2270         {
2271                 TransactionId myxid;
2272
2273                 targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
2274
2275                 myxid = GetTopTransactionIdIfAny();
2276                 if (TransactionIdIsValid(myxid))
2277                 {
2278                         if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
2279                         {
2280                                 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
2281
2282                                 if (TransactionIdEquals(xid, myxid))
2283                                 {
2284                                         /* We wrote it; we already have a write lock. */
2285                                         return;
2286                                 }
2287                         }
2288                 }
2289         }
2290         else
2291                 targetxmin = InvalidTransactionId;
2292
2293         /*
2294          * Do quick-but-not-definitive test for a relation lock first.  This will
2295          * never cause a return when the relation is *not* locked, but will
2296          * occasionally let the check continue when there really *is* a relation
2297          * level lock.
2298          */
2299         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2300                                                                                 relation->rd_node.dbNode,
2301                                                                                 relation->rd_id);
2302         if (PredicateLockExists(&tag))
2303                 return;
2304
2305         tid = &(tuple->t_self);
2306         SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
2307                                                                          relation->rd_node.dbNode,
2308                                                                          relation->rd_id,
2309                                                                          ItemPointerGetBlockNumber(tid),
2310                                                                          ItemPointerGetOffsetNumber(tid),
2311                                                                          targetxmin);
2312         PredicateLockAcquire(&tag);
2313 }
2314
2315
2316 /*
2317  *              DeleteLockTarget
2318  *
2319  * Remove a predicate lock target along with any locks held for it.
2320  *
2321  * Caller must hold SerializablePredicateLockListLock and the
2322  * appropriate hash partition lock for the target.
2323  */
2324 static void
2325 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
2326 {
2327         PREDICATELOCK *predlock;
2328         SHM_QUEUE  *predlocktargetlink;
2329         PREDICATELOCK *nextpredlock;
2330         bool            found;
2331
2332         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2333         Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
2334
2335         predlock = (PREDICATELOCK *)
2336                 SHMQueueNext(&(target->predicateLocks),
2337                                          &(target->predicateLocks),
2338                                          offsetof(PREDICATELOCK, targetLink));
2339         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2340         while (predlock)
2341         {
2342                 predlocktargetlink = &(predlock->targetLink);
2343                 nextpredlock = (PREDICATELOCK *)
2344                         SHMQueueNext(&(target->predicateLocks),
2345                                                  predlocktargetlink,
2346                                                  offsetof(PREDICATELOCK, targetLink));
2347
2348                 SHMQueueDelete(&(predlock->xactLink));
2349                 SHMQueueDelete(&(predlock->targetLink));
2350
2351                 hash_search_with_hash_value
2352                         (PredicateLockHash,
2353                          &predlock->tag,
2354                          PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
2355                                                                                                          targettaghash),
2356                          HASH_REMOVE, &found);
2357                 Assert(found);
2358
2359                 predlock = nextpredlock;
2360         }
2361         LWLockRelease(SerializableXactHashLock);
2362
2363         /* Remove the target itself, if possible. */
2364         RemoveTargetIfNoLongerUsed(target, targettaghash);
2365 }
2366
2367
2368 /*
2369  *              TransferPredicateLocksToNewTarget
2370  *
2371  * Move or copy all the predicate locks for a lock target, for use by
2372  * index page splits/combines and other things that create or replace
2373  * lock targets. If 'removeOld' is true, the old locks and the target
2374  * will be removed.
2375  *
2376  * Returns true on success, or false if we ran out of shared memory to
2377  * allocate the new target or locks. Guaranteed to always succeed if
2378  * removeOld is set (by using the scratch entry in PredicateLockTargetHash
2379  * for scratch space).
2380  *
2381  * Warning: the "removeOld" option should be used only with care,
2382  * because this function does not (indeed, can not) update other
2383  * backends' LocalPredicateLockHash. If we are only adding new
2384  * entries, this is not a problem: the local lock table is used only
2385  * as a hint, so missing entries for locks that are held are
2386  * OK. Having entries for locks that are no longer held, as can happen
2387  * when using "removeOld", is not in general OK. We can only use it
2388  * safely when replacing a lock with a coarser-granularity lock that
2389  * covers it, or if we are absolutely certain that no one will need to
2390  * refer to that lock in the future.
2391  *
2392  * Caller must hold SerializablePredicateLockListLock.
2393  */
2394 static bool
2395 TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
2396                                                                   const PREDICATELOCKTARGETTAG newtargettag,
2397                                                                   bool removeOld)
2398 {
2399         uint32          oldtargettaghash;
2400         LWLockId        oldpartitionLock;
2401         PREDICATELOCKTARGET *oldtarget;
2402         uint32          newtargettaghash;
2403         LWLockId        newpartitionLock;
2404         bool            found;
2405         bool            outOfShmem = false;
2406
2407         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2408
2409         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2410         newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
2411         oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2412         newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
2413
2414         if (removeOld)
2415         {
2416                 /*
2417                  * Remove the dummy entry to give us scratch space, so we know we'll
2418                  * be able to create the new lock target.
2419                  */
2420                 RemoveScratchTarget(false);
2421         }
2422
2423         /*
2424          * We must get the partition locks in ascending sequence to avoid
2425          * deadlocks. If old and new partitions are the same, we must request the
2426          * lock only once.
2427          */
2428         if (oldpartitionLock < newpartitionLock)
2429         {
2430                 LWLockAcquire(oldpartitionLock,
2431                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2432                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2433         }
2434         else if (oldpartitionLock > newpartitionLock)
2435         {
2436                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2437                 LWLockAcquire(oldpartitionLock,
2438                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2439         }
2440         else
2441                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2442
2443         /*
2444          * Look for the old target.  If not found, that's OK; no predicate locks
2445          * are affected, so we can just clean up and return. If it does exist,
2446          * walk its list of predicate locks and move or copy them to the new
2447          * target.
2448          */
2449         oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2450                                                                                         &oldtargettag,
2451                                                                                         oldtargettaghash,
2452                                                                                         HASH_FIND, NULL);
2453
2454         if (oldtarget)
2455         {
2456                 PREDICATELOCKTARGET *newtarget;
2457                 PREDICATELOCK *oldpredlock;
2458                 PREDICATELOCKTAG newpredlocktag;
2459
2460                 newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2461                                                                                                 &newtargettag,
2462                                                                                                 newtargettaghash,
2463                                                                                                 HASH_ENTER_NULL, &found);
2464
2465                 if (!newtarget)
2466                 {
2467                         /* Failed to allocate due to insufficient shmem */
2468                         outOfShmem = true;
2469                         goto exit;
2470                 }
2471
2472                 /* If we created a new entry, initialize it */
2473                 if (!found)
2474                         SHMQueueInit(&(newtarget->predicateLocks));
2475
2476                 newpredlocktag.myTarget = newtarget;
2477
2478                 /*
2479                  * Loop through all the locks on the old target, replacing them with
2480                  * locks on the new target.
2481                  */
2482                 oldpredlock = (PREDICATELOCK *)
2483                         SHMQueueNext(&(oldtarget->predicateLocks),
2484                                                  &(oldtarget->predicateLocks),
2485                                                  offsetof(PREDICATELOCK, targetLink));
2486                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2487                 while (oldpredlock)
2488                 {
2489                         SHM_QUEUE  *predlocktargetlink;
2490                         PREDICATELOCK *nextpredlock;
2491                         PREDICATELOCK *newpredlock;
2492                         SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
2493
2494                         predlocktargetlink = &(oldpredlock->targetLink);
2495                         nextpredlock = (PREDICATELOCK *)
2496                                 SHMQueueNext(&(oldtarget->predicateLocks),
2497                                                          predlocktargetlink,
2498                                                          offsetof(PREDICATELOCK, targetLink));
2499                         newpredlocktag.myXact = oldpredlock->tag.myXact;
2500
2501                         if (removeOld)
2502                         {
2503                                 SHMQueueDelete(&(oldpredlock->xactLink));
2504                                 SHMQueueDelete(&(oldpredlock->targetLink));
2505
2506                                 hash_search_with_hash_value
2507                                         (PredicateLockHash,
2508                                          &oldpredlock->tag,
2509                                    PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
2510                                                                                                                    oldtargettaghash),
2511                                          HASH_REMOVE, &found);
2512                                 Assert(found);
2513                         }
2514
2515
2516                         newpredlock = (PREDICATELOCK *)
2517                                 hash_search_with_hash_value
2518                                 (PredicateLockHash,
2519                                  &newpredlocktag,
2520                                  PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2521                                                                                                                  newtargettaghash),
2522                                  HASH_ENTER_NULL, &found);
2523                         if (!newpredlock)
2524                         {
2525                                 /* Out of shared memory. Undo what we've done so far. */
2526                                 LWLockRelease(SerializableXactHashLock);
2527                                 DeleteLockTarget(newtarget, newtargettaghash);
2528                                 outOfShmem = true;
2529                                 goto exit;
2530                         }
2531                         if (!found)
2532                         {
2533                                 SHMQueueInsertBefore(&(newtarget->predicateLocks),
2534                                                                          &(newpredlock->targetLink));
2535                                 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2536                                                                          &(newpredlock->xactLink));
2537                                 newpredlock->commitSeqNo = oldCommitSeqNo;
2538                         }
2539                         else
2540                         {
2541                                 if (newpredlock->commitSeqNo < oldCommitSeqNo)
2542                                         newpredlock->commitSeqNo = oldCommitSeqNo;
2543                         }
2544
2545                         Assert(newpredlock->commitSeqNo != 0);
2546                         Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2547                                    || (newpredlock->tag.myXact == OldCommittedSxact));
2548
2549                         oldpredlock = nextpredlock;
2550                 }
2551                 LWLockRelease(SerializableXactHashLock);
2552
2553                 if (removeOld)
2554                 {
2555                         Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
2556                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2557                 }
2558         }
2559
2560
2561 exit:
2562         /* Release partition locks in reverse order of acquisition. */
2563         if (oldpartitionLock < newpartitionLock)
2564         {
2565                 LWLockRelease(newpartitionLock);
2566                 LWLockRelease(oldpartitionLock);
2567         }
2568         else if (oldpartitionLock > newpartitionLock)
2569         {
2570                 LWLockRelease(oldpartitionLock);
2571                 LWLockRelease(newpartitionLock);
2572         }
2573         else
2574                 LWLockRelease(newpartitionLock);
2575
2576         if (removeOld)
2577         {
2578                 /* We shouldn't run out of memory if we're moving locks */
2579                 Assert(!outOfShmem);
2580
2581                 /* Put the scrach entry back */
2582                 RestoreScratchTarget(false);
2583         }
2584
2585         return !outOfShmem;
2586 }
2587
2588 /*
2589  * Drop all predicate locks of any granularity from the specified relation,
2590  * which can be a heap relation or an index relation.  If 'transfer' is true,
2591  * acquire a relation lock on the heap for any transactions with any lock(s)
2592  * on the specified relation.
2593  *
2594  * This requires grabbing a lot of LW locks and scanning the entire lock
2595  * target table for matches.  That makes this more expensive than most
2596  * predicate lock management functions, but it will only be called for DDL
2597  * type commands that are expensive anyway, and there are fast returns when
2598  * no serializable transactions are active or the relation is temporary.
2599  *
2600  * We don't use the TransferPredicateLocksToNewTarget function because it
2601  * acquires its own locks on the partitions of the two targets involved,
2602  * and we'll already be holding all partition locks.
2603  *
2604  * We can't throw an error from here, because the call could be from a
2605  * transaction which is not serializable.
2606  *
2607  * NOTE: This is currently only called with transfer set to true, but that may
2608  * change.      If we decide to clean up the locks from a table on commit of a
2609  * transaction which executed DROP TABLE, the false condition will be useful.
2610  */
2611 static void
2612 DropAllPredicateLocksFromTable(const Relation relation, bool transfer)
2613 {
2614         HASH_SEQ_STATUS seqstat;
2615         PREDICATELOCKTARGET *oldtarget;
2616         PREDICATELOCKTARGET *heaptarget;
2617         Oid                     dbId;
2618         Oid                     relId;
2619         Oid                     heapId;
2620         int                     i;
2621         bool            isIndex;
2622         bool            found;
2623         uint32          heaptargettaghash;
2624
2625         /*
2626          * Bail out quickly if there are no serializable transactions running.
2627          * It's safe to check this without taking locks because the caller is
2628          * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
2629          * would matter here can be acquired while that is held.
2630          */
2631         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2632                 return;
2633
2634         if (SkipPredicateLocksForRelation(relation))
2635                 return;
2636
2637         dbId = relation->rd_node.dbNode;
2638         relId = relation->rd_id;
2639         if (relation->rd_index == NULL)
2640         {
2641                 isIndex = false;
2642                 heapId = relId;
2643         }
2644         else
2645         {
2646                 isIndex = true;
2647                 heapId = relation->rd_index->indrelid;
2648         }
2649         Assert(heapId != InvalidOid);
2650         Assert(transfer || !isIndex);           /* index OID only makes sense with
2651                                                                                  * transfer */
2652
2653         /* Retrieve first time needed, then keep. */
2654         heaptargettaghash = 0;
2655         heaptarget = NULL;
2656
2657         /* Acquire locks on all lock partitions */
2658         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2659         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
2660                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
2661         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2662
2663         /*
2664          * Remove the dummy entry to give us scratch space, so we know we'll be
2665          * able to create the new lock target.
2666          */
2667         if (transfer)
2668                 RemoveScratchTarget(true);
2669
2670         /* Scan through target map */
2671         hash_seq_init(&seqstat, PredicateLockTargetHash);
2672
2673         while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
2674         {
2675                 PREDICATELOCK *oldpredlock;
2676
2677                 /*
2678                  * Check whether this is a target which needs attention.
2679                  */
2680                 if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
2681                         continue;                       /* wrong relation id */
2682                 if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
2683                         continue;                       /* wrong database id */
2684                 if (transfer && !isIndex
2685                         && GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
2686                         continue;                       /* already the right lock */
2687
2688                 /*
2689                  * If we made it here, we have work to do.      We make sure the heap
2690                  * relation lock exists, then we walk the list of predicate locks for
2691                  * the old target we found, moving all locks to the heap relation lock
2692                  * -- unless they already hold that.
2693                  */
2694
2695                 /*
2696                  * First make sure we have the heap relation target.  We only need to
2697                  * do this once.
2698                  */
2699                 if (transfer && heaptarget == NULL)
2700                 {
2701                         PREDICATELOCKTARGETTAG heaptargettag;
2702
2703                         SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
2704                         heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
2705                         heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
2706                                                                                                          &heaptargettag,
2707                                                                                                          heaptargettaghash,
2708                                                                                                          HASH_ENTER, &found);
2709                         if (!found)
2710                                 SHMQueueInit(&heaptarget->predicateLocks);
2711                 }
2712
2713                 /*
2714                  * Loop through all the locks on the old target, replacing them with
2715                  * locks on the new target.
2716                  */
2717                 oldpredlock = (PREDICATELOCK *)
2718                         SHMQueueNext(&(oldtarget->predicateLocks),
2719                                                  &(oldtarget->predicateLocks),
2720                                                  offsetof(PREDICATELOCK, targetLink));
2721                 while (oldpredlock)
2722                 {
2723                         PREDICATELOCK *nextpredlock;
2724                         PREDICATELOCK *newpredlock;
2725                         SerCommitSeqNo oldCommitSeqNo;
2726                         SERIALIZABLEXACT *oldXact;
2727
2728                         nextpredlock = (PREDICATELOCK *)
2729                                 SHMQueueNext(&(oldtarget->predicateLocks),
2730                                                          &(oldpredlock->targetLink),
2731                                                          offsetof(PREDICATELOCK, targetLink));
2732
2733                         /*
2734                          * Remove the old lock first. This avoids the chance of running
2735                          * out of lock structure entries for the hash table.
2736                          */
2737                         oldCommitSeqNo = oldpredlock->commitSeqNo;
2738                         oldXact = oldpredlock->tag.myXact;
2739
2740                         SHMQueueDelete(&(oldpredlock->xactLink));
2741
2742                         /*
2743                          * No need for retail delete from oldtarget list, we're removing
2744                          * the whole target anyway.
2745                          */
2746                         hash_search(PredicateLockHash,
2747                                                 &oldpredlock->tag,
2748                                                 HASH_REMOVE, &found);
2749                         Assert(found);
2750
2751                         if (transfer)
2752                         {
2753                                 PREDICATELOCKTAG newpredlocktag;
2754
2755                                 newpredlocktag.myTarget = heaptarget;
2756                                 newpredlocktag.myXact = oldXact;
2757                                 newpredlock = (PREDICATELOCK *)
2758                                         hash_search_with_hash_value
2759                                         (PredicateLockHash,
2760                                          &newpredlocktag,
2761                                          PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2762                                                                                                                   heaptargettaghash),
2763                                          HASH_ENTER, &found);
2764                                 if (!found)
2765                                 {
2766                                         SHMQueueInsertBefore(&(heaptarget->predicateLocks),
2767                                                                                  &(newpredlock->targetLink));
2768                                         SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2769                                                                                  &(newpredlock->xactLink));
2770                                         newpredlock->commitSeqNo = oldCommitSeqNo;
2771                                 }
2772                                 else
2773                                 {
2774                                         if (newpredlock->commitSeqNo < oldCommitSeqNo)
2775                                                 newpredlock->commitSeqNo = oldCommitSeqNo;
2776                                 }
2777
2778                                 Assert(newpredlock->commitSeqNo != 0);
2779                                 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2780                                            || (newpredlock->tag.myXact == OldCommittedSxact));
2781                         }
2782
2783                         oldpredlock = nextpredlock;
2784                 }
2785
2786                 hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
2787                                         &found);
2788                 Assert(found);
2789         }
2790
2791         /* Put the scratch entry back */
2792         if (transfer)
2793                 RestoreScratchTarget(true);
2794
2795         /* Release locks in reverse order */
2796         LWLockRelease(SerializableXactHashLock);
2797         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
2798                 LWLockRelease(FirstPredicateLockMgrLock + i);
2799         LWLockRelease(SerializablePredicateLockListLock);
2800 }
2801
2802 /*
2803  * TransferPredicateLocksToHeapRelation
2804  *              For all transactions, transfer all predicate locks for the given
2805  *              relation to a single relation lock on the heap.
2806  */
2807 void
2808 TransferPredicateLocksToHeapRelation(const Relation relation)
2809 {
2810         DropAllPredicateLocksFromTable(relation, true);
2811 }
2812
2813
2814 /*
2815  *              PredicateLockPageSplit
2816  *
2817  * Copies any predicate locks for the old page to the new page.
2818  * Skip if this is a temporary table or toast table.
2819  *
2820  * NOTE: A page split (or overflow) affects all serializable transactions,
2821  * even if it occurs in the context of another transaction isolation level.
2822  *
2823  * NOTE: This currently leaves the local copy of the locks without
2824  * information on the new lock which is in shared memory.  This could cause
2825  * problems if enough page splits occur on locked pages without the processes
2826  * which hold the locks getting in and noticing.
2827  */
2828 void
2829 PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
2830                                            const BlockNumber newblkno)
2831 {
2832         PREDICATELOCKTARGETTAG oldtargettag;
2833         PREDICATELOCKTARGETTAG newtargettag;
2834         bool            success;
2835
2836         /*
2837          * Bail out quickly if there are no serializable transactions running.
2838          *
2839          * It's safe to do this check without taking any additional locks. Even if
2840          * a serializable transaction starts concurrently, we know it can't take
2841          * any SIREAD locks on the page being split because the caller is holding
2842          * the associated buffer page lock. Memory reordering isn't an issue; the
2843          * memory barrier in the LWLock acquisition guarantees that this read
2844          * occurs while the buffer page lock is held.
2845          */
2846         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2847                 return;
2848
2849         if (SkipPredicateLocksForRelation(relation))
2850                 return;
2851
2852         Assert(oldblkno != newblkno);
2853         Assert(BlockNumberIsValid(oldblkno));
2854         Assert(BlockNumberIsValid(newblkno));
2855
2856         SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
2857                                                                         relation->rd_node.dbNode,
2858                                                                         relation->rd_id,
2859                                                                         oldblkno);
2860         SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
2861                                                                         relation->rd_node.dbNode,
2862                                                                         relation->rd_id,
2863                                                                         newblkno);
2864
2865         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2866
2867         /*
2868          * Try copying the locks over to the new page's tag, creating it if
2869          * necessary.
2870          */
2871         success = TransferPredicateLocksToNewTarget(oldtargettag,
2872                                                                                                 newtargettag,
2873                                                                                                 false);
2874
2875         if (!success)
2876         {
2877                 /*
2878                  * No more predicate lock entries are available. Failure isn't an
2879                  * option here, so promote the page lock to a relation lock.
2880                  */
2881
2882                 /* Get the parent relation lock's lock tag */
2883                 success = GetParentPredicateLockTag(&oldtargettag,
2884                                                                                         &newtargettag);
2885                 Assert(success);
2886
2887                 /*
2888                  * Move the locks to the parent. This shouldn't fail.
2889                  *
2890                  * Note that here we are removing locks held by other backends,
2891                  * leading to a possible inconsistency in their local lock hash table.
2892                  * This is OK because we're replacing it with a lock that covers the
2893                  * old one.
2894                  */
2895                 success = TransferPredicateLocksToNewTarget(oldtargettag,
2896                                                                                                         newtargettag,
2897                                                                                                         true);
2898                 Assert(success);
2899         }
2900
2901         LWLockRelease(SerializablePredicateLockListLock);
2902 }
2903
2904 /*
2905  *              PredicateLockPageCombine
2906  *
2907  * Combines predicate locks for two existing pages.
2908  * Skip if this is a temporary table or toast table.
2909  *
2910  * NOTE: A page combine affects all serializable transactions, even if it
2911  * occurs in the context of another transaction isolation level.
2912  */
2913 void
2914 PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
2915                                                  const BlockNumber newblkno)
2916 {
2917         /*
2918          * Page combines differ from page splits in that we ought to be able to
2919          * remove the locks on the old page after transferring them to the new
2920          * page, instead of duplicating them. However, because we can't edit other
2921          * backends' local lock tables, removing the old lock would leave them
2922          * with an entry in their LocalPredicateLockHash for a lock they're not
2923          * holding, which isn't acceptable. So we wind up having to do the same
2924          * work as a page split, acquiring a lock on the new page and keeping the
2925          * old page locked too. That can lead to some false positives, but should
2926          * be rare in practice.
2927          */
2928         PredicateLockPageSplit(relation, oldblkno, newblkno);
2929 }
2930
2931 /*
2932  * Walk the hash table and find the new xmin.
2933  */
2934 static void
2935 SetNewSxactGlobalXmin(void)
2936 {
2937         SERIALIZABLEXACT *sxact;
2938
2939         Assert(LWLockHeldByMe(SerializableXactHashLock));
2940
2941         PredXact->SxactGlobalXmin = InvalidTransactionId;
2942         PredXact->SxactGlobalXminCount = 0;
2943
2944         for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
2945         {
2946                 if (!SxactIsRolledBack(sxact)
2947                         && !SxactIsCommitted(sxact)
2948                         && sxact != OldCommittedSxact)
2949                 {
2950                         Assert(sxact->xmin != InvalidTransactionId);
2951                         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
2952                                 || TransactionIdPrecedes(sxact->xmin,
2953                                                                                  PredXact->SxactGlobalXmin))
2954                         {
2955                                 PredXact->SxactGlobalXmin = sxact->xmin;
2956                                 PredXact->SxactGlobalXminCount = 1;
2957                         }
2958                         else if (TransactionIdEquals(sxact->xmin,
2959                                                                                  PredXact->SxactGlobalXmin))
2960                                 PredXact->SxactGlobalXminCount++;
2961                 }
2962         }
2963
2964         OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
2965 }
2966
2967 /*
2968  *              ReleasePredicateLocks
2969  *
2970  * Releases predicate locks based on completion of the current transaction,
2971  * whether committed or rolled back.  It can also be called for a read only
2972  * transaction when it becomes impossible for the transaction to become
2973  * part of a dangerous structure.
2974  *
2975  * We do nothing unless this is a serializable transaction.
2976  *
2977  * This method must ensure that shared memory hash tables are cleaned
2978  * up in some relatively timely fashion.
2979  *
2980  * If this transaction is committing and is holding any predicate locks,
2981  * it must be added to a list of completed serializable transactions still
2982  * holding locks.
2983  */
2984 void
2985 ReleasePredicateLocks(const bool isCommit)
2986 {
2987         bool            needToClear;
2988         RWConflict      conflict,
2989                                 nextConflict,
2990                                 possibleUnsafeConflict;
2991         SERIALIZABLEXACT *roXact;
2992
2993         /*
2994          * We can't trust XactReadOnly here, because a transaction which started
2995          * as READ WRITE can show as READ ONLY later, e.g., within
2996          * substransactions.  We want to flag a transaction as READ ONLY if it
2997          * commits without writing so that de facto READ ONLY transactions get the
2998          * benefit of some RO optimizations, so we will use this local variable to
2999          * get some cleanup logic right which is based on whether the transaction
3000          * was declared READ ONLY at the top level.
3001          */
3002         bool            topLevelIsDeclaredReadOnly;
3003
3004         if (MySerializableXact == InvalidSerializableXact)
3005         {
3006                 Assert(LocalPredicateLockHash == NULL);
3007                 return;
3008         }
3009
3010         Assert(!isCommit || SxactIsPrepared(MySerializableXact));
3011         Assert(!SxactIsRolledBack(MySerializableXact));
3012         Assert(!SxactIsCommitted(MySerializableXact));
3013
3014         /* may not be serializable during COMMIT/ROLLBACK PREPARED */
3015         if (MySerializableXact->pid != 0)
3016                 Assert(IsolationIsSerializable());
3017
3018         /* We'd better not already be on the cleanup list. */
3019         Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
3020
3021         topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
3022
3023         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3024
3025         /*
3026          * We don't hold XidGenLock lock here, assuming that TransactionId is
3027          * atomic!
3028          *
3029          * If this value is changing, we don't care that much whether we get the
3030          * old or new value -- it is just used to determine how far
3031          * GlobalSerizableXmin must advance before this transaction can be fully
3032          * cleaned up.  The worst that could happen is we wait for one more
3033          * transaction to complete before freeing some RAM; correctness of visible
3034          * behavior is not affected.
3035          */
3036         MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
3037
3038         /*
3039          * If it's not a commit it's a rollback, and we can clear our locks
3040          * immediately.
3041          */
3042         if (isCommit)
3043         {
3044                 MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
3045                 MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
3046                 /* Recognize implicit read-only transaction (commit without write). */
3047                 if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
3048                         MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
3049         }
3050         else
3051         {
3052                 MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
3053         }
3054
3055         if (!topLevelIsDeclaredReadOnly)
3056         {
3057                 Assert(PredXact->WritableSxactCount > 0);
3058                 if (--(PredXact->WritableSxactCount) == 0)
3059                 {
3060                         /*
3061                          * Release predicate locks and rw-conflicts in for all committed
3062                          * transactions.  There are no longer any transactions which might
3063                          * conflict with the locks and no chance for new transactions to
3064                          * overlap.  Similarly, existing conflicts in can't cause pivots,
3065                          * and any conflicts in which could have completed a dangerous
3066                          * structure would already have caused a rollback, so any
3067                          * remaining ones must be benign.
3068                          */
3069                         PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
3070                 }
3071         }
3072         else
3073         {
3074                 /*
3075                  * Read-only transactions: clear the list of transactions that might
3076                  * make us unsafe. Note that we use 'inLink' for the iteration as
3077                  * opposed to 'outLink' for the r/w xacts.
3078                  */
3079                 possibleUnsafeConflict = (RWConflict)
3080                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3081                                   (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3082                                                  offsetof(RWConflictData, inLink));
3083                 while (possibleUnsafeConflict)
3084                 {
3085                         nextConflict = (RWConflict)
3086                                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3087                                                          &possibleUnsafeConflict->inLink,
3088                                                          offsetof(RWConflictData, inLink));
3089
3090                         Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
3091                         Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
3092
3093                         ReleaseRWConflict(possibleUnsafeConflict);
3094
3095                         possibleUnsafeConflict = nextConflict;
3096                 }
3097         }
3098
3099         /* Check for conflict out to old committed transactions. */
3100         if (isCommit
3101                 && !SxactIsReadOnly(MySerializableXact)
3102                 && SxactHasSummaryConflictOut(MySerializableXact))
3103         {
3104                 MySerializableXact->SeqNo.earliestOutConflictCommit =
3105                         FirstNormalSerCommitSeqNo;
3106                 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3107         }
3108
3109         /*
3110          * Release all outConflicts to committed transactions.  If we're rolling
3111          * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
3112          * previously committed transactions.
3113          */
3114         conflict = (RWConflict)
3115                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
3116                                          (SHM_QUEUE *) &MySerializableXact->outConflicts,
3117                                          offsetof(RWConflictData, outLink));
3118         while (conflict)
3119         {
3120                 nextConflict = (RWConflict)
3121                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
3122                                                  &conflict->outLink,
3123                                                  offsetof(RWConflictData, outLink));
3124
3125                 if (isCommit
3126                         && !SxactIsReadOnly(MySerializableXact)
3127                         && SxactIsCommitted(conflict->sxactIn))
3128                 {
3129                         if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
3130                                 || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
3131                                 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
3132                         MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3133                 }
3134
3135                 if (!isCommit
3136                         || SxactIsCommitted(conflict->sxactIn)
3137                         || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
3138                         ReleaseRWConflict(conflict);
3139
3140                 conflict = nextConflict;
3141         }
3142
3143         /*
3144          * Release all inConflicts from committed and read-only transactions. If
3145          * we're rolling back, clear them all.
3146          */
3147         conflict = (RWConflict)
3148                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
3149                                          (SHM_QUEUE *) &MySerializableXact->inConflicts,
3150                                          offsetof(RWConflictData, inLink));
3151         while (conflict)
3152         {
3153                 nextConflict = (RWConflict)
3154                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
3155                                                  &conflict->inLink,
3156                                                  offsetof(RWConflictData, inLink));
3157
3158                 if (!isCommit
3159                         || SxactIsCommitted(conflict->sxactOut)
3160                         || SxactIsReadOnly(conflict->sxactOut))
3161                         ReleaseRWConflict(conflict);
3162
3163                 conflict = nextConflict;
3164         }
3165
3166         if (!topLevelIsDeclaredReadOnly)
3167         {
3168                 /*
3169                  * Remove ourselves from the list of possible conflicts for concurrent
3170                  * READ ONLY transactions, flagging them as unsafe if we have a
3171                  * conflict out. If any are waiting DEFERRABLE transactions, wake them
3172                  * up if they are known safe or known unsafe.
3173                  */
3174                 possibleUnsafeConflict = (RWConflict)
3175                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3176                                   (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3177                                                  offsetof(RWConflictData, outLink));
3178                 while (possibleUnsafeConflict)
3179                 {
3180                         nextConflict = (RWConflict)
3181                                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
3182                                                          &possibleUnsafeConflict->outLink,
3183                                                          offsetof(RWConflictData, outLink));
3184
3185                         roXact = possibleUnsafeConflict->sxactIn;
3186                         Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
3187                         Assert(SxactIsReadOnly(roXact));
3188
3189                         /* Mark conflicted if necessary. */
3190                         if (isCommit
3191                                 && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE)
3192                                 && SxactHasConflictOut(MySerializableXact)
3193                                 && (MySerializableXact->SeqNo.earliestOutConflictCommit
3194                                         <= roXact->SeqNo.lastCommitBeforeSnapshot))
3195                         {
3196                                 /*
3197                                  * This releases possibleUnsafeConflict (as well as all other
3198                                  * possible conflicts for roXact)
3199                                  */
3200                                 FlagSxactUnsafe(roXact);
3201                         }
3202                         else
3203                         {
3204                                 ReleaseRWConflict(possibleUnsafeConflict);
3205
3206                                 /*
3207                                  * If we were the last possible conflict, flag it safe. The
3208                                  * transaction can now safely release its predicate locks (but
3209                                  * that transaction's backend has to do that itself).
3210                                  */
3211                                 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
3212                                         roXact->flags |= SXACT_FLAG_RO_SAFE;
3213                         }
3214
3215                         /*
3216                          * Wake up the process for a waiting DEFERRABLE transaction if we
3217                          * now know it's either safe or conflicted.
3218                          */
3219                         if (SxactIsDeferrableWaiting(roXact) &&
3220                                 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
3221                                 ProcSendSignal(roXact->pid);
3222
3223                         possibleUnsafeConflict = nextConflict;
3224                 }
3225         }
3226
3227         /*
3228          * Check whether it's time to clean up old transactions. This can only be
3229          * done when the last serializable transaction with the oldest xmin among
3230          * serializable transactions completes.  We then find the "new oldest"
3231          * xmin and purge any transactions which finished before this transaction
3232          * was launched.
3233          */
3234         needToClear = false;
3235         if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
3236         {
3237                 Assert(PredXact->SxactGlobalXminCount > 0);
3238                 if (--(PredXact->SxactGlobalXminCount) == 0)
3239                 {
3240                         SetNewSxactGlobalXmin();
3241                         needToClear = true;
3242                 }
3243         }
3244
3245         LWLockRelease(SerializableXactHashLock);
3246
3247         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3248
3249         /* Add this to the list of transactions to check for later cleanup. */
3250         if (isCommit)
3251                 SHMQueueInsertBefore(FinishedSerializableTransactions,
3252                                                   (SHM_QUEUE *) &(MySerializableXact->finishedLink));
3253
3254         if (!isCommit)
3255                 ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact,
3256                                                                    false, false);
3257
3258         LWLockRelease(SerializableFinishedListLock);
3259
3260         if (needToClear)
3261                 ClearOldPredicateLocks();
3262
3263         MySerializableXact = InvalidSerializableXact;
3264
3265         /* Delete per-transaction lock table */
3266         if (LocalPredicateLockHash != NULL)
3267         {
3268                 hash_destroy(LocalPredicateLockHash);
3269                 LocalPredicateLockHash = NULL;
3270         }
3271 }
3272
3273 /*
3274  * ReleasePredicateLocksIfROSafe
3275  *              Check if the current transaction is read only and operating on
3276  *              a safe snapshot. If so, release predicate locks and return
3277  *              true.
3278  *
3279  * A transaction is flagged as RO_SAFE if all concurrent R/W
3280  * transactions commit without having conflicts out to an earlier
3281  * snapshot, thus ensuring that no conflicts are possible for this
3282  * transaction. Thus, we call this function as part of the
3283  * SkipSerialization check on all public interface methods.
3284  */
3285 static bool
3286 ReleasePredicateLocksIfROSafe(void)
3287 {
3288         if (SxactIsROSafe(MySerializableXact))
3289         {
3290                 ReleasePredicateLocks(false);
3291                 return true;
3292         }
3293         else
3294                 return false;
3295 }
3296
3297 /*
3298  * Clear old predicate locks.
3299  */
3300 static void
3301 ClearOldPredicateLocks(void)
3302 {
3303         SERIALIZABLEXACT *finishedSxact;
3304         PREDICATELOCK *predlock;
3305
3306         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3307         finishedSxact = (SERIALIZABLEXACT *)
3308                 SHMQueueNext(FinishedSerializableTransactions,
3309                                          FinishedSerializableTransactions,
3310                                          offsetof(SERIALIZABLEXACT, finishedLink));
3311         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3312         while (finishedSxact)
3313         {
3314                 SERIALIZABLEXACT *nextSxact;
3315
3316                 nextSxact = (SERIALIZABLEXACT *)
3317                         SHMQueueNext(FinishedSerializableTransactions,
3318                                                  &(finishedSxact->finishedLink),
3319                                                  offsetof(SERIALIZABLEXACT, finishedLink));
3320                 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3321                         || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
3322                                                                                          PredXact->SxactGlobalXmin))
3323                 {
3324                         LWLockRelease(SerializableXactHashLock);
3325                         SHMQueueDelete(&(finishedSxact->finishedLink));
3326                         ReleaseOneSerializableXact(finishedSxact, false, false);
3327                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3328                 }
3329                 else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
3330                    && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
3331                 {
3332                         LWLockRelease(SerializableXactHashLock);
3333                         ReleaseOneSerializableXact(finishedSxact,
3334                                                                            !SxactIsReadOnly(finishedSxact),
3335                                                                            false);
3336                         PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
3337                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3338                 }
3339                 else
3340                         break;
3341                 finishedSxact = nextSxact;
3342         }
3343         LWLockRelease(SerializableXactHashLock);
3344
3345         /*
3346          * Loop through predicate locks on dummy transaction for summarized data.
3347          */
3348         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3349         predlock = (PREDICATELOCK *)
3350                 SHMQueueNext(&OldCommittedSxact->predicateLocks,
3351                                          &OldCommittedSxact->predicateLocks,
3352                                          offsetof(PREDICATELOCK, xactLink));
3353         while (predlock)
3354         {
3355                 PREDICATELOCK *nextpredlock;
3356                 bool            canDoPartialCleanup;
3357
3358                 nextpredlock = (PREDICATELOCK *)
3359                         SHMQueueNext(&OldCommittedSxact->predicateLocks,
3360                                                  &predlock->xactLink,
3361                                                  offsetof(PREDICATELOCK, xactLink));
3362
3363                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3364                 Assert(predlock->commitSeqNo != 0);
3365                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3366                 canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
3367                 LWLockRelease(SerializableXactHashLock);
3368
3369                 if (canDoPartialCleanup)
3370                 {
3371                         PREDICATELOCKTAG tag;
3372                         SHM_QUEUE  *targetLink;
3373                         PREDICATELOCKTARGET *target;
3374                         PREDICATELOCKTARGETTAG targettag;
3375                         uint32          targettaghash;
3376                         LWLockId        partitionLock;
3377
3378                         tag = predlock->tag;
3379                         targetLink = &(predlock->targetLink);
3380                         target = tag.myTarget;
3381                         targettag = target->tag;
3382                         targettaghash = PredicateLockTargetTagHashCode(&targettag);
3383                         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3384
3385                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3386
3387                         SHMQueueDelete(targetLink);
3388                         SHMQueueDelete(&(predlock->xactLink));
3389
3390                         hash_search_with_hash_value(PredicateLockHash, &tag,
3391                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3392                                                                                                                           targettaghash),
3393                                                                                 HASH_REMOVE, NULL);
3394                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3395
3396                         LWLockRelease(partitionLock);
3397                 }
3398
3399                 predlock = nextpredlock;
3400         }
3401
3402         LWLockRelease(SerializablePredicateLockListLock);
3403         LWLockRelease(SerializableFinishedListLock);
3404 }
3405
3406 /*
3407  * This is the normal way to delete anything from any of the predicate
3408  * locking hash tables.  Given a transaction which we know can be deleted:
3409  * delete all predicate locks held by that transaction and any predicate
3410  * lock targets which are now unreferenced by a lock; delete all conflicts
3411  * for the transaction; delete all xid values for the transaction; then
3412  * delete the transaction.
3413  *
3414  * When the partial flag is set, we can release all predicate locks and
3415  * out-conflict information -- we've established that there are no longer
3416  * any overlapping read write transactions for which this transaction could
3417  * matter.
3418  *
3419  * When the summarize flag is set, we've run short of room for sxact data
3420  * and must summarize to the SLRU.      Predicate locks are transferred to a
3421  * dummy "old" transaction, with duplicate locks on a single target
3422  * collapsing to a single lock with the "latest" commitSeqNo from among
3423  * the conflicting locks..
3424  */
3425 static void
3426 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
3427                                                    bool summarize)
3428 {
3429         PREDICATELOCK *predlock;
3430         SERIALIZABLEXIDTAG sxidtag;
3431         RWConflict      conflict,
3432                                 nextConflict;
3433
3434         Assert(sxact != NULL);
3435         Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
3436         Assert(LWLockHeldByMe(SerializableFinishedListLock));
3437
3438         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3439         predlock = (PREDICATELOCK *)
3440                 SHMQueueNext(&(sxact->predicateLocks),
3441                                          &(sxact->predicateLocks),
3442                                          offsetof(PREDICATELOCK, xactLink));
3443         while (predlock)
3444         {
3445                 PREDICATELOCK *nextpredlock;
3446                 PREDICATELOCKTAG tag;
3447                 SHM_QUEUE  *targetLink;
3448                 PREDICATELOCKTARGET *target;
3449                 PREDICATELOCKTARGETTAG targettag;
3450                 uint32          targettaghash;
3451                 LWLockId        partitionLock;
3452
3453                 nextpredlock = (PREDICATELOCK *)
3454                         SHMQueueNext(&(sxact->predicateLocks),
3455                                                  &(predlock->xactLink),
3456                                                  offsetof(PREDICATELOCK, xactLink));
3457
3458                 tag = predlock->tag;
3459                 targetLink = &(predlock->targetLink);
3460                 target = tag.myTarget;
3461                 targettag = target->tag;
3462                 targettaghash = PredicateLockTargetTagHashCode(&targettag);
3463                 partitionLock = PredicateLockHashPartitionLock(targettaghash);
3464
3465                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3466
3467                 SHMQueueDelete(targetLink);
3468
3469                 hash_search_with_hash_value(PredicateLockHash, &tag,
3470                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3471                                                                                                                           targettaghash),
3472                                                                         HASH_REMOVE, NULL);
3473                 if (summarize)
3474                 {
3475                         bool            found;
3476
3477                         /* Fold into dummy transaction list. */
3478                         tag.myXact = OldCommittedSxact;
3479                         predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
3480                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3481                                                                                                                           targettaghash),
3482                                                                                                    HASH_ENTER_NULL, &found);
3483                         if (!predlock)
3484                                 ereport(ERROR,
3485                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3486                                                  errmsg("out of shared memory"),
3487                                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
3488                         if (found)
3489                         {
3490                                 Assert(predlock->commitSeqNo != 0);
3491                                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3492                                 if (predlock->commitSeqNo < sxact->commitSeqNo)
3493                                         predlock->commitSeqNo = sxact->commitSeqNo;
3494                         }
3495                         else
3496                         {
3497                                 SHMQueueInsertBefore(&(target->predicateLocks),
3498                                                                          &(predlock->targetLink));
3499                                 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
3500                                                                          &(predlock->xactLink));
3501                                 predlock->commitSeqNo = sxact->commitSeqNo;
3502                         }
3503                 }
3504                 else
3505                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3506
3507                 LWLockRelease(partitionLock);
3508
3509                 predlock = nextpredlock;
3510         }
3511
3512         /*
3513          * Rather than retail removal, just re-init the head after we've run
3514          * through the list.
3515          */
3516         SHMQueueInit(&sxact->predicateLocks);
3517
3518         LWLockRelease(SerializablePredicateLockListLock);
3519
3520         sxidtag.xid = sxact->topXid;
3521         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3522
3523         if (!partial)
3524         {
3525                 /* Release all outConflicts. */
3526                 conflict = (RWConflict)
3527                         SHMQueueNext(&sxact->outConflicts,
3528                                                  &sxact->outConflicts,
3529                                                  offsetof(RWConflictData, outLink));
3530                 while (conflict)
3531                 {
3532                         nextConflict = (RWConflict)
3533                                 SHMQueueNext(&sxact->outConflicts,
3534                                                          &conflict->outLink,
3535                                                          offsetof(RWConflictData, outLink));
3536                         if (summarize)
3537                                 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3538                         ReleaseRWConflict(conflict);
3539                         conflict = nextConflict;
3540                 }
3541         }
3542
3543         /* Release all inConflicts. */
3544         conflict = (RWConflict)
3545                 SHMQueueNext(&sxact->inConflicts,
3546                                          &sxact->inConflicts,
3547                                          offsetof(RWConflictData, inLink));
3548         while (conflict)
3549         {
3550                 nextConflict = (RWConflict)
3551                         SHMQueueNext(&sxact->inConflicts,
3552                                                  &conflict->inLink,
3553                                                  offsetof(RWConflictData, inLink));
3554                 if (summarize)
3555                         conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3556                 ReleaseRWConflict(conflict);
3557                 conflict = nextConflict;
3558         }
3559
3560         if (!partial)
3561         {
3562                 /* Get rid of the xid and the record of the transaction itself. */
3563                 if (sxidtag.xid != InvalidTransactionId)
3564                         hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
3565                 ReleasePredXact(sxact);
3566         }
3567
3568         LWLockRelease(SerializableXactHashLock);
3569 }
3570
3571 /*
3572  * Tests whether the given top level transaction is concurrent with
3573  * (overlaps) our current transaction.
3574  *
3575  * We need to identify the top level transaction for SSI, anyway, so pass
3576  * that to this function to save the overhead of checking the snapshot's
3577  * subxip array.
3578  */
3579 static bool
3580 XidIsConcurrent(TransactionId xid)
3581 {
3582         Snapshot        snap;
3583         uint32          i;
3584
3585         Assert(TransactionIdIsValid(xid));
3586         Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
3587
3588         snap = GetTransactionSnapshot();
3589
3590         if (TransactionIdPrecedes(xid, snap->xmin))
3591                 return false;
3592
3593         if (TransactionIdFollowsOrEquals(xid, snap->xmax))
3594                 return true;
3595
3596         for (i = 0; i < snap->xcnt; i++)
3597         {
3598                 if (xid == snap->xip[i])
3599                         return true;
3600         }
3601
3602         return false;
3603 }
3604
3605 /*
3606  * CheckForSerializableConflictOut
3607  *              We are reading a tuple which has been modified.  If it is visible to
3608  *              us but has been deleted, that indicates a rw-conflict out.      If it's
3609  *              not visible and was created by a concurrent (overlapping)
3610  *              serializable transaction, that is also a rw-conflict out,
3611  *
3612  * We will determine the top level xid of the writing transaction with which
3613  * we may be in conflict, and check for overlap with our own transaction.
3614  * If the transactions overlap (i.e., they cannot see each other's writes),
3615  * then we have a conflict out.
3616  *
3617  * This function should be called just about anywhere in heapam.c where a
3618  * tuple has been read. The caller must hold at least a shared lock on the
3619  * buffer, because this function might set hint bits on the tuple. There is
3620  * currently no known reason to call this function from an index AM.
3621  */
3622 void
3623 CheckForSerializableConflictOut(const bool visible, const Relation relation,
3624                                                                 const HeapTuple tuple, const Buffer buffer)
3625 {
3626         TransactionId xid;
3627         SERIALIZABLEXIDTAG sxidtag;
3628         SERIALIZABLEXID *sxid;
3629         SERIALIZABLEXACT *sxact;
3630         HTSV_Result htsvResult;
3631
3632         if (SkipSerialization(relation))
3633                 return;
3634
3635         if (SxactIsMarkedForDeath(MySerializableXact))
3636         {
3637                 ereport(ERROR,
3638                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3639                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3640                                  errdetail("Cancelled on identification as a pivot, during conflict out checking."),
3641                                  errhint("The transaction might succeed if retried.")));
3642         }
3643
3644         /*
3645          * Check to see whether the tuple has been written to by a concurrent
3646          * transaction, either to create it not visible to us, or to delete it
3647          * while it is visible to us.  The "visible" bool indicates whether the
3648          * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
3649          * is going on with it.
3650          */
3651         htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
3652         switch (htsvResult)
3653         {
3654                 case HEAPTUPLE_LIVE:
3655                         if (visible)
3656                                 return;
3657                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3658                         break;
3659                 case HEAPTUPLE_RECENTLY_DEAD:
3660                         if (!visible)
3661                                 return;
3662                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3663                         break;
3664                 case HEAPTUPLE_DELETE_IN_PROGRESS:
3665                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3666                         break;
3667                 case HEAPTUPLE_INSERT_IN_PROGRESS:
3668                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3669                         break;
3670                 case HEAPTUPLE_DEAD:
3671                         return;
3672                 default:
3673
3674                         /*
3675                          * The only way to get to this default clause is if a new value is
3676                          * added to the enum type without adding it to this switch
3677                          * statement.  That's a bug, so elog.
3678                          */
3679                         elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
3680
3681                         /*
3682                          * In spite of having all enum values covered and calling elog on
3683                          * this default, some compilers think this is a code path which
3684                          * allows xid to be used below without initialization. Silence
3685                          * that warning.
3686                          */
3687                         xid = InvalidTransactionId;
3688         }
3689         Assert(TransactionIdIsValid(xid));
3690         Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
3691
3692         /*
3693          * Find top level xid.  Bail out if xid is too early to be a conflict, or
3694          * if it's our own xid.
3695          */
3696         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3697                 return;
3698         xid = SubTransGetTopmostTransaction(xid);
3699         if (TransactionIdPrecedes(xid, TransactionXmin))
3700                 return;
3701         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3702                 return;
3703
3704         /*
3705          * Find sxact or summarized info for the top level xid.
3706          */
3707         sxidtag.xid = xid;
3708         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3709         sxid = (SERIALIZABLEXID *)
3710                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
3711         if (!sxid)
3712         {
3713                 /*
3714                  * Transaction not found in "normal" SSI structures.  Check whether it
3715                  * got pushed out to SLRU storage for "old committed" transactions.
3716                  */
3717                 SerCommitSeqNo conflictCommitSeqNo;
3718
3719                 conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
3720                 if (conflictCommitSeqNo != 0)
3721                 {
3722                         if (conflictCommitSeqNo != InvalidSerCommitSeqNo
3723                                 && (!SxactIsReadOnly(MySerializableXact)
3724                                         || conflictCommitSeqNo
3725                                         <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
3726                                 ereport(ERROR,
3727                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3728                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3729                                 errdetail("Cancelled on conflict out to old pivot %u.", xid),
3730                                           errhint("The transaction might succeed if retried.")));
3731
3732                         if (SxactHasSummaryConflictIn(MySerializableXact)
3733                         || !SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
3734                                 ereport(ERROR,
3735                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3736                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3737                                                  errdetail("Cancelled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
3738                                           errhint("The transaction might succeed if retried.")));
3739
3740                         MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3741                 }
3742
3743                 /* It's not serializable or otherwise not important. */
3744                 LWLockRelease(SerializableXactHashLock);
3745                 return;
3746         }
3747         sxact = sxid->myXact;
3748         Assert(TransactionIdEquals(sxact->topXid, xid));
3749         if (sxact == MySerializableXact
3750                 || SxactIsRolledBack(sxact)
3751                 || SxactIsMarkedForDeath(sxact))
3752         {
3753                 /* We can't conflict with our own transaction or one rolled back. */
3754                 LWLockRelease(SerializableXactHashLock);
3755                 return;
3756         }
3757
3758         /*
3759          * We have a conflict out to a transaction which has a conflict out to a
3760          * summarized transaction.      That summarized transaction must have
3761          * committed first, and we can't tell when it committed in relation to our
3762          * snapshot acquisition, so something needs to be cancelled.
3763          */
3764         if (SxactHasSummaryConflictOut(sxact))
3765         {
3766                 if (!SxactIsPrepared(sxact))
3767                 {
3768                         sxact->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
3769                         LWLockRelease(SerializableXactHashLock);
3770                         return;
3771                 }
3772                 else
3773                 {
3774                         LWLockRelease(SerializableXactHashLock);
3775                         ereport(ERROR,
3776                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3777                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
3778                                          errdetail("Cancelled on conflict out to old pivot."),
3779                                          errhint("The transaction might succeed if retried.")));
3780                 }
3781         }
3782
3783         /*
3784          * If this is a read-only transaction and the writing transaction has
3785          * committed, and it doesn't have a rw-conflict to a transaction which
3786          * committed before it, no conflict.
3787          */
3788         if (SxactIsReadOnly(MySerializableXact)
3789                 && SxactIsCommitted(sxact)
3790                 && !SxactHasSummaryConflictOut(sxact)
3791                 && (!SxactHasConflictOut(sxact)
3792                         || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
3793         {
3794                 /* Read-only transaction will appear to run first.      No conflict. */
3795                 LWLockRelease(SerializableXactHashLock);
3796                 return;
3797         }
3798
3799         if (!XidIsConcurrent(xid))
3800         {
3801                 /* This write was already in our snapshot; no conflict. */
3802                 LWLockRelease(SerializableXactHashLock);
3803                 return;
3804         }
3805
3806         if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
3807         {
3808                 /* We don't want duplicate conflict records in the list. */
3809                 LWLockRelease(SerializableXactHashLock);
3810                 return;
3811         }
3812
3813         /*
3814          * Flag the conflict.  But first, if this conflict creates a dangerous
3815          * structure, ereport an error.
3816          */
3817         FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
3818         LWLockRelease(SerializableXactHashLock);
3819 }
3820
3821 /*
3822  * Check a particular target for rw-dependency conflict in.
3823  */
3824 static void
3825 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
3826 {
3827         uint32          targettaghash;
3828         LWLockId        partitionLock;
3829         PREDICATELOCKTARGET *target;
3830         PREDICATELOCK *predlock;
3831         PREDICATELOCK *mypredlock = NULL;
3832         PREDICATELOCKTAG mypredlocktag;
3833
3834         Assert(MySerializableXact != InvalidSerializableXact);
3835
3836         /*
3837          * The same hash and LW lock apply to the lock target and the lock itself.
3838          */
3839         targettaghash = PredicateLockTargetTagHashCode(targettag);
3840         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3841         LWLockAcquire(partitionLock, LW_SHARED);
3842         target = (PREDICATELOCKTARGET *)
3843                 hash_search_with_hash_value(PredicateLockTargetHash,
3844                                                                         targettag, targettaghash,
3845                                                                         HASH_FIND, NULL);
3846         if (!target)
3847         {
3848                 /* Nothing has this target locked; we're done here. */
3849                 LWLockRelease(partitionLock);
3850                 return;
3851         }
3852
3853         /*
3854          * Each lock for an overlapping transaction represents a conflict: a
3855          * rw-dependency in to this transaction.
3856          */
3857         predlock = (PREDICATELOCK *)
3858                 SHMQueueNext(&(target->predicateLocks),
3859                                          &(target->predicateLocks),
3860                                          offsetof(PREDICATELOCK, targetLink));
3861         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3862         while (predlock)
3863         {
3864                 SHM_QUEUE  *predlocktargetlink;
3865                 PREDICATELOCK *nextpredlock;
3866                 SERIALIZABLEXACT *sxact;
3867
3868                 predlocktargetlink = &(predlock->targetLink);
3869                 nextpredlock = (PREDICATELOCK *)
3870                         SHMQueueNext(&(target->predicateLocks),
3871                                                  predlocktargetlink,
3872                                                  offsetof(PREDICATELOCK, targetLink));
3873
3874                 sxact = predlock->tag.myXact;
3875                 if (sxact == MySerializableXact)
3876                 {
3877                         /*
3878                          * If we're getting a write lock on a tuple, we don't need a
3879                          * predicate (SIREAD) lock on the same tuple. We can safely remove
3880                          * our SIREAD lock, but we'll defer doing so until after the loop
3881                          * because that requires upgrading to an exclusive partition lock.
3882                          *
3883                          * We can't use this optimization within a subtransaction because
3884                          * the subtransaction could roll back, and we would be left
3885                          * without any lock at the top level.
3886                          */
3887                         if (!IsSubTransaction()
3888                                 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
3889                         {
3890                                 mypredlock = predlock;
3891                                 mypredlocktag = predlock->tag;
3892                         }
3893                 }
3894                 else if (!SxactIsRolledBack(sxact)
3895                                  && (!SxactIsCommitted(sxact)
3896                                          || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
3897                                                                                           sxact->finishedBefore))
3898                 && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
3899                 {
3900                         LWLockRelease(SerializableXactHashLock);
3901                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3902
3903                         /*
3904                          * Re-check after getting exclusive lock because the other
3905                          * transaction may have flagged a conflict.
3906                          */
3907                         if (!SxactIsRolledBack(sxact)
3908                                 && (!SxactIsCommitted(sxact)
3909                                         || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
3910                                                                                          sxact->finishedBefore))
3911                                 && !RWConflictExists(sxact,
3912                                                                          (SERIALIZABLEXACT *) MySerializableXact))
3913                         {
3914                                 FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
3915                         }
3916
3917                         LWLockRelease(SerializableXactHashLock);
3918                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3919                 }
3920
3921                 predlock = nextpredlock;
3922         }
3923         LWLockRelease(SerializableXactHashLock);
3924         LWLockRelease(partitionLock);
3925
3926         /*
3927          * If we found one of our own SIREAD locks to remove, remove it now.
3928          *
3929          * At this point our transaction already has an ExclusiveRowLock on the
3930          * relation, so we are OK to drop the predicate lock on the tuple, if
3931          * found, without fearing that another write against the tuple will occur
3932          * before the MVCC information makes it to the buffer.
3933          */
3934         if (mypredlock != NULL)
3935         {
3936                 uint32          predlockhashcode;
3937                 PREDICATELOCK *rmpredlock;
3938
3939                 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3940                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3941                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3942
3943                 /*
3944                  * Remove the predicate lock from shared memory, if it wasn't removed
3945                  * while the locks were released.  One way that could happen is from
3946                  * autovacuum cleaning up an index.
3947                  */
3948                 predlockhashcode = PredicateLockHashCodeFromTargetHashCode
3949                         (&mypredlocktag, targettaghash);
3950                 rmpredlock = (PREDICATELOCK *)
3951                         hash_search_with_hash_value(PredicateLockHash,
3952                                                                                 &mypredlocktag,
3953                                                                                 predlockhashcode,
3954                                                                                 HASH_FIND, NULL);
3955                 if (rmpredlock != NULL)
3956                 {
3957                         Assert(rmpredlock == mypredlock);
3958
3959                         SHMQueueDelete(&(mypredlock->targetLink));
3960                         SHMQueueDelete(&(mypredlock->xactLink));
3961
3962                         rmpredlock = (PREDICATELOCK *)
3963                                 hash_search_with_hash_value(PredicateLockHash,
3964                                                                                         &mypredlocktag,
3965                                                                                         predlockhashcode,
3966                                                                                         HASH_REMOVE, NULL);
3967                         Assert(rmpredlock == mypredlock);
3968
3969                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3970                 }
3971
3972                 LWLockRelease(SerializableXactHashLock);
3973                 LWLockRelease(partitionLock);
3974                 LWLockRelease(SerializablePredicateLockListLock);
3975
3976                 if (rmpredlock != NULL)
3977                 {
3978                         /*
3979                          * Remove entry in local lock table if it exists. It's OK if it
3980                          * doesn't exist; that means the lock was transferred to a new
3981                          * target by a different backend.
3982                          */
3983                         hash_search_with_hash_value(LocalPredicateLockHash,
3984                                                                                 targettag, targettaghash,
3985                                                                                 HASH_REMOVE, NULL);
3986
3987                         DecrementParentLocks(targettag);
3988                 }
3989         }
3990 }
3991
3992 /*
3993  * CheckForSerializableConflictIn
3994  *              We are writing the given tuple.  If that indicates a rw-conflict
3995  *              in from another serializable transaction, take appropriate action.
3996  *
3997  * Skip checking for any granularity for which a parameter is missing.
3998  *
3999  * A tuple update or delete is in conflict if we have a predicate lock
4000  * against the relation or page in which the tuple exists, or against the
4001  * tuple itself.
4002  */
4003 void
4004 CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
4005                                                            const Buffer buffer)
4006 {
4007         PREDICATELOCKTARGETTAG targettag;
4008
4009         if (SkipSerialization(relation))
4010                 return;
4011
4012         if (SxactIsMarkedForDeath(MySerializableXact))
4013                 ereport(ERROR,
4014                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4015                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4016                                  errdetail("Cancelled on identification as a pivot, during conflict in checking."),
4017                                  errhint("The transaction might succeed if retried.")));
4018
4019         MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
4020
4021         /*
4022          * It is important that we check for locks from the finest granularity to
4023          * the coarsest granularity, so that granularity promotion doesn't cause
4024          * us to miss a lock.  The new (coarser) lock will be acquired before the
4025          * old (finer) locks are released.
4026          *
4027          * It is not possible to take and hold a lock across the checks for all
4028          * granularities because each target could be in a separate partition.
4029          */
4030         if (tuple != NULL)
4031         {
4032                 SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
4033                                                                                  relation->rd_node.dbNode,
4034                                                                                  relation->rd_id,
4035                                                  ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
4036                                                 ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
4037                                                                           HeapTupleHeaderGetXmin(tuple->t_data));
4038                 CheckTargetForConflictsIn(&targettag);
4039         }
4040
4041         if (BufferIsValid(buffer))
4042         {
4043                 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
4044                                                                                 relation->rd_node.dbNode,
4045                                                                                 relation->rd_id,
4046                                                                                 BufferGetBlockNumber(buffer));
4047                 CheckTargetForConflictsIn(&targettag);
4048         }
4049
4050         SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
4051                                                                                 relation->rd_node.dbNode,
4052                                                                                 relation->rd_id);
4053         CheckTargetForConflictsIn(&targettag);
4054 }
4055
4056 /*
4057  * CheckTableForSerializableConflictIn
4058  *              The entire table is going through a DDL-style logical mass delete
4059  *              like TRUNCATE or DROP TABLE.  If that causes a rw-conflict in from
4060  *              another serializable transaction, take appropriate action.
4061  *
4062  * While these operations do not operate entirely within the bounds of
4063  * snapshot isolation, they can occur inside a serializable transaction, and
4064  * will logically occur after any reads which saw rows which were destroyed
4065  * by these operations, so we do what we can to serialize properly under
4066  * SSI.
4067  *
4068  * The relation passed in must be a heap relation. Any predicate lock of any
4069  * granularity on the heap will cause a rw-conflict in to this transaction.
4070  * Predicate locks on indexes do not matter because they only exist to guard
4071  * against conflicting inserts into the index, and this is a mass *delete*.
4072  * When a table is truncated or dropped, the index will also be truncated
4073  * or dropped, and we'll deal with locks on the index when that happens.
4074  *
4075  * Dropping or truncating a table also needs to drop any existing predicate
4076  * locks on heap tuples or pages, because they're about to go away. This
4077  * should be done before altering the predicate locks because the transaction
4078  * could be rolled back because of a conflict, in which case the lock changes
4079  * are not needed. (At the moment, we don't actually bother to drop the
4080  * existing locks on a dropped or truncated table at the moment. That might
4081  * lead to some false positives, but it doesn't seem worth the trouble.)
4082  */
4083 void
4084 CheckTableForSerializableConflictIn(const Relation relation)
4085 {
4086         HASH_SEQ_STATUS seqstat;
4087         PREDICATELOCKTARGET *target;
4088         Oid                     dbId;
4089         Oid                     heapId;
4090         int                     i;
4091
4092         /*
4093          * Bail out quickly if there are no serializable transactions running.
4094          * It's safe to check this without taking locks because the caller is
4095          * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
4096          * would matter here can be acquired while that is held.
4097          */
4098         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
4099                 return;
4100
4101         if (SkipSerialization(relation))
4102                 return;
4103
4104         Assert(relation->rd_index == NULL); /* not an index relation */
4105
4106         dbId = relation->rd_node.dbNode;
4107         heapId = relation->rd_id;
4108
4109         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
4110         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
4111                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
4112         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4113
4114         /* Scan through target list */
4115         hash_seq_init(&seqstat, PredicateLockTargetHash);
4116
4117         while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
4118         {
4119                 PREDICATELOCK *predlock;
4120
4121                 /*
4122                  * Check whether this is a target which needs attention.
4123                  */
4124                 if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
4125                         continue;                       /* wrong relation id */
4126                 if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
4127                         continue;                       /* wrong database id */
4128
4129                 /*
4130                  * Loop through locks for this target and flag conflicts.
4131                  */
4132                 predlock = (PREDICATELOCK *)
4133                         SHMQueueNext(&(target->predicateLocks),
4134                                                  &(target->predicateLocks),
4135                                                  offsetof(PREDICATELOCK, targetLink));
4136                 while (predlock)
4137                 {
4138                         PREDICATELOCK *nextpredlock;
4139
4140                         nextpredlock = (PREDICATELOCK *)
4141                                 SHMQueueNext(&(target->predicateLocks),
4142                                                          &(predlock->targetLink),
4143                                                          offsetof(PREDICATELOCK, targetLink));
4144
4145                         if (predlock->tag.myXact != MySerializableXact
4146                                 && !RWConflictExists(predlock->tag.myXact,
4147                                                                          (SERIALIZABLEXACT *) MySerializableXact))
4148                                 FlagRWConflict(predlock->tag.myXact,
4149                                                            (SERIALIZABLEXACT *) MySerializableXact);
4150
4151                         predlock = nextpredlock;
4152                 }
4153         }
4154
4155         /* Release locks in reverse order */
4156         LWLockRelease(SerializableXactHashLock);
4157         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
4158                 LWLockRelease(FirstPredicateLockMgrLock + i);
4159         LWLockRelease(SerializablePredicateLockListLock);
4160 }
4161
4162
4163 /*
4164  * Flag a rw-dependency between two serializable transactions.
4165  *
4166  * The caller is responsible for ensuring that we have a LW lock on
4167  * the transaction hash table.
4168  */
4169 static void
4170 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
4171 {
4172         Assert(reader != writer);
4173
4174         /* First, see if this conflict causes failure. */
4175         OnConflict_CheckForSerializationFailure(reader, writer);
4176
4177         /* Actually do the conflict flagging. */
4178         if (reader == OldCommittedSxact)
4179                 writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4180         else if (writer == OldCommittedSxact)
4181                 reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4182         else
4183                 SetRWConflict(reader, writer);
4184 }
4185
4186 /*----------------------------------------------------------------------------
4187  * We are about to add a RW-edge to the dependency graph - check that we don't
4188  * introduce a dangerous structure by doing so, and abort one of the
4189  * transactions if so.
4190  *
4191  * A serialization failure can only occur if there is a dangerous structure
4192  * in the dependency graph:
4193  *
4194  *              Tin ------> Tpivot ------> Tout
4195  *                        rw                     rw
4196  *
4197  * Furthermore, Tout must commit first.
4198  *
4199  * One more optimization is that if Tin is declared READ ONLY (or commits
4200  * without writing), we can only have a problem if Tout committed before Tin
4201  * acquired its snapshot.
4202  *----------------------------------------------------------------------------
4203  */
4204 static void
4205 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
4206                                                                                 SERIALIZABLEXACT *writer)
4207 {
4208         bool            failure;
4209         RWConflict      conflict;
4210
4211         Assert(LWLockHeldByMe(SerializableXactHashLock));
4212
4213         failure = false;
4214
4215         /*------------------------------------------------------------------------
4216          * Check for already-committed writer with rw-conflict out flagged
4217          * (conflict-flag on W means that T2 committed before W):
4218          *
4219          *              R ------> W ------> T2
4220          *                      rw                rw
4221          *
4222          * That is a dangerous structure, so we must abort. (Since the writer
4223          * has already committed, we must be the reader)
4224          *------------------------------------------------------------------------
4225          */
4226         if (SxactIsCommitted(writer)
4227           && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
4228                 failure = true;
4229
4230         /*------------------------------------------------------------------------
4231          * Check whether the writer has become a pivot with an out-conflict
4232          * committed transaction (T2), and T2 committed first:
4233          *
4234          *              R ------> W ------> T2
4235          *                      rw                rw
4236          *
4237          * Because T2 must've committed first, there is no anomaly if:
4238          * - the reader committed before T2
4239          * - the writer committed before T2
4240          * - the reader is a READ ONLY transaction and the reader was concurrent
4241          *       with T2 (= reader acquired its snapshot before T2 committed)
4242          *------------------------------------------------------------------------
4243          */
4244         if (!failure)
4245         {
4246                 if (SxactHasSummaryConflictOut(writer))
4247                 {
4248                         failure = true;
4249                         conflict = NULL;
4250                 }
4251                 else
4252                         conflict = (RWConflict)
4253                                 SHMQueueNext(&writer->outConflicts,
4254                                                          &writer->outConflicts,
4255                                                          offsetof(RWConflictData, outLink));
4256                 while (conflict)
4257                 {
4258                         SERIALIZABLEXACT *t2 = conflict->sxactIn;
4259
4260                         if (SxactIsCommitted(t2)
4261                                 && (!SxactIsCommitted(reader)
4262                                         || t2->commitSeqNo <= reader->commitSeqNo)
4263                                 && (!SxactIsCommitted(writer)
4264                                         || t2->commitSeqNo <= writer->commitSeqNo)
4265                                 && (!SxactIsReadOnly(reader)
4266                            || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
4267                         {
4268                                 failure = true;
4269                                 break;
4270                         }
4271                         conflict = (RWConflict)
4272                                 SHMQueueNext(&writer->outConflicts,
4273                                                          &conflict->outLink,
4274                                                          offsetof(RWConflictData, outLink));
4275                 }
4276         }
4277
4278         /*------------------------------------------------------------------------
4279          * Check whether the reader has become a pivot with a committed writer:
4280          *
4281          *              T0 ------> R ------> W
4282          *                       rw                rw
4283          *
4284          * Because W must've committed first for an anomaly to occur, there is no
4285          * anomaly if:
4286          * - T0 committed before the writer
4287          * - T0 is READ ONLY, and overlaps the writer
4288          *------------------------------------------------------------------------
4289          */
4290         if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
4291         {
4292                 if (SxactHasSummaryConflictIn(reader))
4293                 {
4294                         failure = true;
4295                         conflict = NULL;
4296                 }
4297                 else
4298                         conflict = (RWConflict)
4299                                 SHMQueueNext(&reader->inConflicts,
4300                                                          &reader->inConflicts,
4301                                                          offsetof(RWConflictData, inLink));
4302                 while (conflict)
4303                 {
4304                         SERIALIZABLEXACT *t0 = conflict->sxactOut;
4305
4306                         if (!SxactIsRolledBack(t0)
4307                                 && (!SxactIsCommitted(t0)
4308                                         || t0->commitSeqNo >= writer->commitSeqNo)
4309                                 && (!SxactIsReadOnly(t0)
4310                            || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
4311                         {
4312                                 failure = true;
4313                                 break;
4314                         }
4315                         conflict = (RWConflict)
4316                                 SHMQueueNext(&reader->inConflicts,
4317                                                          &conflict->inLink,
4318                                                          offsetof(RWConflictData, inLink));
4319                 }
4320         }
4321
4322         if (failure)
4323         {
4324                 /*
4325                  * We have to kill a transaction to avoid a possible anomaly from
4326                  * occurring. If the writer is us, we can just ereport() to cause a
4327                  * transaction abort. Otherwise we flag the writer for termination,
4328                  * causing it to abort when it tries to commit. However, if the writer
4329                  * is a prepared transaction, already prepared, we can't abort it
4330                  * anymore, so we have to kill the reader instead.
4331                  */
4332                 if (MySerializableXact == writer)
4333                 {
4334                         LWLockRelease(SerializableXactHashLock);
4335                         ereport(ERROR,
4336                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4337                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
4338                                          errdetail("Cancelled on identification as a pivot, during write."),
4339                                          errhint("The transaction might succeed if retried.")));
4340                 }
4341                 else if (SxactIsPrepared(writer))
4342                 {
4343                         LWLockRelease(SerializableXactHashLock);
4344
4345                         /* if we're not the writer, we have to be the reader */
4346                         Assert(MySerializableXact == reader);
4347                         ereport(ERROR,
4348                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4349                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
4350                                          errdetail("Cancelled on conflict out to pivot %u, during read.", writer->topXid),
4351                                          errhint("The transaction might succeed if retried.")));
4352                 }
4353                 writer->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
4354         }
4355 }
4356
4357 /*
4358  * PreCommit_CheckForSerializableConflicts
4359  *              Check for dangerous structures in a serializable transaction
4360  *              at commit.
4361  *
4362  * We're checking for a dangerous structure as each conflict is recorded.
4363  * The only way we could have a problem at commit is if this is the "out"
4364  * side of a pivot, and neither the "in" side nor the pivot has yet
4365  * committed.
4366  *
4367  * If a dangerous structure is found, the pivot (the near conflict) is
4368  * marked for death, because rolling back another transaction might mean
4369  * that we flail without ever making progress.  This transaction is
4370  * committing writes, so letting it commit ensures progress.  If we
4371  * cancelled the far conflict, it might immediately fail again on retry.
4372  */
4373 void
4374 PreCommit_CheckForSerializationFailure(void)
4375 {
4376         RWConflict      nearConflict;
4377
4378         if (MySerializableXact == InvalidSerializableXact)
4379                 return;
4380
4381         Assert(IsolationIsSerializable());
4382
4383         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4384
4385         if (SxactIsMarkedForDeath(MySerializableXact))
4386         {
4387                 LWLockRelease(SerializableXactHashLock);
4388                 ereport(ERROR,
4389                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4390                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4391                                  errdetail("Cancelled on identification as a pivot, during commit attempt."),
4392                                  errhint("The transaction might succeed if retried.")));
4393         }
4394
4395         nearConflict = (RWConflict)
4396                 SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
4397                                          (SHM_QUEUE *) &MySerializableXact->inConflicts,
4398                                          offsetof(RWConflictData, inLink));
4399         while (nearConflict)
4400         {
4401                 if (!SxactIsCommitted(nearConflict->sxactOut)
4402                         && !SxactIsRolledBack(nearConflict->sxactOut)
4403                         && !SxactIsMarkedForDeath(nearConflict->sxactOut))
4404                 {
4405                         RWConflict      farConflict;
4406
4407                         farConflict = (RWConflict)
4408                                 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4409                                                          &nearConflict->sxactOut->inConflicts,
4410                                                          offsetof(RWConflictData, inLink));
4411                         while (farConflict)
4412                         {
4413                                 if (farConflict->sxactOut == MySerializableXact
4414                                         || (!SxactIsCommitted(farConflict->sxactOut)
4415                                                 && !SxactIsReadOnly(farConflict->sxactOut)
4416                                                 && !SxactIsRolledBack(farConflict->sxactOut)
4417                                                 && !SxactIsMarkedForDeath(farConflict->sxactOut)))
4418                                 {
4419                                         nearConflict->sxactOut->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
4420                                         break;
4421                                 }
4422                                 farConflict = (RWConflict)
4423                                         SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4424                                                                  &farConflict->inLink,
4425                                                                  offsetof(RWConflictData, inLink));
4426                         }
4427                 }
4428
4429                 nearConflict = (RWConflict)
4430                         SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
4431                                                  &nearConflict->inLink,
4432                                                  offsetof(RWConflictData, inLink));
4433         }
4434
4435         MySerializableXact->flags |= SXACT_FLAG_PREPARED;
4436
4437         LWLockRelease(SerializableXactHashLock);
4438 }
4439
4440 /*------------------------------------------------------------------------*/
4441
4442 /*
4443  * Two-phase commit support
4444  */
4445
4446 /*
4447  * AtPrepare_Locks
4448  *              Do the preparatory work for a PREPARE: make 2PC state file
4449  *              records for all predicate locks currently held.
4450  */
4451 void
4452 AtPrepare_PredicateLocks(void)
4453 {
4454         PREDICATELOCK *predlock;
4455         SERIALIZABLEXACT *sxact;
4456         TwoPhasePredicateRecord record;
4457         TwoPhasePredicateXactRecord *xactRecord;
4458         TwoPhasePredicateLockRecord *lockRecord;
4459
4460         sxact = (SERIALIZABLEXACT *) MySerializableXact;
4461         xactRecord = &(record.data.xactRecord);
4462         lockRecord = &(record.data.lockRecord);
4463
4464         if (MySerializableXact == InvalidSerializableXact)
4465                 return;
4466
4467         /* Generate a xact record for our SERIALIZABLEXACT */
4468         record.type = TWOPHASEPREDICATERECORD_XACT;
4469         xactRecord->xmin = MySerializableXact->xmin;
4470         xactRecord->flags = MySerializableXact->flags;
4471
4472         /*
4473          * Tweak the flags. Since we're not going to output the inConflicts and
4474          * outConflicts lists, if they're non-empty we'll represent that by
4475          * setting the appropriate summary conflict flags.
4476          */
4477         if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
4478                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4479         if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->outConflicts))
4480                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4481
4482         RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4483                                                    &record, sizeof(record));
4484
4485         /*
4486          * Generate a lock record for each lock.
4487          *
4488          * To do this, we need to walk the predicate lock list in our sxact rather
4489          * than using the local predicate lock table because the latter is not
4490          * guaranteed to be accurate.
4491          */
4492         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4493
4494         predlock = (PREDICATELOCK *)
4495                 SHMQueueNext(&(sxact->predicateLocks),
4496                                          &(sxact->predicateLocks),
4497                                          offsetof(PREDICATELOCK, xactLink));
4498
4499         while (predlock != NULL)
4500         {
4501                 record.type = TWOPHASEPREDICATERECORD_LOCK;
4502                 lockRecord->target = predlock->tag.myTarget->tag;
4503
4504                 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4505                                                            &record, sizeof(record));
4506
4507                 predlock = (PREDICATELOCK *)
4508                         SHMQueueNext(&(sxact->predicateLocks),
4509                                                  &(predlock->xactLink),
4510                                                  offsetof(PREDICATELOCK, xactLink));
4511         }
4512
4513         LWLockRelease(SerializablePredicateLockListLock);
4514 }
4515
4516 /*
4517  * PostPrepare_Locks
4518  *              Clean up after successful PREPARE. Unlike the non-predicate
4519  *              lock manager, we do not need to transfer locks to a dummy
4520  *              PGPROC because our SERIALIZABLEXACT will stay around
4521  *              anyway. We only need to clean up our local state.
4522  */
4523 void
4524 PostPrepare_PredicateLocks(TransactionId xid)
4525 {
4526         if (MySerializableXact == InvalidSerializableXact)
4527                 return;
4528
4529         Assert(SxactIsPrepared(MySerializableXact));
4530
4531         MySerializableXact->pid = 0;
4532
4533         hash_destroy(LocalPredicateLockHash);
4534         LocalPredicateLockHash = NULL;
4535
4536         MySerializableXact = InvalidSerializableXact;
4537 }
4538
4539 /*
4540  * PredicateLockTwoPhaseFinish
4541  *              Release a prepared transaction's predicate locks once it
4542  *              commits or aborts.
4543  */
4544 void
4545 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
4546 {
4547         SERIALIZABLEXID *sxid;
4548         SERIALIZABLEXIDTAG sxidtag;
4549
4550         sxidtag.xid = xid;
4551
4552         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4553         sxid = (SERIALIZABLEXID *)
4554                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4555         LWLockRelease(SerializableXactHashLock);
4556
4557         /* xid will not be found if it wasn't a serializable transaction */
4558         if (sxid == NULL)
4559                 return;
4560
4561         /* Release its locks */
4562         MySerializableXact = sxid->myXact;
4563         ReleasePredicateLocks(isCommit);
4564 }
4565
4566 /*
4567  * Re-acquire a predicate lock belonging to a transaction that was prepared.
4568  */
4569 void
4570 predicatelock_twophase_recover(TransactionId xid, uint16 info,
4571                                                            void *recdata, uint32 len)
4572 {
4573         TwoPhasePredicateRecord *record;
4574
4575         Assert(len == sizeof(TwoPhasePredicateRecord));
4576
4577         record = (TwoPhasePredicateRecord *) recdata;
4578
4579         Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
4580                    (record->type == TWOPHASEPREDICATERECORD_LOCK));
4581
4582         if (record->type == TWOPHASEPREDICATERECORD_XACT)
4583         {
4584                 /* Per-transaction record. Set up a SERIALIZABLEXACT. */
4585                 TwoPhasePredicateXactRecord *xactRecord;
4586                 SERIALIZABLEXACT *sxact;
4587                 SERIALIZABLEXID *sxid;
4588                 SERIALIZABLEXIDTAG sxidtag;
4589                 bool            found;
4590
4591                 xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
4592
4593                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4594                 sxact = CreatePredXact();
4595                 if (!sxact)
4596                         ereport(ERROR,
4597                                         (errcode(ERRCODE_OUT_OF_MEMORY),
4598                                          errmsg("out of shared memory")));
4599
4600                 /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
4601                 sxact->vxid.backendId = InvalidBackendId;
4602                 sxact->vxid.localTransactionId = (LocalTransactionId) xid;
4603                 sxact->pid = 0;
4604
4605                 /* a prepared xact hasn't committed yet */
4606                 sxact->commitSeqNo = InvalidSerCommitSeqNo;
4607                 sxact->finishedBefore = InvalidTransactionId;
4608
4609                 sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
4610
4611
4612                 /*
4613                  * We don't need the details of a prepared transaction's conflicts,
4614                  * just whether it had conflicts in or out (which we get from the
4615                  * flags)
4616                  */
4617                 SHMQueueInit(&(sxact->outConflicts));
4618                 SHMQueueInit(&(sxact->inConflicts));
4619
4620                 /*
4621                  * Don't need to track this; no transactions running at the time the
4622                  * recovered xact started are still active, except possibly other
4623                  * prepared xacts and we don't care whether those are RO_SAFE or not.
4624                  */
4625                 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
4626
4627                 SHMQueueInit(&(sxact->predicateLocks));
4628                 SHMQueueElemInit(&(sxact->finishedLink));
4629
4630                 sxact->topXid = xid;
4631                 sxact->xmin = xactRecord->xmin;
4632                 sxact->flags = xactRecord->flags;
4633                 Assert(SxactIsPrepared(sxact));
4634                 if (!SxactIsReadOnly(sxact))
4635                 {
4636                         ++(PredXact->WritableSxactCount);
4637                         Assert(PredXact->WritableSxactCount <=
4638                                    (MaxBackends + max_prepared_xacts));
4639                 }
4640
4641                 /* Register the transaction's xid */
4642                 sxidtag.xid = xid;
4643                 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
4644                                                                                            &sxidtag,
4645                                                                                            HASH_ENTER, &found);
4646                 Assert(sxid != NULL);
4647                 Assert(!found);
4648                 sxid->myXact = (SERIALIZABLEXACT *) sxact;
4649
4650                 /*
4651                  * Update global xmin. Note that this is a special case compared to
4652                  * registering a normal transaction, because the global xmin might go
4653                  * backwards. That's OK, because until recovery is over we're not
4654                  * going to complete any transactions or create any non-prepared
4655                  * transactions, so there's no danger of throwing away.
4656                  */
4657                 if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
4658                         (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
4659                 {
4660                         PredXact->SxactGlobalXmin = sxact->xmin;
4661                         PredXact->SxactGlobalXminCount = 1;
4662                         OldSerXidSetActiveSerXmin(sxact->xmin);
4663                 }
4664                 else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
4665                 {
4666                         Assert(PredXact->SxactGlobalXminCount > 0);
4667                         PredXact->SxactGlobalXminCount++;
4668                 }
4669
4670                 LWLockRelease(SerializableXactHashLock);
4671         }
4672         else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
4673         {
4674                 /* Lock record. Recreate the PREDICATELOCK */
4675                 TwoPhasePredicateLockRecord *lockRecord;
4676                 SERIALIZABLEXID *sxid;
4677                 SERIALIZABLEXACT *sxact;
4678                 SERIALIZABLEXIDTAG sxidtag;
4679                 uint32          targettaghash;
4680
4681                 lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
4682                 targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
4683
4684                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4685                 sxidtag.xid = xid;
4686                 sxid = (SERIALIZABLEXID *)
4687                         hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4688                 LWLockRelease(SerializableXactHashLock);
4689
4690                 Assert(sxid != NULL);
4691                 sxact = sxid->myXact;
4692                 Assert(sxact != InvalidSerializableXact);
4693
4694                 CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
4695         }
4696 }