]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lmgr.c
Add support for INSERT ... ON CONFLICT DO NOTHING/UPDATE.
[postgresql] / src / backend / storage / lmgr / lmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * lmgr.c
4  *        POSTGRES lock manager code
5  *
6  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/lmgr/lmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/procarray.h"
25 #include "utils/inval.h"
26
27
28 /*
29  * Per-backend counter for generating speculative insertion tokens.
30  *
31  * This may wrap around, but that's OK as it's only used for the short
32  * duration between inserting a tuple and checking that there are no (unique)
33  * constraint violations.  It's theoretically possible that a backend sees a
34  * tuple that was speculatively inserted by another backend, but before it has
35  * started waiting on the token, the other backend completes its insertion,
36  * and then then performs 2^32 unrelated insertions.  And after all that, the
37  * first backend finally calls SpeculativeInsertionLockAcquire(), with the
38  * intention of waiting for the first insertion to complete, but ends up
39  * waiting for the latest unrelated insertion instead.  Even then, nothing
40  * particularly bad happens: in the worst case they deadlock, causing one of
41  * the transactions to abort.
42  */
43 static uint32 speculativeInsertionToken = 0;
44
45
46 /*
47  * Struct to hold context info for transaction lock waits.
48  *
49  * 'oper' is the operation that needs to wait for the other transaction; 'rel'
50  * and 'ctid' specify the address of the tuple being waited for.
51  */
52 typedef struct XactLockTableWaitInfo
53 {
54         XLTW_Oper       oper;
55         Relation        rel;
56         ItemPointer ctid;
57 } XactLockTableWaitInfo;
58
59 static void XactLockTableWaitErrorCb(void *arg);
60
61 /*
62  * RelationInitLockInfo
63  *              Initializes the lock information in a relation descriptor.
64  *
65  *              relcache.c must call this during creation of any reldesc.
66  */
67 void
68 RelationInitLockInfo(Relation relation)
69 {
70         Assert(RelationIsValid(relation));
71         Assert(OidIsValid(RelationGetRelid(relation)));
72
73         relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
74
75         if (relation->rd_rel->relisshared)
76                 relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
77         else
78                 relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
79 }
80
81 /*
82  * SetLocktagRelationOid
83  *              Set up a locktag for a relation, given only relation OID
84  */
85 static inline void
86 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
87 {
88         Oid                     dbid;
89
90         if (IsSharedRelation(relid))
91                 dbid = InvalidOid;
92         else
93                 dbid = MyDatabaseId;
94
95         SET_LOCKTAG_RELATION(*tag, dbid, relid);
96 }
97
98 /*
99  *              LockRelationOid
100  *
101  * Lock a relation given only its OID.  This should generally be used
102  * before attempting to open the relation's relcache entry.
103  */
104 void
105 LockRelationOid(Oid relid, LOCKMODE lockmode)
106 {
107         LOCKTAG         tag;
108         LockAcquireResult res;
109
110         SetLocktagRelationOid(&tag, relid);
111
112         res = LockAcquire(&tag, lockmode, false, false);
113
114         /*
115          * Now that we have the lock, check for invalidation messages, so that we
116          * will update or flush any stale relcache entry before we try to use it.
117          * RangeVarGetRelid() specifically relies on us for this.  We can skip
118          * this in the not-uncommon case that we already had the same type of lock
119          * being requested, since then no one else could have modified the
120          * relcache entry in an undesirable way.  (In the case where our own xact
121          * modifies the rel, the relcache update happens via
122          * CommandCounterIncrement, not here.)
123          */
124         if (res != LOCKACQUIRE_ALREADY_HELD)
125                 AcceptInvalidationMessages();
126 }
127
128 /*
129  *              ConditionalLockRelationOid
130  *
131  * As above, but only lock if we can get the lock without blocking.
132  * Returns TRUE iff the lock was acquired.
133  *
134  * NOTE: we do not currently need conditional versions of all the
135  * LockXXX routines in this file, but they could easily be added if needed.
136  */
137 bool
138 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
139 {
140         LOCKTAG         tag;
141         LockAcquireResult res;
142
143         SetLocktagRelationOid(&tag, relid);
144
145         res = LockAcquire(&tag, lockmode, false, true);
146
147         if (res == LOCKACQUIRE_NOT_AVAIL)
148                 return false;
149
150         /*
151          * Now that we have the lock, check for invalidation messages; see notes
152          * in LockRelationOid.
153          */
154         if (res != LOCKACQUIRE_ALREADY_HELD)
155                 AcceptInvalidationMessages();
156
157         return true;
158 }
159
160 /*
161  *              UnlockRelationId
162  *
163  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
164  * for speed reasons.
165  */
166 void
167 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
168 {
169         LOCKTAG         tag;
170
171         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
172
173         LockRelease(&tag, lockmode, false);
174 }
175
176 /*
177  *              UnlockRelationOid
178  *
179  * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
180  */
181 void
182 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
183 {
184         LOCKTAG         tag;
185
186         SetLocktagRelationOid(&tag, relid);
187
188         LockRelease(&tag, lockmode, false);
189 }
190
191 /*
192  *              LockRelation
193  *
194  * This is a convenience routine for acquiring an additional lock on an
195  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
196  * and then lock with this.
197  */
198 void
199 LockRelation(Relation relation, LOCKMODE lockmode)
200 {
201         LOCKTAG         tag;
202         LockAcquireResult res;
203
204         SET_LOCKTAG_RELATION(tag,
205                                                  relation->rd_lockInfo.lockRelId.dbId,
206                                                  relation->rd_lockInfo.lockRelId.relId);
207
208         res = LockAcquire(&tag, lockmode, false, false);
209
210         /*
211          * Now that we have the lock, check for invalidation messages; see notes
212          * in LockRelationOid.
213          */
214         if (res != LOCKACQUIRE_ALREADY_HELD)
215                 AcceptInvalidationMessages();
216 }
217
218 /*
219  *              ConditionalLockRelation
220  *
221  * This is a convenience routine for acquiring an additional lock on an
222  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
223  * and then lock with this.
224  */
225 bool
226 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
227 {
228         LOCKTAG         tag;
229         LockAcquireResult res;
230
231         SET_LOCKTAG_RELATION(tag,
232                                                  relation->rd_lockInfo.lockRelId.dbId,
233                                                  relation->rd_lockInfo.lockRelId.relId);
234
235         res = LockAcquire(&tag, lockmode, false, true);
236
237         if (res == LOCKACQUIRE_NOT_AVAIL)
238                 return false;
239
240         /*
241          * Now that we have the lock, check for invalidation messages; see notes
242          * in LockRelationOid.
243          */
244         if (res != LOCKACQUIRE_ALREADY_HELD)
245                 AcceptInvalidationMessages();
246
247         return true;
248 }
249
250 /*
251  *              UnlockRelation
252  *
253  * This is a convenience routine for unlocking a relation without also
254  * closing it.
255  */
256 void
257 UnlockRelation(Relation relation, LOCKMODE lockmode)
258 {
259         LOCKTAG         tag;
260
261         SET_LOCKTAG_RELATION(tag,
262                                                  relation->rd_lockInfo.lockRelId.dbId,
263                                                  relation->rd_lockInfo.lockRelId.relId);
264
265         LockRelease(&tag, lockmode, false);
266 }
267
268 /*
269  *              LockHasWaitersRelation
270  *
271  * This is a functiion to check if someone else is waiting on a
272  * lock, we are currently holding.
273  */
274 bool
275 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
276 {
277         LOCKTAG         tag;
278
279         SET_LOCKTAG_RELATION(tag,
280                                                  relation->rd_lockInfo.lockRelId.dbId,
281                                                  relation->rd_lockInfo.lockRelId.relId);
282
283         return LockHasWaiters(&tag, lockmode, false);
284 }
285
286 /*
287  *              LockRelationIdForSession
288  *
289  * This routine grabs a session-level lock on the target relation.  The
290  * session lock persists across transaction boundaries.  It will be removed
291  * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
292  * or if the backend exits.
293  *
294  * Note that one should also grab a transaction-level lock on the rel
295  * in any transaction that actually uses the rel, to ensure that the
296  * relcache entry is up to date.
297  */
298 void
299 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
300 {
301         LOCKTAG         tag;
302
303         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
304
305         (void) LockAcquire(&tag, lockmode, true, false);
306 }
307
308 /*
309  *              UnlockRelationIdForSession
310  */
311 void
312 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
313 {
314         LOCKTAG         tag;
315
316         SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
317
318         LockRelease(&tag, lockmode, true);
319 }
320
321 /*
322  *              LockRelationForExtension
323  *
324  * This lock tag is used to interlock addition of pages to relations.
325  * We need such locking because bufmgr/smgr definition of P_NEW is not
326  * race-condition-proof.
327  *
328  * We assume the caller is already holding some type of regular lock on
329  * the relation, so no AcceptInvalidationMessages call is needed here.
330  */
331 void
332 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
333 {
334         LOCKTAG         tag;
335
336         SET_LOCKTAG_RELATION_EXTEND(tag,
337                                                                 relation->rd_lockInfo.lockRelId.dbId,
338                                                                 relation->rd_lockInfo.lockRelId.relId);
339
340         (void) LockAcquire(&tag, lockmode, false, false);
341 }
342
343 /*
344  *              UnlockRelationForExtension
345  */
346 void
347 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
348 {
349         LOCKTAG         tag;
350
351         SET_LOCKTAG_RELATION_EXTEND(tag,
352                                                                 relation->rd_lockInfo.lockRelId.dbId,
353                                                                 relation->rd_lockInfo.lockRelId.relId);
354
355         LockRelease(&tag, lockmode, false);
356 }
357
358 /*
359  *              LockPage
360  *
361  * Obtain a page-level lock.  This is currently used by some index access
362  * methods to lock individual index pages.
363  */
364 void
365 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
366 {
367         LOCKTAG         tag;
368
369         SET_LOCKTAG_PAGE(tag,
370                                          relation->rd_lockInfo.lockRelId.dbId,
371                                          relation->rd_lockInfo.lockRelId.relId,
372                                          blkno);
373
374         (void) LockAcquire(&tag, lockmode, false, false);
375 }
376
377 /*
378  *              ConditionalLockPage
379  *
380  * As above, but only lock if we can get the lock without blocking.
381  * Returns TRUE iff the lock was acquired.
382  */
383 bool
384 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
385 {
386         LOCKTAG         tag;
387
388         SET_LOCKTAG_PAGE(tag,
389                                          relation->rd_lockInfo.lockRelId.dbId,
390                                          relation->rd_lockInfo.lockRelId.relId,
391                                          blkno);
392
393         return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
394 }
395
396 /*
397  *              UnlockPage
398  */
399 void
400 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
401 {
402         LOCKTAG         tag;
403
404         SET_LOCKTAG_PAGE(tag,
405                                          relation->rd_lockInfo.lockRelId.dbId,
406                                          relation->rd_lockInfo.lockRelId.relId,
407                                          blkno);
408
409         LockRelease(&tag, lockmode, false);
410 }
411
412 /*
413  *              LockTuple
414  *
415  * Obtain a tuple-level lock.  This is used in a less-than-intuitive fashion
416  * because we can't afford to keep a separate lock in shared memory for every
417  * tuple.  See heap_lock_tuple before using this!
418  */
419 void
420 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
421 {
422         LOCKTAG         tag;
423
424         SET_LOCKTAG_TUPLE(tag,
425                                           relation->rd_lockInfo.lockRelId.dbId,
426                                           relation->rd_lockInfo.lockRelId.relId,
427                                           ItemPointerGetBlockNumber(tid),
428                                           ItemPointerGetOffsetNumber(tid));
429
430         (void) LockAcquire(&tag, lockmode, false, false);
431 }
432
433 /*
434  *              ConditionalLockTuple
435  *
436  * As above, but only lock if we can get the lock without blocking.
437  * Returns TRUE iff the lock was acquired.
438  */
439 bool
440 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
441 {
442         LOCKTAG         tag;
443
444         SET_LOCKTAG_TUPLE(tag,
445                                           relation->rd_lockInfo.lockRelId.dbId,
446                                           relation->rd_lockInfo.lockRelId.relId,
447                                           ItemPointerGetBlockNumber(tid),
448                                           ItemPointerGetOffsetNumber(tid));
449
450         return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
451 }
452
453 /*
454  *              UnlockTuple
455  */
456 void
457 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
458 {
459         LOCKTAG         tag;
460
461         SET_LOCKTAG_TUPLE(tag,
462                                           relation->rd_lockInfo.lockRelId.dbId,
463                                           relation->rd_lockInfo.lockRelId.relId,
464                                           ItemPointerGetBlockNumber(tid),
465                                           ItemPointerGetOffsetNumber(tid));
466
467         LockRelease(&tag, lockmode, false);
468 }
469
470 /*
471  *              XactLockTableInsert
472  *
473  * Insert a lock showing that the given transaction ID is running ---
474  * this is done when an XID is acquired by a transaction or subtransaction.
475  * The lock can then be used to wait for the transaction to finish.
476  */
477 void
478 XactLockTableInsert(TransactionId xid)
479 {
480         LOCKTAG         tag;
481
482         SET_LOCKTAG_TRANSACTION(tag, xid);
483
484         (void) LockAcquire(&tag, ExclusiveLock, false, false);
485 }
486
487 /*
488  *              XactLockTableDelete
489  *
490  * Delete the lock showing that the given transaction ID is running.
491  * (This is never used for main transaction IDs; those locks are only
492  * released implicitly at transaction end.  But we do use it for subtrans IDs.)
493  */
494 void
495 XactLockTableDelete(TransactionId xid)
496 {
497         LOCKTAG         tag;
498
499         SET_LOCKTAG_TRANSACTION(tag, xid);
500
501         LockRelease(&tag, ExclusiveLock, false);
502 }
503
504 /*
505  *              XactLockTableWait
506  *
507  * Wait for the specified transaction to commit or abort.  If an operation
508  * is specified, an error context callback is set up.  If 'oper' is passed as
509  * None, no error context callback is set up.
510  *
511  * Note that this does the right thing for subtransactions: if we wait on a
512  * subtransaction, we will exit as soon as it aborts or its top parent commits.
513  * It takes some extra work to ensure this, because to save on shared memory
514  * the XID lock of a subtransaction is released when it ends, whether
515  * successfully or unsuccessfully.  So we have to check if it's "still running"
516  * and if so wait for its parent.
517  */
518 void
519 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
520                                   XLTW_Oper oper)
521 {
522         LOCKTAG         tag;
523         XactLockTableWaitInfo info;
524         ErrorContextCallback callback;
525
526         /*
527          * If an operation is specified, set up our verbose error context
528          * callback.
529          */
530         if (oper != XLTW_None)
531         {
532                 Assert(RelationIsValid(rel));
533                 Assert(ItemPointerIsValid(ctid));
534
535                 info.rel = rel;
536                 info.ctid = ctid;
537                 info.oper = oper;
538
539                 callback.callback = XactLockTableWaitErrorCb;
540                 callback.arg = &info;
541                 callback.previous = error_context_stack;
542                 error_context_stack = &callback;
543         }
544
545         for (;;)
546         {
547                 Assert(TransactionIdIsValid(xid));
548                 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
549
550                 SET_LOCKTAG_TRANSACTION(tag, xid);
551
552                 (void) LockAcquire(&tag, ShareLock, false, false);
553
554                 LockRelease(&tag, ShareLock, false);
555
556                 if (!TransactionIdIsInProgress(xid))
557                         break;
558                 xid = SubTransGetParent(xid);
559         }
560
561         if (oper != XLTW_None)
562                 error_context_stack = callback.previous;
563 }
564
565 /*
566  *              ConditionalXactLockTableWait
567  *
568  * As above, but only lock if we can get the lock without blocking.
569  * Returns TRUE if the lock was acquired.
570  */
571 bool
572 ConditionalXactLockTableWait(TransactionId xid)
573 {
574         LOCKTAG         tag;
575
576         for (;;)
577         {
578                 Assert(TransactionIdIsValid(xid));
579                 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
580
581                 SET_LOCKTAG_TRANSACTION(tag, xid);
582
583                 if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
584                         return false;
585
586                 LockRelease(&tag, ShareLock, false);
587
588                 if (!TransactionIdIsInProgress(xid))
589                         break;
590                 xid = SubTransGetParent(xid);
591         }
592
593         return true;
594 }
595
596 /*
597  *              SpeculativeInsertionLockAcquire
598  *
599  * Insert a lock showing that the given transaction ID is inserting a tuple,
600  * but hasn't yet decided whether it's going to keep it.  The lock can then be
601  * used to wait for the decision to go ahead with the insertion, or aborting
602  * it.
603  *
604  * The token is used to distinguish multiple insertions by the same
605  * transaction.  It is returned to caller.
606  */
607 uint32
608 SpeculativeInsertionLockAcquire(TransactionId xid)
609 {
610         LOCKTAG         tag;
611
612         speculativeInsertionToken++;
613
614         /*
615          * Check for wrap-around. Zero means no token is held, so don't use that.
616          */
617         if (speculativeInsertionToken == 0)
618                 speculativeInsertionToken = 1;
619
620         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
621
622         (void) LockAcquire(&tag, ExclusiveLock, false, false);
623
624         return speculativeInsertionToken;
625 }
626
627 /*
628  *              SpeculativeInsertionLockRelease
629  *
630  * Delete the lock showing that the given transaction is speculatively
631  * inserting a tuple.
632  */
633 void
634 SpeculativeInsertionLockRelease(TransactionId xid)
635 {
636         LOCKTAG         tag;
637
638         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
639
640         LockRelease(&tag, ExclusiveLock, false);
641 }
642
643 /*
644  *              SpeculativeInsertionWait
645  *
646  * Wait for the specified transaction to finish or abort the insertion of a
647  * tuple.
648  */
649 void
650 SpeculativeInsertionWait(TransactionId xid, uint32 token)
651 {
652         LOCKTAG         tag;
653
654         SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
655
656         Assert(TransactionIdIsValid(xid));
657         Assert(token != 0);
658
659         (void) LockAcquire(&tag, ShareLock, false, false);
660         LockRelease(&tag, ShareLock, false);
661 }
662
663 /*
664  * XactLockTableWaitErrorContextCb
665  *              Error context callback for transaction lock waits.
666  */
667 static void
668 XactLockTableWaitErrorCb(void *arg)
669 {
670         XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
671
672         /*
673          * We would like to print schema name too, but that would require a
674          * syscache lookup.
675          */
676         if (info->oper != XLTW_None &&
677                 ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
678         {
679                 const char *cxt;
680
681                 switch (info->oper)
682                 {
683                         case XLTW_Update:
684                                 cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
685                                 break;
686                         case XLTW_Delete:
687                                 cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
688                                 break;
689                         case XLTW_Lock:
690                                 cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
691                                 break;
692                         case XLTW_LockUpdated:
693                                 cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
694                                 break;
695                         case XLTW_InsertIndex:
696                                 cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
697                                 break;
698                         case XLTW_InsertIndexUnique:
699                                 cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
700                                 break;
701                         case XLTW_FetchUpdated:
702                                 cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
703                                 break;
704                         case XLTW_RecheckExclusionConstr:
705                                 cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
706                                 break;
707
708                         default:
709                                 return;
710                 }
711
712                 errcontext(cxt,
713                                    ItemPointerGetBlockNumber(info->ctid),
714                                    ItemPointerGetOffsetNumber(info->ctid),
715                                    RelationGetRelationName(info->rel));
716         }
717 }
718
719 /*
720  * WaitForLockersMultiple
721  *              Wait until no transaction holds locks that conflict with the given
722  *              locktags at the given lockmode.
723  *
724  * To do this, obtain the current list of lockers, and wait on their VXIDs
725  * until they are finished.
726  *
727  * Note we don't try to acquire the locks on the given locktags, only the VXIDs
728  * of its lock holders; if somebody grabs a conflicting lock on the objects
729  * after we obtained our initial list of lockers, we will not wait for them.
730  */
731 void
732 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode)
733 {
734         List       *holders = NIL;
735         ListCell   *lc;
736
737         /* Done if no locks to wait for */
738         if (list_length(locktags) == 0)
739                 return;
740
741         /* Collect the transactions we need to wait on */
742         foreach(lc, locktags)
743         {
744                 LOCKTAG    *locktag = lfirst(lc);
745
746                 holders = lappend(holders, GetLockConflicts(locktag, lockmode));
747         }
748
749         /*
750          * Note: GetLockConflicts() never reports our own xid, hence we need not
751          * check for that.  Also, prepared xacts are not reported, which is fine
752          * since they certainly aren't going to do anything anymore.
753          */
754
755         /* Finally wait for each such transaction to complete */
756         foreach(lc, holders)
757         {
758                 VirtualTransactionId *lockholders = lfirst(lc);
759
760                 while (VirtualTransactionIdIsValid(*lockholders))
761                 {
762                         VirtualXactLock(*lockholders, true);
763                         lockholders++;
764                 }
765         }
766
767         list_free_deep(holders);
768 }
769
770 /*
771  * WaitForLockers
772  *
773  * Same as WaitForLockersMultiple, for a single lock tag.
774  */
775 void
776 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode)
777 {
778         List       *l;
779
780         l = list_make1(&heaplocktag);
781         WaitForLockersMultiple(l, lockmode);
782         list_free(l);
783 }
784
785
786 /*
787  *              LockDatabaseObject
788  *
789  * Obtain a lock on a general object of the current database.  Don't use
790  * this for shared objects (such as tablespaces).  It's unwise to apply it
791  * to relations, also, since a lock taken this way will NOT conflict with
792  * locks taken via LockRelation and friends.
793  */
794 void
795 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
796                                    LOCKMODE lockmode)
797 {
798         LOCKTAG         tag;
799
800         SET_LOCKTAG_OBJECT(tag,
801                                            MyDatabaseId,
802                                            classid,
803                                            objid,
804                                            objsubid);
805
806         (void) LockAcquire(&tag, lockmode, false, false);
807
808         /* Make sure syscaches are up-to-date with any changes we waited for */
809         AcceptInvalidationMessages();
810 }
811
812 /*
813  *              UnlockDatabaseObject
814  */
815 void
816 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
817                                          LOCKMODE lockmode)
818 {
819         LOCKTAG         tag;
820
821         SET_LOCKTAG_OBJECT(tag,
822                                            MyDatabaseId,
823                                            classid,
824                                            objid,
825                                            objsubid);
826
827         LockRelease(&tag, lockmode, false);
828 }
829
830 /*
831  *              LockSharedObject
832  *
833  * Obtain a lock on a shared-across-databases object.
834  */
835 void
836 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
837                                  LOCKMODE lockmode)
838 {
839         LOCKTAG         tag;
840
841         SET_LOCKTAG_OBJECT(tag,
842                                            InvalidOid,
843                                            classid,
844                                            objid,
845                                            objsubid);
846
847         (void) LockAcquire(&tag, lockmode, false, false);
848
849         /* Make sure syscaches are up-to-date with any changes we waited for */
850         AcceptInvalidationMessages();
851 }
852
853 /*
854  *              UnlockSharedObject
855  */
856 void
857 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
858                                    LOCKMODE lockmode)
859 {
860         LOCKTAG         tag;
861
862         SET_LOCKTAG_OBJECT(tag,
863                                            InvalidOid,
864                                            classid,
865                                            objid,
866                                            objsubid);
867
868         LockRelease(&tag, lockmode, false);
869 }
870
871 /*
872  *              LockSharedObjectForSession
873  *
874  * Obtain a session-level lock on a shared-across-databases object.
875  * See LockRelationIdForSession for notes about session-level locks.
876  */
877 void
878 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
879                                                    LOCKMODE lockmode)
880 {
881         LOCKTAG         tag;
882
883         SET_LOCKTAG_OBJECT(tag,
884                                            InvalidOid,
885                                            classid,
886                                            objid,
887                                            objsubid);
888
889         (void) LockAcquire(&tag, lockmode, true, false);
890 }
891
892 /*
893  *              UnlockSharedObjectForSession
894  */
895 void
896 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
897                                                          LOCKMODE lockmode)
898 {
899         LOCKTAG         tag;
900
901         SET_LOCKTAG_OBJECT(tag,
902                                            InvalidOid,
903                                            classid,
904                                            objid,
905                                            objsubid);
906
907         LockRelease(&tag, lockmode, true);
908 }
909
910
911 /*
912  * Append a description of a lockable object to buf.
913  *
914  * Ideally we would print names for the numeric values, but that requires
915  * getting locks on system tables, which might cause problems since this is
916  * typically used to report deadlock situations.
917  */
918 void
919 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
920 {
921         switch ((LockTagType) tag->locktag_type)
922         {
923                 case LOCKTAG_RELATION:
924                         appendStringInfo(buf,
925                                                          _("relation %u of database %u"),
926                                                          tag->locktag_field2,
927                                                          tag->locktag_field1);
928                         break;
929                 case LOCKTAG_RELATION_EXTEND:
930                         appendStringInfo(buf,
931                                                          _("extension of relation %u of database %u"),
932                                                          tag->locktag_field2,
933                                                          tag->locktag_field1);
934                         break;
935                 case LOCKTAG_PAGE:
936                         appendStringInfo(buf,
937                                                          _("page %u of relation %u of database %u"),
938                                                          tag->locktag_field3,
939                                                          tag->locktag_field2,
940                                                          tag->locktag_field1);
941                         break;
942                 case LOCKTAG_TUPLE:
943                         appendStringInfo(buf,
944                                                          _("tuple (%u,%u) of relation %u of database %u"),
945                                                          tag->locktag_field3,
946                                                          tag->locktag_field4,
947                                                          tag->locktag_field2,
948                                                          tag->locktag_field1);
949                         break;
950                 case LOCKTAG_TRANSACTION:
951                         appendStringInfo(buf,
952                                                          _("transaction %u"),
953                                                          tag->locktag_field1);
954                         break;
955                 case LOCKTAG_VIRTUALTRANSACTION:
956                         appendStringInfo(buf,
957                                                          _("virtual transaction %d/%u"),
958                                                          tag->locktag_field1,
959                                                          tag->locktag_field2);
960                         break;
961                 case LOCKTAG_SPECULATIVE_TOKEN:
962                         appendStringInfo(buf,
963                                                          _("speculative token %u of transaction %u"),
964                                                          tag->locktag_field2,
965                                                          tag->locktag_field1);
966                         break;
967                 case LOCKTAG_OBJECT:
968                         appendStringInfo(buf,
969                                                          _("object %u of class %u of database %u"),
970                                                          tag->locktag_field3,
971                                                          tag->locktag_field2,
972                                                          tag->locktag_field1);
973                         break;
974                 case LOCKTAG_USERLOCK:
975                         /* reserved for old contrib code, now on pgfoundry */
976                         appendStringInfo(buf,
977                                                          _("user lock [%u,%u,%u]"),
978                                                          tag->locktag_field1,
979                                                          tag->locktag_field2,
980                                                          tag->locktag_field3);
981                         break;
982                 case LOCKTAG_ADVISORY:
983                         appendStringInfo(buf,
984                                                          _("advisory lock [%u,%u,%u,%u]"),
985                                                          tag->locktag_field1,
986                                                          tag->locktag_field2,
987                                                          tag->locktag_field3,
988                                                          tag->locktag_field4);
989                         break;
990                 default:
991                         appendStringInfo(buf,
992                                                          _("unrecognized locktag type %d"),
993                                                          (int) tag->locktag_type);
994                         break;
995         }
996 }