]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lmgr.c
e688ba811703c8b8a3811f1a61f99fa018b60501
[postgresql] / src / backend / storage / lmgr / lmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * lmgr.c
4  *        POSTGRES lock manager code
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/lmgr/lmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/procarray.h"
25 #include "utils/inval.h"
26
27
28 /*
29  * Per-backend counter for generating speculative insertion tokens.
30  *
31  * This may wrap around, but that's OK as it's only used for the short
32  * duration between inserting a tuple and checking that there are no (unique)
33  * constraint violations.  It's theoretically possible that a backend sees a
34  * tuple that was speculatively inserted by another backend, but before it has
35  * started waiting on the token, the other backend completes its insertion,
36  * and then performs 2^32 unrelated insertions.  And after all that, the
37  * first backend finally calls SpeculativeInsertionLockAcquire(), with the
38  * intention of waiting for the first insertion to complete, but ends up
39  * waiting for the latest unrelated insertion instead.  Even then, nothing
40  * particularly bad happens: in the worst case they deadlock, causing one of
41  * the transactions to abort.
42  */
43 static uint32 speculativeInsertionToken = 0;
44
45
46 /*
47  * Struct to hold context info for transaction lock waits.
48  *
49  * 'oper' is the operation that needs to wait for the other transaction; 'rel'
50  * and 'ctid' specify the address of the tuple being waited for.
51  */
52 typedef struct XactLockTableWaitInfo
53 {
54         XLTW_Oper       oper;
55         Relation        rel;
56         ItemPointer ctid;
57 } XactLockTableWaitInfo;
58
59 static void XactLockTableWaitErrorCb(void *arg);
60
61 /*
62  * RelationInitLockInfo
63  *              Initializes the lock information in a relation descriptor.
64  *
65  *              relcache.c must call this during creation of any reldesc.
66  */
67 void
68 RelationInitLockInfo(Relation relation)
69 {
70         Assert(RelationIsValid(relation));
71         Assert(OidIsValid(RelationGetRelid(relation)));
72
73         relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
74
75         if (relation->rd_rel->relisshared)
76                 relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
77         else
78                 relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
79 }
80
81 /*
82  * SetLocktagRelationOid
83  *              Set up a locktag for a relation, given only relation OID
84  */
85 static inline void
86 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
87 {
88         Oid                     dbid;
89
90         if (IsSharedRelation(relid))
91                 dbid = InvalidOid;
92         else
93                 dbid = MyDatabaseId;
94
95         SET_LOCKTAG_RELATION(*tag, dbid, relid);
96 }
97
98 /*
99  *              LockRelationOid
100  *
101  * Lock a relation given only its OID.  This should generally be used
102  * before attempting to open the relation's relcache entry.
103  */
104 void
105 LockRelationOid(Oid relid, LOCKMODE lockmode)
106 {
107         LOCKTAG         tag;
108         LOCALLOCK  *locallock;
109         LockAcquireResult res;
110
111         SetLocktagRelationOid(&tag, relid);
112
113         res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
114
115         /*
116          * Now that we have the lock, check for invalidation messages, so that we
117          * will update or flush any stale relcache entry before we try to use it.
118          * RangeVarGetRelid() specifically relies on us for this.  We can skip
119          * this in the not-uncommon case that we already had the same type of lock
120          * being requested, since then no one else could have modified the
121          * relcache entry in an undesirable way.  (In the case where our own xact
122          * modifies the rel, the relcache update happens via
123          * CommandCounterIncrement, not here.)
124          *
125          * However, in corner cases where code acts on tables (usually catalogs)
126          * recursively, we might get here while still processing invalidation
127          * messages in some outer execution of this function or a sibling.  The
128          * "cleared" status of the lock tells us whether we really are done
129          * absorbing relevant inval messages.
130          */
131         if (res != LOCKACQUIRE_ALREADY_CLEAR)
132         {
133                 AcceptInvalidationMessages();
134                 MarkLockClear(locallock);
135         }
136 }
137
138 /*
139  *              ConditionalLockRelationOid
140  *
141  * As above, but only lock if we can get the lock without blocking.
142  * Returns true iff the lock was acquired.
143  *
144  * NOTE: we do not currently need conditional versions of all the
145  * LockXXX routines in this file, but they could easily be added if needed.
146  */
147 bool
148 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
149 {
150         LOCKTAG         tag;
151         LOCALLOCK  *locallock;
152         LockAcquireResult res;
153
154         SetLocktagRelationOid(&tag, relid);
155
156         res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
157
158         if (res == LOCKACQUIRE_NOT_AVAIL)
159                 return false;
160
161         /*
162          * Now that we have the lock, check for invalidation messages; see notes
163          * in LockRelationOid.
164          */
165         if (res != LOCKACQUIRE_ALREADY_CLEAR)
166         {
167                 AcceptInvalidationMessages();
168                 MarkLockClear(locallock);
169         }
170
171         return true;
172 }
173
174 /*
175  *              UnlockRelationId
176  *
177  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
178  * for speed reasons.
179  */
180 void
181 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
182 {
183         LOCKTAG         tag;
184
185         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
186
187         LockRelease(&tag, lockmode, false);
188 }
189
190 /*
191  *              UnlockRelationOid
192  *
193  * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
194  */
195 void
196 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
197 {
198         LOCKTAG         tag;
199
200         SetLocktagRelationOid(&tag, relid);
201
202         LockRelease(&tag, lockmode, false);
203 }
204
205 /*
206  *              LockRelation
207  *
208  * This is a convenience routine for acquiring an additional lock on an
209  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
210  * and then lock with this.
211  */
212 void
213 LockRelation(Relation relation, LOCKMODE lockmode)
214 {
215         LOCKTAG         tag;
216         LOCALLOCK  *locallock;
217         LockAcquireResult res;
218
219         SET_LOCKTAG_RELATION(tag,
220                                                  relation->rd_lockInfo.lockRelId.dbId,
221                                                  relation->rd_lockInfo.lockRelId.relId);
222
223         res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
224
225         /*
226          * Now that we have the lock, check for invalidation messages; see notes
227          * in LockRelationOid.
228          */
229         if (res != LOCKACQUIRE_ALREADY_CLEAR)
230         {
231                 AcceptInvalidationMessages();
232                 MarkLockClear(locallock);
233         }
234 }
235
236 /*
237  *              ConditionalLockRelation
238  *
239  * This is a convenience routine for acquiring an additional lock on an
240  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
241  * and then lock with this.
242  */
243 bool
244 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
245 {
246         LOCKTAG         tag;
247         LOCALLOCK  *locallock;
248         LockAcquireResult res;
249
250         SET_LOCKTAG_RELATION(tag,
251                                                  relation->rd_lockInfo.lockRelId.dbId,
252                                                  relation->rd_lockInfo.lockRelId.relId);
253
254         res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
255
256         if (res == LOCKACQUIRE_NOT_AVAIL)
257                 return false;
258
259         /*
260          * Now that we have the lock, check for invalidation messages; see notes
261          * in LockRelationOid.
262          */
263         if (res != LOCKACQUIRE_ALREADY_CLEAR)
264         {
265                 AcceptInvalidationMessages();
266                 MarkLockClear(locallock);
267         }
268
269         return true;
270 }
271
272 /*
273  *              UnlockRelation
274  *
275  * This is a convenience routine for unlocking a relation without also
276  * closing it.
277  */
278 void
279 UnlockRelation(Relation relation, LOCKMODE lockmode)
280 {
281         LOCKTAG         tag;
282
283         SET_LOCKTAG_RELATION(tag,
284                                                  relation->rd_lockInfo.lockRelId.dbId,
285                                                  relation->rd_lockInfo.lockRelId.relId);
286
287         LockRelease(&tag, lockmode, false);
288 }
289
290 /*
291  *              CheckRelationLockedByMe
292  *
293  * Returns true if current transaction holds a lock on 'relation' of mode
294  * 'lockmode'.  If 'orstronger' is true, a stronger lockmode is also OK.
295  * ("Stronger" is defined as "numerically higher", which is a bit
296  * semantically dubious but is OK for the purposes we use this for.)
297  */
298 bool
299 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
300 {
301         LOCKTAG         tag;
302
303         SET_LOCKTAG_RELATION(tag,
304                                                  relation->rd_lockInfo.lockRelId.dbId,
305                                                  relation->rd_lockInfo.lockRelId.relId);
306
307         if (LockHeldByMe(&tag, lockmode))
308                 return true;
309
310         if (orstronger)
311         {
312                 LOCKMODE        slockmode;
313
314                 for (slockmode = lockmode + 1;
315                          slockmode <= MaxLockMode;
316                          slockmode++)
317                 {
318                         if (LockHeldByMe(&tag, slockmode))
319                         {
320 #ifdef NOT_USED
321                                 /* Sometimes this might be useful for debugging purposes */
322                                 elog(WARNING, "lock mode %s substituted for %s on relation %s",
323                                          GetLockmodeName(tag.locktag_lockmethodid, slockmode),
324                                          GetLockmodeName(tag.locktag_lockmethodid, lockmode),
325                                          RelationGetRelationName(relation));
326 #endif
327                                 return true;
328                         }
329                 }
330         }
331
332         return false;
333 }
334
335 /*
336  *              LockHasWaitersRelation
337  *
338  * This is a function to check whether someone else is waiting for a
339  * lock which we are currently holding.
340  */
341 bool
342 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
343 {
344         LOCKTAG         tag;
345
346         SET_LOCKTAG_RELATION(tag,
347                                                  relation->rd_lockInfo.lockRelId.dbId,
348                                                  relation->rd_lockInfo.lockRelId.relId);
349
350         return LockHasWaiters(&tag, lockmode, false);
351 }
352
353 /*
354  *              LockRelationIdForSession
355  *
356  * This routine grabs a session-level lock on the target relation.  The
357  * session lock persists across transaction boundaries.  It will be removed
358  * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
359  * or if the backend exits.
360  *
361  * Note that one should also grab a transaction-level lock on the rel
362  * in any transaction that actually uses the rel, to ensure that the
363  * relcache entry is up to date.
364  */
365 void
366 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
367 {
368         LOCKTAG         tag;
369
370         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
371
372         (void) LockAcquire(&tag, lockmode, true, false);
373 }
374
375 /*
376  *              UnlockRelationIdForSession
377  */
378 void
379 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
380 {
381         LOCKTAG         tag;
382
383         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
384
385         LockRelease(&tag, lockmode, true);
386 }
387
388 /*
389  *              LockRelationForExtension
390  *
391  * This lock tag is used to interlock addition of pages to relations.
392  * We need such locking because bufmgr/smgr definition of P_NEW is not
393  * race-condition-proof.
394  *
395  * We assume the caller is already holding some type of regular lock on
396  * the relation, so no AcceptInvalidationMessages call is needed here.
397  */
398 void
399 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
400 {
401         LOCKTAG         tag;
402
403         SET_LOCKTAG_RELATION_EXTEND(tag,
404                                                                 relation->rd_lockInfo.lockRelId.dbId,
405                                                                 relation->rd_lockInfo.lockRelId.relId);
406
407         (void) LockAcquire(&tag, lockmode, false, false);
408 }
409
410 /*
411  *              ConditionalLockRelationForExtension
412  *
413  * As above, but only lock if we can get the lock without blocking.
414  * Returns true iff the lock was acquired.
415  */
416 bool
417 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
418 {
419         LOCKTAG         tag;
420
421         SET_LOCKTAG_RELATION_EXTEND(tag,
422                                                                 relation->rd_lockInfo.lockRelId.dbId,
423                                                                 relation->rd_lockInfo.lockRelId.relId);
424
425         return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
426 }
427
428 /*
429  *              RelationExtensionLockWaiterCount
430  *
431  * Count the number of processes waiting for the given relation extension lock.
432  */
433 int
434 RelationExtensionLockWaiterCount(Relation relation)
435 {
436         LOCKTAG         tag;
437
438         SET_LOCKTAG_RELATION_EXTEND(tag,
439                                                                 relation->rd_lockInfo.lockRelId.dbId,
440                                                                 relation->rd_lockInfo.lockRelId.relId);
441
442         return LockWaiterCount(&tag);
443 }
444
445 /*
446  *              UnlockRelationForExtension
447  */
448 void
449 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
450 {
451         LOCKTAG         tag;
452
453         SET_LOCKTAG_RELATION_EXTEND(tag,
454                                                                 relation->rd_lockInfo.lockRelId.dbId,
455                                                                 relation->rd_lockInfo.lockRelId.relId);
456
457         LockRelease(&tag, lockmode, false);
458 }
459
460 /*
461  *              LockPage
462  *
463  * Obtain a page-level lock.  This is currently used by some index access
464  * methods to lock individual index pages.
465  */
466 void
467 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
468 {
469         LOCKTAG         tag;
470
471         SET_LOCKTAG_PAGE(tag,
472                                          relation->rd_lockInfo.lockRelId.dbId,
473                                          relation->rd_lockInfo.lockRelId.relId,
474                                          blkno);
475
476         (void) LockAcquire(&tag, lockmode, false, false);
477 }
478
479 /*
480  *              ConditionalLockPage
481  *
482  * As above, but only lock if we can get the lock without blocking.
483  * Returns true iff the lock was acquired.
484  */
485 bool
486 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
487 {
488         LOCKTAG         tag;
489
490         SET_LOCKTAG_PAGE(tag,
491                                          relation->rd_lockInfo.lockRelId.dbId,
492                                          relation->rd_lockInfo.lockRelId.relId,
493                                          blkno);
494
495         return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
496 }
497
498 /*
499  *              UnlockPage
500  */
501 void
502 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
503 {
504         LOCKTAG         tag;
505
506         SET_LOCKTAG_PAGE(tag,
507                                          relation->rd_lockInfo.lockRelId.dbId,
508                                          relation->rd_lockInfo.lockRelId.relId,
509                                          blkno);
510
511         LockRelease(&tag, lockmode, false);
512 }
513
514 /*
515  *              LockTuple
516  *
517  * Obtain a tuple-level lock.  This is used in a less-than-intuitive fashion
518  * because we can't afford to keep a separate lock in shared memory for every
519  * tuple.  See heap_lock_tuple before using this!
520  */
521 void
522 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
523 {
524         LOCKTAG         tag;
525
526         SET_LOCKTAG_TUPLE(tag,
527                                           relation->rd_lockInfo.lockRelId.dbId,
528                                           relation->rd_lockInfo.lockRelId.relId,
529                                           ItemPointerGetBlockNumber(tid),
530                                           ItemPointerGetOffsetNumber(tid));
531
532         (void) LockAcquire(&tag, lockmode, false, false);
533 }
534
535 /*
536  *              ConditionalLockTuple
537  *
538  * As above, but only lock if we can get the lock without blocking.
539  * Returns true iff the lock was acquired.
540  */
541 bool
542 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
543 {
544         LOCKTAG         tag;
545
546         SET_LOCKTAG_TUPLE(tag,
547                                           relation->rd_lockInfo.lockRelId.dbId,
548                                           relation->rd_lockInfo.lockRelId.relId,
549                                           ItemPointerGetBlockNumber(tid),
550                                           ItemPointerGetOffsetNumber(tid));
551
552         return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
553 }
554
555 /*
556  *              UnlockTuple
557  */
558 void
559 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
560 {
561         LOCKTAG         tag;
562
563         SET_LOCKTAG_TUPLE(tag,
564                                           relation->rd_lockInfo.lockRelId.dbId,
565                                           relation->rd_lockInfo.lockRelId.relId,
566                                           ItemPointerGetBlockNumber(tid),
567                                           ItemPointerGetOffsetNumber(tid));
568
569         LockRelease(&tag, lockmode, false);
570 }
571
572 /*
573  *              XactLockTableInsert
574  *
575  * Insert a lock showing that the given transaction ID is running ---
576  * this is done when an XID is acquired by a transaction or subtransaction.
577  * The lock can then be used to wait for the transaction to finish.
578  */
579 void
580 XactLockTableInsert(TransactionId xid)
581 {
582         LOCKTAG         tag;
583
584         SET_LOCKTAG_TRANSACTION(tag, xid);
585
586         (void) LockAcquire(&tag, ExclusiveLock, false, false);
587 }
588
589 /*
590  *              XactLockTableDelete
591  *
592  * Delete the lock showing that the given transaction ID is running.
593  * (This is never used for main transaction IDs; those locks are only
594  * released implicitly at transaction end.  But we do use it for subtrans IDs.)
595  */
596 void
597 XactLockTableDelete(TransactionId xid)
598 {
599         LOCKTAG         tag;
600
601         SET_LOCKTAG_TRANSACTION(tag, xid);
602
603         LockRelease(&tag, ExclusiveLock, false);
604 }
605
606 /*
607  *              XactLockTableWait
608  *
609  * Wait for the specified transaction to commit or abort.  If an operation
610  * is specified, an error context callback is set up.  If 'oper' is passed as
611  * None, no error context callback is set up.
612  *
613  * Note that this does the right thing for subtransactions: if we wait on a
614  * subtransaction, we will exit as soon as it aborts or its top parent commits.
615  * It takes some extra work to ensure this, because to save on shared memory
616  * the XID lock of a subtransaction is released when it ends, whether
617  * successfully or unsuccessfully.  So we have to check if it's "still running"
618  * and if so wait for its parent.
619  */
620 void
621 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
622                                   XLTW_Oper oper)
623 {
624         LOCKTAG         tag;
625         XactLockTableWaitInfo info;
626         ErrorContextCallback callback;
627         bool            first = true;
628
629         /*
630          * If an operation is specified, set up our verbose error context
631          * callback.
632          */
633         if (oper != XLTW_None)
634         {
635                 Assert(RelationIsValid(rel));
636                 Assert(ItemPointerIsValid(ctid));
637
638                 info.rel = rel;
639                 info.ctid = ctid;
640                 info.oper = oper;
641
642                 callback.callback = XactLockTableWaitErrorCb;
643                 callback.arg = &info;
644                 callback.previous = error_context_stack;
645                 error_context_stack = &callback;
646         }
647
648         for (;;)
649         {
650                 Assert(TransactionIdIsValid(xid));
651                 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
652
653                 SET_LOCKTAG_TRANSACTION(tag, xid);
654
655                 (void) LockAcquire(&tag, ShareLock, false, false);
656
657                 LockRelease(&tag, ShareLock, false);
658
659                 if (!TransactionIdIsInProgress(xid))
660                         break;
661
662                 /*
663                  * If the Xid belonged to a subtransaction, then the lock would have
664                  * gone away as soon as it was finished; for correct tuple visibility,
665                  * the right action is to wait on its parent transaction to go away.
666                  * But instead of going levels up one by one, we can just wait for the
667                  * topmost transaction to finish with the same end result, which also
668                  * incurs less locktable traffic.
669                  *
670                  * Some uses of this function don't involve tuple visibility -- such
671                  * as when building snapshots for logical decoding.  It is possible to
672                  * see a transaction in ProcArray before it registers itself in the
673                  * locktable.  The topmost transaction in that case is the same xid,
674                  * so we try again after a short sleep.  (Don't sleep the first time
675                  * through, to avoid slowing down the normal case.)
676                  */
677                 if (!first)
678                         pg_usleep(1000L);
679                 first = false;
680                 xid = SubTransGetTopmostTransaction(xid);
681         }
682
683         if (oper != XLTW_None)
684                 error_context_stack = callback.previous;
685 }
686
687 /*
688  *              ConditionalXactLockTableWait
689  *
690  * As above, but only lock if we can get the lock without blocking.
691  * Returns true if the lock was acquired.
692  */
693 bool
694 ConditionalXactLockTableWait(TransactionId xid)
695 {
696         LOCKTAG         tag;
697         bool            first = true;
698
699         for (;;)
700         {
701                 Assert(TransactionIdIsValid(xid));
702                 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
703
704                 SET_LOCKTAG_TRANSACTION(tag, xid);
705
706                 if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
707                         return false;
708
709                 LockRelease(&tag, ShareLock, false);
710
711                 if (!TransactionIdIsInProgress(xid))
712                         break;
713
714                 /* See XactLockTableWait about this case */
715                 if (!first)
716                         pg_usleep(1000L);
717                 first = false;
718                 xid = SubTransGetTopmostTransaction(xid);
719         }
720
721         return true;
722 }
723
724 /*
725  *              SpeculativeInsertionLockAcquire
726  *
727  * Insert a lock showing that the given transaction ID is inserting a tuple,
728  * but hasn't yet decided whether it's going to keep it.  The lock can then be
729  * used to wait for the decision to go ahead with the insertion, or aborting
730  * it.
731  *
732  * The token is used to distinguish multiple insertions by the same
733  * transaction.  It is returned to caller.
734  */
735 uint32
736 SpeculativeInsertionLockAcquire(TransactionId xid)
737 {
738         LOCKTAG         tag;
739
740         speculativeInsertionToken++;
741
742         /*
743          * Check for wrap-around. Zero means no token is held, so don't use that.
744          */
745         if (speculativeInsertionToken == 0)
746                 speculativeInsertionToken = 1;
747
748         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
749
750         (void) LockAcquire(&tag, ExclusiveLock, false, false);
751
752         return speculativeInsertionToken;
753 }
754
755 /*
756  *              SpeculativeInsertionLockRelease
757  *
758  * Delete the lock showing that the given transaction is speculatively
759  * inserting a tuple.
760  */
761 void
762 SpeculativeInsertionLockRelease(TransactionId xid)
763 {
764         LOCKTAG         tag;
765
766         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
767
768         LockRelease(&tag, ExclusiveLock, false);
769 }
770
771 /*
772  *              SpeculativeInsertionWait
773  *
774  * Wait for the specified transaction to finish or abort the insertion of a
775  * tuple.
776  */
777 void
778 SpeculativeInsertionWait(TransactionId xid, uint32 token)
779 {
780         LOCKTAG         tag;
781
782         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
783
784         Assert(TransactionIdIsValid(xid));
785         Assert(token != 0);
786
787         (void) LockAcquire(&tag, ShareLock, false, false);
788         LockRelease(&tag, ShareLock, false);
789 }
790
791 /*
792  * XactLockTableWaitErrorContextCb
793  *              Error context callback for transaction lock waits.
794  */
795 static void
796 XactLockTableWaitErrorCb(void *arg)
797 {
798         XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
799
800         /*
801          * We would like to print schema name too, but that would require a
802          * syscache lookup.
803          */
804         if (info->oper != XLTW_None &&
805                 ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
806         {
807                 const char *cxt;
808
809                 switch (info->oper)
810                 {
811                         case XLTW_Update:
812                                 cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
813                                 break;
814                         case XLTW_Delete:
815                                 cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
816                                 break;
817                         case XLTW_Lock:
818                                 cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
819                                 break;
820                         case XLTW_LockUpdated:
821                                 cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
822                                 break;
823                         case XLTW_InsertIndex:
824                                 cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
825                                 break;
826                         case XLTW_InsertIndexUnique:
827                                 cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
828                                 break;
829                         case XLTW_FetchUpdated:
830                                 cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
831                                 break;
832                         case XLTW_RecheckExclusionConstr:
833                                 cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
834                                 break;
835
836                         default:
837                                 return;
838                 }
839
840                 errcontext(cxt,
841                                    ItemPointerGetBlockNumber(info->ctid),
842                                    ItemPointerGetOffsetNumber(info->ctid),
843                                    RelationGetRelationName(info->rel));
844         }
845 }
846
847 /*
848  * WaitForLockersMultiple
849  *              Wait until no transaction holds locks that conflict with the given
850  *              locktags at the given lockmode.
851  *
852  * To do this, obtain the current list of lockers, and wait on their VXIDs
853  * until they are finished.
854  *
855  * Note we don't try to acquire the locks on the given locktags, only the VXIDs
856  * of its lock holders; if somebody grabs a conflicting lock on the objects
857  * after we obtained our initial list of lockers, we will not wait for them.
858  */
859 void
860 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode)
861 {
862         List       *holders = NIL;
863         ListCell   *lc;
864
865         /* Done if no locks to wait for */
866         if (list_length(locktags) == 0)
867                 return;
868
869         /* Collect the transactions we need to wait on */
870         foreach(lc, locktags)
871         {
872                 LOCKTAG    *locktag = lfirst(lc);
873
874                 holders = lappend(holders, GetLockConflicts(locktag, lockmode));
875         }
876
877         /*
878          * Note: GetLockConflicts() never reports our own xid, hence we need not
879          * check for that.  Also, prepared xacts are not reported, which is fine
880          * since they certainly aren't going to do anything anymore.
881          */
882
883         /* Finally wait for each such transaction to complete */
884         foreach(lc, holders)
885         {
886                 VirtualTransactionId *lockholders = lfirst(lc);
887
888                 while (VirtualTransactionIdIsValid(*lockholders))
889                 {
890                         VirtualXactLock(*lockholders, true);
891                         lockholders++;
892                 }
893         }
894
895         list_free_deep(holders);
896 }
897
898 /*
899  * WaitForLockers
900  *
901  * Same as WaitForLockersMultiple, for a single lock tag.
902  */
903 void
904 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode)
905 {
906         List       *l;
907
908         l = list_make1(&heaplocktag);
909         WaitForLockersMultiple(l, lockmode);
910         list_free(l);
911 }
912
913
914 /*
915  *              LockDatabaseObject
916  *
917  * Obtain a lock on a general object of the current database.  Don't use
918  * this for shared objects (such as tablespaces).  It's unwise to apply it
919  * to relations, also, since a lock taken this way will NOT conflict with
920  * locks taken via LockRelation and friends.
921  */
922 void
923 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
924                                    LOCKMODE lockmode)
925 {
926         LOCKTAG         tag;
927
928         SET_LOCKTAG_OBJECT(tag,
929                                            MyDatabaseId,
930                                            classid,
931                                            objid,
932                                            objsubid);
933
934         (void) LockAcquire(&tag, lockmode, false, false);
935
936         /* Make sure syscaches are up-to-date with any changes we waited for */
937         AcceptInvalidationMessages();
938 }
939
940 /*
941  *              UnlockDatabaseObject
942  */
943 void
944 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
945                                          LOCKMODE lockmode)
946 {
947         LOCKTAG         tag;
948
949         SET_LOCKTAG_OBJECT(tag,
950                                            MyDatabaseId,
951                                            classid,
952                                            objid,
953                                            objsubid);
954
955         LockRelease(&tag, lockmode, false);
956 }
957
958 /*
959  *              LockSharedObject
960  *
961  * Obtain a lock on a shared-across-databases object.
962  */
963 void
964 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
965                                  LOCKMODE lockmode)
966 {
967         LOCKTAG         tag;
968
969         SET_LOCKTAG_OBJECT(tag,
970                                            InvalidOid,
971                                            classid,
972                                            objid,
973                                            objsubid);
974
975         (void) LockAcquire(&tag, lockmode, false, false);
976
977         /* Make sure syscaches are up-to-date with any changes we waited for */
978         AcceptInvalidationMessages();
979 }
980
981 /*
982  *              UnlockSharedObject
983  */
984 void
985 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
986                                    LOCKMODE lockmode)
987 {
988         LOCKTAG         tag;
989
990         SET_LOCKTAG_OBJECT(tag,
991                                            InvalidOid,
992                                            classid,
993                                            objid,
994                                            objsubid);
995
996         LockRelease(&tag, lockmode, false);
997 }
998
999 /*
1000  *              LockSharedObjectForSession
1001  *
1002  * Obtain a session-level lock on a shared-across-databases object.
1003  * See LockRelationIdForSession for notes about session-level locks.
1004  */
1005 void
1006 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1007                                                    LOCKMODE lockmode)
1008 {
1009         LOCKTAG         tag;
1010
1011         SET_LOCKTAG_OBJECT(tag,
1012                                            InvalidOid,
1013                                            classid,
1014                                            objid,
1015                                            objsubid);
1016
1017         (void) LockAcquire(&tag, lockmode, true, false);
1018 }
1019
1020 /*
1021  *              UnlockSharedObjectForSession
1022  */
1023 void
1024 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1025                                                          LOCKMODE lockmode)
1026 {
1027         LOCKTAG         tag;
1028
1029         SET_LOCKTAG_OBJECT(tag,
1030                                            InvalidOid,
1031                                            classid,
1032                                            objid,
1033                                            objsubid);
1034
1035         LockRelease(&tag, lockmode, true);
1036 }
1037
1038
1039 /*
1040  * Append a description of a lockable object to buf.
1041  *
1042  * Ideally we would print names for the numeric values, but that requires
1043  * getting locks on system tables, which might cause problems since this is
1044  * typically used to report deadlock situations.
1045  */
1046 void
1047 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1048 {
1049         switch ((LockTagType) tag->locktag_type)
1050         {
1051                 case LOCKTAG_RELATION:
1052                         appendStringInfo(buf,
1053                                                          _("relation %u of database %u"),
1054                                                          tag->locktag_field2,
1055                                                          tag->locktag_field1);
1056                         break;
1057                 case LOCKTAG_RELATION_EXTEND:
1058                         appendStringInfo(buf,
1059                                                          _("extension of relation %u of database %u"),
1060                                                          tag->locktag_field2,
1061                                                          tag->locktag_field1);
1062                         break;
1063                 case LOCKTAG_PAGE:
1064                         appendStringInfo(buf,
1065                                                          _("page %u of relation %u of database %u"),
1066                                                          tag->locktag_field3,
1067                                                          tag->locktag_field2,
1068                                                          tag->locktag_field1);
1069                         break;
1070                 case LOCKTAG_TUPLE:
1071                         appendStringInfo(buf,
1072                                                          _("tuple (%u,%u) of relation %u of database %u"),
1073                                                          tag->locktag_field3,
1074                                                          tag->locktag_field4,
1075                                                          tag->locktag_field2,
1076                                                          tag->locktag_field1);
1077                         break;
1078                 case LOCKTAG_TRANSACTION:
1079                         appendStringInfo(buf,
1080                                                          _("transaction %u"),
1081                                                          tag->locktag_field1);
1082                         break;
1083                 case LOCKTAG_VIRTUALTRANSACTION:
1084                         appendStringInfo(buf,
1085                                                          _("virtual transaction %d/%u"),
1086                                                          tag->locktag_field1,
1087                                                          tag->locktag_field2);
1088                         break;
1089                 case LOCKTAG_SPECULATIVE_TOKEN:
1090                         appendStringInfo(buf,
1091                                                          _("speculative token %u of transaction %u"),
1092                                                          tag->locktag_field2,
1093                                                          tag->locktag_field1);
1094                         break;
1095                 case LOCKTAG_OBJECT:
1096                         appendStringInfo(buf,
1097                                                          _("object %u of class %u of database %u"),
1098                                                          tag->locktag_field3,
1099                                                          tag->locktag_field2,
1100                                                          tag->locktag_field1);
1101                         break;
1102                 case LOCKTAG_USERLOCK:
1103                         /* reserved for old contrib code, now on pgfoundry */
1104                         appendStringInfo(buf,
1105                                                          _("user lock [%u,%u,%u]"),
1106                                                          tag->locktag_field1,
1107                                                          tag->locktag_field2,
1108                                                          tag->locktag_field3);
1109                         break;
1110                 case LOCKTAG_ADVISORY:
1111                         appendStringInfo(buf,
1112                                                          _("advisory lock [%u,%u,%u,%u]"),
1113                                                          tag->locktag_field1,
1114                                                          tag->locktag_field2,
1115                                                          tag->locktag_field3,
1116                                                          tag->locktag_field4);
1117                         break;
1118                 default:
1119                         appendStringInfo(buf,
1120                                                          _("unrecognized locktag type %d"),
1121                                                          (int) tag->locktag_type);
1122                         break;
1123         }
1124 }
1125
1126 /*
1127  * GetLockNameFromTagType
1128  *
1129  *      Given locktag type, return the corresponding lock name.
1130  */
1131 const char *
1132 GetLockNameFromTagType(uint16 locktag_type)
1133 {
1134         if (locktag_type > LOCKTAG_LAST_TYPE)
1135                 return "???";
1136         return LockTagTypeNames[locktag_type];
1137 }