]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/heapam.c
Use the standard lock manager to establish priority order when there
[postgresql] / src / backend / access / heap / heapam.c
1 /*-------------------------------------------------------------------------
2  *
3  * heapam.c
4  *        heap access method code
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.189 2005/04/30 19:03:32 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              relation_open   - open any relation by relation OID
16  *              relation_openrv - open any relation specified by a RangeVar
17  *              relation_close  - close any relation
18  *              heap_open               - open a heap relation by relation OID
19  *              heap_openrv             - open a heap relation specified by a RangeVar
20  *              heap_close              - (now just a macro for relation_close)
21  *              heap_beginscan  - begin relation scan
22  *              heap_rescan             - restart a relation scan
23  *              heap_endscan    - end relation scan
24  *              heap_getnext    - retrieve next tuple in scan
25  *              heap_fetch              - retrieve tuple with tid
26  *              heap_insert             - insert tuple into a relation
27  *              heap_delete             - delete a tuple from a relation
28  *              heap_update             - replace a tuple in a relation with another tuple
29  *              heap_markpos    - mark scan position
30  *              heap_restrpos   - restore position to marked location
31  *
32  * NOTES
33  *        This file contains the heap_ routines which implement
34  *        the POSTGRES heap access method used for all POSTGRES
35  *        relations.
36  *
37  *-------------------------------------------------------------------------
38  */
39 #include "postgres.h"
40
41 #include "access/heapam.h"
42 #include "access/hio.h"
43 #include "access/multixact.h"
44 #include "access/tuptoaster.h"
45 #include "access/valid.h"
46 #include "access/xlogutils.h"
47 #include "catalog/catalog.h"
48 #include "catalog/namespace.h"
49 #include "miscadmin.h"
50 #include "storage/sinval.h"
51 #include "utils/inval.h"
52 #include "utils/relcache.h"
53 #include "pgstat.h"
54
55
56 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
57            ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
58
59
60 /* ----------------------------------------------------------------
61  *                                               heap support routines
62  * ----------------------------------------------------------------
63  */
64
65 /* ----------------
66  *              initscan - scan code common to heap_beginscan and heap_rescan
67  * ----------------
68  */
69 static void
70 initscan(HeapScanDesc scan, ScanKey key)
71 {
72         /*
73          * Determine the number of blocks we have to scan.
74          *
75          * It is sufficient to do this once at scan start, since any tuples added
76          * while the scan is in progress will be invisible to my transaction
77          * anyway...
78          */
79         scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
80
81         scan->rs_ctup.t_datamcxt = NULL;
82         scan->rs_ctup.t_data = NULL;
83         scan->rs_cbuf = InvalidBuffer;
84
85         /* we don't have a marked position... */
86         ItemPointerSetInvalid(&(scan->rs_mctid));
87
88         /*
89          * copy the scan key, if appropriate
90          */
91         if (key != NULL)
92                 memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
93 }
94
95 /* ----------------
96  *              heapgettup - fetch next heap tuple
97  *
98  *              routine used by heap_getnext() which does most of the
99  *              real work in scanning tuples.
100  *
101  *              The passed-in *buffer must be either InvalidBuffer or the pinned
102  *              current page of the scan.  If we have to move to another page,
103  *              we will unpin this buffer (if valid).  On return, *buffer is either
104  *              InvalidBuffer or the ID of a pinned buffer.
105  * ----------------
106  */
107 static void
108 heapgettup(Relation relation,
109                    int dir,
110                    HeapTuple tuple,
111                    Buffer *buffer,
112                    Snapshot snapshot,
113                    int nkeys,
114                    ScanKey key,
115                    BlockNumber pages)
116 {
117         ItemId          lpp;
118         Page            dp;
119         BlockNumber page;
120         int                     lines;
121         OffsetNumber lineoff;
122         int                     linesleft;
123         ItemPointer tid;
124
125         tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
126
127         /*
128          * debugging stuff
129          *
130          * check validity of arguments, here and for other functions too Note: no
131          * locking manipulations needed--this is a local function
132          */
133 #ifdef  HEAPDEBUGALL
134         if (ItemPointerIsValid(tid))
135                 elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
136                          RelationGetRelationName(relation), tid, tid->ip_blkid,
137                          tid->ip_posid, dir);
138         else
139                 elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
140                          RelationGetRelationName(relation), tid, dir);
141
142         elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
143
144         elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
145                  relation->rd_rel->relkind, RelationGetRelationName(relation),
146                  snapshot);
147 #endif   /* HEAPDEBUGALL */
148
149         if (!ItemPointerIsValid(tid))
150         {
151                 Assert(!PointerIsValid(tid));
152                 tid = NULL;
153         }
154
155         tuple->t_tableOid = relation->rd_id;
156
157         /*
158          * return null immediately if relation is empty
159          */
160         if (pages == 0)
161         {
162                 if (BufferIsValid(*buffer))
163                         ReleaseBuffer(*buffer);
164                 *buffer = InvalidBuffer;
165                 tuple->t_datamcxt = NULL;
166                 tuple->t_data = NULL;
167                 return;
168         }
169
170         /*
171          * calculate next starting lineoff, given scan direction
172          */
173         if (dir == 0)
174         {
175                 /*
176                  * ``no movement'' scan direction: refetch same tuple
177                  */
178                 if (tid == NULL)
179                 {
180                         if (BufferIsValid(*buffer))
181                                 ReleaseBuffer(*buffer);
182                         *buffer = InvalidBuffer;
183                         tuple->t_datamcxt = NULL;
184                         tuple->t_data = NULL;
185                         return;
186                 }
187
188                 *buffer = ReleaseAndReadBuffer(*buffer,
189                                                                            relation,
190                                                                            ItemPointerGetBlockNumber(tid));
191
192                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
193
194                 dp = (Page) BufferGetPage(*buffer);
195                 lineoff = ItemPointerGetOffsetNumber(tid);
196                 lpp = PageGetItemId(dp, lineoff);
197
198                 tuple->t_datamcxt = NULL;
199                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
200                 tuple->t_len = ItemIdGetLength(lpp);
201                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
202
203                 return;
204         }
205         else if (dir < 0)
206         {
207                 /*
208                  * reverse scan direction
209                  */
210                 if (tid == NULL)
211                 {
212                         page = pages - 1;       /* final page */
213                 }
214                 else
215                 {
216                         page = ItemPointerGetBlockNumber(tid);          /* current page */
217                 }
218
219                 Assert(page < pages);
220
221                 *buffer = ReleaseAndReadBuffer(*buffer,
222                                                                            relation,
223                                                                            page);
224
225                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
226
227                 dp = (Page) BufferGetPage(*buffer);
228                 lines = PageGetMaxOffsetNumber(dp);
229                 if (tid == NULL)
230                 {
231                         lineoff = lines;        /* final offnum */
232                 }
233                 else
234                 {
235                         lineoff =                       /* previous offnum */
236                                 OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
237                 }
238                 /* page and lineoff now reference the physically previous tid */
239         }
240         else
241         {
242                 /*
243                  * forward scan direction
244                  */
245                 if (tid == NULL)
246                 {
247                         page = 0;                       /* first page */
248                         lineoff = FirstOffsetNumber;            /* first offnum */
249                 }
250                 else
251                 {
252                         page = ItemPointerGetBlockNumber(tid);          /* current page */
253                         lineoff =                       /* next offnum */
254                                 OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
255                 }
256
257                 Assert(page < pages);
258
259                 *buffer = ReleaseAndReadBuffer(*buffer,
260                                                                            relation,
261                                                                            page);
262
263                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
264
265                 dp = (Page) BufferGetPage(*buffer);
266                 lines = PageGetMaxOffsetNumber(dp);
267                 /* page and lineoff now reference the physically next tid */
268         }
269
270         /* 'dir' is now non-zero */
271
272         /*
273          * calculate line pointer and number of remaining items to check on
274          * this page.
275          */
276         lpp = PageGetItemId(dp, lineoff);
277         if (dir < 0)
278                 linesleft = lineoff - 1;
279         else
280                 linesleft = lines - lineoff;
281
282         /*
283          * advance the scan until we find a qualifying tuple or run out of
284          * stuff to scan
285          */
286         for (;;)
287         {
288                 while (linesleft >= 0)
289                 {
290                         if (ItemIdIsUsed(lpp))
291                         {
292                                 bool            valid;
293
294                                 tuple->t_datamcxt = NULL;
295                                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
296                                 tuple->t_len = ItemIdGetLength(lpp);
297                                 ItemPointerSet(&(tuple->t_self), page, lineoff);
298
299                                 /*
300                                  * if current tuple qualifies, return it.
301                                  */
302                                 HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
303                                                                    snapshot, nkeys, key, valid);
304                                 if (valid)
305                                 {
306                                         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
307                                         return;
308                                 }
309                         }
310
311                         /*
312                          * otherwise move to the next item on the page
313                          */
314                         --linesleft;
315                         if (dir < 0)
316                         {
317                                 --lpp;                  /* move back in this page's ItemId array */
318                                 --lineoff;
319                         }
320                         else
321                         {
322                                 ++lpp;                  /* move forward in this page's ItemId
323                                                                  * array */
324                                 ++lineoff;
325                         }
326                 }
327
328                 /*
329                  * if we get here, it means we've exhausted the items on this page
330                  * and it's time to move to the next.
331                  */
332                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
333
334                 /*
335                  * return NULL if we've exhausted all the pages
336                  */
337                 if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
338                 {
339                         if (BufferIsValid(*buffer))
340                                 ReleaseBuffer(*buffer);
341                         *buffer = InvalidBuffer;
342                         tuple->t_datamcxt = NULL;
343                         tuple->t_data = NULL;
344                         return;
345                 }
346
347                 page = (dir < 0) ? (page - 1) : (page + 1);
348
349                 Assert(page < pages);
350
351                 *buffer = ReleaseAndReadBuffer(*buffer,
352                                                                            relation,
353                                                                            page);
354
355                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
356                 dp = (Page) BufferGetPage(*buffer);
357                 lines = PageGetMaxOffsetNumber((Page) dp);
358                 linesleft = lines - 1;
359                 if (dir < 0)
360                 {
361                         lineoff = lines;
362                         lpp = PageGetItemId(dp, lines);
363                 }
364                 else
365                 {
366                         lineoff = FirstOffsetNumber;
367                         lpp = PageGetItemId(dp, FirstOffsetNumber);
368                 }
369         }
370 }
371
372
373 #if defined(DISABLE_COMPLEX_MACRO)
374 /*
375  * This is formatted so oddly so that the correspondence to the macro
376  * definition in access/heapam.h is maintained.
377  */
378 Datum
379 fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
380                         bool *isnull)
381 {
382         return (
383                         (attnum) > 0 ?
384                         (
385                          ((isnull) ? (*(isnull) = false) : (dummyret) NULL),
386                          HeapTupleNoNulls(tup) ?
387                          (
388                           (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
389                           (
390                            fetchatt((tupleDesc)->attrs[(attnum) - 1],
391                                                 (char *) (tup)->t_data + (tup)->t_data->t_hoff +
392                                                 (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
393                            )
394                           :
395                           nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
396                           )
397                          :
398                          (
399                           att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
400                           (
401                            ((isnull) ? (*(isnull) = true) : (dummyret) NULL),
402                            (Datum) NULL
403                            )
404                           :
405                           (
406                            nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
407                            )
408                           )
409                          )
410                         :
411                         (
412                          (Datum) NULL
413                          )
414                 );
415 }
416 #endif   /* defined(DISABLE_COMPLEX_MACRO) */
417
418
419 /* ----------------------------------------------------------------
420  *                                       heap access method interface
421  * ----------------------------------------------------------------
422  */
423
424 /* ----------------
425  *              relation_open - open any relation by relation OID
426  *
427  *              If lockmode is not "NoLock", the specified kind of lock is
428  *              obtained on the relation.  (Generally, NoLock should only be
429  *              used if the caller knows it has some appropriate lock on the
430  *              relation already.)
431  *
432  *              An error is raised if the relation does not exist.
433  *
434  *              NB: a "relation" is anything with a pg_class entry.  The caller is
435  *              expected to check whether the relkind is something it can handle.
436  * ----------------
437  */
438 Relation
439 relation_open(Oid relationId, LOCKMODE lockmode)
440 {
441         Relation        r;
442
443         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
444
445         /* The relcache does all the real work... */
446         r = RelationIdGetRelation(relationId);
447
448         if (!RelationIsValid(r))
449                 elog(ERROR, "could not open relation with OID %u", relationId);
450
451         if (lockmode != NoLock)
452                 LockRelation(r, lockmode);
453
454         return r;
455 }
456
457 /* ----------------
458  *              conditional_relation_open - open with option not to wait
459  *
460  *              As above, but if nowait is true, then throw an error rather than
461  *              waiting when the lock is not immediately obtainable.
462  * ----------------
463  */
464 Relation
465 conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
466 {
467         Relation        r;
468
469         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
470
471         /* The relcache does all the real work... */
472         r = RelationIdGetRelation(relationId);
473
474         if (!RelationIsValid(r))
475                 elog(ERROR, "could not open relation with OID %u", relationId);
476
477         if (lockmode != NoLock)
478         {
479                 if (nowait)
480                 {
481                         if (!ConditionalLockRelation(r, lockmode))
482                                 ereport(ERROR,
483                                                 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
484                                                  errmsg("could not obtain lock on relation \"%s\"",
485                                                                 RelationGetRelationName(r))));
486                 }
487                 else
488                         LockRelation(r, lockmode);
489         }
490
491         return r;
492 }
493
494 /* ----------------
495  *              relation_openrv - open any relation specified by a RangeVar
496  *
497  *              As above, but the relation is specified by a RangeVar.
498  * ----------------
499  */
500 Relation
501 relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
502 {
503         Oid                     relOid;
504
505         /*
506          * Check for shared-cache-inval messages before trying to open the
507          * relation.  This is needed to cover the case where the name
508          * identifies a rel that has been dropped and recreated since the
509          * start of our transaction: if we don't flush the old syscache entry
510          * then we'll latch onto that entry and suffer an error when we do
511          * LockRelation. Note that relation_open does not need to do this,
512          * since a relation's OID never changes.
513          *
514          * We skip this if asked for NoLock, on the assumption that the caller
515          * has already ensured some appropriate lock is held.
516          */
517         if (lockmode != NoLock)
518                 AcceptInvalidationMessages();
519
520         /* Look up the appropriate relation using namespace search */
521         relOid = RangeVarGetRelid(relation, false);
522
523         /* Let relation_open do the rest */
524         return relation_open(relOid, lockmode);
525 }
526
527 /* ----------------
528  *              relation_close - close any relation
529  *
530  *              If lockmode is not "NoLock", we first release the specified lock.
531  *
532  *              Note that it is often sensible to hold a lock beyond relation_close;
533  *              in that case, the lock is released automatically at xact end.
534  * ----------------
535  */
536 void
537 relation_close(Relation relation, LOCKMODE lockmode)
538 {
539         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
540
541         if (lockmode != NoLock)
542                 UnlockRelation(relation, lockmode);
543
544         /* The relcache does the real work... */
545         RelationClose(relation);
546 }
547
548
549 /* ----------------
550  *              heap_open - open a heap relation by relation OID
551  *
552  *              This is essentially relation_open plus check that the relation
553  *              is not an index or special relation.  (The caller should also check
554  *              that it's not a view before assuming it has storage.)
555  * ----------------
556  */
557 Relation
558 heap_open(Oid relationId, LOCKMODE lockmode)
559 {
560         Relation        r;
561
562         r = relation_open(relationId, lockmode);
563
564         if (r->rd_rel->relkind == RELKIND_INDEX)
565                 ereport(ERROR,
566                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
567                                  errmsg("\"%s\" is an index",
568                                                 RelationGetRelationName(r))));
569         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
570                 ereport(ERROR,
571                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
572                                  errmsg("\"%s\" is a special relation",
573                                                 RelationGetRelationName(r))));
574         else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
575                 ereport(ERROR,
576                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
577                                  errmsg("\"%s\" is a composite type",
578                                                 RelationGetRelationName(r))));
579
580         pgstat_initstats(&r->pgstat_info, r);
581
582         return r;
583 }
584
585 /* ----------------
586  *              heap_openrv - open a heap relation specified
587  *              by a RangeVar node
588  *
589  *              As above, but relation is specified by a RangeVar.
590  * ----------------
591  */
592 Relation
593 heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
594 {
595         Relation        r;
596
597         r = relation_openrv(relation, lockmode);
598
599         if (r->rd_rel->relkind == RELKIND_INDEX)
600                 ereport(ERROR,
601                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
602                                  errmsg("\"%s\" is an index",
603                                                 RelationGetRelationName(r))));
604         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
605                 ereport(ERROR,
606                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
607                                  errmsg("\"%s\" is a special relation",
608                                                 RelationGetRelationName(r))));
609         else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
610                 ereport(ERROR,
611                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
612                                  errmsg("\"%s\" is a composite type",
613                                                 RelationGetRelationName(r))));
614
615         pgstat_initstats(&r->pgstat_info, r);
616
617         return r;
618 }
619
620
621 /* ----------------
622  *              heap_beginscan  - begin relation scan
623  * ----------------
624  */
625 HeapScanDesc
626 heap_beginscan(Relation relation, Snapshot snapshot,
627                            int nkeys, ScanKey key)
628 {
629         HeapScanDesc scan;
630
631         /*
632          * increment relation ref count while scanning relation
633          *
634          * This is just to make really sure the relcache entry won't go away
635          * while the scan has a pointer to it.  Caller should be holding the
636          * rel open anyway, so this is redundant in all normal scenarios...
637          */
638         RelationIncrementReferenceCount(relation);
639
640         /*
641          * allocate and initialize scan descriptor
642          */
643         scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
644
645         scan->rs_rd = relation;
646         scan->rs_snapshot = snapshot;
647         scan->rs_nkeys = nkeys;
648
649         /*
650          * we do this here instead of in initscan() because heap_rescan also
651          * calls initscan() and we don't want to allocate memory again
652          */
653         if (nkeys > 0)
654                 scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
655         else
656                 scan->rs_key = NULL;
657
658         pgstat_initstats(&scan->rs_pgstat_info, relation);
659
660         initscan(scan, key);
661
662         return scan;
663 }
664
665 /* ----------------
666  *              heap_rescan             - restart a relation scan
667  * ----------------
668  */
669 void
670 heap_rescan(HeapScanDesc scan,
671                         ScanKey key)
672 {
673         /*
674          * unpin scan buffers
675          */
676         if (BufferIsValid(scan->rs_cbuf))
677                 ReleaseBuffer(scan->rs_cbuf);
678
679         /*
680          * reinitialize scan descriptor
681          */
682         initscan(scan, key);
683
684         pgstat_reset_heap_scan(&scan->rs_pgstat_info);
685 }
686
687 /* ----------------
688  *              heap_endscan    - end relation scan
689  *
690  *              See how to integrate with index scans.
691  *              Check handling if reldesc caching.
692  * ----------------
693  */
694 void
695 heap_endscan(HeapScanDesc scan)
696 {
697         /* Note: no locking manipulations needed */
698
699         /*
700          * unpin scan buffers
701          */
702         if (BufferIsValid(scan->rs_cbuf))
703                 ReleaseBuffer(scan->rs_cbuf);
704
705         /*
706          * decrement relation reference count and free scan descriptor storage
707          */
708         RelationDecrementReferenceCount(scan->rs_rd);
709
710         if (scan->rs_key)
711                 pfree(scan->rs_key);
712
713         pfree(scan);
714 }
715
716 /* ----------------
717  *              heap_getnext    - retrieve next tuple in scan
718  *
719  *              Fix to work with index relations.
720  *              We don't return the buffer anymore, but you can get it from the
721  *              returned HeapTuple.
722  * ----------------
723  */
724
725 #ifdef HEAPDEBUGALL
726 #define HEAPDEBUG_1 \
727         elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
728                  RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
729 #define HEAPDEBUG_2 \
730         elog(DEBUG2, "heap_getnext returning EOS")
731 #define HEAPDEBUG_3 \
732         elog(DEBUG2, "heap_getnext returning tuple")
733 #else
734 #define HEAPDEBUG_1
735 #define HEAPDEBUG_2
736 #define HEAPDEBUG_3
737 #endif   /* !defined(HEAPDEBUGALL) */
738
739
740 HeapTuple
741 heap_getnext(HeapScanDesc scan, ScanDirection direction)
742 {
743         /* Note: no locking manipulations needed */
744
745         HEAPDEBUG_1;                            /* heap_getnext( info ) */
746
747         /*
748          * Note: we depend here on the -1/0/1 encoding of ScanDirection.
749          */
750         heapgettup(scan->rs_rd,
751                            (int) direction,
752                            &(scan->rs_ctup),
753                            &(scan->rs_cbuf),
754                            scan->rs_snapshot,
755                            scan->rs_nkeys,
756                            scan->rs_key,
757                            scan->rs_nblocks);
758
759         if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
760         {
761                 HEAPDEBUG_2;                    /* heap_getnext returning EOS */
762                 return NULL;
763         }
764
765         pgstat_count_heap_scan(&scan->rs_pgstat_info);
766
767         /*
768          * if we get here it means we have a new current scan tuple, so point
769          * to the proper return buffer and return the tuple.
770          */
771
772         HEAPDEBUG_3;                            /* heap_getnext returning tuple */
773
774         if (scan->rs_ctup.t_data != NULL)
775                 pgstat_count_heap_getnext(&scan->rs_pgstat_info);
776
777         return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
778 }
779
780 /*
781  *      heap_fetch              - retrieve tuple with given tid
782  *
783  * On entry, tuple->t_self is the TID to fetch.  We pin the buffer holding
784  * the tuple, fill in the remaining fields of *tuple, and check the tuple
785  * against the specified snapshot.
786  *
787  * If successful (tuple found and passes snapshot time qual), then *userbuf
788  * is set to the buffer holding the tuple and TRUE is returned.  The caller
789  * must unpin the buffer when done with the tuple.
790  *
791  * If the tuple is not found (ie, item number references a deleted slot),
792  * then tuple->t_data is set to NULL and FALSE is returned.
793  *
794  * If the tuple is found but fails the time qual check, then FALSE is returned
795  * but tuple->t_data is left pointing to the tuple.
796  *
797  * keep_buf determines what is done with the buffer in the FALSE-result cases.
798  * When the caller specifies keep_buf = true, we retain the pin on the buffer
799  * and return it in *userbuf (so the caller must eventually unpin it); when
800  * keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
801  *
802  * It is somewhat inconsistent that we ereport() on invalid block number but
803  * return false on invalid item number.  This is historical.  The only
804  * justification I can see is that the caller can relatively easily check the
805  * block number for validity, but cannot check the item number without reading
806  * the page himself.
807  */
808 bool
809 heap_fetch(Relation relation,
810                    Snapshot snapshot,
811                    HeapTuple tuple,
812                    Buffer *userbuf,
813                    bool keep_buf,
814                    PgStat_Info *pgstat_info)
815 {
816         /* Assume *userbuf is undefined on entry */
817         *userbuf = InvalidBuffer;
818         return heap_release_fetch(relation, snapshot, tuple,
819                                                           userbuf, keep_buf, pgstat_info);
820 }
821
822 /*
823  *      heap_release_fetch              - retrieve tuple with given tid
824  *
825  * This has the same API as heap_fetch except that if *userbuf is not
826  * InvalidBuffer on entry, that buffer will be released before reading
827  * the new page.  This saves a separate ReleaseBuffer step and hence
828  * one entry into the bufmgr when looping through multiple fetches.
829  * Also, if *userbuf is the same buffer that holds the target tuple,
830  * we avoid bufmgr manipulation altogether.
831  */
832 bool
833 heap_release_fetch(Relation relation,
834                                    Snapshot snapshot,
835                                    HeapTuple tuple,
836                                    Buffer *userbuf,
837                                    bool keep_buf,
838                                    PgStat_Info *pgstat_info)
839 {
840         ItemPointer tid = &(tuple->t_self);
841         ItemId          lp;
842         Buffer          buffer;
843         PageHeader      dp;
844         OffsetNumber offnum;
845         bool            valid;
846
847         /*
848          * get the buffer from the relation descriptor. Note that this does a
849          * buffer pin, and releases the old *userbuf if not InvalidBuffer.
850          */
851         buffer = ReleaseAndReadBuffer(*userbuf, relation,
852                                                                   ItemPointerGetBlockNumber(tid));
853
854         /*
855          * Need share lock on buffer to examine tuple commit status.
856          */
857         LockBuffer(buffer, BUFFER_LOCK_SHARE);
858         dp = (PageHeader) BufferGetPage(buffer);
859
860         /*
861          * We'd better check for out-of-range offnum in case of VACUUM since
862          * the TID was obtained.
863          */
864         offnum = ItemPointerGetOffsetNumber(tid);
865         if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
866         {
867                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
868                 if (keep_buf)
869                         *userbuf = buffer;
870                 else
871                 {
872                         ReleaseBuffer(buffer);
873                         *userbuf = InvalidBuffer;
874                 }
875                 tuple->t_datamcxt = NULL;
876                 tuple->t_data = NULL;
877                 return false;
878         }
879
880         /*
881          * get the item line pointer corresponding to the requested tid
882          */
883         lp = PageGetItemId(dp, offnum);
884
885         /*
886          * Must check for deleted tuple.
887          */
888         if (!ItemIdIsUsed(lp))
889         {
890                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
891                 if (keep_buf)
892                         *userbuf = buffer;
893                 else
894                 {
895                         ReleaseBuffer(buffer);
896                         *userbuf = InvalidBuffer;
897                 }
898                 tuple->t_datamcxt = NULL;
899                 tuple->t_data = NULL;
900                 return false;
901         }
902
903         /*
904          * fill in *tuple fields
905          */
906         tuple->t_datamcxt = NULL;
907         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
908         tuple->t_len = ItemIdGetLength(lp);
909         tuple->t_tableOid = relation->rd_id;
910
911         /*
912          * check time qualification of tuple, then release lock
913          */
914         HeapTupleSatisfies(tuple, relation, buffer, dp,
915                                            snapshot, 0, NULL, valid);
916
917         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
918
919         if (valid)
920         {
921                 /*
922                  * All checks passed, so return the tuple as valid. Caller is now
923                  * responsible for releasing the buffer.
924                  */
925                 *userbuf = buffer;
926
927                 /*
928                  * Count the successful fetch in *pgstat_info if given, otherwise
929                  * in the relation's default statistics area.
930                  */
931                 if (pgstat_info != NULL)
932                         pgstat_count_heap_fetch(pgstat_info);
933                 else
934                         pgstat_count_heap_fetch(&relation->pgstat_info);
935
936                 return true;
937         }
938
939         /* Tuple failed time qual, but maybe caller wants to see it anyway. */
940         if (keep_buf)
941                 *userbuf = buffer;
942         else
943         {
944                 ReleaseBuffer(buffer);
945                 *userbuf = InvalidBuffer;
946         }
947
948         return false;
949 }
950
951 /*
952  *      heap_get_latest_tid -  get the latest tid of a specified tuple
953  */
954 ItemPointer
955 heap_get_latest_tid(Relation relation,
956                                         Snapshot snapshot,
957                                         ItemPointer tid)
958 {
959         ItemId          lp = NULL;
960         Buffer          buffer;
961         PageHeader      dp;
962         OffsetNumber offnum;
963         HeapTupleData tp;
964         HeapTupleHeader t_data;
965         ItemPointerData ctid;
966         bool            invalidBlock,
967                                 linkend,
968                                 valid;
969
970         /*
971          * get the buffer from the relation descriptor Note that this does a
972          * buffer pin.
973          */
974         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
975         LockBuffer(buffer, BUFFER_LOCK_SHARE);
976
977         /*
978          * get the item line pointer corresponding to the requested tid
979          */
980         dp = (PageHeader) BufferGetPage(buffer);
981         offnum = ItemPointerGetOffsetNumber(tid);
982         invalidBlock = true;
983         if (!PageIsNew(dp))
984         {
985                 lp = PageGetItemId(dp, offnum);
986                 if (ItemIdIsUsed(lp))
987                         invalidBlock = false;
988         }
989         if (invalidBlock)
990         {
991                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
992                 ReleaseBuffer(buffer);
993                 return NULL;
994         }
995
996         /*
997          * more sanity checks
998          */
999
1000         tp.t_datamcxt = NULL;
1001         t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1002         tp.t_len = ItemIdGetLength(lp);
1003         tp.t_self = *tid;
1004         ctid = tp.t_data->t_ctid;
1005
1006         /*
1007          * check time qualification of tid
1008          */
1009
1010         HeapTupleSatisfies(&tp, relation, buffer, dp,
1011                                            snapshot, 0, NULL, valid);
1012
1013         linkend = true;
1014         if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
1015                 !ItemPointerEquals(tid, &ctid))
1016                 linkend = false;
1017
1018         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1019         ReleaseBuffer(buffer);
1020
1021         if (!valid)
1022         {
1023                 if (linkend)
1024                         return NULL;
1025                 heap_get_latest_tid(relation, snapshot, &ctid);
1026                 *tid = ctid;
1027         }
1028
1029         return tid;
1030 }
1031
1032 /*
1033  *      heap_insert             - insert tuple into a heap
1034  *
1035  * The new tuple is stamped with current transaction ID and the specified
1036  * command ID.
1037  */
1038 Oid
1039 heap_insert(Relation relation, HeapTuple tup, CommandId cid)
1040 {
1041         TransactionId xid = GetCurrentTransactionId();
1042         Buffer          buffer;
1043
1044         if (relation->rd_rel->relhasoids)
1045         {
1046 #ifdef NOT_USED
1047                 /* this is redundant with an Assert in HeapTupleSetOid */
1048                 Assert(tup->t_data->t_infomask & HEAP_HASOID);
1049 #endif
1050
1051                 /*
1052                  * If the object id of this tuple has already been assigned, trust
1053                  * the caller.  There are a couple of ways this can happen.  At
1054                  * initial db creation, the backend program sets oids for tuples.
1055                  * When we define an index, we set the oid.  Finally, in the
1056                  * future, we may allow users to set their own object ids in order
1057                  * to support a persistent object store (objects need to contain
1058                  * pointers to one another).
1059                  */
1060                 if (!OidIsValid(HeapTupleGetOid(tup)))
1061                         HeapTupleSetOid(tup, newoid());
1062                 else
1063                         CheckMaxObjectId(HeapTupleGetOid(tup));
1064         }
1065         else
1066         {
1067                 /* check there is not space for an OID */
1068                 Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
1069         }
1070
1071         tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1072         tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1073         HeapTupleHeaderSetXmin(tup->t_data, xid);
1074         HeapTupleHeaderSetCmin(tup->t_data, cid);
1075         HeapTupleHeaderSetXmax(tup->t_data, 0);         /* zero out Datum fields */
1076         HeapTupleHeaderSetCmax(tup->t_data, 0);         /* for cleanliness */
1077         tup->t_tableOid = relation->rd_id;
1078
1079         /*
1080          * If the new tuple is too big for storage or contains already toasted
1081          * out-of-line attributes from some other relation, invoke the
1082          * toaster.
1083          */
1084         if (HeapTupleHasExternal(tup) ||
1085                 (MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD))
1086                 heap_tuple_toast_attrs(relation, tup, NULL);
1087
1088         /* Find buffer to insert this tuple into */
1089         buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
1090
1091         /* NO EREPORT(ERROR) from here till changes are logged */
1092         START_CRIT_SECTION();
1093
1094         RelationPutHeapTuple(relation, buffer, tup);
1095
1096         pgstat_count_heap_insert(&relation->pgstat_info);
1097
1098         /* XLOG stuff */
1099         if (!relation->rd_istemp)
1100         {
1101                 xl_heap_insert xlrec;
1102                 xl_heap_header xlhdr;
1103                 XLogRecPtr      recptr;
1104                 XLogRecData rdata[3];
1105                 Page            page = BufferGetPage(buffer);
1106                 uint8           info = XLOG_HEAP_INSERT;
1107
1108                 xlrec.target.node = relation->rd_node;
1109                 xlrec.target.tid = tup->t_self;
1110                 rdata[0].buffer = InvalidBuffer;
1111                 rdata[0].data = (char *) &xlrec;
1112                 rdata[0].len = SizeOfHeapInsert;
1113                 rdata[0].next = &(rdata[1]);
1114
1115                 xlhdr.t_natts = tup->t_data->t_natts;
1116                 xlhdr.t_infomask = tup->t_data->t_infomask;
1117                 xlhdr.t_hoff = tup->t_data->t_hoff;
1118
1119                 /*
1120                  * note we mark rdata[1] as belonging to buffer; if XLogInsert
1121                  * decides to write the whole page to the xlog, we don't need to
1122                  * store xl_heap_header in the xlog.
1123                  */
1124                 rdata[1].buffer = buffer;
1125                 rdata[1].data = (char *) &xlhdr;
1126                 rdata[1].len = SizeOfHeapHeader;
1127                 rdata[1].next = &(rdata[2]);
1128
1129                 rdata[2].buffer = buffer;
1130                 /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
1131                 rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits);
1132                 rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits);
1133                 rdata[2].next = NULL;
1134
1135                 /*
1136                  * If this is the single and first tuple on page, we can reinit
1137                  * the page instead of restoring the whole thing.  Set flag, and
1138                  * hide buffer references from XLogInsert.
1139                  */
1140                 if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber &&
1141                         PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
1142                 {
1143                         info |= XLOG_HEAP_INIT_PAGE;
1144                         rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
1145                 }
1146
1147                 recptr = XLogInsert(RM_HEAP_ID, info, rdata);
1148
1149                 PageSetLSN(page, recptr);
1150                 PageSetTLI(page, ThisTimeLineID);
1151         }
1152         else
1153         {
1154                 /* No XLOG record, but still need to flag that XID exists on disk */
1155                 MyXactMadeTempRelUpdate = true;
1156         }
1157
1158         END_CRIT_SECTION();
1159
1160         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1161         WriteBuffer(buffer);
1162
1163         /*
1164          * If tuple is cachable, mark it for invalidation from the caches in
1165          * case we abort.  Note it is OK to do this after WriteBuffer releases
1166          * the buffer, because the "tup" data structure is all in local
1167          * memory, not in the shared buffer.
1168          */
1169         CacheInvalidateHeapTuple(relation, tup);
1170
1171         return HeapTupleGetOid(tup);
1172 }
1173
1174 /*
1175  *      simple_heap_insert - insert a tuple
1176  *
1177  * Currently, this routine differs from heap_insert only in supplying
1178  * a default command ID.  But it should be used rather than using
1179  * heap_insert directly in most places where we are modifying system catalogs.
1180  */
1181 Oid
1182 simple_heap_insert(Relation relation, HeapTuple tup)
1183 {
1184         return heap_insert(relation, tup, GetCurrentCommandId());
1185 }
1186
1187 /*
1188  *      heap_delete             - delete a tuple
1189  *
1190  * NB: do not call this directly unless you are prepared to deal with
1191  * concurrent-update conditions.  Use simple_heap_delete instead.
1192  *
1193  *      relation - table to be modified
1194  *      tid - TID of tuple to be deleted
1195  *      ctid - output parameter, used only for failure case (see below)
1196  *      cid - delete command ID to use in verifying tuple visibility
1197  *      crosscheck - if not InvalidSnapshot, also check tuple against this
1198  *      wait - true if should wait for any conflicting update to commit/abort
1199  *
1200  * Normal, successful return value is HeapTupleMayBeUpdated, which
1201  * actually means we did delete it.  Failure return codes are
1202  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1203  * (the last only possible if wait == false).  On a failure return,
1204  * *ctid is set to the ctid link of the target tuple (possibly a later
1205  * version of the row).
1206  */
1207 HTSU_Result
1208 heap_delete(Relation relation, ItemPointer tid,
1209                         ItemPointer ctid, CommandId cid,
1210                         Snapshot crosscheck, bool wait)
1211 {
1212         HTSU_Result     result;
1213         TransactionId xid = GetCurrentTransactionId();
1214         ItemId          lp;
1215         HeapTupleData tp;
1216         PageHeader      dp;
1217         Buffer          buffer;
1218         bool            have_tuple_lock = false;
1219
1220         Assert(ItemPointerIsValid(tid));
1221
1222         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1223         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1224
1225         dp = (PageHeader) BufferGetPage(buffer);
1226         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1227         tp.t_datamcxt = NULL;
1228         tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1229         tp.t_len = ItemIdGetLength(lp);
1230         tp.t_self = *tid;
1231         tp.t_tableOid = relation->rd_id;
1232
1233 l1:
1234         result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
1235
1236         if (result == HeapTupleInvisible)
1237         {
1238                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1239                 ReleaseBuffer(buffer);
1240                 elog(ERROR, "attempted to delete invisible tuple");
1241         }
1242         else if (result == HeapTupleBeingUpdated && wait)
1243         {
1244                 TransactionId xwait;
1245                 uint16  infomask;
1246
1247                 /* must copy state data before unlocking buffer */
1248                 xwait = HeapTupleHeaderGetXmax(tp.t_data);
1249                 infomask = tp.t_data->t_infomask;
1250
1251                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1252
1253                 /*
1254                  * Acquire tuple lock to establish our priority for the tuple
1255                  * (see heap_lock_tuple).  LockTuple will release us when we are
1256                  * next-in-line for the tuple.
1257                  *
1258                  * If we are forced to "start over" below, we keep the tuple lock;
1259                  * this arranges that we stay at the head of the line while
1260                  * rechecking tuple state.
1261                  */
1262                 if (!have_tuple_lock)
1263                 {
1264                         LockTuple(relation, &(tp.t_self), ExclusiveLock);
1265                         have_tuple_lock = true;
1266                 }
1267
1268                 /*
1269                  * Sleep until concurrent transaction ends.  Note that we don't care
1270                  * if the locker has an exclusive or shared lock, because we need
1271                  * exclusive.
1272                  */
1273
1274                 if (infomask & HEAP_XMAX_IS_MULTI)
1275                 {
1276                         /* wait for multixact */
1277                         MultiXactIdWait((MultiXactId) xwait);
1278                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1279
1280                         /*
1281                          * If xwait had just locked the tuple then some other xact could
1282                          * update this tuple before we get to this point.  Check for xmax
1283                          * change, and start over if so.
1284                          */
1285                         if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1286                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
1287                                                                          xwait))
1288                                 goto l1;
1289
1290                         /*
1291                          * You might think the multixact is necessarily done here, but
1292                          * not so: it could have surviving members, namely our own xact
1293                          * or other subxacts of this backend.  It is legal for us to
1294                          * delete the tuple in either case, however (the latter case is
1295                          * essentially a situation of upgrading our former shared lock
1296                          * to exclusive).  We don't bother changing the on-disk hint bits
1297                          * since we are about to overwrite the xmax altogether.
1298                          */
1299                 }
1300                 else
1301                 {
1302                         /* wait for regular transaction to end */
1303                         XactLockTableWait(xwait);
1304                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1305
1306                         /*
1307                          * xwait is done, but if xwait had just locked the tuple then some
1308                          * other xact could update this tuple before we get to this point.
1309                          * Check for xmax change, and start over if so.
1310                          */
1311                         if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1312                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
1313                                                                          xwait))
1314                                 goto l1;
1315
1316                         /* Otherwise we can mark it committed or aborted */
1317                         if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
1318                                                                                    HEAP_XMAX_INVALID)))
1319                         {
1320                                 if (TransactionIdDidCommit(xwait))
1321                                         tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1322                                 else
1323                                         tp.t_data->t_infomask |= HEAP_XMAX_INVALID;
1324                                 SetBufferCommitInfoNeedsSave(buffer);
1325                         }
1326                 }
1327
1328                 /*
1329                  * We may overwrite if previous xmax aborted, or if it committed
1330                  * but only locked the tuple without updating it.
1331                  */
1332                 if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1333                                                                          HEAP_IS_LOCKED))
1334                         result = HeapTupleMayBeUpdated;
1335                 else
1336                         result = HeapTupleUpdated;
1337         }
1338
1339         if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1340         {
1341                 /* Perform additional check for serializable RI updates */
1342                 if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck, buffer))
1343                         result = HeapTupleUpdated;
1344         }
1345
1346         if (result != HeapTupleMayBeUpdated)
1347         {
1348                 Assert(result == HeapTupleSelfUpdated ||
1349                            result == HeapTupleUpdated ||
1350                            result == HeapTupleBeingUpdated);
1351                 *ctid = tp.t_data->t_ctid;
1352                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1353                 ReleaseBuffer(buffer);
1354                 if (have_tuple_lock)
1355                         UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
1356                 return result;
1357         }
1358
1359         START_CRIT_SECTION();
1360
1361         /* store transaction information of xact deleting the tuple */
1362         tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1363                                                            HEAP_XMAX_INVALID |
1364                                                            HEAP_XMAX_IS_MULTI |
1365                                                            HEAP_IS_LOCKED |
1366                                                            HEAP_MOVED);
1367         HeapTupleHeaderSetXmax(tp.t_data, xid);
1368         HeapTupleHeaderSetCmax(tp.t_data, cid);
1369         /* Make sure there is no forward chain link in t_ctid */
1370         tp.t_data->t_ctid = tp.t_self;
1371
1372         /* XLOG stuff */
1373         if (!relation->rd_istemp)
1374         {
1375                 xl_heap_delete xlrec;
1376                 XLogRecPtr      recptr;
1377                 XLogRecData rdata[2];
1378
1379                 xlrec.target.node = relation->rd_node;
1380                 xlrec.target.tid = tp.t_self;
1381                 rdata[0].buffer = InvalidBuffer;
1382                 rdata[0].data = (char *) &xlrec;
1383                 rdata[0].len = SizeOfHeapDelete;
1384                 rdata[0].next = &(rdata[1]);
1385
1386                 rdata[1].buffer = buffer;
1387                 rdata[1].data = NULL;
1388                 rdata[1].len = 0;
1389                 rdata[1].next = NULL;
1390
1391                 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
1392
1393                 PageSetLSN(dp, recptr);
1394                 PageSetTLI(dp, ThisTimeLineID);
1395         }
1396         else
1397         {
1398                 /* No XLOG record, but still need to flag that XID exists on disk */
1399                 MyXactMadeTempRelUpdate = true;
1400         }
1401
1402         END_CRIT_SECTION();
1403
1404         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1405
1406         /*
1407          * If the tuple has toasted out-of-line attributes, we need to delete
1408          * those items too.  We have to do this before WriteBuffer because we
1409          * need to look at the contents of the tuple, but it's OK to release
1410          * the context lock on the buffer first.
1411          */
1412         if (HeapTupleHasExternal(&tp))
1413                 heap_tuple_toast_attrs(relation, NULL, &tp);
1414
1415         pgstat_count_heap_delete(&relation->pgstat_info);
1416
1417         /*
1418          * Mark tuple for invalidation from system caches at next command
1419          * boundary. We have to do this before WriteBuffer because we need to
1420          * look at the contents of the tuple, so we need to hold our refcount
1421          * on the buffer.
1422          */
1423         CacheInvalidateHeapTuple(relation, &tp);
1424
1425         WriteBuffer(buffer);
1426
1427         /*
1428          * Release the lmgr tuple lock, if we had it.
1429          */
1430         if (have_tuple_lock)
1431                 UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
1432
1433         return HeapTupleMayBeUpdated;
1434 }
1435
1436 /*
1437  *      simple_heap_delete - delete a tuple
1438  *
1439  * This routine may be used to delete a tuple when concurrent updates of
1440  * the target tuple are not expected (for example, because we have a lock
1441  * on the relation associated with the tuple).  Any failure is reported
1442  * via ereport().
1443  */
1444 void
1445 simple_heap_delete(Relation relation, ItemPointer tid)
1446 {
1447         ItemPointerData ctid;
1448         HTSU_Result             result;
1449
1450         result = heap_delete(relation, tid,
1451                                                  &ctid,
1452                                                  GetCurrentCommandId(), InvalidSnapshot,
1453                                                  true /* wait for commit */ );
1454         switch (result)
1455         {
1456                 case HeapTupleSelfUpdated:
1457                         /* Tuple was already updated in current command? */
1458                         elog(ERROR, "tuple already updated by self");
1459                         break;
1460
1461                 case HeapTupleMayBeUpdated:
1462                         /* done successfully */
1463                         break;
1464
1465                 case HeapTupleUpdated:
1466                         elog(ERROR, "tuple concurrently updated");
1467                         break;
1468
1469                 default:
1470                         elog(ERROR, "unrecognized heap_delete status: %u", result);
1471                         break;
1472         }
1473 }
1474
1475 /*
1476  *      heap_update - replace a tuple
1477  *
1478  * NB: do not call this directly unless you are prepared to deal with
1479  * concurrent-update conditions.  Use simple_heap_update instead.
1480  *
1481  *      relation - table to be modified
1482  *      otid - TID of old tuple to be replaced
1483  *      newtup - newly constructed tuple data to store
1484  *      ctid - output parameter, used only for failure case (see below)
1485  *      cid - update command ID to use in verifying old tuple visibility
1486  *      crosscheck - if not InvalidSnapshot, also check old tuple against this
1487  *      wait - true if should wait for any conflicting update to commit/abort
1488  *
1489  * Normal, successful return value is HeapTupleMayBeUpdated, which
1490  * actually means we *did* update it.  Failure return codes are
1491  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1492  * (the last only possible if wait == false).  On a failure return,
1493  * *ctid is set to the ctid link of the old tuple (possibly a later
1494  * version of the row).
1495  * On success, newtup->t_self is set to the TID where the new tuple
1496  * was inserted.
1497  */
1498 HTSU_Result
1499 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
1500                         ItemPointer ctid, CommandId cid,
1501                         Snapshot crosscheck, bool wait)
1502 {
1503         HTSU_Result     result;
1504         TransactionId xid = GetCurrentTransactionId();
1505         ItemId          lp;
1506         HeapTupleData oldtup;
1507         PageHeader      dp;
1508         Buffer          buffer,
1509                                 newbuf;
1510         bool            need_toast,
1511                                 already_marked;
1512         Size            newtupsize,
1513                                 pagefree;
1514         bool            have_tuple_lock = false;
1515
1516         Assert(ItemPointerIsValid(otid));
1517
1518         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
1519         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1520
1521         dp = (PageHeader) BufferGetPage(buffer);
1522         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));
1523
1524         oldtup.t_datamcxt = NULL;
1525         oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
1526         oldtup.t_len = ItemIdGetLength(lp);
1527         oldtup.t_self = *otid;
1528
1529         /*
1530          * Note: beyond this point, use oldtup not otid to refer to old tuple.
1531          * otid may very well point at newtup->t_self, which we will overwrite
1532          * with the new tuple's location, so there's great risk of confusion
1533          * if we use otid anymore.
1534          */
1535
1536 l2:
1537         result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid, buffer);
1538
1539         if (result == HeapTupleInvisible)
1540         {
1541                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1542                 ReleaseBuffer(buffer);
1543                 elog(ERROR, "attempted to update invisible tuple");
1544         }
1545         else if (result == HeapTupleBeingUpdated && wait)
1546         {
1547                 TransactionId xwait;
1548                 uint16  infomask;
1549
1550                 /* must copy state data before unlocking buffer */
1551                 xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
1552                 infomask = oldtup.t_data->t_infomask;
1553
1554                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1555
1556                 /*
1557                  * Acquire tuple lock to establish our priority for the tuple
1558                  * (see heap_lock_tuple).  LockTuple will release us when we are
1559                  * next-in-line for the tuple.
1560                  *
1561                  * If we are forced to "start over" below, we keep the tuple lock;
1562                  * this arranges that we stay at the head of the line while
1563                  * rechecking tuple state.
1564                  */
1565                 if (!have_tuple_lock)
1566                 {
1567                         LockTuple(relation, &(oldtup.t_self), ExclusiveLock);
1568                         have_tuple_lock = true;
1569                 }
1570
1571                 /*
1572                  * Sleep until concurrent transaction ends.  Note that we don't care
1573                  * if the locker has an exclusive or shared lock, because we need
1574                  * exclusive.
1575                  */
1576
1577                 if (infomask & HEAP_XMAX_IS_MULTI)
1578                 {
1579                         /* wait for multixact */
1580                         MultiXactIdWait((MultiXactId) xwait);
1581                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1582
1583                         /*
1584                          * If xwait had just locked the tuple then some other xact could
1585                          * update this tuple before we get to this point.  Check for xmax
1586                          * change, and start over if so.
1587                          */
1588                         if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1589                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
1590                                                                          xwait))
1591                                 goto l2;
1592
1593                         /*
1594                          * You might think the multixact is necessarily done here, but
1595                          * not so: it could have surviving members, namely our own xact
1596                          * or other subxacts of this backend.  It is legal for us to
1597                          * update the tuple in either case, however (the latter case is
1598                          * essentially a situation of upgrading our former shared lock
1599                          * to exclusive).  We don't bother changing the on-disk hint bits
1600                          * since we are about to overwrite the xmax altogether.
1601                          */
1602                 }
1603                 else
1604                 {
1605                         /* wait for regular transaction to end */
1606                         XactLockTableWait(xwait);
1607                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1608
1609                         /*
1610                          * xwait is done, but if xwait had just locked the tuple then some
1611                          * other xact could update this tuple before we get to this point.
1612                          * Check for xmax change, and start over if so.
1613                          */
1614                         if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1615                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
1616                                                                          xwait))
1617                                 goto l2;
1618
1619                         /* Otherwise we can mark it committed or aborted */
1620                         if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
1621                                                                                            HEAP_XMAX_INVALID)))
1622                         {
1623                                 if (TransactionIdDidCommit(xwait))
1624                                         oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1625                                 else
1626                                         oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID;
1627                                 SetBufferCommitInfoNeedsSave(buffer);
1628                         }
1629                 }
1630
1631                 /*
1632                  * We may overwrite if previous xmax aborted, or if it committed
1633                  * but only locked the tuple without updating it.
1634                  */
1635                 if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID |
1636                                                                                  HEAP_IS_LOCKED))
1637                         result = HeapTupleMayBeUpdated;
1638                 else
1639                         result = HeapTupleUpdated;
1640         }
1641
1642         if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1643         {
1644                 /* Perform additional check for serializable RI updates */
1645                 if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck, buffer))
1646                         result = HeapTupleUpdated;
1647         }
1648
1649         if (result != HeapTupleMayBeUpdated)
1650         {
1651                 Assert(result == HeapTupleSelfUpdated ||
1652                            result == HeapTupleUpdated ||
1653                            result == HeapTupleBeingUpdated);
1654                 *ctid = oldtup.t_data->t_ctid;
1655                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1656                 ReleaseBuffer(buffer);
1657                 if (have_tuple_lock)
1658                         UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
1659                 return result;
1660         }
1661
1662         /* Fill in OID and transaction status data for newtup */
1663         if (relation->rd_rel->relhasoids)
1664         {
1665 #ifdef NOT_USED
1666                 /* this is redundant with an Assert in HeapTupleSetOid */
1667                 Assert(newtup->t_data->t_infomask & HEAP_HASOID);
1668 #endif
1669                 HeapTupleSetOid(newtup, HeapTupleGetOid(&oldtup));
1670         }
1671         else
1672         {
1673                 /* check there is not space for an OID */
1674                 Assert(!(newtup->t_data->t_infomask & HEAP_HASOID));
1675         }
1676
1677         newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1678         newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1679         HeapTupleHeaderSetXmin(newtup->t_data, xid);
1680         HeapTupleHeaderSetCmin(newtup->t_data, cid);
1681         HeapTupleHeaderSetXmax(newtup->t_data, 0);      /* zero out Datum fields */
1682         HeapTupleHeaderSetCmax(newtup->t_data, 0);      /* for cleanliness */
1683
1684         /*
1685          * If the toaster needs to be activated, OR if the new tuple will not
1686          * fit on the same page as the old, then we need to release the
1687          * context lock (but not the pin!) on the old tuple's buffer while we
1688          * are off doing TOAST and/or table-file-extension work.  We must mark
1689          * the old tuple to show that it's already being updated, else other
1690          * processes may try to update it themselves.
1691          *
1692          * We need to invoke the toaster if there are already any out-of-line
1693          * toasted values present, or if the new tuple is over-threshold.
1694          */
1695         need_toast = (HeapTupleHasExternal(&oldtup) ||
1696                                   HeapTupleHasExternal(newtup) ||
1697                                   (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
1698
1699         newtupsize = MAXALIGN(newtup->t_len);
1700         pagefree = PageGetFreeSpace((Page) dp);
1701
1702         if (need_toast || newtupsize > pagefree)
1703         {
1704                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1705                                                                            HEAP_XMAX_INVALID |
1706                                                                            HEAP_XMAX_IS_MULTI |
1707                                                                            HEAP_IS_LOCKED |
1708                                                                            HEAP_MOVED);
1709                 HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1710                 HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1711                 already_marked = true;
1712                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1713
1714                 /* Let the toaster do its thing */
1715                 if (need_toast)
1716                 {
1717                         heap_tuple_toast_attrs(relation, newtup, &oldtup);
1718                         newtupsize = MAXALIGN(newtup->t_len);
1719                 }
1720
1721                 /*
1722                  * Now, do we need a new page for the tuple, or not?  This is a
1723                  * bit tricky since someone else could have added tuples to the
1724                  * page while we weren't looking.  We have to recheck the
1725                  * available space after reacquiring the buffer lock.  But don't
1726                  * bother to do that if the former amount of free space is still
1727                  * not enough; it's unlikely there's more free now than before.
1728                  *
1729                  * What's more, if we need to get a new page, we will need to acquire
1730                  * buffer locks on both old and new pages.      To avoid deadlock
1731                  * against some other backend trying to get the same two locks in
1732                  * the other order, we must be consistent about the order we get
1733                  * the locks in. We use the rule "lock the lower-numbered page of
1734                  * the relation first".  To implement this, we must do
1735                  * RelationGetBufferForTuple while not holding the lock on the old
1736                  * page, and we must rely on it to get the locks on both pages in
1737                  * the correct order.
1738                  */
1739                 if (newtupsize > pagefree)
1740                 {
1741                         /* Assume there's no chance to put newtup on same page. */
1742                         newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1743                                                                                            buffer);
1744                 }
1745                 else
1746                 {
1747                         /* Re-acquire the lock on the old tuple's page. */
1748                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1749                         /* Re-check using the up-to-date free space */
1750                         pagefree = PageGetFreeSpace((Page) dp);
1751                         if (newtupsize > pagefree)
1752                         {
1753                                 /*
1754                                  * Rats, it doesn't fit anymore.  We must now unlock and
1755                                  * relock to avoid deadlock.  Fortunately, this path
1756                                  * should seldom be taken.
1757                                  */
1758                                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1759                                 newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1760                                                                                                    buffer);
1761                         }
1762                         else
1763                         {
1764                                 /* OK, it fits here, so we're done. */
1765                                 newbuf = buffer;
1766                         }
1767                 }
1768         }
1769         else
1770         {
1771                 /* No TOAST work needed, and it'll fit on same page */
1772                 already_marked = false;
1773                 newbuf = buffer;
1774         }
1775
1776         pgstat_count_heap_update(&relation->pgstat_info);
1777
1778         /*
1779          * At this point newbuf and buffer are both pinned and locked, and
1780          * newbuf has enough space for the new tuple.  If they are the same
1781          * buffer, only one pin is held.
1782          */
1783
1784         /* NO EREPORT(ERROR) from here till changes are logged */
1785         START_CRIT_SECTION();
1786
1787         RelationPutHeapTuple(relation, newbuf, newtup);         /* insert new tuple */
1788
1789         if (!already_marked)
1790         {
1791                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1792                                                                            HEAP_XMAX_INVALID |
1793                                                                            HEAP_XMAX_IS_MULTI |
1794                                                                            HEAP_IS_LOCKED |
1795                                                                            HEAP_MOVED);
1796                 HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1797                 HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1798         }
1799
1800         /* record address of new tuple in t_ctid of old one */
1801         oldtup.t_data->t_ctid = newtup->t_self;
1802
1803         /* XLOG stuff */
1804         if (!relation->rd_istemp)
1805         {
1806                 XLogRecPtr      recptr = log_heap_update(relation, buffer, oldtup.t_self,
1807                                                                                          newbuf, newtup, false);
1808
1809                 if (newbuf != buffer)
1810                 {
1811                         PageSetLSN(BufferGetPage(newbuf), recptr);
1812                         PageSetTLI(BufferGetPage(newbuf), ThisTimeLineID);
1813                 }
1814                 PageSetLSN(BufferGetPage(buffer), recptr);
1815                 PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
1816         }
1817         else
1818         {
1819                 /* No XLOG record, but still need to flag that XID exists on disk */
1820                 MyXactMadeTempRelUpdate = true;
1821         }
1822
1823         END_CRIT_SECTION();
1824
1825         if (newbuf != buffer)
1826                 LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
1827         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1828
1829         /*
1830          * Mark old tuple for invalidation from system caches at next command
1831          * boundary. We have to do this before WriteBuffer because we need to
1832          * look at the contents of the tuple, so we need to hold our refcount.
1833          */
1834         CacheInvalidateHeapTuple(relation, &oldtup);
1835
1836         if (newbuf != buffer)
1837                 WriteBuffer(newbuf);
1838         WriteBuffer(buffer);
1839
1840         /*
1841          * If new tuple is cachable, mark it for invalidation from the caches
1842          * in case we abort.  Note it is OK to do this after WriteBuffer
1843          * releases the buffer, because the "newtup" data structure is all in
1844          * local memory, not in the shared buffer.
1845          */
1846         CacheInvalidateHeapTuple(relation, newtup);
1847
1848         /*
1849          * Release the lmgr tuple lock, if we had it.
1850          */
1851         if (have_tuple_lock)
1852                 UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
1853
1854         return HeapTupleMayBeUpdated;
1855 }
1856
1857 /*
1858  *      simple_heap_update - replace a tuple
1859  *
1860  * This routine may be used to update a tuple when concurrent updates of
1861  * the target tuple are not expected (for example, because we have a lock
1862  * on the relation associated with the tuple).  Any failure is reported
1863  * via ereport().
1864  */
1865 void
1866 simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
1867 {
1868         ItemPointerData ctid;
1869         HTSU_Result             result;
1870
1871         result = heap_update(relation, otid, tup,
1872                                                  &ctid,
1873                                                  GetCurrentCommandId(), InvalidSnapshot,
1874                                                  true /* wait for commit */ );
1875         switch (result)
1876         {
1877                 case HeapTupleSelfUpdated:
1878                         /* Tuple was already updated in current command? */
1879                         elog(ERROR, "tuple already updated by self");
1880                         break;
1881
1882                 case HeapTupleMayBeUpdated:
1883                         /* done successfully */
1884                         break;
1885
1886                 case HeapTupleUpdated:
1887                         elog(ERROR, "tuple concurrently updated");
1888                         break;
1889
1890                 default:
1891                         elog(ERROR, "unrecognized heap_update status: %u", result);
1892                         break;
1893         }
1894 }
1895
1896 /*
1897  *      heap_lock_tuple         - lock a tuple in shared or exclusive mode
1898  *
1899  * NOTES: because the shared-memory lock table is of finite size, but users
1900  * could reasonably want to lock large numbers of tuples, we do not rely on
1901  * the standard lock manager to store tuple-level locks over the long term.
1902  * Instead, a tuple is marked as locked by setting the current transaction's
1903  * XID as its XMAX, and setting additional infomask bits to distinguish this
1904  * usage from the more normal case of having deleted the tuple.  When
1905  * multiple transactions concurrently share-lock a tuple, the first locker's
1906  * XID is replaced in XMAX with a MultiTransactionId representing the set of
1907  * XIDs currently holding share-locks.
1908  *
1909  * When it is necessary to wait for a tuple-level lock to be released, the
1910  * basic delay is provided by XactLockTableWait or MultiXactIdWait on the
1911  * contents of the tuple's XMAX.  However, that mechanism will release all
1912  * waiters concurrently, so there would be a race condition as to which
1913  * waiter gets the tuple, potentially leading to indefinite starvation of
1914  * some waiters.  The possibility of share-locking makes the problem much
1915  * worse --- a steady stream of share-lockers can easily block an exclusive
1916  * locker forever.  To provide more reliable semantics about who gets a
1917  * tuple-level lock first, we use the standard lock manager.  The protocol
1918  * for waiting for a tuple-level lock is really
1919  *              LockTuple()
1920  *              XactLockTableWait()
1921  *              mark tuple as locked by me
1922  *              UnlockTuple()
1923  * When there are multiple waiters, arbitration of who is to get the lock next
1924  * is provided by LockTuple().  However, at most one tuple-level lock will
1925  * be held or awaited per backend at any time, so we don't risk overflow
1926  * of the lock table.  Note that incoming share-lockers are required to
1927  * do LockTuple as well, if there is any conflict, to ensure that they don't
1928  * starve out waiting exclusive-lockers.  However, if there is not any active
1929  * conflict for a tuple, we don't incur any extra overhead.
1930  */
1931 HTSU_Result
1932 heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
1933                                  CommandId cid, LockTupleMode mode)
1934 {
1935         HTSU_Result     result;
1936         ItemPointer tid = &(tuple->t_self);
1937         ItemId          lp;
1938         PageHeader      dp;
1939         TransactionId   xid;
1940         uint16          new_infomask;
1941         LOCKMODE        tuple_lock_type;
1942         bool            have_tuple_lock = false;
1943
1944         tuple_lock_type = (mode == LockTupleShared) ? ShareLock : ExclusiveLock;
1945
1946         *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1947         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1948
1949         dp = (PageHeader) BufferGetPage(*buffer);
1950         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1951         tuple->t_datamcxt = NULL;
1952         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1953         tuple->t_len = ItemIdGetLength(lp);
1954
1955 l3:
1956         result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
1957
1958         if (result == HeapTupleInvisible)
1959         {
1960                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1961                 ReleaseBuffer(*buffer);
1962                 elog(ERROR, "attempted to lock invisible tuple");
1963         }
1964         else if (result == HeapTupleBeingUpdated)
1965         {
1966                 TransactionId xwait;
1967                 uint16  infomask;
1968
1969                 /* must copy state data before unlocking buffer */
1970                 xwait = HeapTupleHeaderGetXmax(tuple->t_data);
1971                 infomask = tuple->t_data->t_infomask;
1972
1973                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1974
1975                 /*
1976                  * Acquire tuple lock to establish our priority for the tuple.
1977                  * LockTuple will release us when we are next-in-line for the
1978                  * tuple.  We must do this even if we are share-locking.
1979                  *
1980                  * If we are forced to "start over" below, we keep the tuple lock;
1981                  * this arranges that we stay at the head of the line while
1982                  * rechecking tuple state.
1983                  */
1984                 if (!have_tuple_lock)
1985                 {
1986                         LockTuple(relation, tid, tuple_lock_type);
1987                         have_tuple_lock = true;
1988                 }
1989
1990                 if (mode == LockTupleShared && (infomask & HEAP_XMAX_SHARED_LOCK))
1991                 {
1992                         /*
1993                          * Acquiring sharelock when there's at least one sharelocker
1994                          * already.  We need not wait for him/them to complete.
1995                          */
1996                         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1997
1998                         /*
1999                          * Make sure it's still a shared lock, else start over.  (It's
2000                          * OK if the ownership of the shared lock has changed, though.)
2001                          */
2002                         if (!(tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
2003                                 goto l3;
2004                 }
2005                 else if (infomask & HEAP_XMAX_IS_MULTI)
2006                 {
2007                         /* wait for multixact to end */
2008                         MultiXactIdWait((MultiXactId) xwait);
2009                         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
2010
2011                         /*
2012                          * If xwait had just locked the tuple then some other xact
2013                          * could update this tuple before we get to this point.
2014                          * Check for xmax change, and start over if so.
2015                          */
2016                         if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
2017                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
2018                                                                          xwait))
2019                                 goto l3;
2020
2021                         /*
2022                          * You might think the multixact is necessarily done here, but
2023                          * not so: it could have surviving members, namely our own xact
2024                          * or other subxacts of this backend.  It is legal for us to
2025                          * lock the tuple in either case, however.  We don't bother
2026                          * changing the on-disk hint bits since we are about to
2027                          * overwrite the xmax altogether.
2028                          */
2029                 }
2030                 else
2031                 {
2032                         /* wait for regular transaction to end */
2033                         XactLockTableWait(xwait);
2034                         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
2035
2036                         /*
2037                          * xwait is done, but if xwait had just locked the tuple then
2038                          * some other xact could update this tuple before we get to
2039                          * this point.  Check for xmax change, and start over if so.
2040                          */
2041                         if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
2042                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
2043                                                                          xwait))
2044                                 goto l3;
2045
2046                         /* Otherwise we can mark it committed or aborted */
2047                         if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
2048                                                                                            HEAP_XMAX_INVALID)))
2049                         {
2050                                 if (TransactionIdDidCommit(xwait))
2051                                         tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
2052                                 else
2053                                         tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
2054                                 SetBufferCommitInfoNeedsSave(*buffer);
2055                         }
2056                 }
2057
2058                 /*
2059                  * We may lock if previous xmax aborted, or if it committed
2060                  * but only locked the tuple without updating it.  The case where
2061                  * we didn't wait because we are joining an existing shared lock
2062                  * is correctly handled, too.
2063                  */
2064                 if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
2065                                                                                  HEAP_IS_LOCKED))
2066                         result = HeapTupleMayBeUpdated;
2067                 else
2068                         result = HeapTupleUpdated;
2069         }
2070
2071         if (result != HeapTupleMayBeUpdated)
2072         {
2073                 ItemPointerData newctid = tuple->t_data->t_ctid;
2074
2075                 Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
2076                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
2077                 if (have_tuple_lock)
2078                         UnlockTuple(relation, tid, tuple_lock_type);
2079                 /* can't overwrite t_self (== *tid) until after above Unlock */
2080                 tuple->t_self = newctid;
2081                 return result;
2082         }
2083
2084         /*
2085          * Compute the new xmax and infomask to store into the tuple.  Note we
2086          * do not modify the tuple just yet, because that would leave it in the
2087          * wrong state if multixact.c elogs.
2088          */
2089         xid = GetCurrentTransactionId();
2090
2091         new_infomask = tuple->t_data->t_infomask;
2092
2093         new_infomask &= ~(HEAP_XMAX_COMMITTED |
2094                                           HEAP_XMAX_INVALID |
2095                                           HEAP_XMAX_IS_MULTI |
2096                                           HEAP_IS_LOCKED |
2097                                           HEAP_MOVED);
2098
2099         if (mode == LockTupleShared)
2100         {
2101                 TransactionId   xmax = HeapTupleHeaderGetXmax(tuple->t_data);
2102                 uint16          old_infomask = tuple->t_data->t_infomask;
2103
2104                 /*
2105                  * If this is the first acquisition of a shared lock in the current
2106                  * transaction, set my per-backend OldestMemberMXactId setting.
2107                  * We can be certain that the transaction will never become a
2108                  * member of any older MultiXactIds than that.  (We have to do this
2109                  * even if we end up just using our own TransactionId below, since
2110                  * some other backend could incorporate our XID into a MultiXact
2111                  * immediately afterwards.)
2112                  */
2113                 MultiXactIdSetOldestMember();
2114
2115                 new_infomask |= HEAP_XMAX_SHARED_LOCK;
2116
2117                 /*
2118                  * Check to see if we need a MultiXactId because there are multiple
2119                  * lockers.
2120                  *
2121                  * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID
2122                  * bit if the xmax was a MultiXactId but it was not running anymore.
2123                  * There is a race condition, which is that the MultiXactId may have
2124                  * finished since then, but that uncommon case is handled within
2125                  * MultiXactIdExpand.
2126                  *
2127                  * There is a similar race condition possible when the old xmax was
2128                  * a regular TransactionId.  We test TransactionIdIsInProgress again
2129                  * just to narrow the window, but it's still possible to end up
2130                  * creating an unnecessary MultiXactId.  Fortunately this is harmless.
2131                  */
2132                 if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED)))
2133                 {
2134                         if (old_infomask & HEAP_XMAX_IS_MULTI)
2135                         {
2136                                 /*
2137                                  * If the XMAX is already a MultiXactId, then we need to
2138                                  * expand it to include our own TransactionId.
2139                                  */
2140                                 xid = MultiXactIdExpand(xmax, true, xid);
2141                                 new_infomask |= HEAP_XMAX_IS_MULTI;
2142                         }
2143                         else if (TransactionIdIsInProgress(xmax))
2144                         {
2145                                 if (TransactionIdEquals(xmax, xid))
2146                                 {
2147                                         /*
2148                                          * If the old locker is ourselves, we'll just mark the
2149                                          * tuple again with our own TransactionId.  However we
2150                                          * have to consider the possibility that we had
2151                                          * exclusive rather than shared lock before --- if so,
2152                                          * be careful to preserve the exclusivity of the lock.
2153                                          */
2154                                         if (!(old_infomask & HEAP_XMAX_SHARED_LOCK))
2155                                         {
2156                                                 new_infomask &= ~HEAP_XMAX_SHARED_LOCK;
2157                                                 new_infomask |= HEAP_XMAX_EXCL_LOCK;
2158                                                 mode = LockTupleExclusive;
2159                                         }
2160                                 }
2161                                 else
2162                                 {
2163                                         /*
2164                                          * If the Xmax is a valid TransactionId, then we need to
2165                                          * create a new MultiXactId that includes both the old
2166                                          * locker and our own TransactionId.
2167                                          */
2168                                         xid = MultiXactIdExpand(xmax, false, xid);
2169                                         new_infomask |= HEAP_XMAX_IS_MULTI;
2170                                 }
2171                         }
2172                         else
2173                         {
2174                                 /*
2175                                  * Can get here iff HeapTupleSatisfiesUpdate saw the old
2176                                  * xmax as running, but it finished before
2177                                  * TransactionIdIsInProgress() got to run.  Treat it like
2178                                  * there's no locker in the tuple.
2179                                  */
2180                         }
2181                 }
2182                 else
2183                 {
2184                         /*
2185                          * There was no previous locker, so just insert our own
2186                          * TransactionId.
2187                          */
2188                 }
2189         }
2190         else
2191         {
2192                 /* We want an exclusive lock on the tuple */
2193                 new_infomask |= HEAP_XMAX_EXCL_LOCK;
2194         }
2195
2196         START_CRIT_SECTION();
2197
2198         /*
2199          * Store transaction information of xact locking the tuple.
2200          *
2201          * Note: our CID is meaningless if storing a MultiXactId, but no harm
2202          * in storing it anyway.
2203          */
2204         tuple->t_data->t_infomask = new_infomask;
2205         HeapTupleHeaderSetXmax(tuple->t_data, xid);
2206         HeapTupleHeaderSetCmax(tuple->t_data, cid);
2207         /* Make sure there is no forward chain link in t_ctid */
2208         tuple->t_data->t_ctid = *tid;
2209
2210         /*
2211          * XLOG stuff.  You might think that we don't need an XLOG record because
2212          * there is no state change worth restoring after a crash.  You would be
2213          * wrong however: we have just written either a TransactionId or a
2214          * MultiXactId that may never have been seen on disk before, and we need
2215          * to make sure that there are XLOG entries covering those ID numbers.
2216          * Else the same IDs might be re-used after a crash, which would be
2217          * disastrous if this page made it to disk before the crash.  Essentially
2218          * we have to enforce the WAL log-before-data rule even in this case.
2219          */
2220         if (!relation->rd_istemp)
2221         {
2222                 xl_heap_lock xlrec;
2223                 XLogRecPtr      recptr;
2224                 XLogRecData rdata[2];
2225
2226                 xlrec.target.node = relation->rd_node;
2227                 xlrec.target.tid = tuple->t_self;
2228                 xlrec.shared_lock = (mode == LockTupleShared);
2229                 rdata[0].buffer = InvalidBuffer;
2230                 rdata[0].data = (char *) &xlrec;
2231                 rdata[0].len = SizeOfHeapLock;
2232                 rdata[0].next = &(rdata[1]);
2233
2234                 rdata[1].buffer = *buffer;
2235                 rdata[1].data = NULL;
2236                 rdata[1].len = 0;
2237                 rdata[1].next = NULL;
2238
2239                 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
2240
2241                 PageSetLSN(dp, recptr);
2242                 PageSetTLI(dp, ThisTimeLineID);
2243         }
2244         else
2245         {
2246                 /* No XLOG record, but still need to flag that XID exists on disk */
2247                 MyXactMadeTempRelUpdate = true;
2248         }
2249
2250         END_CRIT_SECTION();
2251
2252         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
2253
2254         WriteNoReleaseBuffer(*buffer);
2255
2256         /*
2257          * Now that we have successfully marked the tuple as locked, we can
2258          * release the lmgr tuple lock, if we had it.
2259          */
2260         if (have_tuple_lock)
2261                 UnlockTuple(relation, tid, tuple_lock_type);
2262
2263         return HeapTupleMayBeUpdated;
2264 }
2265
2266 /* ----------------
2267  *              heap_markpos    - mark scan position
2268  * ----------------
2269  */
2270 void
2271 heap_markpos(HeapScanDesc scan)
2272 {
2273         /* Note: no locking manipulations needed */
2274
2275         if (scan->rs_ctup.t_data != NULL)
2276                 scan->rs_mctid = scan->rs_ctup.t_self;
2277         else
2278                 ItemPointerSetInvalid(&scan->rs_mctid);
2279 }
2280
2281 /* ----------------
2282  *              heap_restrpos   - restore position to marked location
2283  * ----------------
2284  */
2285 void
2286 heap_restrpos(HeapScanDesc scan)
2287 {
2288         /* XXX no amrestrpos checking that ammarkpos called */
2289
2290         /* Note: no locking manipulations needed */
2291
2292         /*
2293          * unpin scan buffers
2294          */
2295         if (BufferIsValid(scan->rs_cbuf))
2296                 ReleaseBuffer(scan->rs_cbuf);
2297         scan->rs_cbuf = InvalidBuffer;
2298
2299         if (!ItemPointerIsValid(&scan->rs_mctid))
2300         {
2301                 scan->rs_ctup.t_datamcxt = NULL;
2302                 scan->rs_ctup.t_data = NULL;
2303         }
2304         else
2305         {
2306                 scan->rs_ctup.t_self = scan->rs_mctid;
2307                 scan->rs_ctup.t_datamcxt = NULL;
2308                 scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;   /* for heapgettup */
2309                 heapgettup(scan->rs_rd,
2310                                    0,
2311                                    &(scan->rs_ctup),
2312                                    &(scan->rs_cbuf),
2313                                    scan->rs_snapshot,
2314                                    0,
2315                                    NULL,
2316                                    scan->rs_nblocks);
2317         }
2318 }
2319
2320 XLogRecPtr
2321 log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
2322 {
2323         xl_heap_clean xlrec;
2324         XLogRecPtr      recptr;
2325         XLogRecData rdata[2];
2326
2327         /* Caller should not call me on a temp relation */
2328         Assert(!reln->rd_istemp);
2329
2330         xlrec.node = reln->rd_node;
2331         xlrec.block = BufferGetBlockNumber(buffer);
2332
2333         rdata[0].buffer = InvalidBuffer;
2334         rdata[0].data = (char *) &xlrec;
2335         rdata[0].len = SizeOfHeapClean;
2336         rdata[0].next = &(rdata[1]);
2337
2338         /*
2339          * The unused-offsets array is not actually in the buffer, but pretend
2340          * that it is.  When XLogInsert stores the whole buffer, the offsets
2341          * array need not be stored too.
2342          */
2343         rdata[1].buffer = buffer;
2344         if (uncnt > 0)
2345         {
2346                 rdata[1].data = (char *) unused;
2347                 rdata[1].len = uncnt * sizeof(OffsetNumber);
2348         }
2349         else
2350         {
2351                 rdata[1].data = NULL;
2352                 rdata[1].len = 0;
2353         }
2354         rdata[1].next = NULL;
2355
2356         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);
2357
2358         return (recptr);
2359 }
2360
2361 static XLogRecPtr
2362 log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
2363                                 Buffer newbuf, HeapTuple newtup, bool move)
2364 {
2365         /*
2366          * Note: xlhdr is declared to have adequate size and correct alignment
2367          * for an xl_heap_header.  However the two tids, if present at all,
2368          * will be packed in with no wasted space after the xl_heap_header;
2369          * they aren't necessarily aligned as implied by this struct
2370          * declaration.
2371          */
2372         struct
2373         {
2374                 xl_heap_header hdr;
2375                 TransactionId tid1;
2376                 TransactionId tid2;
2377         }                       xlhdr;
2378         int                     hsize = SizeOfHeapHeader;
2379         xl_heap_update xlrec;
2380         XLogRecPtr      recptr;
2381         XLogRecData rdata[4];
2382         Page            page = BufferGetPage(newbuf);
2383         uint8           info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
2384
2385         /* Caller should not call me on a temp relation */
2386         Assert(!reln->rd_istemp);
2387
2388         xlrec.target.node = reln->rd_node;
2389         xlrec.target.tid = from;
2390         xlrec.newtid = newtup->t_self;
2391         rdata[0].buffer = InvalidBuffer;
2392         rdata[0].data = (char *) &xlrec;
2393         rdata[0].len = SizeOfHeapUpdate;
2394         rdata[0].next = &(rdata[1]);
2395
2396         rdata[1].buffer = oldbuf;
2397         rdata[1].data = NULL;
2398         rdata[1].len = 0;
2399         rdata[1].next = &(rdata[2]);
2400
2401         xlhdr.hdr.t_natts = newtup->t_data->t_natts;
2402         xlhdr.hdr.t_infomask = newtup->t_data->t_infomask;
2403         xlhdr.hdr.t_hoff = newtup->t_data->t_hoff;
2404         if (move)                                       /* remember xmax & xmin */
2405         {
2406                 TransactionId xid[2];   /* xmax, xmin */
2407
2408                 if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED))
2409                         xid[0] = InvalidTransactionId;
2410                 else
2411                         xid[0] = HeapTupleHeaderGetXmax(newtup->t_data);
2412                 xid[1] = HeapTupleHeaderGetXmin(newtup->t_data);
2413                 memcpy((char *) &xlhdr + hsize,
2414                            (char *) xid,
2415                            2 * sizeof(TransactionId));
2416                 hsize += 2 * sizeof(TransactionId);
2417         }
2418
2419         /*
2420          * As with insert records, we need not store the rdata[2] segment if
2421          * we decide to store the whole buffer instead.
2422          */
2423         rdata[2].buffer = newbuf;
2424         rdata[2].data = (char *) &xlhdr;
2425         rdata[2].len = hsize;
2426         rdata[2].next = &(rdata[3]);
2427
2428         rdata[3].buffer = newbuf;
2429         /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
2430         rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
2431         rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
2432         rdata[3].next = NULL;
2433
2434         /* If new tuple is the single and first tuple on page... */
2435         if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
2436                 PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
2437         {
2438                 info |= XLOG_HEAP_INIT_PAGE;
2439                 rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
2440         }
2441
2442         recptr = XLogInsert(RM_HEAP_ID, info, rdata);
2443
2444         return (recptr);
2445 }
2446
2447 XLogRecPtr
2448 log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
2449                           Buffer newbuf, HeapTuple newtup)
2450 {
2451         return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true));
2452 }
2453
2454 static void
2455 heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record)
2456 {
2457         xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
2458         Relation        reln;
2459         Buffer          buffer;
2460         Page            page;
2461
2462         if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2463                 return;
2464
2465         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
2466         if (!RelationIsValid(reln))
2467                 return;
2468
2469         buffer = XLogReadBuffer(false, reln, xlrec->block);
2470         if (!BufferIsValid(buffer))
2471                 elog(PANIC, "heap_clean_redo: no block");
2472
2473         page = (Page) BufferGetPage(buffer);
2474         if (PageIsNew((PageHeader) page))
2475                 elog(PANIC, "heap_clean_redo: uninitialized page");
2476
2477         if (XLByteLE(lsn, PageGetLSN(page)))
2478         {
2479                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2480                 ReleaseBuffer(buffer);
2481                 return;
2482         }
2483
2484         if (record->xl_len > SizeOfHeapClean)
2485         {
2486                 OffsetNumber *unused;
2487                 OffsetNumber *unend;
2488                 ItemId          lp;
2489
2490                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
2491                 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
2492
2493                 while (unused < unend)
2494                 {
2495                         lp = PageGetItemId(page, *unused + 1);
2496                         lp->lp_flags &= ~LP_USED;
2497                         unused++;
2498                 }
2499         }
2500
2501         PageRepairFragmentation(page, NULL);
2502
2503         PageSetLSN(page, lsn);
2504         PageSetTLI(page, ThisTimeLineID);
2505         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2506         WriteBuffer(buffer);
2507 }
2508
2509 static void
2510 heap_xlog_newpage(bool redo, XLogRecPtr lsn, XLogRecord *record)
2511 {
2512         xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
2513         Relation        reln;
2514         Buffer          buffer;
2515         Page            page;
2516
2517         /*
2518          * Note: the NEWPAGE log record is used for both heaps and indexes, so
2519          * do not do anything that assumes we are touching a heap.
2520          */
2521
2522         if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2523                 return;
2524
2525         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
2526         if (!RelationIsValid(reln))
2527                 return;
2528         buffer = XLogReadBuffer(true, reln, xlrec->blkno);
2529         if (!BufferIsValid(buffer))
2530                 elog(PANIC, "heap_newpage_redo: no block");
2531         page = (Page) BufferGetPage(buffer);
2532
2533         Assert(record->xl_len == SizeOfHeapNewpage + BLCKSZ);
2534         memcpy(page, (char *) xlrec + SizeOfHeapNewpage, BLCKSZ);
2535
2536         PageSetLSN(page, lsn);
2537         PageSetTLI(page, ThisTimeLineID);
2538         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2539         WriteBuffer(buffer);
2540 }
2541
2542 static void
2543 heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
2544 {
2545         xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
2546         Relation        reln;
2547         Buffer          buffer;
2548         Page            page;
2549         OffsetNumber offnum;
2550         ItemId          lp = NULL;
2551         HeapTupleHeader htup;
2552
2553         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2554                 return;
2555
2556         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2557
2558         if (!RelationIsValid(reln))
2559                 return;
2560
2561         buffer = XLogReadBuffer(false, reln,
2562                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2563         if (!BufferIsValid(buffer))
2564                 elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un");
2565
2566         page = (Page) BufferGetPage(buffer);
2567         if (PageIsNew((PageHeader) page))
2568                 elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un");
2569
2570         if (redo)
2571         {
2572                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2573                 {
2574                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2575                         ReleaseBuffer(buffer);
2576                         return;
2577                 }
2578         }
2579         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2580                                                                                                  * ?! */
2581                 elog(PANIC, "heap_delete_undo: bad page LSN");
2582
2583         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2584         if (PageGetMaxOffsetNumber(page) >= offnum)
2585                 lp = PageGetItemId(page, offnum);
2586
2587         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2588                 elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un");
2589
2590         htup = (HeapTupleHeader) PageGetItem(page, lp);
2591
2592         if (redo)
2593         {
2594                 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2595                                                           HEAP_XMAX_INVALID |
2596                                                           HEAP_XMAX_IS_MULTI |
2597                                                           HEAP_IS_LOCKED |
2598                                                           HEAP_MOVED);
2599                 HeapTupleHeaderSetXmax(htup, record->xl_xid);
2600                 HeapTupleHeaderSetCmax(htup, FirstCommandId);
2601                 /* Make sure there is no forward chain link in t_ctid */
2602                 htup->t_ctid = xlrec->target.tid;
2603                 PageSetLSN(page, lsn);
2604                 PageSetTLI(page, ThisTimeLineID);
2605                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2606                 WriteBuffer(buffer);
2607                 return;
2608         }
2609
2610         elog(PANIC, "heap_delete_undo: unimplemented");
2611 }
2612
2613 static void
2614 heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
2615 {
2616         xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
2617         Relation        reln;
2618         Buffer          buffer;
2619         Page            page;
2620         OffsetNumber offnum;
2621
2622         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2623                 return;
2624
2625         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2626
2627         if (!RelationIsValid(reln))
2628                 return;
2629
2630         buffer = XLogReadBuffer((redo) ? true : false, reln,
2631                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2632         if (!BufferIsValid(buffer))
2633                 return;
2634
2635         page = (Page) BufferGetPage(buffer);
2636         if (PageIsNew((PageHeader) page) &&
2637                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2638                 elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2639
2640         if (redo)
2641         {
2642                 struct
2643                 {
2644                         HeapTupleHeaderData hdr;
2645                         char            data[MaxTupleSize];
2646                 }                       tbuf;
2647                 HeapTupleHeader htup;
2648                 xl_heap_header xlhdr;
2649                 uint32          newlen;
2650
2651                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2652                         PageInit(page, BufferGetPageSize(buffer), 0);
2653
2654                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2655                 {
2656                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2657                         ReleaseBuffer(buffer);
2658                         return;
2659                 }
2660
2661                 offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2662                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2663                         elog(PANIC, "heap_insert_redo: invalid max offset number");
2664
2665                 newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
2666                 Assert(newlen <= MaxTupleSize);
2667                 memcpy((char *) &xlhdr,
2668                            (char *) xlrec + SizeOfHeapInsert,
2669                            SizeOfHeapHeader);
2670                 htup = &tbuf.hdr;
2671                 MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
2672                 /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2673                 memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2674                            (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
2675                            newlen);
2676                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2677                 htup->t_natts = xlhdr.t_natts;
2678                 htup->t_infomask = xlhdr.t_infomask;
2679                 htup->t_hoff = xlhdr.t_hoff;
2680                 HeapTupleHeaderSetXmin(htup, record->xl_xid);
2681                 HeapTupleHeaderSetCmin(htup, FirstCommandId);
2682                 htup->t_ctid = xlrec->target.tid;
2683
2684                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2685                                                          LP_USED | OverwritePageMode);
2686                 if (offnum == InvalidOffsetNumber)
2687                         elog(PANIC, "heap_insert_redo: failed to add tuple");
2688                 PageSetLSN(page, lsn);
2689                 PageSetTLI(page, ThisTimeLineID);
2690                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2691                 WriteBuffer(buffer);
2692                 return;
2693         }
2694
2695         /* undo insert */
2696         if (XLByteLT(PageGetLSN(page), lsn))            /* changes are not applied
2697                                                                                                  * ?! */
2698                 elog(PANIC, "heap_insert_undo: bad page LSN");
2699
2700         elog(PANIC, "heap_insert_undo: unimplemented");
2701 }
2702
2703 /*
2704  * Handles UPDATE & MOVE
2705  */
2706 static void
2707 heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
2708 {
2709         xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
2710         Relation        reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2711         Buffer          buffer;
2712         bool            samepage =
2713         (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
2714          ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2715         Page            page;
2716         OffsetNumber offnum;
2717         ItemId          lp = NULL;
2718         HeapTupleHeader htup;
2719
2720         if (!RelationIsValid(reln))
2721                 return;
2722
2723         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2724                 goto newt;
2725
2726         /* Deal with old tuple version */
2727
2728         buffer = XLogReadBuffer(false, reln,
2729                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2730         if (!BufferIsValid(buffer))
2731                 elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un");
2732
2733         page = (Page) BufferGetPage(buffer);
2734         if (PageIsNew((PageHeader) page))
2735                 elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un");
2736
2737         if (redo)
2738         {
2739                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2740                 {
2741                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2742                         ReleaseBuffer(buffer);
2743                         if (samepage)
2744                                 return;
2745                         goto newt;
2746                 }
2747         }
2748         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2749                                                                                                  * ?! */
2750                 elog(PANIC, "heap_update_undo: bad old tuple page LSN");
2751
2752         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2753         if (PageGetMaxOffsetNumber(page) >= offnum)
2754                 lp = PageGetItemId(page, offnum);
2755
2756         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2757                 elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un");
2758
2759         htup = (HeapTupleHeader) PageGetItem(page, lp);
2760
2761         if (redo)
2762         {
2763                 if (move)
2764                 {
2765                         htup->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2766                                                                   HEAP_XMIN_INVALID |
2767                                                                   HEAP_MOVED_IN);
2768                         htup->t_infomask |= HEAP_MOVED_OFF;
2769                         HeapTupleHeaderSetXvac(htup, record->xl_xid);
2770                         /* Make sure there is no forward chain link in t_ctid */
2771                         htup->t_ctid = xlrec->target.tid;
2772                 }
2773                 else
2774                 {
2775                         htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2776                                                                   HEAP_XMAX_INVALID |
2777                                                                   HEAP_XMAX_IS_MULTI |
2778                                                                   HEAP_IS_LOCKED |
2779                                                                   HEAP_MOVED);
2780                         HeapTupleHeaderSetXmax(htup, record->xl_xid);
2781                         HeapTupleHeaderSetCmax(htup, FirstCommandId);
2782                         /* Set forward chain link in t_ctid */
2783                         htup->t_ctid = xlrec->newtid;
2784                 }
2785                 if (samepage)
2786                         goto newsame;
2787                 PageSetLSN(page, lsn);
2788                 PageSetTLI(page, ThisTimeLineID);
2789                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2790                 WriteBuffer(buffer);
2791                 goto newt;
2792         }
2793
2794         elog(PANIC, "heap_update_undo: unimplemented");
2795
2796         /* Deal with new tuple */
2797
2798 newt:;
2799
2800         if (redo &&
2801                 ((record->xl_info & XLR_BKP_BLOCK_2) ||
2802                  ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)))
2803                 return;
2804
2805         buffer = XLogReadBuffer((redo) ? true : false, reln,
2806                                                         ItemPointerGetBlockNumber(&(xlrec->newtid)));
2807         if (!BufferIsValid(buffer))
2808                 return;
2809
2810         page = (Page) BufferGetPage(buffer);
2811
2812 newsame:;
2813         if (PageIsNew((PageHeader) page) &&
2814                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2815                 elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un");
2816
2817         if (redo)
2818         {
2819                 struct
2820                 {
2821                         HeapTupleHeaderData hdr;
2822                         char            data[MaxTupleSize];
2823                 }                       tbuf;
2824                 xl_heap_header xlhdr;
2825                 int                     hsize;
2826                 uint32          newlen;
2827
2828                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2829                         PageInit(page, BufferGetPageSize(buffer), 0);
2830
2831                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2832                 {
2833                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2834                         ReleaseBuffer(buffer);
2835                         return;
2836                 }
2837
2838                 offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
2839                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2840                         elog(PANIC, "heap_update_redo: invalid max offset number");
2841
2842                 hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
2843                 if (move)
2844                         hsize += (2 * sizeof(TransactionId));
2845
2846                 newlen = record->xl_len - hsize;
2847                 Assert(newlen <= MaxTupleSize);
2848                 memcpy((char *) &xlhdr,
2849                            (char *) xlrec + SizeOfHeapUpdate,
2850                            SizeOfHeapHeader);
2851                 htup = &tbuf.hdr;
2852                 MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
2853                 /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2854                 memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2855                            (char *) xlrec + hsize,
2856                            newlen);
2857                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2858                 htup->t_natts = xlhdr.t_natts;
2859                 htup->t_infomask = xlhdr.t_infomask;
2860                 htup->t_hoff = xlhdr.t_hoff;
2861
2862                 if (move)
2863                 {
2864                         TransactionId xid[2];           /* xmax, xmin */
2865
2866                         memcpy((char *) xid,
2867                                    (char *) xlrec + SizeOfHeapUpdate + SizeOfHeapHeader,
2868                                    2 * sizeof(TransactionId));
2869                         HeapTupleHeaderSetXmin(htup, xid[1]);
2870                         HeapTupleHeaderSetXmax(htup, xid[0]);
2871                         HeapTupleHeaderSetXvac(htup, record->xl_xid);
2872                 }
2873                 else
2874                 {
2875                         HeapTupleHeaderSetXmin(htup, record->xl_xid);
2876                         HeapTupleHeaderSetCmin(htup, FirstCommandId);
2877                 }
2878                 /* Make sure there is no forward chain link in t_ctid */
2879                 htup->t_ctid = xlrec->newtid;
2880
2881                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2882                                                          LP_USED | OverwritePageMode);
2883                 if (offnum == InvalidOffsetNumber)
2884                         elog(PANIC, "heap_update_redo: failed to add tuple");
2885                 PageSetLSN(page, lsn);
2886                 PageSetTLI(page, ThisTimeLineID);
2887                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2888                 WriteBuffer(buffer);
2889                 return;
2890         }
2891
2892         /* undo */
2893         if (XLByteLT(PageGetLSN(page), lsn))            /* changes not applied?! */
2894                 elog(PANIC, "heap_update_undo: bad new tuple page LSN");
2895
2896         elog(PANIC, "heap_update_undo: unimplemented");
2897
2898 }
2899
2900 static void
2901 heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record)
2902 {
2903         xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
2904         Relation        reln;
2905         Buffer          buffer;
2906         Page            page;
2907         OffsetNumber offnum;
2908         ItemId          lp = NULL;
2909         HeapTupleHeader htup;
2910
2911         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2912                 return;
2913
2914         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2915
2916         if (!RelationIsValid(reln))
2917                 return;
2918
2919         buffer = XLogReadBuffer(false, reln,
2920                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2921         if (!BufferIsValid(buffer))
2922                 elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un");
2923
2924         page = (Page) BufferGetPage(buffer);
2925         if (PageIsNew((PageHeader) page))
2926                 elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un");
2927
2928         if (redo)
2929         {
2930                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2931                 {
2932                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2933                         ReleaseBuffer(buffer);
2934                         return;
2935                 }
2936         }
2937         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2938                                                                                                  * ?! */
2939                 elog(PANIC, "heap_lock_undo: bad page LSN");
2940
2941         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2942         if (PageGetMaxOffsetNumber(page) >= offnum)
2943                 lp = PageGetItemId(page, offnum);
2944
2945         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2946                 elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un");
2947
2948         htup = (HeapTupleHeader) PageGetItem(page, lp);
2949
2950         if (redo)
2951         {
2952                 /*
2953                  * Presently, we don't bother to restore the locked state, but
2954                  * just set the XMAX_INVALID bit.
2955                  */
2956                 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2957                                                           HEAP_XMAX_INVALID |
2958                                                           HEAP_XMAX_IS_MULTI |
2959                                                           HEAP_IS_LOCKED |
2960                                                           HEAP_MOVED);
2961                 htup->t_infomask |= HEAP_XMAX_INVALID;
2962                 HeapTupleHeaderSetXmax(htup, record->xl_xid);
2963                 HeapTupleHeaderSetCmax(htup, FirstCommandId);
2964                 /* Make sure there is no forward chain link in t_ctid */
2965                 htup->t_ctid = xlrec->target.tid;
2966                 PageSetLSN(page, lsn);
2967                 PageSetTLI(page, ThisTimeLineID);
2968                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2969                 WriteBuffer(buffer);
2970                 return;
2971         }
2972
2973         elog(PANIC, "heap_lock_undo: unimplemented");
2974 }
2975
2976 void
2977 heap_redo(XLogRecPtr lsn, XLogRecord *record)
2978 {
2979         uint8           info = record->xl_info & ~XLR_INFO_MASK;
2980
2981         info &= XLOG_HEAP_OPMASK;
2982         if (info == XLOG_HEAP_INSERT)
2983                 heap_xlog_insert(true, lsn, record);
2984         else if (info == XLOG_HEAP_DELETE)
2985                 heap_xlog_delete(true, lsn, record);
2986         else if (info == XLOG_HEAP_UPDATE)
2987                 heap_xlog_update(true, lsn, record, false);
2988         else if (info == XLOG_HEAP_MOVE)
2989                 heap_xlog_update(true, lsn, record, true);
2990         else if (info == XLOG_HEAP_CLEAN)
2991                 heap_xlog_clean(true, lsn, record);
2992         else if (info == XLOG_HEAP_NEWPAGE)
2993                 heap_xlog_newpage(true, lsn, record);
2994         else if (info == XLOG_HEAP_LOCK)
2995                 heap_xlog_lock(true, lsn, record);
2996         else
2997                 elog(PANIC, "heap_redo: unknown op code %u", info);
2998 }
2999
3000 void
3001 heap_undo(XLogRecPtr lsn, XLogRecord *record)
3002 {
3003         uint8           info = record->xl_info & ~XLR_INFO_MASK;
3004
3005         info &= XLOG_HEAP_OPMASK;
3006         if (info == XLOG_HEAP_INSERT)
3007                 heap_xlog_insert(false, lsn, record);
3008         else if (info == XLOG_HEAP_DELETE)
3009                 heap_xlog_delete(false, lsn, record);
3010         else if (info == XLOG_HEAP_UPDATE)
3011                 heap_xlog_update(false, lsn, record, false);
3012         else if (info == XLOG_HEAP_MOVE)
3013                 heap_xlog_update(false, lsn, record, true);
3014         else if (info == XLOG_HEAP_CLEAN)
3015                 heap_xlog_clean(false, lsn, record);
3016         else if (info == XLOG_HEAP_NEWPAGE)
3017                 heap_xlog_newpage(false, lsn, record);
3018         else if (info == XLOG_HEAP_LOCK)
3019                 heap_xlog_lock(false, lsn, record);
3020         else
3021                 elog(PANIC, "heap_undo: unknown op code %u", info);
3022 }
3023
3024 static void
3025 out_target(char *buf, xl_heaptid *target)
3026 {
3027         sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
3028                  target->node.spcNode, target->node.dbNode, target->node.relNode,
3029                         ItemPointerGetBlockNumber(&(target->tid)),
3030                         ItemPointerGetOffsetNumber(&(target->tid)));
3031 }
3032
3033 void
3034 heap_desc(char *buf, uint8 xl_info, char *rec)
3035 {
3036         uint8           info = xl_info & ~XLR_INFO_MASK;
3037
3038         info &= XLOG_HEAP_OPMASK;
3039         if (info == XLOG_HEAP_INSERT)
3040         {
3041                 xl_heap_insert *xlrec = (xl_heap_insert *) rec;
3042
3043                 strcat(buf, "insert: ");
3044                 out_target(buf, &(xlrec->target));
3045         }
3046         else if (info == XLOG_HEAP_DELETE)
3047         {
3048                 xl_heap_delete *xlrec = (xl_heap_delete *) rec;
3049
3050                 strcat(buf, "delete: ");
3051                 out_target(buf, &(xlrec->target));
3052         }
3053         else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
3054         {
3055                 xl_heap_update *xlrec = (xl_heap_update *) rec;
3056
3057                 if (info == XLOG_HEAP_UPDATE)
3058                         strcat(buf, "update: ");
3059                 else
3060                         strcat(buf, "move: ");
3061                 out_target(buf, &(xlrec->target));
3062                 sprintf(buf + strlen(buf), "; new %u/%u",
3063                                 ItemPointerGetBlockNumber(&(xlrec->newtid)),
3064                                 ItemPointerGetOffsetNumber(&(xlrec->newtid)));
3065         }
3066         else if (info == XLOG_HEAP_CLEAN)
3067         {
3068                 xl_heap_clean *xlrec = (xl_heap_clean *) rec;
3069
3070                 sprintf(buf + strlen(buf), "clean: rel %u/%u/%u; blk %u",
3071                                 xlrec->node.spcNode, xlrec->node.dbNode,
3072                                 xlrec->node.relNode, xlrec->block);
3073         }
3074         else if (info == XLOG_HEAP_NEWPAGE)
3075         {
3076                 xl_heap_newpage *xlrec = (xl_heap_newpage *) rec;
3077
3078                 sprintf(buf + strlen(buf), "newpage: rel %u/%u/%u; blk %u",
3079                                 xlrec->node.spcNode, xlrec->node.dbNode,
3080                                 xlrec->node.relNode, xlrec->blkno);
3081         }
3082         else if (info == XLOG_HEAP_LOCK)
3083         {
3084                 xl_heap_lock *xlrec = (xl_heap_lock *) rec;
3085
3086                 if (xlrec->shared_lock)
3087                         strcat(buf, "shared_lock: ");
3088                 else
3089                         strcat(buf, "exclusive_lock: ");
3090                 out_target(buf, &(xlrec->target));
3091         }
3092         else
3093                 strcat(buf, "UNKNOWN");
3094 }