X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;ds=sidebyside;f=src%2Fbackend%2Fstorage%2Flmgr%2Fpredicate.c;h=3b355641c250198f331935dcaf28363940073083;hb=c7b8998ebbf310a156aa38022555a24d98fdbfb4;hp=e6c2c4b9b00d858c4f6e1d0236c8a1dc8114c897;hpb=5da417f7c4b2adb5b2aa4d6c86354f8de87dcde9;p=postgresql diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e6c2c4b9b0..3b355641c2 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -32,11 +32,11 @@ * examining the MVCC data.) * * (1) Besides tuples actually read, they must cover ranges of tuples - * which would have been read based on the predicate. This will + * which would have been read based on the predicate. This will * require modelling the predicates through locks against database * objects such as pages, index ranges, or entire tables. * - * (2) They must be kept in RAM for quick access. Because of this, it + * (2) They must be kept in RAM for quick access. Because of this, it * isn't possible to always maintain tuple-level granularity -- when * the space allocated to store these approaches exhaustion, a * request for a lock may need to scan for situations where a single @@ -49,7 +49,7 @@ * * (4) While they are associated with a transaction, they must survive * a successful COMMIT of that transaction, and remain until all - * overlapping transactions complete. This even means that they + * overlapping transactions complete. This even means that they * must survive termination of the transaction's process. If a * top level transaction is rolled back, however, it is immediately * flagged so that it can be ignored, and its SIREAD locks can be @@ -62,7 +62,7 @@ * an existing SIREAD lock for the same transaction, the SIREAD lock * can be deleted. * - * (7) A write from a serializable transaction must ensure that a xact + * (7) A write from a serializable transaction must ensure that an xact * record exists for the transaction, with the same lifespan (until * all concurrent transaction complete or the transaction is rolled * back) so that rw-dependencies to that transaction can be @@ -90,7 +90,7 @@ * may yet matter because they overlap still-active transactions. * * SerializablePredicateLockListLock - * - Protects the linked list of locks held by a transaction. Note + * - Protects the linked list of locks held by a transaction. Note * that the locks themselves are also covered by the partition * locks of their respective lock targets; this lock only affects * the linked list connecting the locks related to a transaction. @@ -101,11 +101,11 @@ * - It is relatively infrequent that another process needs to * modify the list for a transaction, but it does happen for such * things as index page splits for pages with predicate locks and - * freeing of predicate locked pages by a vacuum process. When + * freeing of predicate locked pages by a vacuum process. When * removing a lock in such cases, the lock itself contains the * pointers needed to remove it from the list. When adding a * lock in such cases, the lock can be added using the anchor in - * the transaction structure. Neither requires walking the list. + * the transaction structure. Neither requires walking the list. * - Cleaning up the list for a terminated transaction is sometimes * not done on a retail basis, in which case no lock is required. * - Due to the above, a process accessing its active transaction's @@ -125,7 +125,7 @@ * - Protects both PredXact and SerializableXidHash. * * - * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * @@ -146,7 +146,9 @@ * PageIsPredicateLocked(Relation relation, BlockNumber blkno) * * predicate lock maintenance - * RegisterSerializableTransaction(Snapshot snapshot) + * GetSerializableTransactionSnapshot(Snapshot snapshot) + * SetSerializableTransactionSnapshot(Snapshot snapshot, + * VirtualTransactionId *sourcevxid) * RegisterPredicateLockingXid(void) * PredicateLockRelation(Relation relation, Snapshot snapshot) * PredicateLockPage(Relation relation, BlockNumber blkno, @@ -154,9 +156,9 @@ * PredicateLockTuple(Relation relation, HeapTuple tuple, * Snapshot snapshot) * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, - * BlockNumber newblkno); + * BlockNumber newblkno) * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, - * BlockNumber newblkno); + * BlockNumber newblkno) * TransferPredicateLocksToHeapRelation(Relation relation) * ReleasePredicateLocks(bool isCommit) * @@ -181,16 +183,20 @@ #include "postgres.h" +#include "access/htup_details.h" #include "access/slru.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlog.h" #include "miscadmin.h" +#include "pgstat.h" #include "storage/bufmgr.h" #include "storage/predicate.h" #include "storage/predicate_internals.h" +#include "storage/proc.h" #include "storage/procarray.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -237,13 +243,24 @@ #define PredicateLockHashPartition(hashcode) \ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS) #define PredicateLockHashPartitionLock(hashcode) \ - ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode))) + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \ + PredicateLockHashPartition(hashcode)].lock) +#define PredicateLockHashPartitionLockByIndex(i) \ + (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock) #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink))) +/* + * Note that a sxact is marked "prepared" once it has passed + * PreCommit_CheckForSerializationFailure, even if it isn't using + * 2PC. This is the point at which it can no longer be aborted. + * + * The PREPARED flag remains set after commit, so SxactIsCommitted + * implies SxactIsPrepared. + */ #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0) #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0) #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0) @@ -270,7 +287,7 @@ * the lock partition number from the hashcode. */ #define PredicateLockTargetTagHashCode(predicatelocktargettag) \ - (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG))) + get_hash_value(PredicateLockTargetHash, predicatelocktargettag) /* * Given a predicate lock tag, and the hash for its target, @@ -297,7 +314,13 @@ static SlruCtlData OldSerXidSlruCtlData; #define OLDSERXID_PAGESIZE BLCKSZ #define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo) #define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE) -#define OLDSERXID_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) + +/* + * Set maximum pages based on the lesser of the number needed to track all + * transactions and the maximum that SLRU supports. + */ +#define OLDSERXID_MAX_PAGE Min(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1, \ + (MaxTransactionId) / OLDSERXID_ENTRIESPERPAGE) #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1) @@ -314,7 +337,7 @@ typedef struct OldSerXidControlData TransactionId headXid; /* newest valid Xid in the SLRU */ TransactionId tailXid; /* oldest xmin we might be interested in */ bool warningIssued; /* have we issued SLRU wrap-around warning? */ -} OldSerXidControlData; +} OldSerXidControlData; typedef struct OldSerXidControlData *OldSerXidControl; @@ -329,12 +352,19 @@ static OldSerXidControl oldSerXidControl; static SERIALIZABLEXACT *OldCommittedSxact; -/* This configuration variable is used to set the predicate lock table size */ -int max_predicate_locks_per_xact; /* set by guc.c */ +/* + * These configuration variables are used to set the predicate lock table size + * and to control promotion of predicate locks to coarser granularity in an + * attempt to degrade performance (mostly as false positive serialization + * failure) gracefully in the face of memory pressurel + */ +int max_predicate_locks_per_xact; /* set by guc.c */ +int max_predicate_locks_per_relation; /* set by guc.c */ +int max_predicate_locks_per_page; /* set by guc.c */ /* * This provides a list of objects in order to track transactions - * participating in predicate locking. Entries in the list are fixed size, + * participating in predicate locking. Entries in the list are fixed size, * and reside in shared memory. The memory address of an entry must remain * fixed during its lifetime. The list will be protected from concurrent * update externally; no provision is made in this code to manage that. The @@ -363,9 +393,9 @@ static SHM_QUEUE *FinishedSerializableTransactions; * this entry, you can ensure that there's enough scratch space available for * inserting one entry in the hash table. This is an otherwise-invalid tag. */ -static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0, 0}; +static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0}; static uint32 ScratchTargetTagHash; -static int ScratchPartitionLock; +static LWLock *ScratchPartitionLock; /* * The local hash table used to determine when to combine multiple fine- @@ -388,7 +418,7 @@ static void ReleasePredXact(SERIALIZABLEXACT *sxact); static SERIALIZABLEXACT *FirstPredXact(void); static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact); -static bool RWConflictExists(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); +static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer); static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact); static void ReleaseRWConflict(RWConflict conflict); @@ -403,27 +433,29 @@ static void OldSerXidSetActiveSerXmin(TransactionId xid); static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); -static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot); -static bool PredicateLockExists(PREDICATELOCKTARGETTAG *targettag); -static bool GetParentPredicateLockTag(PREDICATELOCKTARGETTAG *tag, +static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, + VirtualTransactionId *sourcevxid, + int sourcepid); +static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); +static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); -static bool CoarserLockCovers(PREDICATELOCKTARGETTAG *newtargettag); +static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag); static void RemoveScratchTarget(bool lockheld); static void RestoreScratchTarget(bool lockheld); static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash); -static void DeleteChildTargetLocks(PREDICATELOCKTARGETTAG *newtargettag); -static int PredicateLockPromotionThreshold(PREDICATELOCKTARGETTAG *tag); -static bool CheckAndPromotePredicateLockRequest(PREDICATELOCKTARGETTAG *reqtag); -static void DecrementParentLocks(PREDICATELOCKTARGETTAG *targettag); -static void CreatePredicateLock(PREDICATELOCKTARGETTAG *targettag, +static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag); +static int MaxPredicateChildLocks(const PREDICATELOCKTARGETTAG *tag); +static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag); +static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag); +static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, uint32 targettaghash, SERIALIZABLEXACT *sxact); static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash); static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, PREDICATELOCKTARGETTAG newtargettag, bool removeOld); -static void PredicateLockAcquire(PREDICATELOCKTARGETTAG *targettag); +static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag); static void DropAllPredicateLocksFromTable(Relation relation, bool transfer); static void SetNewSxactGlobalXmin(void); @@ -433,7 +465,7 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, static bool XidIsConcurrent(TransactionId xid); static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag); static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); -static void OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, +static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); @@ -441,13 +473,14 @@ static void OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, /* * Does this relation participate in predicate locking? Temporary and system - * relations are exempt. + * relations are exempt, as are materialized views. */ static inline bool PredicateLockingNeededForRelation(Relation relation) { return !(relation->rd_id < FirstBootstrapObjectId || - RelationUsesLocalBuffers(relation)); + RelationUsesLocalBuffers(relation) || + relation->rd_rel->relkind == RELKIND_MATVIEW); } /* @@ -472,8 +505,8 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot) * Don't acquire locks or conflict when scanning with a special snapshot. * This excludes things like CLUSTER and REINDEX. They use the wholesale * functions TransferPredicateLocksToHeapRelation() and - * CheckTableForSerializableConflictIn() to participate serialization, but - * the scans involved don't need serialization. + * CheckTableForSerializableConflictIn() to participate in serialization, + * but the scans involved don't need serialization. */ if (!IsMVCCSnapshot(snapshot)) return false; @@ -524,7 +557,7 @@ SerializationNeededForWrite(Relation relation) /* * These functions are a simple implementation of a list for this specific - * type of struct. If there is ever a generalized shared memory list, we + * type of struct. If there is ever a generalized shared memory list, we * should probably switch to that. */ static SERIALIZABLEXACT * @@ -601,7 +634,7 @@ NextPredXact(SERIALIZABLEXACT *sxact) * These functions manage primitive access to the RWConflict pool and lists. */ static bool -RWConflictExists(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) +RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer) { RWConflict conflict; @@ -648,7 +681,7 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) if (!conflict) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("not enough elements in RWConflictPool to record a rw-conflict"), + errmsg("not enough elements in RWConflictPool to record a read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); SHMQueueDelete(&conflict->outLink); @@ -676,7 +709,7 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, if (!conflict) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"), + errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); SHMQueueDelete(&conflict->outLink); @@ -744,7 +777,7 @@ OldSerXidPagePrecedesLogically(int p, int q) int diff; /* - * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2. Both inputs should + * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2. Both inputs should * be in the range 0..OLDSERXID_MAX_PAGE. */ Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE); @@ -753,7 +786,7 @@ OldSerXidPagePrecedesLogically(int p, int q) diff = p - q; if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2)) diff -= OLDSERXID_MAX_PAGE + 1; - else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2)) + else if (diff < -((int) (OLDSERXID_MAX_PAGE + 1) / 2)) diff += OLDSERXID_MAX_PAGE + 1; return diff < 0; } @@ -770,8 +803,9 @@ OldSerXidInit(void) * Set up SLRU management of the pg_serial data. */ OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically; - SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", - NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial"); + SimpleLruInit(OldSerXidSlruCtl, "oldserxid", + NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial", + LWTRANCHE_OLDSERXID_BUFFERS); /* Override default assumption that writes should be fsync'd */ OldSerXidSlruCtl->do_fsync = false; @@ -882,7 +916,7 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) oldSerXidControl->warningIssued = true; ereport(WARNING, (errmsg("memory for serializable conflict tracking is nearly exhausted"), - errhint("There may be an idle transaction or a forgotten prepared transaction causing this."))); + errhint("There might be an idle transaction or a forgotten prepared transaction causing this."))); } } @@ -906,7 +940,7 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) } /* - * Get the minimum commitSeqNo for any conflict out for the given xid. For + * Get the minimum commitSeqNo for any conflict out for the given xid. For * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo * will be returned. */ @@ -959,7 +993,7 @@ OldSerXidSetActiveSerXmin(TransactionId xid) /* * When no sxacts are active, nothing overlaps, set the xid values to * invalid to show that there are no valid entries. Don't clear headPage, - * though. A new xmin might still land on that page, and we don't want to + * though. A new xmin might still land on that page, and we don't want to * repeatedly zero out the same page. */ if (!TransactionIdIsValid(xid)) @@ -1071,7 +1105,6 @@ void InitPredicateLocks(void) { HASHCTL info; - int hash_flags; long max_table_size; Size requestSize; bool found; @@ -1089,15 +1122,14 @@ InitPredicateLocks(void) MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(PREDICATELOCKTARGETTAG); info.entrysize = sizeof(PREDICATELOCKTARGET); - info.hash = tag_hash; info.num_partitions = NUM_PREDICATELOCK_PARTITIONS; - hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE); PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash", max_table_size, max_table_size, &info, - hash_flags); + HASH_ELEM | HASH_BLOBS | + HASH_PARTITION | HASH_FIXED_SIZE); /* Assume an average of 2 xacts per target */ max_table_size *= 2; @@ -1119,13 +1151,13 @@ InitPredicateLocks(void) info.entrysize = sizeof(PREDICATELOCK); info.hash = predicatelock_hash; info.num_partitions = NUM_PREDICATELOCK_PARTITIONS; - hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE); PredicateLockHash = ShmemInitHash("PREDICATELOCK hash", max_table_size, max_table_size, &info, - hash_flags); + HASH_ELEM | HASH_FUNCTION | + HASH_PARTITION | HASH_FIXED_SIZE); /* * Compute size for serializable transaction hashtable. Note these @@ -1161,12 +1193,6 @@ InitPredicateLocks(void) requestSize = mul_size((Size) max_table_size, PredXactListElementDataSize); PredXact->element = ShmemAlloc(requestSize); - if (PredXact->element == NULL) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("not enough shared memory for elements of data structure" - " \"%s\" (%lu bytes requested)", - "PredXactList", (unsigned long) requestSize))); /* Add all elements to available list, clean. */ memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) @@ -1176,6 +1202,7 @@ InitPredicateLocks(void) } PredXact->OldCommittedSxact = CreatePredXact(); SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid); + PredXact->OldCommittedSxact->prepareSeqNo = 0; PredXact->OldCommittedSxact->commitSeqNo = 0; PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0; SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts); @@ -1199,14 +1226,13 @@ InitPredicateLocks(void) MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(SERIALIZABLEXIDTAG); info.entrysize = sizeof(SERIALIZABLEXID); - info.hash = tag_hash; - hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash", max_table_size, max_table_size, &info, - hash_flags); + HASH_ELEM | HASH_BLOBS | + HASH_FIXED_SIZE); /* * Allocate space for tracking rw-conflicts in lists attached to the @@ -1216,7 +1242,7 @@ InitPredicateLocks(void) * that this will prevent resource exhaustion in even the most pessimal * loads up to max_connections = 200 with all 200 connections pounding the * database with serializable transactions. Beyond that, there may be - * occassional transactions canceled when trying to flag conflicts. That's + * occasional transactions canceled when trying to flag conflicts. That's * probably OK. */ max_table_size *= 5; @@ -1232,12 +1258,6 @@ InitPredicateLocks(void) requestSize = mul_size((Size) max_table_size, RWConflictDataSize); RWConflictPool->element = ShmemAlloc(requestSize); - if (RWConflictPool->element == NULL) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("not enough shared memory for elements of data structure" - " \"%s\" (%lu bytes requested)", - "RWConflictPool", (unsigned long) requestSize))); /* Add all elements to available list, clean. */ memset(RWConflictPool->element, 0, requestSize); for (i = 0; i < max_table_size; i++) @@ -1337,7 +1357,7 @@ PredicateLockShmemSize(void) static uint32 predicatelock_hash(const void *key, Size keysize) { - PREDICATELOCKTAG *predicatelocktag = (PREDICATELOCKTAG *) key; + const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key; uint32 targethash; Assert(keysize == sizeof(PREDICATELOCKTAG)); @@ -1377,7 +1397,7 @@ GetPredicateLockStatusData(void) * in ascending order, then SerializableXactHashLock. */ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Get number of locks and allocate appropriately-sized arrays. */ @@ -1406,7 +1426,7 @@ GetPredicateLockStatusData(void) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); return data; } @@ -1443,7 +1463,7 @@ SummarizeOldestCommittedSxact(void) /* * Grab the first sxact off the finished list -- this will be the earliest - * commit. Remove it from the list. + * commit. Remove it from the list. */ sxact = (SERIALIZABLEXACT *) SHMQueueNext(FinishedSerializableTransactions, @@ -1470,6 +1490,10 @@ SummarizeOldestCommittedSxact(void) * without further checks. This requires waiting for concurrent * transactions to complete, and retrying with a new snapshot if * one of them could possibly create a conflict. + * + * As with GetSerializableTransactionSnapshot (which this is a subroutine + * for), the passed-in Snapshot pointer should reference a static data + * area that can safely be passed to GetSnapshotData. */ static Snapshot GetSafeSnapshot(Snapshot origSnapshot) @@ -1481,12 +1505,13 @@ GetSafeSnapshot(Snapshot origSnapshot) while (true) { /* - * RegisterSerializableTransactionInt is going to call - * GetSnapshotData, so we need to provide it the static snapshot our - * caller passed to us. It returns a copy of that snapshot and - * registers it on TopTransactionResourceOwner. + * GetSerializableTransactionSnapshotInt is going to call + * GetSnapshotData, so we need to provide it the static snapshot area + * our caller passed to us. The pointer returned is actually the same + * one passed to it, but we avoid assuming that here. */ - snapshot = RegisterSerializableTransactionInt(origSnapshot); + snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, + NULL, InvalidPid); if (MySerializableXact == InvalidSerializableXact) return snapshot; /* no concurrent r/w xacts; it's safe */ @@ -1502,7 +1527,7 @@ GetSafeSnapshot(Snapshot origSnapshot) SxactIsROUnsafe(MySerializableXact))) { LWLockRelease(SerializableXactHashLock); - ProcWaitForSignal(); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); } MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; @@ -1520,8 +1545,6 @@ GetSafeSnapshot(Snapshot origSnapshot) (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("deferrable snapshot was unsafe; trying a new one"))); ReleasePredicateLocks(false); - UnregisterSnapshotFromOwner(snapshot, - TopTransactionResourceOwner); } /* @@ -1534,28 +1557,141 @@ GetSafeSnapshot(Snapshot origSnapshot) } /* - * Acquire and register a snapshot which can be used for this transaction.. + * GetSafeSnapshotBlockingPids + * If the specified process is currently blocked in GetSafeSnapshot, + * write the process IDs of all processes that it is blocked by + * into the caller-supplied buffer output[]. The list is truncated at + * output_size, and the number of PIDs written into the buffer is + * returned. Returns zero if the given PID is not currently blocked + * in GetSafeSnapshot. + */ +int +GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) +{ + int num_written = 0; + SERIALIZABLEXACT *sxact; + + LWLockAcquire(SerializableXactHashLock, LW_SHARED); + + /* Find blocked_pid's SERIALIZABLEXACT by linear search. */ + for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + { + if (sxact->pid == blocked_pid) + break; + } + + /* Did we find it, and is it currently waiting in GetSafeSnapshot? */ + if (sxact != NULL && SxactIsDeferrableWaiting(sxact)) + { + RWConflict possibleUnsafeConflict; + + /* Traverse the list of possible unsafe conflicts collecting PIDs. */ + possibleUnsafeConflict = (RWConflict) + SHMQueueNext(&sxact->possibleUnsafeConflicts, + &sxact->possibleUnsafeConflicts, + offsetof(RWConflictData, inLink)); + + while (possibleUnsafeConflict != NULL && num_written < output_size) + { + output[num_written++] = possibleUnsafeConflict->sxactOut->pid; + possibleUnsafeConflict = (RWConflict) + SHMQueueNext(&sxact->possibleUnsafeConflicts, + &possibleUnsafeConflict->inLink, + offsetof(RWConflictData, inLink)); + } + } + + LWLockRelease(SerializableXactHashLock); + + return num_written; +} + +/* + * Acquire a snapshot that can be used for the current transaction. + * * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact. * It should be current for this process and be contained in PredXact. + * + * The passed-in Snapshot pointer should reference a static data area that + * can safely be passed to GetSnapshotData. The return value is actually + * always this same pointer; no new snapshot data structure is allocated + * within this function. */ Snapshot -RegisterSerializableTransaction(Snapshot snapshot) +GetSerializableTransactionSnapshot(Snapshot snapshot) { Assert(IsolationIsSerializable()); + /* + * Can't use serializable mode while recovery is still active, as it is, + * for example, on a hot standby. We could get here despite the check in + * check_XactIsoLevel() if default_transaction_isolation is set to + * serializable, so phrase the hint accordingly. + */ + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use serializable mode in a hot standby"), + errdetail("\"default_transaction_isolation\" is set to \"serializable\"."), + errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default."))); + /* * A special optimization is available for SERIALIZABLE READ ONLY * DEFERRABLE transactions -- we can wait for a suitable snapshot and - * thereby avoid all SSI overhead once it's running.. + * thereby avoid all SSI overhead once it's running. */ if (XactReadOnly && XactDeferrable) return GetSafeSnapshot(snapshot); - return RegisterSerializableTransactionInt(snapshot); + return GetSerializableTransactionSnapshotInt(snapshot, + NULL, InvalidPid); } +/* + * Import a snapshot to be used for the current transaction. + * + * This is nearly the same as GetSerializableTransactionSnapshot, except that + * we don't take a new snapshot, but rather use the data we're handed. + * + * The caller must have verified that the snapshot came from a serializable + * transaction; and if we're read-write, the source transaction must not be + * read-only. + */ +void +SetSerializableTransactionSnapshot(Snapshot snapshot, + VirtualTransactionId *sourcevxid, + int sourcepid) +{ + Assert(IsolationIsSerializable()); + + /* + * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to + * import snapshots, since there's no way to wait for a safe snapshot when + * we're using the snap we're told to. (XXX instead of throwing an error, + * we could just ignore the XactDeferrable flag?) + */ + if (XactReadOnly && XactDeferrable) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); + + (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid, + sourcepid); +} + +/* + * Guts of GetSerializableTransactionSnapshot + * + * If sourcexid is valid, this is actually an import operation and we should + * skip calling GetSnapshotData, because the snapshot contents are already + * loaded up. HOWEVER: to avoid race conditions, we must check that the + * source xact is still running after we acquire SerializableXactHashLock. + * We do that by calling ProcArrayInstallImportedXmin. + */ static Snapshot -RegisterSerializableTransactionInt(Snapshot snapshot) +GetSerializableTransactionSnapshotInt(Snapshot snapshot, + VirtualTransactionId *sourcevxid, + int sourcepid) { PGPROC *proc; VirtualTransactionId vxid; @@ -1568,6 +1704,14 @@ RegisterSerializableTransactionInt(Snapshot snapshot) Assert(!RecoveryInProgress()); + /* + * Since all parts of a serializable transaction must use the same + * snapshot, it is too late to establish one after a parallel operation + * has begun. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot establish serializable snapshot during a parallel operation"); + proc = MyProc; Assert(proc != NULL); GET_VXID_FROM_PGPROC(vxid, *proc); @@ -1575,6 +1719,14 @@ RegisterSerializableTransactionInt(Snapshot snapshot) /* * First we get the sxact structure, which may involve looping and access * to the "finished" list to free a structure for use. + * + * We must hold SerializableXactHashLock when taking/checking the snapshot + * to avoid race conditions, for much the same reasons that + * GetSnapshotData takes the ProcArrayLock. Since we might have to + * release SerializableXactHashLock to call SummarizeOldestCommittedSxact, + * this means we have to create the sxact first, which is a bit annoying + * (in particular, an elog(ERROR) in procarray.c would cause us to leak + * the sxact). Consider refactoring to avoid this. */ #ifdef TEST_OLDSERXID SummarizeOldestCommittedSxact(); @@ -1592,9 +1744,19 @@ RegisterSerializableTransactionInt(Snapshot snapshot) } } while (!sxact); - /* Get and register a snapshot */ - snapshot = GetSnapshotData(snapshot); - snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner); + /* Get the snapshot, or check that it's safe to use */ + if (!sourcevxid) + snapshot = GetSnapshotData(snapshot); + else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid)) + { + ReleasePredXact(sxact); + LWLockRelease(SerializableXactHashLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source process with pid %d is not running anymore.", + sourcepid))); + } /* * If there are no serializable transactions which are not read-only, we @@ -1636,6 +1798,7 @@ RegisterSerializableTransactionInt(Snapshot snapshot) /* Initialize the structure. */ sxact->vxid = vxid; sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo; + sxact->prepareSeqNo = InvalidSerCommitSeqNo; sxact->commitSeqNo = InvalidSerCommitSeqNo; SHMQueueInit(&(sxact->outConflicts)); SHMQueueInit(&(sxact->inConflicts)); @@ -1661,8 +1824,9 @@ RegisterSerializableTransactionInt(Snapshot snapshot) othersxact != NULL; othersxact = NextPredXact(othersxact)) { - if (!SxactIsOnFinishedList(othersxact) && - !SxactIsReadOnly(othersxact)) + if (!SxactIsCommitted(othersxact) + && !SxactIsDoomed(othersxact) + && !SxactIsReadOnly(othersxact)) { SetPossibleUnsafeConflict(sxact, othersxact); } @@ -1685,11 +1849,10 @@ RegisterSerializableTransactionInt(Snapshot snapshot) MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG); hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK); - hash_ctl.hash = tag_hash; LocalPredicateLockHash = hash_create("Local predicate lock", max_predicate_locks_per_xact, &hash_ctl, - HASH_ELEM | HASH_FUNCTION); + HASH_ELEM | HASH_BLOBS); return snapshot; } @@ -1752,7 +1915,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno) { PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; SET_PREDICATELOCKTARGETTAG_PAGE(targettag, @@ -1785,7 +1948,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno) * acceptable! */ static bool -PredicateLockExists(PREDICATELOCKTARGETTAG *targettag) +PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag) { LOCALPREDICATELOCK *lock; @@ -1812,7 +1975,7 @@ PredicateLockExists(PREDICATELOCKTARGETTAG *targettag) * returns false if none exists. */ static bool -GetParentPredicateLockTag(PREDICATELOCKTARGETTAG *tag, +GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent) { switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag)) @@ -1851,7 +2014,7 @@ GetParentPredicateLockTag(PREDICATELOCKTARGETTAG *tag, * negative, but it will never return a false positive. */ static bool -CoarserLockCovers(PREDICATELOCKTARGETTAG *newtargettag) +CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag) { PREDICATELOCKTARGETTAG targettag, parenttag; @@ -1925,7 +2088,7 @@ RestoreScratchTarget(bool lockheld) static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) { - PREDICATELOCKTARGET *rmtarget; + PREDICATELOCKTARGET *rmtarget PG_USED_FOR_ASSERTS_ONLY; Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); @@ -1944,7 +2107,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) /* * Delete child target locks owned by this process. * This implementation is assuming that the usage of each target tag field - * is uniform. No need to make this hard if we don't have to. + * is uniform. No need to make this hard if we don't have to. * * We aren't acquiring lightweight locks for the predicate lock or lock * target structures associated with this transaction unless we're going @@ -1952,7 +2115,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) * locks. */ static void -DeleteChildTargetLocks(PREDICATELOCKTARGETTAG *newtargettag) +DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) { SERIALIZABLEXACT *sxact; PREDICATELOCK *predlock; @@ -1985,8 +2148,8 @@ DeleteChildTargetLocks(PREDICATELOCKTARGETTAG *newtargettag) if (TargetTagIsCoveredBy(oldtargettag, *newtargettag)) { uint32 oldtargettaghash; - LWLockId partitionLock; - PREDICATELOCK *rmpredlock; + LWLock *partitionLock; + PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY; oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); partitionLock = PredicateLockHashPartitionLock(oldtargettaghash); @@ -2016,28 +2179,35 @@ DeleteChildTargetLocks(PREDICATELOCKTARGETTAG *newtargettag) } /* - * Returns the promotion threshold for a given predicate lock - * target. This is the number of descendant locks required to promote - * to the specified tag. Note that the threshold includes non-direct - * descendants, e.g. both tuples and pages for a relation lock. + * Returns the promotion limit for a given predicate lock target. This is the + * max number of descendant locks allowed before promoting to the specified + * tag. Note that the limit includes non-direct descendants (e.g., both tuples + * and pages for a relation lock). * - * TODO SSI: We should do something more intelligent about what the - * thresholds are, either making it proportional to the number of - * tuples in a page & pages in a relation, or at least making it a - * GUC. Currently the threshold is 3 for a page lock, and - * max_pred_locks_per_transaction/2 for a relation lock, chosen - * entirely arbitrarily (and without benchmarking). + * Currently the default limit is 2 for a page lock, and half of the value of + * max_pred_locks_per_transaction - 1 for a relation lock, to match behavior + * of earlier releases when upgrading. + * + * TODO SSI: We should probably add additional GUCs to allow a maximum ratio + * of page and tuple locks based on the pages in a relation, and the maximum + * ratio of tuple locks to tuples in a page. This would provide more + * generally "balanced" allocation of locks to where they are most useful, + * while still allowing the absolute numbers to prevent one relation from + * tying up all predicate lock resources. */ static int -PredicateLockPromotionThreshold(PREDICATELOCKTARGETTAG *tag) +MaxPredicateChildLocks(const PREDICATELOCKTARGETTAG *tag) { switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag)) { case PREDLOCKTAG_RELATION: - return max_predicate_locks_per_xact / 2; + return max_predicate_locks_per_relation < 0 + ? (max_predicate_locks_per_xact + / (-max_predicate_locks_per_relation)) - 1 + : max_predicate_locks_per_relation; case PREDLOCKTAG_PAGE: - return 3; + return max_predicate_locks_per_page; case PREDLOCKTAG_TUPLE: @@ -2063,7 +2233,7 @@ PredicateLockPromotionThreshold(PREDICATELOCKTARGETTAG *tag) * Returns true if a parent lock was acquired and false otherwise. */ static bool -CheckAndPromotePredicateLockRequest(PREDICATELOCKTARGETTAG *reqtag) +CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag) { PREDICATELOCKTARGETTAG targettag, nexttag, @@ -2092,8 +2262,8 @@ CheckAndPromotePredicateLockRequest(PREDICATELOCKTARGETTAG *reqtag) else parentlock->childLocks++; - if (parentlock->childLocks >= - PredicateLockPromotionThreshold(&targettag)) + if (parentlock->childLocks > + MaxPredicateChildLocks(&targettag)) { /* * We should promote to this parent lock. Continue to check its @@ -2128,7 +2298,7 @@ CheckAndPromotePredicateLockRequest(PREDICATELOCKTARGETTAG *reqtag) * this information is no longer needed. */ static void -DecrementParentLocks(PREDICATELOCKTARGETTAG *targettag) +DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag) { PREDICATELOCKTARGETTAG parenttag, nexttag; @@ -2139,7 +2309,7 @@ DecrementParentLocks(PREDICATELOCKTARGETTAG *targettag) { uint32 targettaghash; LOCALPREDICATELOCK *parentlock, - *rmlock; + *rmlock PG_USED_FOR_ASSERTS_ONLY; parenttag = nexttag; targettaghash = PredicateLockTargetTagHashCode(&parenttag); @@ -2190,14 +2360,14 @@ DecrementParentLocks(PREDICATELOCKTARGETTAG *targettag) * PredicateLockAcquire for that. */ static void -CreatePredicateLock(PREDICATELOCKTARGETTAG *targettag, +CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, uint32 targettaghash, SERIALIZABLEXACT *sxact) { PREDICATELOCKTARGET *target; PREDICATELOCKTAG locktag; PREDICATELOCK *lock; - LWLockId partitionLock; + LWLock *partitionLock; bool found; partitionLock = PredicateLockHashPartitionLock(targettaghash); @@ -2251,7 +2421,7 @@ CreatePredicateLock(PREDICATELOCKTARGETTAG *targettag, * any finer-grained locks covered by the new one. */ static void -PredicateLockAcquire(PREDICATELOCKTARGETTAG *targettag) +PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag) { uint32 targettaghash; bool found; @@ -2388,11 +2558,9 @@ PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot) } } } - else - targetxmin = InvalidTransactionId; /* - * Do quick-but-not-definitive test for a relation lock first. This will + * Do quick-but-not-definitive test for a relation lock first. This will * never cause a return when the relation is *not* locked, but will * occasionally let the check continue when there really *is* a relation * level lock. @@ -2408,8 +2576,7 @@ PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot) relation->rd_node.dbNode, relation->rd_id, ItemPointerGetBlockNumber(tid), - ItemPointerGetOffsetNumber(tid), - targetxmin); + ItemPointerGetOffsetNumber(tid)); PredicateLockAcquire(&tag); } @@ -2498,10 +2665,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool removeOld) { uint32 oldtargettaghash; - LWLockId oldpartitionLock; + LWLock *oldpartitionLock; PREDICATELOCKTARGET *oldtarget; uint32 newtargettaghash; - LWLockId newpartitionLock; + LWLock *newpartitionLock; bool found; bool outOfShmem = false; @@ -2613,14 +2780,13 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, Assert(found); } - newpredlock = (PREDICATELOCK *) - hash_search_with_hash_value - (PredicateLockHash, - &newpredlocktag, - PredicateLockHashCodeFromTargetHashCode(&newpredlocktag, - newtargettaghash), - HASH_ENTER_NULL, &found); + hash_search_with_hash_value(PredicateLockHash, + &newpredlocktag, + PredicateLockHashCodeFromTargetHashCode(&newpredlocktag, + newtargettaghash), + HASH_ENTER_NULL, + &found); if (!newpredlock) { /* Out of shared memory. Undo what we've done so far. */ @@ -2679,7 +2845,7 @@ exit: /* We shouldn't run out of memory if we're moving locks */ Assert(!outOfShmem); - /* Put the scrach entry back */ + /* Put the scratch entry back */ RestoreScratchTarget(false); } @@ -2706,7 +2872,7 @@ exit: * transaction which is not serializable. * * NOTE: This is currently only called with transfer set to true, but that may - * change. If we decide to clean up the locks from a table on commit of a + * change. If we decide to clean up the locks from a table on commit of a * transaction which executed DROP TABLE, the false condition will be useful. */ static void @@ -2748,8 +2914,8 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) heapId = relation->rd_index->indrelid; } Assert(heapId != InvalidOid); - Assert(transfer || !isIndex); /* index OID only makes sense with - * transfer */ + Assert(transfer || !isIndex); /* index OID only makes sense with + * transfer */ /* Retrieve first time needed, then keep. */ heaptargettaghash = 0; @@ -2758,7 +2924,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Acquire locks on all lock partitions */ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); /* @@ -2787,7 +2953,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) continue; /* already the right lock */ /* - * If we made it here, we have work to do. We make sure the heap + * If we made it here, we have work to do. We make sure the heap * relation lock exists, then we walk the list of predicate locks for * the old target we found, moving all locks to the heap relation lock * -- unless they already hold that. @@ -2856,12 +3022,12 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) newpredlocktag.myTarget = heaptarget; newpredlocktag.myXact = oldXact; newpredlock = (PREDICATELOCK *) - hash_search_with_hash_value - (PredicateLockHash, - &newpredlocktag, + hash_search_with_hash_value(PredicateLockHash, + &newpredlocktag, PredicateLockHashCodeFromTargetHashCode(&newpredlocktag, heaptargettaghash), - HASH_ENTER, &found); + HASH_ENTER, + &found); if (!found) { SHMQueueInsertBefore(&(heaptarget->predicateLocks), @@ -2896,7 +3062,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } @@ -3095,7 +3261,7 @@ ReleasePredicateLocks(bool isCommit) /* * We can't trust XactReadOnly here, because a transaction which started * as READ WRITE can show as READ ONLY later, e.g., within - * substransactions. We want to flag a transaction as READ ONLY if it + * subtransactions. We want to flag a transaction as READ ONLY if it * commits without writing so that de facto READ ONLY transactions get the * benefit of some RO optimizations, so we will use this local variable to * get some cleanup logic right which is based on whether the transaction @@ -3109,30 +3275,29 @@ ReleasePredicateLocks(bool isCommit) return; } + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + Assert(!isCommit || SxactIsPrepared(MySerializableXact)); Assert(!isCommit || !SxactIsDoomed(MySerializableXact)); Assert(!SxactIsCommitted(MySerializableXact)); Assert(!SxactIsRolledBack(MySerializableXact)); /* may not be serializable during COMMIT/ROLLBACK PREPARED */ - if (MySerializableXact->pid != 0) - Assert(IsolationIsSerializable()); + Assert(MySerializableXact->pid == 0 || IsolationIsSerializable()); /* We'd better not already be on the cleanup list. */ Assert(!SxactIsOnFinishedList(MySerializableXact)); topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact); - LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - /* * We don't hold XidGenLock lock here, assuming that TransactionId is * atomic! * * If this value is changing, we don't care that much whether we get the * old or new value -- it is just used to determine how far - * GlobalSerizableXmin must advance before this transaction can be fully - * cleaned up. The worst that could happen is we wait for one more + * GlobalSerializableXmin must advance before this transaction can be + * fully cleaned up. The worst that could happen is we wait for one more * transaction to complete before freeing some RAM; correctness of visible * behavior is not affected. */ @@ -3166,6 +3331,14 @@ ReleasePredicateLocks(bool isCommit) */ MySerializableXact->flags |= SXACT_FLAG_DOOMED; MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK; + + /* + * If the transaction was previously prepared, but is now failing due + * to a ROLLBACK PREPARED or (hopefully very rare) error after the + * prepare, clear the prepared flag. This simplifies conflict + * checking. + */ + MySerializableXact->flags &= ~SXACT_FLAG_PREPARED; } if (!topLevelIsDeclaredReadOnly) @@ -3227,7 +3400,7 @@ ReleasePredicateLocks(bool isCommit) } /* - * Release all outConflicts to committed transactions. If we're rolling + * Release all outConflicts to committed transactions. If we're rolling * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to * previously committed transactions. */ @@ -3247,8 +3420,8 @@ ReleasePredicateLocks(bool isCommit) && SxactIsCommitted(conflict->sxactIn)) { if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0 - || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit) - MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo; + || conflict->sxactIn->prepareSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit) + MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->prepareSeqNo; MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT; } @@ -3434,10 +3607,29 @@ ClearOldPredicateLocks(void) else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough) { + /* + * Any active transactions that took their snapshot before this + * transaction committed are read-only, so we can clear part of + * its state. + */ LWLockRelease(SerializableXactHashLock); - ReleaseOneSerializableXact(finishedSxact, - !SxactIsReadOnly(finishedSxact), - false); + + if (SxactIsReadOnly(finishedSxact)) + { + /* A read-only transaction can be removed entirely */ + SHMQueueDelete(&(finishedSxact->finishedLink)); + ReleaseOneSerializableXact(finishedSxact, false, false); + } + else + { + /* + * A read-write transaction can only be partially cleared. We + * need to keep the SERIALIZABLEXACT but can release the + * SIREAD locks and conflicts in. + */ + ReleaseOneSerializableXact(finishedSxact, true, false); + } + PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo; LWLockAcquire(SerializableXactHashLock, LW_SHARED); } @@ -3484,7 +3676,7 @@ ClearOldPredicateLocks(void) PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; tag = predlock->tag; target = tag.myTarget; @@ -3527,7 +3719,7 @@ ClearOldPredicateLocks(void) * matter -- but keep the transaction entry itself and any outConflicts. * * When the summarize flag is set, we've run short of room for sxact data - * and must summarize to the SLRU. Predicate locks are transferred to a + * and must summarize to the SLRU. Predicate locks are transferred to a * dummy "old" transaction, with duplicate locks on a single target * collapsing to a single lock with the "latest" commitSeqNo from among * the conflicting locks.. @@ -3543,6 +3735,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, Assert(sxact != NULL); Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact)); + Assert(partial || !SxactIsOnFinishedList(sxact)); Assert(LWLockHeldByMe(SerializableFinishedListLock)); /* @@ -3562,7 +3755,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; nextpredlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), @@ -3719,7 +3912,7 @@ XidIsConcurrent(TransactionId xid) /* * CheckForSerializableConflictOut * We are reading a tuple which has been modified. If it is visible to - * us but has been deleted, that indicates a rw-conflict out. If it's + * us but has been deleted, that indicates a rw-conflict out. If it's * not visible and was created by a concurrent (overlapping) * serializable transaction, that is also a rw-conflict out, * @@ -3753,7 +3946,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on identification as a pivot, during conflict out checking."), + errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."), errhint("The transaction might succeed if retried."))); } @@ -3764,7 +3957,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else * is going on with it. */ - htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer); + htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer); switch (htsvResult) { case HEAPTUPLE_LIVE: @@ -3775,10 +3968,10 @@ CheckForSerializableConflictOut(bool visible, Relation relation, case HEAPTUPLE_RECENTLY_DEAD: if (!visible) return; - xid = HeapTupleHeaderGetXmax(tuple->t_data); + xid = HeapTupleHeaderGetUpdateXid(tuple->t_data); break; case HEAPTUPLE_DELETE_IN_PROGRESS: - xid = HeapTupleHeaderGetXmax(tuple->t_data); + xid = HeapTupleHeaderGetUpdateXid(tuple->t_data); break; case HEAPTUPLE_INSERT_IN_PROGRESS: xid = HeapTupleHeaderGetXmin(tuple->t_data); @@ -3806,7 +3999,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin)); /* - * Find top level xid. Bail out if xid is too early to be a conflict, or + * Find top level xid. Bail out if xid is too early to be a conflict, or * if it's our own xid. */ if (TransactionIdEquals(xid, GetTopTransactionIdIfAny())) @@ -3842,7 +4035,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on conflict out to old pivot %u.", xid), + errdetail_internal("Reason code: Canceled on conflict out to old pivot %u.", xid), errhint("The transaction might succeed if retried."))); if (SxactHasSummaryConflictIn(MySerializableXact) @@ -3850,7 +4043,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on identification as a pivot, with conflict out to old committed transaction %u.", xid), + errdetail_internal("Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid), errhint("The transaction might succeed if retried."))); MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; @@ -3871,9 +4064,9 @@ CheckForSerializableConflictOut(bool visible, Relation relation, /* * We have a conflict out to a transaction which has a conflict out to a - * summarized transaction. That summarized transaction must have + * summarized transaction. That summarized transaction must have * committed first, and we can't tell when it committed in relation to our - * snapshot acquisition, so something needs to be cancelled. + * snapshot acquisition, so something needs to be canceled. */ if (SxactHasSummaryConflictOut(sxact)) { @@ -3889,7 +4082,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on conflict out to old pivot."), + errdetail_internal("Reason code: Canceled on conflict out to old pivot."), errhint("The transaction might succeed if retried."))); } } @@ -3905,7 +4098,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation, && (!SxactHasConflictOut(sxact) || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit)) { - /* Read-only transaction will appear to run first. No conflict. */ + /* Read-only transaction will appear to run first. No conflict. */ LWLockRelease(SerializableXactHashLock); return; } @@ -3940,7 +4133,7 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { uint32 targettaghash; - LWLockId partitionLock; + LWLock *partitionLock; PREDICATELOCKTARGET *target; PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; @@ -4128,7 +4321,7 @@ CheckForSerializableConflictIn(Relation relation, HeapTuple tuple, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on identification as a pivot, during conflict in checking."), + errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."), errhint("The transaction might succeed if retried."))); /* @@ -4151,9 +4344,8 @@ CheckForSerializableConflictIn(Relation relation, HeapTuple tuple, SET_PREDICATELOCKTARGETTAG_TUPLE(targettag, relation->rd_node.dbNode, relation->rd_id, - ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)), - ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)), - HeapTupleHeaderGetXmin(tuple->t_data)); + ItemPointerGetBlockNumber(&(tuple->t_self)), + ItemPointerGetOffsetNumber(&(tuple->t_self))); CheckTargetForConflictsIn(&targettag); } @@ -4233,8 +4425,8 @@ CheckTableForSerializableConflictIn(Relation relation) LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE); for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++) - LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED); - LWLockAcquire(SerializableXactHashLock, LW_SHARED); + LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); /* Scan through target list */ hash_seq_init(&seqstat, PredicateLockTargetHash); @@ -4280,7 +4472,7 @@ CheckTableForSerializableConflictIn(Relation relation) /* Release locks in reverse order */ LWLockRelease(SerializableXactHashLock); for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--) - LWLockRelease(FirstPredicateLockMgrLock + i); + LWLockRelease(PredicateLockHashPartitionLockByIndex(i)); LWLockRelease(SerializablePredicateLockListLock); } @@ -4327,7 +4519,7 @@ FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) *---------------------------------------------------------------------------- */ static void -OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, +OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) { bool failure; @@ -4364,6 +4556,11 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, * - the writer committed before T2 * - the reader is a READ ONLY transaction and the reader was concurrent * with T2 (= reader acquired its snapshot before T2 committed) + * + * We also handle the case that T2 is prepared but not yet committed + * here. In that case T2 has already checked for conflicts, so if it + * commits first, making the above conflict real, it's too late for it + * to abort. *------------------------------------------------------------------------ */ if (!failure) @@ -4382,13 +4579,13 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, { SERIALIZABLEXACT *t2 = conflict->sxactIn; - if (SxactIsCommitted(t2) + if (SxactIsPrepared(t2) && (!SxactIsCommitted(reader) - || t2->commitSeqNo <= reader->commitSeqNo) + || t2->prepareSeqNo <= reader->commitSeqNo) && (!SxactIsCommitted(writer) - || t2->commitSeqNo <= writer->commitSeqNo) + || t2->prepareSeqNo <= writer->commitSeqNo) && (!SxactIsReadOnly(reader) - || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)) + || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)) { failure = true; break; @@ -4401,7 +4598,8 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, } /*------------------------------------------------------------------------ - * Check whether the reader has become a pivot with a committed writer: + * Check whether the reader has become a pivot with a writer + * that's committed (or prepared): * * T0 ------> R ------> W * rw rw @@ -4412,7 +4610,7 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, * - T0 is READ ONLY, and overlaps the writer *------------------------------------------------------------------------ */ - if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader)) + if (!failure && SxactIsPrepared(writer) && !SxactIsReadOnly(reader)) { if (SxactHasSummaryConflictIn(reader)) { @@ -4430,9 +4628,9 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, if (!SxactIsDoomed(t0) && (!SxactIsCommitted(t0) - || t0->commitSeqNo >= writer->commitSeqNo) + || t0->commitSeqNo >= writer->prepareSeqNo) && (!SxactIsReadOnly(t0) - || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo)) + || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) { failure = true; break; @@ -4460,7 +4658,7 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on identification as a pivot, during write."), + errdetail_internal("Reason code: Canceled on identification as a pivot, during write."), errhint("The transaction might succeed if retried."))); } else if (SxactIsPrepared(writer)) @@ -4472,7 +4670,7 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on conflict out to pivot %u, during read.", writer->topXid), + errdetail_internal("Reason code: Canceled on conflict out to pivot %u, during read.", writer->topXid), errhint("The transaction might succeed if retried."))); } writer->flags |= SXACT_FLAG_DOOMED; @@ -4491,9 +4689,9 @@ OnConflict_CheckForSerializationFailure(SERIALIZABLEXACT *reader, * * If a dangerous structure is found, the pivot (the near conflict) is * marked for death, because rolling back another transaction might mean - * that we flail without ever making progress. This transaction is + * that we flail without ever making progress. This transaction is * committing writes, so letting it commit ensures progress. If we - * cancelled the far conflict, it might immediately fail again on retry. + * canceled the far conflict, it might immediately fail again on retry. */ void PreCommit_CheckForSerializationFailure(void) @@ -4514,7 +4712,7 @@ PreCommit_CheckForSerializationFailure(void) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on identification as a pivot, during commit attempt."), + errdetail_internal("Reason code: Canceled on identification as a pivot, during commit attempt."), errhint("The transaction might succeed if retried."))); } @@ -4552,7 +4750,7 @@ PreCommit_CheckForSerializationFailure(void) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), - errdetail("Cancelled on commit attempt with conflict in from prepared pivot."), + errdetail_internal("Reason code: Canceled on commit attempt with conflict in from prepared pivot."), errhint("The transaction might succeed if retried."))); } nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED; @@ -4571,6 +4769,7 @@ PreCommit_CheckForSerializationFailure(void) offsetof(RWConflictData, inLink)); } + MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); MySerializableXact->flags |= SXACT_FLAG_PREPARED; LWLockRelease(SerializableXactHashLock); @@ -4603,20 +4802,17 @@ AtPrepare_PredicateLocks(void) if (MySerializableXact == InvalidSerializableXact) return; - /* Generate a xact record for our SERIALIZABLEXACT */ + /* Generate an xact record for our SERIALIZABLEXACT */ record.type = TWOPHASEPREDICATERECORD_XACT; xactRecord->xmin = MySerializableXact->xmin; xactRecord->flags = MySerializableXact->flags; /* - * Tweak the flags. Since we're not going to output the inConflicts and - * outConflicts lists, if they're non-empty we'll represent that by - * setting the appropriate summary conflict flags. + * Note that we don't include the list of conflicts in our out in the + * statefile, because new conflicts can be added even after the + * transaction prepares. We'll just make a conservative assumption during + * recovery instead. */ - if (!SHMQueueEmpty(&MySerializableXact->inConflicts)) - xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; - if (!SHMQueueEmpty(&MySerializableXact->outConflicts)) - xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0, &record, sizeof(record)); @@ -4745,20 +4941,12 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, sxact->pid = 0; /* a prepared xact hasn't committed yet */ + sxact->prepareSeqNo = RecoverySerCommitSeqNo; sxact->commitSeqNo = InvalidSerCommitSeqNo; sxact->finishedBefore = InvalidTransactionId; sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo; - - /* - * We don't need the details of a prepared transaction's conflicts, - * just whether it had conflicts in or out (which we get from the - * flags) - */ - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); - /* * Don't need to track this; no transactions running at the time the * recovered xact started are still active, except possibly other @@ -4780,6 +4968,16 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, (MaxBackends + max_prepared_xacts)); } + /* + * We don't know whether the transaction had any conflicts or not, so + * we'll conservatively assume that it had both a conflict in and a + * conflict out, and represent that with the summary conflict flags. + */ + SHMQueueInit(&(sxact->outConflicts)); + SHMQueueInit(&(sxact->inConflicts)); + sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; + sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; + /* Register the transaction's xid */ sxidtag.xid = xid; sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,