]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/heapam.c
ee604df2caeb9153bb7345e607b052dd00acb4a5
[postgresql] / src / backend / access / heap / heapam.c
1 /*-------------------------------------------------------------------------
2  *
3  * heapam.c
4  *        heap access method code
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              relation_open   - open any relation by relation OID
16  *              relation_openrv - open any relation specified by a RangeVar
17  *              relation_close  - close any relation
18  *              heap_open               - open a heap relation by relation OID
19  *              heap_openrv             - open a heap relation specified by a RangeVar
20  *              heap_close              - (now just a macro for relation_close)
21  *              heap_beginscan  - begin relation scan
22  *              heap_rescan             - restart a relation scan
23  *              heap_endscan    - end relation scan
24  *              heap_getnext    - retrieve next tuple in scan
25  *              heap_fetch              - retrieve tuple with tid
26  *              heap_insert             - insert tuple into a relation
27  *              heap_delete             - delete a tuple from a relation
28  *              heap_update             - replace a tuple in a relation with another tuple
29  *              heap_markpos    - mark scan position
30  *              heap_restrpos   - restore position to marked location
31  *
32  * NOTES
33  *        This file contains the heap_ routines which implement
34  *        the POSTGRES heap access method used for all POSTGRES
35  *        relations.
36  *
37  *-------------------------------------------------------------------------
38  */
39 #include "postgres.h"
40
41 #include "access/heapam.h"
42 #include "access/hio.h"
43 #include "access/multixact.h"
44 #include "access/tuptoaster.h"
45 #include "access/valid.h"
46 #include "access/xlogutils.h"
47 #include "catalog/catalog.h"
48 #include "catalog/namespace.h"
49 #include "miscadmin.h"
50 #include "storage/sinval.h"
51 #include "utils/inval.h"
52 #include "utils/relcache.h"
53 #include "pgstat.h"
54
55
56 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
57            ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
58
59
60 /* ----------------------------------------------------------------
61  *                                               heap support routines
62  * ----------------------------------------------------------------
63  */
64
65 /* ----------------
66  *              initscan - scan code common to heap_beginscan and heap_rescan
67  * ----------------
68  */
69 static void
70 initscan(HeapScanDesc scan, ScanKey key)
71 {
72         /*
73          * Determine the number of blocks we have to scan.
74          *
75          * It is sufficient to do this once at scan start, since any tuples added
76          * while the scan is in progress will be invisible to my transaction
77          * anyway...
78          */
79         scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
80
81         scan->rs_ctup.t_datamcxt = NULL;
82         scan->rs_ctup.t_data = NULL;
83         scan->rs_cbuf = InvalidBuffer;
84
85         /* we don't have a marked position... */
86         ItemPointerSetInvalid(&(scan->rs_mctid));
87
88         /*
89          * copy the scan key, if appropriate
90          */
91         if (key != NULL)
92                 memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
93 }
94
95 /* ----------------
96  *              heapgettup - fetch next heap tuple
97  *
98  *              routine used by heap_getnext() which does most of the
99  *              real work in scanning tuples.
100  *
101  *              The passed-in *buffer must be either InvalidBuffer or the pinned
102  *              current page of the scan.  If we have to move to another page,
103  *              we will unpin this buffer (if valid).  On return, *buffer is either
104  *              InvalidBuffer or the ID of a pinned buffer.
105  * ----------------
106  */
107 static void
108 heapgettup(Relation relation,
109                    int dir,
110                    HeapTuple tuple,
111                    Buffer *buffer,
112                    Snapshot snapshot,
113                    int nkeys,
114                    ScanKey key,
115                    BlockNumber pages)
116 {
117         ItemId          lpp;
118         Page            dp;
119         BlockNumber page;
120         int                     lines;
121         OffsetNumber lineoff;
122         int                     linesleft;
123         ItemPointer tid;
124
125         tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
126
127         /*
128          * debugging stuff
129          *
130          * check validity of arguments, here and for other functions too Note: no
131          * locking manipulations needed--this is a local function
132          */
133 #ifdef  HEAPDEBUGALL
134         if (ItemPointerIsValid(tid))
135                 elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
136                          RelationGetRelationName(relation), tid, tid->ip_blkid,
137                          tid->ip_posid, dir);
138         else
139                 elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
140                          RelationGetRelationName(relation), tid, dir);
141
142         elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
143
144         elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
145                  relation->rd_rel->relkind, RelationGetRelationName(relation),
146                  snapshot);
147 #endif   /* HEAPDEBUGALL */
148
149         if (!ItemPointerIsValid(tid))
150         {
151                 Assert(!PointerIsValid(tid));
152                 tid = NULL;
153         }
154
155         tuple->t_tableOid = relation->rd_id;
156
157         /*
158          * return null immediately if relation is empty
159          */
160         if (pages == 0)
161         {
162                 if (BufferIsValid(*buffer))
163                         ReleaseBuffer(*buffer);
164                 *buffer = InvalidBuffer;
165                 tuple->t_datamcxt = NULL;
166                 tuple->t_data = NULL;
167                 return;
168         }
169
170         /*
171          * calculate next starting lineoff, given scan direction
172          */
173         if (dir == 0)
174         {
175                 /*
176                  * ``no movement'' scan direction: refetch same tuple
177                  */
178                 if (tid == NULL)
179                 {
180                         if (BufferIsValid(*buffer))
181                                 ReleaseBuffer(*buffer);
182                         *buffer = InvalidBuffer;
183                         tuple->t_datamcxt = NULL;
184                         tuple->t_data = NULL;
185                         return;
186                 }
187
188                 *buffer = ReleaseAndReadBuffer(*buffer,
189                                                                            relation,
190                                                                            ItemPointerGetBlockNumber(tid));
191
192                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
193
194                 dp = (Page) BufferGetPage(*buffer);
195                 lineoff = ItemPointerGetOffsetNumber(tid);
196                 lpp = PageGetItemId(dp, lineoff);
197
198                 tuple->t_datamcxt = NULL;
199                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
200                 tuple->t_len = ItemIdGetLength(lpp);
201                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
202
203                 return;
204         }
205         else if (dir < 0)
206         {
207                 /*
208                  * reverse scan direction
209                  */
210                 if (tid == NULL)
211                 {
212                         page = pages - 1;       /* final page */
213                 }
214                 else
215                 {
216                         page = ItemPointerGetBlockNumber(tid);          /* current page */
217                 }
218
219                 Assert(page < pages);
220
221                 *buffer = ReleaseAndReadBuffer(*buffer,
222                                                                            relation,
223                                                                            page);
224
225                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
226
227                 dp = (Page) BufferGetPage(*buffer);
228                 lines = PageGetMaxOffsetNumber(dp);
229                 if (tid == NULL)
230                 {
231                         lineoff = lines;        /* final offnum */
232                 }
233                 else
234                 {
235                         lineoff =                       /* previous offnum */
236                                 OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
237                 }
238                 /* page and lineoff now reference the physically previous tid */
239         }
240         else
241         {
242                 /*
243                  * forward scan direction
244                  */
245                 if (tid == NULL)
246                 {
247                         page = 0;                       /* first page */
248                         lineoff = FirstOffsetNumber;            /* first offnum */
249                 }
250                 else
251                 {
252                         page = ItemPointerGetBlockNumber(tid);          /* current page */
253                         lineoff =                       /* next offnum */
254                                 OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
255                 }
256
257                 Assert(page < pages);
258
259                 *buffer = ReleaseAndReadBuffer(*buffer,
260                                                                            relation,
261                                                                            page);
262
263                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
264
265                 dp = (Page) BufferGetPage(*buffer);
266                 lines = PageGetMaxOffsetNumber(dp);
267                 /* page and lineoff now reference the physically next tid */
268         }
269
270         /* 'dir' is now non-zero */
271
272         /*
273          * calculate line pointer and number of remaining items to check on
274          * this page.
275          */
276         lpp = PageGetItemId(dp, lineoff);
277         if (dir < 0)
278                 linesleft = lineoff - 1;
279         else
280                 linesleft = lines - lineoff;
281
282         /*
283          * advance the scan until we find a qualifying tuple or run out of
284          * stuff to scan
285          */
286         for (;;)
287         {
288                 while (linesleft >= 0)
289                 {
290                         if (ItemIdIsUsed(lpp))
291                         {
292                                 bool            valid;
293
294                                 tuple->t_datamcxt = NULL;
295                                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
296                                 tuple->t_len = ItemIdGetLength(lpp);
297                                 ItemPointerSet(&(tuple->t_self), page, lineoff);
298
299                                 /*
300                                  * if current tuple qualifies, return it.
301                                  */
302                                 HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
303                                                                    snapshot, nkeys, key, valid);
304                                 if (valid)
305                                 {
306                                         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
307                                         return;
308                                 }
309                         }
310
311                         /*
312                          * otherwise move to the next item on the page
313                          */
314                         --linesleft;
315                         if (dir < 0)
316                         {
317                                 --lpp;                  /* move back in this page's ItemId array */
318                                 --lineoff;
319                         }
320                         else
321                         {
322                                 ++lpp;                  /* move forward in this page's ItemId
323                                                                  * array */
324                                 ++lineoff;
325                         }
326                 }
327
328                 /*
329                  * if we get here, it means we've exhausted the items on this page
330                  * and it's time to move to the next.
331                  */
332                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
333
334                 /*
335                  * return NULL if we've exhausted all the pages
336                  */
337                 if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
338                 {
339                         if (BufferIsValid(*buffer))
340                                 ReleaseBuffer(*buffer);
341                         *buffer = InvalidBuffer;
342                         tuple->t_datamcxt = NULL;
343                         tuple->t_data = NULL;
344                         return;
345                 }
346
347                 page = (dir < 0) ? (page - 1) : (page + 1);
348
349                 Assert(page < pages);
350
351                 *buffer = ReleaseAndReadBuffer(*buffer,
352                                                                            relation,
353                                                                            page);
354
355                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
356                 dp = (Page) BufferGetPage(*buffer);
357                 lines = PageGetMaxOffsetNumber((Page) dp);
358                 linesleft = lines - 1;
359                 if (dir < 0)
360                 {
361                         lineoff = lines;
362                         lpp = PageGetItemId(dp, lines);
363                 }
364                 else
365                 {
366                         lineoff = FirstOffsetNumber;
367                         lpp = PageGetItemId(dp, FirstOffsetNumber);
368                 }
369         }
370 }
371
372
373 #if defined(DISABLE_COMPLEX_MACRO)
374 /*
375  * This is formatted so oddly so that the correspondence to the macro
376  * definition in access/heapam.h is maintained.
377  */
378 Datum
379 fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
380                         bool *isnull)
381 {
382         return (
383                         (attnum) > 0 ?
384                         (
385                          ((isnull) ? (*(isnull) = false) : (dummyret) NULL),
386                          HeapTupleNoNulls(tup) ?
387                          (
388                           (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
389                           (
390                            fetchatt((tupleDesc)->attrs[(attnum) - 1],
391                                                 (char *) (tup)->t_data + (tup)->t_data->t_hoff +
392                                                 (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
393                            )
394                           :
395                           nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
396                           )
397                          :
398                          (
399                           att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
400                           (
401                            ((isnull) ? (*(isnull) = true) : (dummyret) NULL),
402                            (Datum) NULL
403                            )
404                           :
405                           (
406                            nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
407                            )
408                           )
409                          )
410                         :
411                         (
412                          (Datum) NULL
413                          )
414                 );
415 }
416 #endif   /* defined(DISABLE_COMPLEX_MACRO) */
417
418
419 /* ----------------------------------------------------------------
420  *                                       heap access method interface
421  * ----------------------------------------------------------------
422  */
423
424 /* ----------------
425  *              relation_open - open any relation by relation OID
426  *
427  *              If lockmode is not "NoLock", the specified kind of lock is
428  *              obtained on the relation.  (Generally, NoLock should only be
429  *              used if the caller knows it has some appropriate lock on the
430  *              relation already.)
431  *
432  *              An error is raised if the relation does not exist.
433  *
434  *              NB: a "relation" is anything with a pg_class entry.  The caller is
435  *              expected to check whether the relkind is something it can handle.
436  * ----------------
437  */
438 Relation
439 relation_open(Oid relationId, LOCKMODE lockmode)
440 {
441         Relation        r;
442
443         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
444
445         /* The relcache does all the real work... */
446         r = RelationIdGetRelation(relationId);
447
448         if (!RelationIsValid(r))
449                 elog(ERROR, "could not open relation with OID %u", relationId);
450
451         if (lockmode != NoLock)
452                 LockRelation(r, lockmode);
453
454         return r;
455 }
456
457 /* ----------------
458  *              conditional_relation_open - open with option not to wait
459  *
460  *              As above, but if nowait is true, then throw an error rather than
461  *              waiting when the lock is not immediately obtainable.
462  * ----------------
463  */
464 Relation
465 conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
466 {
467         Relation        r;
468
469         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
470
471         /* The relcache does all the real work... */
472         r = RelationIdGetRelation(relationId);
473
474         if (!RelationIsValid(r))
475                 elog(ERROR, "could not open relation with OID %u", relationId);
476
477         if (lockmode != NoLock)
478         {
479                 if (nowait)
480                 {
481                         if (!ConditionalLockRelation(r, lockmode))
482                                 ereport(ERROR,
483                                                 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
484                                                  errmsg("could not obtain lock on relation \"%s\"",
485                                                                 RelationGetRelationName(r))));
486                 }
487                 else
488                         LockRelation(r, lockmode);
489         }
490
491         return r;
492 }
493
494 /* ----------------
495  *              relation_openrv - open any relation specified by a RangeVar
496  *
497  *              As above, but the relation is specified by a RangeVar.
498  * ----------------
499  */
500 Relation
501 relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
502 {
503         Oid                     relOid;
504
505         /*
506          * Check for shared-cache-inval messages before trying to open the
507          * relation.  This is needed to cover the case where the name
508          * identifies a rel that has been dropped and recreated since the
509          * start of our transaction: if we don't flush the old syscache entry
510          * then we'll latch onto that entry and suffer an error when we do
511          * LockRelation. Note that relation_open does not need to do this,
512          * since a relation's OID never changes.
513          *
514          * We skip this if asked for NoLock, on the assumption that the caller
515          * has already ensured some appropriate lock is held.
516          */
517         if (lockmode != NoLock)
518                 AcceptInvalidationMessages();
519
520         /* Look up the appropriate relation using namespace search */
521         relOid = RangeVarGetRelid(relation, false);
522
523         /* Let relation_open do the rest */
524         return relation_open(relOid, lockmode);
525 }
526
527 /* ----------------
528  *              relation_close - close any relation
529  *
530  *              If lockmode is not "NoLock", we first release the specified lock.
531  *
532  *              Note that it is often sensible to hold a lock beyond relation_close;
533  *              in that case, the lock is released automatically at xact end.
534  * ----------------
535  */
536 void
537 relation_close(Relation relation, LOCKMODE lockmode)
538 {
539         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
540
541         if (lockmode != NoLock)
542                 UnlockRelation(relation, lockmode);
543
544         /* The relcache does the real work... */
545         RelationClose(relation);
546 }
547
548
549 /* ----------------
550  *              heap_open - open a heap relation by relation OID
551  *
552  *              This is essentially relation_open plus check that the relation
553  *              is not an index or special relation.  (The caller should also check
554  *              that it's not a view before assuming it has storage.)
555  * ----------------
556  */
557 Relation
558 heap_open(Oid relationId, LOCKMODE lockmode)
559 {
560         Relation        r;
561
562         r = relation_open(relationId, lockmode);
563
564         if (r->rd_rel->relkind == RELKIND_INDEX)
565                 ereport(ERROR,
566                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
567                                  errmsg("\"%s\" is an index",
568                                                 RelationGetRelationName(r))));
569         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
570                 ereport(ERROR,
571                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
572                                  errmsg("\"%s\" is a special relation",
573                                                 RelationGetRelationName(r))));
574         else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
575                 ereport(ERROR,
576                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
577                                  errmsg("\"%s\" is a composite type",
578                                                 RelationGetRelationName(r))));
579
580         pgstat_initstats(&r->pgstat_info, r);
581
582         return r;
583 }
584
585 /* ----------------
586  *              heap_openrv - open a heap relation specified
587  *              by a RangeVar node
588  *
589  *              As above, but relation is specified by a RangeVar.
590  * ----------------
591  */
592 Relation
593 heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
594 {
595         Relation        r;
596
597         r = relation_openrv(relation, lockmode);
598
599         if (r->rd_rel->relkind == RELKIND_INDEX)
600                 ereport(ERROR,
601                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
602                                  errmsg("\"%s\" is an index",
603                                                 RelationGetRelationName(r))));
604         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
605                 ereport(ERROR,
606                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
607                                  errmsg("\"%s\" is a special relation",
608                                                 RelationGetRelationName(r))));
609         else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
610                 ereport(ERROR,
611                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
612                                  errmsg("\"%s\" is a composite type",
613                                                 RelationGetRelationName(r))));
614
615         pgstat_initstats(&r->pgstat_info, r);
616
617         return r;
618 }
619
620
621 /* ----------------
622  *              heap_beginscan  - begin relation scan
623  * ----------------
624  */
625 HeapScanDesc
626 heap_beginscan(Relation relation, Snapshot snapshot,
627                            int nkeys, ScanKey key)
628 {
629         HeapScanDesc scan;
630
631         /*
632          * increment relation ref count while scanning relation
633          *
634          * This is just to make really sure the relcache entry won't go away
635          * while the scan has a pointer to it.  Caller should be holding the
636          * rel open anyway, so this is redundant in all normal scenarios...
637          */
638         RelationIncrementReferenceCount(relation);
639
640         /*
641          * allocate and initialize scan descriptor
642          */
643         scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
644
645         scan->rs_rd = relation;
646         scan->rs_snapshot = snapshot;
647         scan->rs_nkeys = nkeys;
648
649         /*
650          * we do this here instead of in initscan() because heap_rescan also
651          * calls initscan() and we don't want to allocate memory again
652          */
653         if (nkeys > 0)
654                 scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
655         else
656                 scan->rs_key = NULL;
657
658         pgstat_initstats(&scan->rs_pgstat_info, relation);
659
660         initscan(scan, key);
661
662         return scan;
663 }
664
665 /* ----------------
666  *              heap_rescan             - restart a relation scan
667  * ----------------
668  */
669 void
670 heap_rescan(HeapScanDesc scan,
671                         ScanKey key)
672 {
673         /*
674          * unpin scan buffers
675          */
676         if (BufferIsValid(scan->rs_cbuf))
677                 ReleaseBuffer(scan->rs_cbuf);
678
679         /*
680          * reinitialize scan descriptor
681          */
682         initscan(scan, key);
683
684         pgstat_reset_heap_scan(&scan->rs_pgstat_info);
685 }
686
687 /* ----------------
688  *              heap_endscan    - end relation scan
689  *
690  *              See how to integrate with index scans.
691  *              Check handling if reldesc caching.
692  * ----------------
693  */
694 void
695 heap_endscan(HeapScanDesc scan)
696 {
697         /* Note: no locking manipulations needed */
698
699         /*
700          * unpin scan buffers
701          */
702         if (BufferIsValid(scan->rs_cbuf))
703                 ReleaseBuffer(scan->rs_cbuf);
704
705         /*
706          * decrement relation reference count and free scan descriptor storage
707          */
708         RelationDecrementReferenceCount(scan->rs_rd);
709
710         if (scan->rs_key)
711                 pfree(scan->rs_key);
712
713         pfree(scan);
714 }
715
716 /* ----------------
717  *              heap_getnext    - retrieve next tuple in scan
718  *
719  *              Fix to work with index relations.
720  *              We don't return the buffer anymore, but you can get it from the
721  *              returned HeapTuple.
722  * ----------------
723  */
724
725 #ifdef HEAPDEBUGALL
726 #define HEAPDEBUG_1 \
727         elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
728                  RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
729 #define HEAPDEBUG_2 \
730         elog(DEBUG2, "heap_getnext returning EOS")
731 #define HEAPDEBUG_3 \
732         elog(DEBUG2, "heap_getnext returning tuple")
733 #else
734 #define HEAPDEBUG_1
735 #define HEAPDEBUG_2
736 #define HEAPDEBUG_3
737 #endif   /* !defined(HEAPDEBUGALL) */
738
739
740 HeapTuple
741 heap_getnext(HeapScanDesc scan, ScanDirection direction)
742 {
743         /* Note: no locking manipulations needed */
744
745         HEAPDEBUG_1;                            /* heap_getnext( info ) */
746
747         /*
748          * Note: we depend here on the -1/0/1 encoding of ScanDirection.
749          */
750         heapgettup(scan->rs_rd,
751                            (int) direction,
752                            &(scan->rs_ctup),
753                            &(scan->rs_cbuf),
754                            scan->rs_snapshot,
755                            scan->rs_nkeys,
756                            scan->rs_key,
757                            scan->rs_nblocks);
758
759         if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
760         {
761                 HEAPDEBUG_2;                    /* heap_getnext returning EOS */
762                 return NULL;
763         }
764
765         pgstat_count_heap_scan(&scan->rs_pgstat_info);
766
767         /*
768          * if we get here it means we have a new current scan tuple, so point
769          * to the proper return buffer and return the tuple.
770          */
771
772         HEAPDEBUG_3;                            /* heap_getnext returning tuple */
773
774         if (scan->rs_ctup.t_data != NULL)
775                 pgstat_count_heap_getnext(&scan->rs_pgstat_info);
776
777         return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
778 }
779
780 /*
781  *      heap_fetch              - retrieve tuple with given tid
782  *
783  * On entry, tuple->t_self is the TID to fetch.  We pin the buffer holding
784  * the tuple, fill in the remaining fields of *tuple, and check the tuple
785  * against the specified snapshot.
786  *
787  * If successful (tuple found and passes snapshot time qual), then *userbuf
788  * is set to the buffer holding the tuple and TRUE is returned.  The caller
789  * must unpin the buffer when done with the tuple.
790  *
791  * If the tuple is not found (ie, item number references a deleted slot),
792  * then tuple->t_data is set to NULL and FALSE is returned.
793  *
794  * If the tuple is found but fails the time qual check, then FALSE is returned
795  * but tuple->t_data is left pointing to the tuple.
796  *
797  * keep_buf determines what is done with the buffer in the FALSE-result cases.
798  * When the caller specifies keep_buf = true, we retain the pin on the buffer
799  * and return it in *userbuf (so the caller must eventually unpin it); when
800  * keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
801  *
802  * It is somewhat inconsistent that we ereport() on invalid block number but
803  * return false on invalid item number.  This is historical.  The only
804  * justification I can see is that the caller can relatively easily check the
805  * block number for validity, but cannot check the item number without reading
806  * the page himself.
807  */
808 bool
809 heap_fetch(Relation relation,
810                    Snapshot snapshot,
811                    HeapTuple tuple,
812                    Buffer *userbuf,
813                    bool keep_buf,
814                    PgStat_Info *pgstat_info)
815 {
816         /* Assume *userbuf is undefined on entry */
817         *userbuf = InvalidBuffer;
818         return heap_release_fetch(relation, snapshot, tuple,
819                                                           userbuf, keep_buf, pgstat_info);
820 }
821
822 /*
823  *      heap_release_fetch              - retrieve tuple with given tid
824  *
825  * This has the same API as heap_fetch except that if *userbuf is not
826  * InvalidBuffer on entry, that buffer will be released before reading
827  * the new page.  This saves a separate ReleaseBuffer step and hence
828  * one entry into the bufmgr when looping through multiple fetches.
829  * Also, if *userbuf is the same buffer that holds the target tuple,
830  * we avoid bufmgr manipulation altogether.
831  */
832 bool
833 heap_release_fetch(Relation relation,
834                                    Snapshot snapshot,
835                                    HeapTuple tuple,
836                                    Buffer *userbuf,
837                                    bool keep_buf,
838                                    PgStat_Info *pgstat_info)
839 {
840         ItemPointer tid = &(tuple->t_self);
841         ItemId          lp;
842         Buffer          buffer;
843         PageHeader      dp;
844         OffsetNumber offnum;
845         bool            valid;
846
847         /*
848          * get the buffer from the relation descriptor. Note that this does a
849          * buffer pin, and releases the old *userbuf if not InvalidBuffer.
850          */
851         buffer = ReleaseAndReadBuffer(*userbuf, relation,
852                                                                   ItemPointerGetBlockNumber(tid));
853
854         /*
855          * Need share lock on buffer to examine tuple commit status.
856          */
857         LockBuffer(buffer, BUFFER_LOCK_SHARE);
858         dp = (PageHeader) BufferGetPage(buffer);
859
860         /*
861          * We'd better check for out-of-range offnum in case of VACUUM since
862          * the TID was obtained.
863          */
864         offnum = ItemPointerGetOffsetNumber(tid);
865         if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
866         {
867                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
868                 if (keep_buf)
869                         *userbuf = buffer;
870                 else
871                 {
872                         ReleaseBuffer(buffer);
873                         *userbuf = InvalidBuffer;
874                 }
875                 tuple->t_datamcxt = NULL;
876                 tuple->t_data = NULL;
877                 return false;
878         }
879
880         /*
881          * get the item line pointer corresponding to the requested tid
882          */
883         lp = PageGetItemId(dp, offnum);
884
885         /*
886          * Must check for deleted tuple.
887          */
888         if (!ItemIdIsUsed(lp))
889         {
890                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
891                 if (keep_buf)
892                         *userbuf = buffer;
893                 else
894                 {
895                         ReleaseBuffer(buffer);
896                         *userbuf = InvalidBuffer;
897                 }
898                 tuple->t_datamcxt = NULL;
899                 tuple->t_data = NULL;
900                 return false;
901         }
902
903         /*
904          * fill in *tuple fields
905          */
906         tuple->t_datamcxt = NULL;
907         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
908         tuple->t_len = ItemIdGetLength(lp);
909         tuple->t_tableOid = relation->rd_id;
910
911         /*
912          * check time qualification of tuple, then release lock
913          */
914         HeapTupleSatisfies(tuple, relation, buffer, dp,
915                                            snapshot, 0, NULL, valid);
916
917         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
918
919         if (valid)
920         {
921                 /*
922                  * All checks passed, so return the tuple as valid. Caller is now
923                  * responsible for releasing the buffer.
924                  */
925                 *userbuf = buffer;
926
927                 /*
928                  * Count the successful fetch in *pgstat_info if given, otherwise
929                  * in the relation's default statistics area.
930                  */
931                 if (pgstat_info != NULL)
932                         pgstat_count_heap_fetch(pgstat_info);
933                 else
934                         pgstat_count_heap_fetch(&relation->pgstat_info);
935
936                 return true;
937         }
938
939         /* Tuple failed time qual, but maybe caller wants to see it anyway. */
940         if (keep_buf)
941                 *userbuf = buffer;
942         else
943         {
944                 ReleaseBuffer(buffer);
945                 *userbuf = InvalidBuffer;
946         }
947
948         return false;
949 }
950
951 /*
952  *      heap_get_latest_tid -  get the latest tid of a specified tuple
953  */
954 ItemPointer
955 heap_get_latest_tid(Relation relation,
956                                         Snapshot snapshot,
957                                         ItemPointer tid)
958 {
959         ItemId          lp = NULL;
960         Buffer          buffer;
961         PageHeader      dp;
962         OffsetNumber offnum;
963         HeapTupleData tp;
964         HeapTupleHeader t_data;
965         ItemPointerData ctid;
966         bool            invalidBlock,
967                                 linkend,
968                                 valid;
969
970         /*
971          * get the buffer from the relation descriptor Note that this does a
972          * buffer pin.
973          */
974         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
975         LockBuffer(buffer, BUFFER_LOCK_SHARE);
976
977         /*
978          * get the item line pointer corresponding to the requested tid
979          */
980         dp = (PageHeader) BufferGetPage(buffer);
981         offnum = ItemPointerGetOffsetNumber(tid);
982         invalidBlock = true;
983         if (!PageIsNew(dp))
984         {
985                 lp = PageGetItemId(dp, offnum);
986                 if (ItemIdIsUsed(lp))
987                         invalidBlock = false;
988         }
989         if (invalidBlock)
990         {
991                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
992                 ReleaseBuffer(buffer);
993                 return NULL;
994         }
995
996         /*
997          * more sanity checks
998          */
999
1000         tp.t_datamcxt = NULL;
1001         t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1002         tp.t_len = ItemIdGetLength(lp);
1003         tp.t_self = *tid;
1004         ctid = tp.t_data->t_ctid;
1005
1006         /*
1007          * check time qualification of tid
1008          */
1009
1010         HeapTupleSatisfies(&tp, relation, buffer, dp,
1011                                            snapshot, 0, NULL, valid);
1012
1013         linkend = true;
1014         if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
1015                 !ItemPointerEquals(tid, &ctid))
1016                 linkend = false;
1017
1018         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1019         ReleaseBuffer(buffer);
1020
1021         if (!valid)
1022         {
1023                 if (linkend)
1024                         return NULL;
1025                 heap_get_latest_tid(relation, snapshot, &ctid);
1026                 *tid = ctid;
1027         }
1028
1029         return tid;
1030 }
1031
1032 /*
1033  *      heap_insert             - insert tuple into a heap
1034  *
1035  * The new tuple is stamped with current transaction ID and the specified
1036  * command ID.
1037  */
1038 Oid
1039 heap_insert(Relation relation, HeapTuple tup, CommandId cid)
1040 {
1041         TransactionId xid = GetCurrentTransactionId();
1042         Buffer          buffer;
1043
1044         if (relation->rd_rel->relhasoids)
1045         {
1046 #ifdef NOT_USED
1047                 /* this is redundant with an Assert in HeapTupleSetOid */
1048                 Assert(tup->t_data->t_infomask & HEAP_HASOID);
1049 #endif
1050
1051                 /*
1052                  * If the object id of this tuple has already been assigned, trust
1053                  * the caller.  There are a couple of ways this can happen.  At
1054                  * initial db creation, the backend program sets oids for tuples.
1055                  * When we define an index, we set the oid.  Finally, in the
1056                  * future, we may allow users to set their own object ids in order
1057                  * to support a persistent object store (objects need to contain
1058                  * pointers to one another).
1059                  */
1060                 if (!OidIsValid(HeapTupleGetOid(tup)))
1061                         HeapTupleSetOid(tup, newoid());
1062                 else
1063                         CheckMaxObjectId(HeapTupleGetOid(tup));
1064         }
1065         else
1066         {
1067                 /* check there is not space for an OID */
1068                 Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
1069         }
1070
1071         tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1072         tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1073         HeapTupleHeaderSetXmin(tup->t_data, xid);
1074         HeapTupleHeaderSetCmin(tup->t_data, cid);
1075         HeapTupleHeaderSetXmax(tup->t_data, 0);         /* zero out Datum fields */
1076         HeapTupleHeaderSetCmax(tup->t_data, 0);         /* for cleanliness */
1077         tup->t_tableOid = relation->rd_id;
1078
1079         /*
1080          * If the new tuple is too big for storage or contains already toasted
1081          * out-of-line attributes from some other relation, invoke the
1082          * toaster.
1083          */
1084         if (HeapTupleHasExternal(tup) ||
1085                 (MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD))
1086                 heap_tuple_toast_attrs(relation, tup, NULL);
1087
1088         /* Find buffer to insert this tuple into */
1089         buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
1090
1091         /* NO EREPORT(ERROR) from here till changes are logged */
1092         START_CRIT_SECTION();
1093
1094         RelationPutHeapTuple(relation, buffer, tup);
1095
1096         pgstat_count_heap_insert(&relation->pgstat_info);
1097
1098         /* XLOG stuff */
1099         if (!relation->rd_istemp)
1100         {
1101                 xl_heap_insert xlrec;
1102                 xl_heap_header xlhdr;
1103                 XLogRecPtr      recptr;
1104                 XLogRecData rdata[3];
1105                 Page            page = BufferGetPage(buffer);
1106                 uint8           info = XLOG_HEAP_INSERT;
1107
1108                 xlrec.target.node = relation->rd_node;
1109                 xlrec.target.tid = tup->t_self;
1110                 rdata[0].buffer = InvalidBuffer;
1111                 rdata[0].data = (char *) &xlrec;
1112                 rdata[0].len = SizeOfHeapInsert;
1113                 rdata[0].next = &(rdata[1]);
1114
1115                 xlhdr.t_natts = tup->t_data->t_natts;
1116                 xlhdr.t_infomask = tup->t_data->t_infomask;
1117                 xlhdr.t_hoff = tup->t_data->t_hoff;
1118
1119                 /*
1120                  * note we mark rdata[1] as belonging to buffer; if XLogInsert
1121                  * decides to write the whole page to the xlog, we don't need to
1122                  * store xl_heap_header in the xlog.
1123                  */
1124                 rdata[1].buffer = buffer;
1125                 rdata[1].data = (char *) &xlhdr;
1126                 rdata[1].len = SizeOfHeapHeader;
1127                 rdata[1].next = &(rdata[2]);
1128
1129                 rdata[2].buffer = buffer;
1130                 /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
1131                 rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits);
1132                 rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits);
1133                 rdata[2].next = NULL;
1134
1135                 /*
1136                  * If this is the single and first tuple on page, we can reinit
1137                  * the page instead of restoring the whole thing.  Set flag, and
1138                  * hide buffer references from XLogInsert.
1139                  */
1140                 if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber &&
1141                         PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
1142                 {
1143                         info |= XLOG_HEAP_INIT_PAGE;
1144                         rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
1145                 }
1146
1147                 recptr = XLogInsert(RM_HEAP_ID, info, rdata);
1148
1149                 PageSetLSN(page, recptr);
1150                 PageSetTLI(page, ThisTimeLineID);
1151         }
1152         else
1153         {
1154                 /* No XLOG record, but still need to flag that XID exists on disk */
1155                 MyXactMadeTempRelUpdate = true;
1156         }
1157
1158         END_CRIT_SECTION();
1159
1160         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1161         WriteBuffer(buffer);
1162
1163         /*
1164          * If tuple is cachable, mark it for invalidation from the caches in
1165          * case we abort.  Note it is OK to do this after WriteBuffer releases
1166          * the buffer, because the "tup" data structure is all in local
1167          * memory, not in the shared buffer.
1168          */
1169         CacheInvalidateHeapTuple(relation, tup);
1170
1171         return HeapTupleGetOid(tup);
1172 }
1173
1174 /*
1175  *      simple_heap_insert - insert a tuple
1176  *
1177  * Currently, this routine differs from heap_insert only in supplying
1178  * a default command ID.  But it should be used rather than using
1179  * heap_insert directly in most places where we are modifying system catalogs.
1180  */
1181 Oid
1182 simple_heap_insert(Relation relation, HeapTuple tup)
1183 {
1184         return heap_insert(relation, tup, GetCurrentCommandId());
1185 }
1186
1187 /*
1188  *      heap_delete             - delete a tuple
1189  *
1190  * NB: do not call this directly unless you are prepared to deal with
1191  * concurrent-update conditions.  Use simple_heap_delete instead.
1192  *
1193  *      relation - table to be modified
1194  *      tid - TID of tuple to be deleted
1195  *      ctid - output parameter, used only for failure case (see below)
1196  *      cid - delete command ID to use in verifying tuple visibility
1197  *      crosscheck - if not InvalidSnapshot, also check tuple against this
1198  *      wait - true if should wait for any conflicting update to commit/abort
1199  *
1200  * Normal, successful return value is HeapTupleMayBeUpdated, which
1201  * actually means we did delete it.  Failure return codes are
1202  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1203  * (the last only possible if wait == false).  On a failure return,
1204  * *ctid is set to the ctid link of the target tuple (possibly a later
1205  * version of the row).
1206  */
1207 HTSU_Result
1208 heap_delete(Relation relation, ItemPointer tid,
1209                         ItemPointer ctid, CommandId cid,
1210                         Snapshot crosscheck, bool wait)
1211 {
1212         TransactionId xid = GetCurrentTransactionId();
1213         ItemId          lp;
1214         HeapTupleData tp;
1215         PageHeader      dp;
1216         Buffer          buffer;
1217         HTSU_Result     result;
1218
1219         Assert(ItemPointerIsValid(tid));
1220
1221         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1222         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1223
1224         dp = (PageHeader) BufferGetPage(buffer);
1225         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1226         tp.t_datamcxt = NULL;
1227         tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1228         tp.t_len = ItemIdGetLength(lp);
1229         tp.t_self = *tid;
1230         tp.t_tableOid = relation->rd_id;
1231
1232 l1:
1233         result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
1234
1235         if (result == HeapTupleInvisible)
1236         {
1237                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1238                 ReleaseBuffer(buffer);
1239                 elog(ERROR, "attempted to delete invisible tuple");
1240         }
1241         else if (result == HeapTupleBeingUpdated && wait)
1242         {
1243                 TransactionId xwait;
1244                 uint16  infomask;
1245
1246                 /*
1247                  * Sleep until concurrent transaction ends.  Note that we don't care
1248                  * if the locker has an exclusive or shared lock, because we need
1249                  * exclusive.
1250                  */
1251
1252                 /* must copy state data before unlocking buffer */
1253                 xwait = HeapTupleHeaderGetXmax(tp.t_data);
1254                 infomask = tp.t_data->t_infomask;
1255
1256                 if (infomask & HEAP_XMAX_IS_MULTI)
1257                 {
1258                         /* wait for multixact */
1259                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1260                         MultiXactIdWait((MultiXactId) xwait);
1261                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1262
1263                         /*
1264                          * If xwait had just locked the tuple then some other xact could
1265                          * update this tuple before we get to this point.  Check for xmax
1266                          * change, and start over if so.
1267                          */
1268                         if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1269                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
1270                                                                          xwait))
1271                                 goto l1;
1272
1273                         /*
1274                          * You might think the multixact is necessarily done here, but
1275                          * not so: it could have surviving members, namely our own xact
1276                          * or other subxacts of this backend.  It is legal for us to
1277                          * delete the tuple in either case, however (the latter case is
1278                          * essentially a situation of upgrading our former shared lock
1279                          * to exclusive).  We don't bother changing the on-disk hint bits
1280                          * since we are about to overwrite the xmax altogether.
1281                          */
1282                 }
1283                 else
1284                 {
1285                         /* wait for regular transaction to end */
1286                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1287                         XactLockTableWait(xwait);
1288                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1289
1290                         /*
1291                          * xwait is done, but if xwait had just locked the tuple then some
1292                          * other xact could update this tuple before we get to this point.
1293                          * Check for xmax change, and start over if so.
1294                          */
1295                         if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1296                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
1297                                                                          xwait))
1298                                 goto l1;
1299
1300                         /* Otherwise we can mark it committed or aborted */
1301                         if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
1302                                                                                    HEAP_XMAX_INVALID)))
1303                         {
1304                                 if (TransactionIdDidCommit(xwait))
1305                                         tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1306                                 else
1307                                         tp.t_data->t_infomask |= HEAP_XMAX_INVALID;
1308                                 SetBufferCommitInfoNeedsSave(buffer);
1309                         }
1310                 }
1311
1312                 /*
1313                  * We may overwrite if previous xmax aborted, or if it committed
1314                  * but only locked the tuple without updating it.
1315                  */
1316                 if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1317                                                                          HEAP_IS_LOCKED))
1318                         result = HeapTupleMayBeUpdated;
1319                 else
1320                         result = HeapTupleUpdated;
1321         }
1322
1323         if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1324         {
1325                 /* Perform additional check for serializable RI updates */
1326                 if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck, buffer))
1327                         result = HeapTupleUpdated;
1328         }
1329
1330         if (result != HeapTupleMayBeUpdated)
1331         {
1332                 Assert(result == HeapTupleSelfUpdated ||
1333                            result == HeapTupleUpdated ||
1334                            result == HeapTupleBeingUpdated);
1335                 *ctid = tp.t_data->t_ctid;
1336                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1337                 ReleaseBuffer(buffer);
1338                 return result;
1339         }
1340
1341         START_CRIT_SECTION();
1342
1343         /* store transaction information of xact deleting the tuple */
1344         tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1345                                                            HEAP_XMAX_INVALID |
1346                                                            HEAP_XMAX_IS_MULTI |
1347                                                            HEAP_IS_LOCKED |
1348                                                            HEAP_MOVED);
1349         HeapTupleHeaderSetXmax(tp.t_data, xid);
1350         HeapTupleHeaderSetCmax(tp.t_data, cid);
1351         /* Make sure there is no forward chain link in t_ctid */
1352         tp.t_data->t_ctid = tp.t_self;
1353
1354         /* XLOG stuff */
1355         if (!relation->rd_istemp)
1356         {
1357                 xl_heap_delete xlrec;
1358                 XLogRecPtr      recptr;
1359                 XLogRecData rdata[2];
1360
1361                 xlrec.target.node = relation->rd_node;
1362                 xlrec.target.tid = tp.t_self;
1363                 rdata[0].buffer = InvalidBuffer;
1364                 rdata[0].data = (char *) &xlrec;
1365                 rdata[0].len = SizeOfHeapDelete;
1366                 rdata[0].next = &(rdata[1]);
1367
1368                 rdata[1].buffer = buffer;
1369                 rdata[1].data = NULL;
1370                 rdata[1].len = 0;
1371                 rdata[1].next = NULL;
1372
1373                 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
1374
1375                 PageSetLSN(dp, recptr);
1376                 PageSetTLI(dp, ThisTimeLineID);
1377         }
1378         else
1379         {
1380                 /* No XLOG record, but still need to flag that XID exists on disk */
1381                 MyXactMadeTempRelUpdate = true;
1382         }
1383
1384         END_CRIT_SECTION();
1385
1386         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1387
1388         /*
1389          * If the tuple has toasted out-of-line attributes, we need to delete
1390          * those items too.  We have to do this before WriteBuffer because we
1391          * need to look at the contents of the tuple, but it's OK to release
1392          * the context lock on the buffer first.
1393          */
1394         if (HeapTupleHasExternal(&tp))
1395                 heap_tuple_toast_attrs(relation, NULL, &tp);
1396
1397         pgstat_count_heap_delete(&relation->pgstat_info);
1398
1399         /*
1400          * Mark tuple for invalidation from system caches at next command
1401          * boundary. We have to do this before WriteBuffer because we need to
1402          * look at the contents of the tuple, so we need to hold our refcount
1403          * on the buffer.
1404          */
1405         CacheInvalidateHeapTuple(relation, &tp);
1406
1407         WriteBuffer(buffer);
1408
1409         return HeapTupleMayBeUpdated;
1410 }
1411
1412 /*
1413  *      simple_heap_delete - delete a tuple
1414  *
1415  * This routine may be used to delete a tuple when concurrent updates of
1416  * the target tuple are not expected (for example, because we have a lock
1417  * on the relation associated with the tuple).  Any failure is reported
1418  * via ereport().
1419  */
1420 void
1421 simple_heap_delete(Relation relation, ItemPointer tid)
1422 {
1423         ItemPointerData ctid;
1424         HTSU_Result             result;
1425
1426         result = heap_delete(relation, tid,
1427                                                  &ctid,
1428                                                  GetCurrentCommandId(), InvalidSnapshot,
1429                                                  true /* wait for commit */ );
1430         switch (result)
1431         {
1432                 case HeapTupleSelfUpdated:
1433                         /* Tuple was already updated in current command? */
1434                         elog(ERROR, "tuple already updated by self");
1435                         break;
1436
1437                 case HeapTupleMayBeUpdated:
1438                         /* done successfully */
1439                         break;
1440
1441                 case HeapTupleUpdated:
1442                         elog(ERROR, "tuple concurrently updated");
1443                         break;
1444
1445                 default:
1446                         elog(ERROR, "unrecognized heap_delete status: %u", result);
1447                         break;
1448         }
1449 }
1450
1451 /*
1452  *      heap_update - replace a tuple
1453  *
1454  * NB: do not call this directly unless you are prepared to deal with
1455  * concurrent-update conditions.  Use simple_heap_update instead.
1456  *
1457  *      relation - table to be modified
1458  *      otid - TID of old tuple to be replaced
1459  *      newtup - newly constructed tuple data to store
1460  *      ctid - output parameter, used only for failure case (see below)
1461  *      cid - update command ID to use in verifying old tuple visibility
1462  *      crosscheck - if not InvalidSnapshot, also check old tuple against this
1463  *      wait - true if should wait for any conflicting update to commit/abort
1464  *
1465  * Normal, successful return value is HeapTupleMayBeUpdated, which
1466  * actually means we *did* update it.  Failure return codes are
1467  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
1468  * (the last only possible if wait == false).  On a failure return,
1469  * *ctid is set to the ctid link of the old tuple (possibly a later
1470  * version of the row).
1471  * On success, newtup->t_self is set to the TID where the new tuple
1472  * was inserted.
1473  */
1474 HTSU_Result
1475 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
1476                         ItemPointer ctid, CommandId cid,
1477                         Snapshot crosscheck, bool wait)
1478 {
1479         TransactionId xid = GetCurrentTransactionId();
1480         ItemId          lp;
1481         HeapTupleData oldtup;
1482         PageHeader      dp;
1483         Buffer          buffer,
1484                                 newbuf;
1485         bool            need_toast,
1486                                 already_marked;
1487         Size            newtupsize,
1488                                 pagefree;
1489         HTSU_Result     result;
1490
1491         Assert(ItemPointerIsValid(otid));
1492
1493         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
1494         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1495
1496         dp = (PageHeader) BufferGetPage(buffer);
1497         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));
1498
1499         oldtup.t_datamcxt = NULL;
1500         oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
1501         oldtup.t_len = ItemIdGetLength(lp);
1502         oldtup.t_self = *otid;
1503
1504         /*
1505          * Note: beyond this point, use oldtup not otid to refer to old tuple.
1506          * otid may very well point at newtup->t_self, which we will overwrite
1507          * with the new tuple's location, so there's great risk of confusion
1508          * if we use otid anymore.
1509          */
1510
1511 l2:
1512         result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid, buffer);
1513
1514         if (result == HeapTupleInvisible)
1515         {
1516                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1517                 ReleaseBuffer(buffer);
1518                 elog(ERROR, "attempted to update invisible tuple");
1519         }
1520         else if (result == HeapTupleBeingUpdated && wait)
1521         {
1522                 TransactionId xwait;
1523                 uint16  infomask;
1524
1525                 /*
1526                  * Sleep until concurrent transaction ends.  Note that we don't care
1527                  * if the locker has an exclusive or shared lock, because we need
1528                  * exclusive.
1529                  */
1530
1531                 /* must copy state data before unlocking buffer */
1532                 xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
1533                 infomask = oldtup.t_data->t_infomask;
1534
1535                 if (infomask & HEAP_XMAX_IS_MULTI)
1536                 {
1537                         /* wait for multixact */
1538                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1539                         MultiXactIdWait((MultiXactId) xwait);
1540                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1541
1542                         /*
1543                          * If xwait had just locked the tuple then some other xact could
1544                          * update this tuple before we get to this point.  Check for xmax
1545                          * change, and start over if so.
1546                          */
1547                         if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1548                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
1549                                                                          xwait))
1550                                 goto l2;
1551
1552                         /*
1553                          * You might think the multixact is necessarily done here, but
1554                          * not so: it could have surviving members, namely our own xact
1555                          * or other subxacts of this backend.  It is legal for us to
1556                          * update the tuple in either case, however (the latter case is
1557                          * essentially a situation of upgrading our former shared lock
1558                          * to exclusive).  We don't bother changing the on-disk hint bits
1559                          * since we are about to overwrite the xmax altogether.
1560                          */
1561                 }
1562                 else
1563                 {
1564                         /* wait for regular transaction to end */
1565                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1566                         XactLockTableWait(xwait);
1567                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1568
1569                         /*
1570                          * xwait is done, but if xwait had just locked the tuple then some
1571                          * other xact could update this tuple before we get to this point.
1572                          * Check for xmax change, and start over if so.
1573                          */
1574                         if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1575                                 !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
1576                                                                          xwait))
1577                                 goto l2;
1578
1579                         /* Otherwise we can mark it committed or aborted */
1580                         if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
1581                                                                                            HEAP_XMAX_INVALID)))
1582                         {
1583                                 if (TransactionIdDidCommit(xwait))
1584                                         oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1585                                 else
1586                                         oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID;
1587                                 SetBufferCommitInfoNeedsSave(buffer);
1588                         }
1589                 }
1590
1591                 /*
1592                  * We may overwrite if previous xmax aborted, or if it committed
1593                  * but only locked the tuple without updating it.
1594                  */
1595                 if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID |
1596                                                                                  HEAP_IS_LOCKED))
1597                         result = HeapTupleMayBeUpdated;
1598                 else
1599                         result = HeapTupleUpdated;
1600         }
1601
1602         if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
1603         {
1604                 /* Perform additional check for serializable RI updates */
1605                 if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck, buffer))
1606                         result = HeapTupleUpdated;
1607         }
1608
1609         if (result != HeapTupleMayBeUpdated)
1610         {
1611                 Assert(result == HeapTupleSelfUpdated ||
1612                            result == HeapTupleUpdated ||
1613                            result == HeapTupleBeingUpdated);
1614                 *ctid = oldtup.t_data->t_ctid;
1615                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1616                 ReleaseBuffer(buffer);
1617                 return result;
1618         }
1619
1620         /* Fill in OID and transaction status data for newtup */
1621         if (relation->rd_rel->relhasoids)
1622         {
1623 #ifdef NOT_USED
1624                 /* this is redundant with an Assert in HeapTupleSetOid */
1625                 Assert(newtup->t_data->t_infomask & HEAP_HASOID);
1626 #endif
1627                 HeapTupleSetOid(newtup, HeapTupleGetOid(&oldtup));
1628         }
1629         else
1630         {
1631                 /* check there is not space for an OID */
1632                 Assert(!(newtup->t_data->t_infomask & HEAP_HASOID));
1633         }
1634
1635         newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1636         newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1637         HeapTupleHeaderSetXmin(newtup->t_data, xid);
1638         HeapTupleHeaderSetCmin(newtup->t_data, cid);
1639         HeapTupleHeaderSetXmax(newtup->t_data, 0);      /* zero out Datum fields */
1640         HeapTupleHeaderSetCmax(newtup->t_data, 0);      /* for cleanliness */
1641
1642         /*
1643          * If the toaster needs to be activated, OR if the new tuple will not
1644          * fit on the same page as the old, then we need to release the
1645          * context lock (but not the pin!) on the old tuple's buffer while we
1646          * are off doing TOAST and/or table-file-extension work.  We must mark
1647          * the old tuple to show that it's already being updated, else other
1648          * processes may try to update it themselves.
1649          *
1650          * We need to invoke the toaster if there are already any out-of-line
1651          * toasted values present, or if the new tuple is over-threshold.
1652          */
1653         need_toast = (HeapTupleHasExternal(&oldtup) ||
1654                                   HeapTupleHasExternal(newtup) ||
1655                                   (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
1656
1657         newtupsize = MAXALIGN(newtup->t_len);
1658         pagefree = PageGetFreeSpace((Page) dp);
1659
1660         if (need_toast || newtupsize > pagefree)
1661         {
1662                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1663                                                                            HEAP_XMAX_INVALID |
1664                                                                            HEAP_XMAX_IS_MULTI |
1665                                                                            HEAP_IS_LOCKED |
1666                                                                            HEAP_MOVED);
1667                 HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1668                 HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1669                 already_marked = true;
1670                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1671
1672                 /* Let the toaster do its thing */
1673                 if (need_toast)
1674                 {
1675                         heap_tuple_toast_attrs(relation, newtup, &oldtup);
1676                         newtupsize = MAXALIGN(newtup->t_len);
1677                 }
1678
1679                 /*
1680                  * Now, do we need a new page for the tuple, or not?  This is a
1681                  * bit tricky since someone else could have added tuples to the
1682                  * page while we weren't looking.  We have to recheck the
1683                  * available space after reacquiring the buffer lock.  But don't
1684                  * bother to do that if the former amount of free space is still
1685                  * not enough; it's unlikely there's more free now than before.
1686                  *
1687                  * What's more, if we need to get a new page, we will need to acquire
1688                  * buffer locks on both old and new pages.      To avoid deadlock
1689                  * against some other backend trying to get the same two locks in
1690                  * the other order, we must be consistent about the order we get
1691                  * the locks in. We use the rule "lock the lower-numbered page of
1692                  * the relation first".  To implement this, we must do
1693                  * RelationGetBufferForTuple while not holding the lock on the old
1694                  * page, and we must rely on it to get the locks on both pages in
1695                  * the correct order.
1696                  */
1697                 if (newtupsize > pagefree)
1698                 {
1699                         /* Assume there's no chance to put newtup on same page. */
1700                         newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1701                                                                                            buffer);
1702                 }
1703                 else
1704                 {
1705                         /* Re-acquire the lock on the old tuple's page. */
1706                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1707                         /* Re-check using the up-to-date free space */
1708                         pagefree = PageGetFreeSpace((Page) dp);
1709                         if (newtupsize > pagefree)
1710                         {
1711                                 /*
1712                                  * Rats, it doesn't fit anymore.  We must now unlock and
1713                                  * relock to avoid deadlock.  Fortunately, this path
1714                                  * should seldom be taken.
1715                                  */
1716                                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1717                                 newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1718                                                                                                    buffer);
1719                         }
1720                         else
1721                         {
1722                                 /* OK, it fits here, so we're done. */
1723                                 newbuf = buffer;
1724                         }
1725                 }
1726         }
1727         else
1728         {
1729                 /* No TOAST work needed, and it'll fit on same page */
1730                 already_marked = false;
1731                 newbuf = buffer;
1732         }
1733
1734         pgstat_count_heap_update(&relation->pgstat_info);
1735
1736         /*
1737          * At this point newbuf and buffer are both pinned and locked, and
1738          * newbuf has enough space for the new tuple.  If they are the same
1739          * buffer, only one pin is held.
1740          */
1741
1742         /* NO EREPORT(ERROR) from here till changes are logged */
1743         START_CRIT_SECTION();
1744
1745         RelationPutHeapTuple(relation, newbuf, newtup);         /* insert new tuple */
1746
1747         if (!already_marked)
1748         {
1749                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1750                                                                            HEAP_XMAX_INVALID |
1751                                                                            HEAP_XMAX_IS_MULTI |
1752                                                                            HEAP_IS_LOCKED |
1753                                                                            HEAP_MOVED);
1754                 HeapTupleHeaderSetXmax(oldtup.t_data, xid);
1755                 HeapTupleHeaderSetCmax(oldtup.t_data, cid);
1756         }
1757
1758         /* record address of new tuple in t_ctid of old one */
1759         oldtup.t_data->t_ctid = newtup->t_self;
1760
1761         /* XLOG stuff */
1762         if (!relation->rd_istemp)
1763         {
1764                 XLogRecPtr      recptr = log_heap_update(relation, buffer, oldtup.t_self,
1765                                                                                          newbuf, newtup, false);
1766
1767                 if (newbuf != buffer)
1768                 {
1769                         PageSetLSN(BufferGetPage(newbuf), recptr);
1770                         PageSetTLI(BufferGetPage(newbuf), ThisTimeLineID);
1771                 }
1772                 PageSetLSN(BufferGetPage(buffer), recptr);
1773                 PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
1774         }
1775         else
1776         {
1777                 /* No XLOG record, but still need to flag that XID exists on disk */
1778                 MyXactMadeTempRelUpdate = true;
1779         }
1780
1781         END_CRIT_SECTION();
1782
1783         if (newbuf != buffer)
1784                 LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
1785         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1786
1787         /*
1788          * Mark old tuple for invalidation from system caches at next command
1789          * boundary. We have to do this before WriteBuffer because we need to
1790          * look at the contents of the tuple, so we need to hold our refcount.
1791          */
1792         CacheInvalidateHeapTuple(relation, &oldtup);
1793
1794         if (newbuf != buffer)
1795                 WriteBuffer(newbuf);
1796         WriteBuffer(buffer);
1797
1798         /*
1799          * If new tuple is cachable, mark it for invalidation from the caches
1800          * in case we abort.  Note it is OK to do this after WriteBuffer
1801          * releases the buffer, because the "newtup" data structure is all in
1802          * local memory, not in the shared buffer.
1803          */
1804         CacheInvalidateHeapTuple(relation, newtup);
1805
1806         return HeapTupleMayBeUpdated;
1807 }
1808
1809 /*
1810  *      simple_heap_update - replace a tuple
1811  *
1812  * This routine may be used to update a tuple when concurrent updates of
1813  * the target tuple are not expected (for example, because we have a lock
1814  * on the relation associated with the tuple).  Any failure is reported
1815  * via ereport().
1816  */
1817 void
1818 simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
1819 {
1820         ItemPointerData ctid;
1821         HTSU_Result             result;
1822
1823         result = heap_update(relation, otid, tup,
1824                                                  &ctid,
1825                                                  GetCurrentCommandId(), InvalidSnapshot,
1826                                                  true /* wait for commit */ );
1827         switch (result)
1828         {
1829                 case HeapTupleSelfUpdated:
1830                         /* Tuple was already updated in current command? */
1831                         elog(ERROR, "tuple already updated by self");
1832                         break;
1833
1834                 case HeapTupleMayBeUpdated:
1835                         /* done successfully */
1836                         break;
1837
1838                 case HeapTupleUpdated:
1839                         elog(ERROR, "tuple concurrently updated");
1840                         break;
1841
1842                 default:
1843                         elog(ERROR, "unrecognized heap_update status: %u", result);
1844                         break;
1845         }
1846 }
1847
1848 /*
1849  *      heap_lock_tuple         - lock a tuple in shared or exclusive mode
1850  */
1851 HTSU_Result
1852 heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
1853                                  CommandId cid, LockTupleMode mode)
1854 {
1855         TransactionId   xid;
1856         ItemPointer tid = &(tuple->t_self);
1857         ItemId          lp;
1858         PageHeader      dp;
1859         HTSU_Result     result;
1860         uint16          new_infomask;
1861
1862         *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1863         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1864
1865         dp = (PageHeader) BufferGetPage(*buffer);
1866         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1867         tuple->t_datamcxt = NULL;
1868         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1869         tuple->t_len = ItemIdGetLength(lp);
1870
1871 l3:
1872         result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
1873
1874         if (result == HeapTupleInvisible)
1875         {
1876                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1877                 ReleaseBuffer(*buffer);
1878                 elog(ERROR, "attempted to lock invisible tuple");
1879         }
1880         else if (result == HeapTupleBeingUpdated)
1881         {
1882                 if (mode == LockTupleShared &&
1883                         (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
1884                         result = HeapTupleMayBeUpdated;
1885                 else
1886                 {
1887                         TransactionId xwait;
1888                         uint16  infomask;
1889
1890                         /*
1891                          * Sleep until concurrent transaction ends.
1892                          */
1893
1894                         /* must copy state data before unlocking buffer */
1895                         xwait = HeapTupleHeaderGetXmax(tuple->t_data);
1896                         infomask = tuple->t_data->t_infomask;
1897
1898                         if (infomask & HEAP_XMAX_IS_MULTI)
1899                         {
1900                                 /* wait for multixact */
1901                                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1902                                 MultiXactIdWait((MultiXactId) xwait);
1903                                 LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1904
1905                                 /*
1906                                  * If xwait had just locked the tuple then some other xact
1907                                  * could update this tuple before we get to this point.
1908                                  * Check for xmax change, and start over if so.
1909                                  */
1910                                 if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1911                                         !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
1912                                                                                  xwait))
1913                                         goto l3;
1914
1915                                 /*
1916                                  * You might think the multixact is necessarily done here, but
1917                                  * not so: it could have surviving members, namely our own xact
1918                                  * or other subxacts of this backend.  It is legal for us to
1919                                  * lock the tuple in either case, however.  We don't bother
1920                                  * changing the on-disk hint bits since we are about to
1921                                  * overwrite the xmax altogether.
1922                                  */
1923                         }
1924                         else
1925                         {
1926                                 /* wait for regular transaction to end */
1927                                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1928                                 XactLockTableWait(xwait);
1929                                 LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1930
1931                                 /*
1932                                  * xwait is done, but if xwait had just locked the tuple then
1933                                  * some other xact could update this tuple before we get to
1934                                  * this point.  Check for xmax change, and start over if so.
1935                                  */
1936                                 if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
1937                                         !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
1938                                                                                  xwait))
1939                                         goto l3;
1940
1941                                 /* Otherwise we can mark it committed or aborted */
1942                                 if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
1943                                                                                                    HEAP_XMAX_INVALID)))
1944                                 {
1945                                         if (TransactionIdDidCommit(xwait))
1946                                                 tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1947                                         else
1948                                                 tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1949                                         SetBufferCommitInfoNeedsSave(*buffer);
1950                                 }
1951                         }
1952
1953                         /*
1954                          * We may lock if previous xmax aborted, or if it committed
1955                          * but only locked the tuple without updating it.
1956                          */
1957                         if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
1958                                                                                          HEAP_IS_LOCKED))
1959                                 result = HeapTupleMayBeUpdated;
1960                         else
1961                                 result = HeapTupleUpdated;
1962                 }
1963         }
1964
1965         if (result != HeapTupleMayBeUpdated)
1966         {
1967                 Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1968                 tuple->t_self = tuple->t_data->t_ctid;
1969                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1970                 return result;
1971         }
1972
1973         /*
1974          * Compute the new xmax and infomask to store into the tuple.  Note we
1975          * do not modify the tuple just yet, because that would leave it in the
1976          * wrong state if multixact.c elogs.
1977          */
1978         xid = GetCurrentTransactionId();
1979
1980         new_infomask = tuple->t_data->t_infomask;
1981
1982         new_infomask &= ~(HEAP_XMAX_COMMITTED |
1983                                           HEAP_XMAX_INVALID |
1984                                           HEAP_XMAX_IS_MULTI |
1985                                           HEAP_IS_LOCKED |
1986                                           HEAP_MOVED);
1987
1988         if (mode == LockTupleShared)
1989         {
1990                 TransactionId   xmax = HeapTupleHeaderGetXmax(tuple->t_data);
1991                 uint16          old_infomask = tuple->t_data->t_infomask;
1992
1993                 /*
1994                  * If this is the first acquisition of a shared lock in the current
1995                  * transaction, set my per-backend OldestMemberMXactId setting.
1996                  * We can be certain that the transaction will never become a
1997                  * member of any older MultiXactIds than that.  (We have to do this
1998                  * even if we end up just using our own TransactionId below, since
1999                  * some other backend could incorporate our XID into a MultiXact
2000                  * immediately afterwards.)
2001                  */
2002                 MultiXactIdSetOldestMember();
2003
2004                 new_infomask |= HEAP_XMAX_SHARED_LOCK;
2005
2006                 /*
2007                  * Check to see if we need a MultiXactId because there are multiple
2008                  * lockers.
2009                  *
2010                  * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID
2011                  * bit if the xmax was a MultiXactId but it was not running anymore.
2012                  * There is a race condition, which is that the MultiXactId may have
2013                  * finished since then, but that uncommon case is handled within
2014                  * MultiXactIdExpand.
2015                  *
2016                  * There is a similar race condition possible when the old xmax was
2017                  * a regular TransactionId.  We test TransactionIdIsInProgress again
2018                  * just to narrow the window, but it's still possible to end up
2019                  * creating an unnecessary MultiXactId.  Fortunately this is harmless.
2020                  */
2021                 if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED)))
2022                 {
2023                         if (old_infomask & HEAP_XMAX_IS_MULTI)
2024                         {
2025                                 /*
2026                                  * If the XMAX is already a MultiXactId, then we need to
2027                                  * expand it to include our own TransactionId.
2028                                  */
2029                                 xid = MultiXactIdExpand(xmax, true, xid);
2030                                 new_infomask |= HEAP_XMAX_IS_MULTI;
2031                         }
2032                         else if (TransactionIdIsInProgress(xmax))
2033                         {
2034                                 if (TransactionIdEquals(xmax, xid))
2035                                 {
2036                                         /*
2037                                          * If the old locker is ourselves, we'll just mark the
2038                                          * tuple again with our own TransactionId.  However we
2039                                          * have to consider the possibility that we had
2040                                          * exclusive rather than shared lock before --- if so,
2041                                          * be careful to preserve the exclusivity of the lock.
2042                                          */
2043                                         if (!(old_infomask & HEAP_XMAX_SHARED_LOCK))
2044                                         {
2045                                                 new_infomask &= ~HEAP_XMAX_SHARED_LOCK;
2046                                                 new_infomask |= HEAP_XMAX_EXCL_LOCK;
2047                                                 mode = LockTupleExclusive;
2048                                         }
2049                                 }
2050                                 else
2051                                 {
2052                                         /*
2053                                          * If the Xmax is a valid TransactionId, then we need to
2054                                          * create a new MultiXactId that includes both the old
2055                                          * locker and our own TransactionId.
2056                                          */
2057                                         xid = MultiXactIdExpand(xmax, false, xid);
2058                                         new_infomask |= HEAP_XMAX_IS_MULTI;
2059                                 }
2060                         }
2061                         else
2062                         {
2063                                 /*
2064                                  * Can get here iff HeapTupleSatisfiesUpdate saw the old
2065                                  * xmax as running, but it finished before
2066                                  * TransactionIdIsInProgress() got to run.  Treat it like
2067                                  * there's no locker in the tuple.
2068                                  */
2069                         }
2070                 }
2071                 else
2072                 {
2073                         /*
2074                          * There was no previous locker, so just insert our own
2075                          * TransactionId.
2076                          */
2077                 }
2078         }
2079         else
2080         {
2081                 /* We want an exclusive lock on the tuple */
2082                 new_infomask |= HEAP_XMAX_EXCL_LOCK;
2083         }
2084
2085         START_CRIT_SECTION();
2086
2087         /*
2088          * Store transaction information of xact locking the tuple.
2089          *
2090          * Note: our CID is meaningless if storing a MultiXactId, but no harm
2091          * in storing it anyway.
2092          */
2093         tuple->t_data->t_infomask = new_infomask;
2094         HeapTupleHeaderSetXmax(tuple->t_data, xid);
2095         HeapTupleHeaderSetCmax(tuple->t_data, cid);
2096         /* Make sure there is no forward chain link in t_ctid */
2097         tuple->t_data->t_ctid = *tid;
2098
2099         /*
2100          * XLOG stuff.  You might think that we don't need an XLOG record because
2101          * there is no state change worth restoring after a crash.  You would be
2102          * wrong however: we have just written either a TransactionId or a
2103          * MultiXactId that may never have been seen on disk before, and we need
2104          * to make sure that there are XLOG entries covering those ID numbers.
2105          * Else the same IDs might be re-used after a crash, which would be
2106          * disastrous if this page made it to disk before the crash.  Essentially
2107          * we have to enforce the WAL log-before-data rule even in this case.
2108          */
2109         if (!relation->rd_istemp)
2110         {
2111                 xl_heap_lock xlrec;
2112                 XLogRecPtr      recptr;
2113                 XLogRecData rdata[2];
2114
2115                 xlrec.target.node = relation->rd_node;
2116                 xlrec.target.tid = tuple->t_self;
2117                 xlrec.shared_lock = (mode == LockTupleShared);
2118                 rdata[0].buffer = InvalidBuffer;
2119                 rdata[0].data = (char *) &xlrec;
2120                 rdata[0].len = SizeOfHeapLock;
2121                 rdata[0].next = &(rdata[1]);
2122
2123                 rdata[1].buffer = *buffer;
2124                 rdata[1].data = NULL;
2125                 rdata[1].len = 0;
2126                 rdata[1].next = NULL;
2127
2128                 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
2129
2130                 PageSetLSN(dp, recptr);
2131                 PageSetTLI(dp, ThisTimeLineID);
2132         }
2133         else
2134         {
2135                 /* No XLOG record, but still need to flag that XID exists on disk */
2136                 MyXactMadeTempRelUpdate = true;
2137         }
2138
2139         END_CRIT_SECTION();
2140
2141         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
2142
2143         WriteNoReleaseBuffer(*buffer);
2144
2145         return HeapTupleMayBeUpdated;
2146 }
2147
2148 /* ----------------
2149  *              heap_markpos    - mark scan position
2150  * ----------------
2151  */
2152 void
2153 heap_markpos(HeapScanDesc scan)
2154 {
2155         /* Note: no locking manipulations needed */
2156
2157         if (scan->rs_ctup.t_data != NULL)
2158                 scan->rs_mctid = scan->rs_ctup.t_self;
2159         else
2160                 ItemPointerSetInvalid(&scan->rs_mctid);
2161 }
2162
2163 /* ----------------
2164  *              heap_restrpos   - restore position to marked location
2165  * ----------------
2166  */
2167 void
2168 heap_restrpos(HeapScanDesc scan)
2169 {
2170         /* XXX no amrestrpos checking that ammarkpos called */
2171
2172         /* Note: no locking manipulations needed */
2173
2174         /*
2175          * unpin scan buffers
2176          */
2177         if (BufferIsValid(scan->rs_cbuf))
2178                 ReleaseBuffer(scan->rs_cbuf);
2179         scan->rs_cbuf = InvalidBuffer;
2180
2181         if (!ItemPointerIsValid(&scan->rs_mctid))
2182         {
2183                 scan->rs_ctup.t_datamcxt = NULL;
2184                 scan->rs_ctup.t_data = NULL;
2185         }
2186         else
2187         {
2188                 scan->rs_ctup.t_self = scan->rs_mctid;
2189                 scan->rs_ctup.t_datamcxt = NULL;
2190                 scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;   /* for heapgettup */
2191                 heapgettup(scan->rs_rd,
2192                                    0,
2193                                    &(scan->rs_ctup),
2194                                    &(scan->rs_cbuf),
2195                                    scan->rs_snapshot,
2196                                    0,
2197                                    NULL,
2198                                    scan->rs_nblocks);
2199         }
2200 }
2201
2202 XLogRecPtr
2203 log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
2204 {
2205         xl_heap_clean xlrec;
2206         XLogRecPtr      recptr;
2207         XLogRecData rdata[2];
2208
2209         /* Caller should not call me on a temp relation */
2210         Assert(!reln->rd_istemp);
2211
2212         xlrec.node = reln->rd_node;
2213         xlrec.block = BufferGetBlockNumber(buffer);
2214
2215         rdata[0].buffer = InvalidBuffer;
2216         rdata[0].data = (char *) &xlrec;
2217         rdata[0].len = SizeOfHeapClean;
2218         rdata[0].next = &(rdata[1]);
2219
2220         /*
2221          * The unused-offsets array is not actually in the buffer, but pretend
2222          * that it is.  When XLogInsert stores the whole buffer, the offsets
2223          * array need not be stored too.
2224          */
2225         rdata[1].buffer = buffer;
2226         if (uncnt > 0)
2227         {
2228                 rdata[1].data = (char *) unused;
2229                 rdata[1].len = uncnt * sizeof(OffsetNumber);
2230         }
2231         else
2232         {
2233                 rdata[1].data = NULL;
2234                 rdata[1].len = 0;
2235         }
2236         rdata[1].next = NULL;
2237
2238         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);
2239
2240         return (recptr);
2241 }
2242
2243 static XLogRecPtr
2244 log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
2245                                 Buffer newbuf, HeapTuple newtup, bool move)
2246 {
2247         /*
2248          * Note: xlhdr is declared to have adequate size and correct alignment
2249          * for an xl_heap_header.  However the two tids, if present at all,
2250          * will be packed in with no wasted space after the xl_heap_header;
2251          * they aren't necessarily aligned as implied by this struct
2252          * declaration.
2253          */
2254         struct
2255         {
2256                 xl_heap_header hdr;
2257                 TransactionId tid1;
2258                 TransactionId tid2;
2259         }                       xlhdr;
2260         int                     hsize = SizeOfHeapHeader;
2261         xl_heap_update xlrec;
2262         XLogRecPtr      recptr;
2263         XLogRecData rdata[4];
2264         Page            page = BufferGetPage(newbuf);
2265         uint8           info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
2266
2267         /* Caller should not call me on a temp relation */
2268         Assert(!reln->rd_istemp);
2269
2270         xlrec.target.node = reln->rd_node;
2271         xlrec.target.tid = from;
2272         xlrec.newtid = newtup->t_self;
2273         rdata[0].buffer = InvalidBuffer;
2274         rdata[0].data = (char *) &xlrec;
2275         rdata[0].len = SizeOfHeapUpdate;
2276         rdata[0].next = &(rdata[1]);
2277
2278         rdata[1].buffer = oldbuf;
2279         rdata[1].data = NULL;
2280         rdata[1].len = 0;
2281         rdata[1].next = &(rdata[2]);
2282
2283         xlhdr.hdr.t_natts = newtup->t_data->t_natts;
2284         xlhdr.hdr.t_infomask = newtup->t_data->t_infomask;
2285         xlhdr.hdr.t_hoff = newtup->t_data->t_hoff;
2286         if (move)                                       /* remember xmax & xmin */
2287         {
2288                 TransactionId xid[2];   /* xmax, xmin */
2289
2290                 if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED))
2291                         xid[0] = InvalidTransactionId;
2292                 else
2293                         xid[0] = HeapTupleHeaderGetXmax(newtup->t_data);
2294                 xid[1] = HeapTupleHeaderGetXmin(newtup->t_data);
2295                 memcpy((char *) &xlhdr + hsize,
2296                            (char *) xid,
2297                            2 * sizeof(TransactionId));
2298                 hsize += 2 * sizeof(TransactionId);
2299         }
2300
2301         /*
2302          * As with insert records, we need not store the rdata[2] segment if
2303          * we decide to store the whole buffer instead.
2304          */
2305         rdata[2].buffer = newbuf;
2306         rdata[2].data = (char *) &xlhdr;
2307         rdata[2].len = hsize;
2308         rdata[2].next = &(rdata[3]);
2309
2310         rdata[3].buffer = newbuf;
2311         /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
2312         rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
2313         rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
2314         rdata[3].next = NULL;
2315
2316         /* If new tuple is the single and first tuple on page... */
2317         if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
2318                 PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
2319         {
2320                 info |= XLOG_HEAP_INIT_PAGE;
2321                 rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
2322         }
2323
2324         recptr = XLogInsert(RM_HEAP_ID, info, rdata);
2325
2326         return (recptr);
2327 }
2328
2329 XLogRecPtr
2330 log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
2331                           Buffer newbuf, HeapTuple newtup)
2332 {
2333         return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true));
2334 }
2335
2336 static void
2337 heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record)
2338 {
2339         xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
2340         Relation        reln;
2341         Buffer          buffer;
2342         Page            page;
2343
2344         if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2345                 return;
2346
2347         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
2348         if (!RelationIsValid(reln))
2349                 return;
2350
2351         buffer = XLogReadBuffer(false, reln, xlrec->block);
2352         if (!BufferIsValid(buffer))
2353                 elog(PANIC, "heap_clean_redo: no block");
2354
2355         page = (Page) BufferGetPage(buffer);
2356         if (PageIsNew((PageHeader) page))
2357                 elog(PANIC, "heap_clean_redo: uninitialized page");
2358
2359         if (XLByteLE(lsn, PageGetLSN(page)))
2360         {
2361                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2362                 ReleaseBuffer(buffer);
2363                 return;
2364         }
2365
2366         if (record->xl_len > SizeOfHeapClean)
2367         {
2368                 OffsetNumber *unused;
2369                 OffsetNumber *unend;
2370                 ItemId          lp;
2371
2372                 unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
2373                 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
2374
2375                 while (unused < unend)
2376                 {
2377                         lp = PageGetItemId(page, *unused + 1);
2378                         lp->lp_flags &= ~LP_USED;
2379                         unused++;
2380                 }
2381         }
2382
2383         PageRepairFragmentation(page, NULL);
2384
2385         PageSetLSN(page, lsn);
2386         PageSetTLI(page, ThisTimeLineID);
2387         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2388         WriteBuffer(buffer);
2389 }
2390
2391 static void
2392 heap_xlog_newpage(bool redo, XLogRecPtr lsn, XLogRecord *record)
2393 {
2394         xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
2395         Relation        reln;
2396         Buffer          buffer;
2397         Page            page;
2398
2399         /*
2400          * Note: the NEWPAGE log record is used for both heaps and indexes, so
2401          * do not do anything that assumes we are touching a heap.
2402          */
2403
2404         if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2405                 return;
2406
2407         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
2408         if (!RelationIsValid(reln))
2409                 return;
2410         buffer = XLogReadBuffer(true, reln, xlrec->blkno);
2411         if (!BufferIsValid(buffer))
2412                 elog(PANIC, "heap_newpage_redo: no block");
2413         page = (Page) BufferGetPage(buffer);
2414
2415         Assert(record->xl_len == SizeOfHeapNewpage + BLCKSZ);
2416         memcpy(page, (char *) xlrec + SizeOfHeapNewpage, BLCKSZ);
2417
2418         PageSetLSN(page, lsn);
2419         PageSetTLI(page, ThisTimeLineID);
2420         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2421         WriteBuffer(buffer);
2422 }
2423
2424 static void
2425 heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
2426 {
2427         xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
2428         Relation        reln;
2429         Buffer          buffer;
2430         Page            page;
2431         OffsetNumber offnum;
2432         ItemId          lp = NULL;
2433         HeapTupleHeader htup;
2434
2435         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2436                 return;
2437
2438         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2439
2440         if (!RelationIsValid(reln))
2441                 return;
2442
2443         buffer = XLogReadBuffer(false, reln,
2444                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2445         if (!BufferIsValid(buffer))
2446                 elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un");
2447
2448         page = (Page) BufferGetPage(buffer);
2449         if (PageIsNew((PageHeader) page))
2450                 elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un");
2451
2452         if (redo)
2453         {
2454                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2455                 {
2456                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2457                         ReleaseBuffer(buffer);
2458                         return;
2459                 }
2460         }
2461         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2462                                                                                                  * ?! */
2463                 elog(PANIC, "heap_delete_undo: bad page LSN");
2464
2465         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2466         if (PageGetMaxOffsetNumber(page) >= offnum)
2467                 lp = PageGetItemId(page, offnum);
2468
2469         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2470                 elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un");
2471
2472         htup = (HeapTupleHeader) PageGetItem(page, lp);
2473
2474         if (redo)
2475         {
2476                 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2477                                                           HEAP_XMAX_INVALID |
2478                                                           HEAP_XMAX_IS_MULTI |
2479                                                           HEAP_IS_LOCKED |
2480                                                           HEAP_MOVED);
2481                 HeapTupleHeaderSetXmax(htup, record->xl_xid);
2482                 HeapTupleHeaderSetCmax(htup, FirstCommandId);
2483                 /* Make sure there is no forward chain link in t_ctid */
2484                 htup->t_ctid = xlrec->target.tid;
2485                 PageSetLSN(page, lsn);
2486                 PageSetTLI(page, ThisTimeLineID);
2487                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2488                 WriteBuffer(buffer);
2489                 return;
2490         }
2491
2492         elog(PANIC, "heap_delete_undo: unimplemented");
2493 }
2494
2495 static void
2496 heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
2497 {
2498         xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
2499         Relation        reln;
2500         Buffer          buffer;
2501         Page            page;
2502         OffsetNumber offnum;
2503
2504         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2505                 return;
2506
2507         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2508
2509         if (!RelationIsValid(reln))
2510                 return;
2511
2512         buffer = XLogReadBuffer((redo) ? true : false, reln,
2513                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2514         if (!BufferIsValid(buffer))
2515                 return;
2516
2517         page = (Page) BufferGetPage(buffer);
2518         if (PageIsNew((PageHeader) page) &&
2519                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2520                 elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2521
2522         if (redo)
2523         {
2524                 struct
2525                 {
2526                         HeapTupleHeaderData hdr;
2527                         char            data[MaxTupleSize];
2528                 }                       tbuf;
2529                 HeapTupleHeader htup;
2530                 xl_heap_header xlhdr;
2531                 uint32          newlen;
2532
2533                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2534                         PageInit(page, BufferGetPageSize(buffer), 0);
2535
2536                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2537                 {
2538                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2539                         ReleaseBuffer(buffer);
2540                         return;
2541                 }
2542
2543                 offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2544                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2545                         elog(PANIC, "heap_insert_redo: invalid max offset number");
2546
2547                 newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
2548                 Assert(newlen <= MaxTupleSize);
2549                 memcpy((char *) &xlhdr,
2550                            (char *) xlrec + SizeOfHeapInsert,
2551                            SizeOfHeapHeader);
2552                 htup = &tbuf.hdr;
2553                 MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
2554                 /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2555                 memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2556                            (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
2557                            newlen);
2558                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2559                 htup->t_natts = xlhdr.t_natts;
2560                 htup->t_infomask = xlhdr.t_infomask;
2561                 htup->t_hoff = xlhdr.t_hoff;
2562                 HeapTupleHeaderSetXmin(htup, record->xl_xid);
2563                 HeapTupleHeaderSetCmin(htup, FirstCommandId);
2564                 htup->t_ctid = xlrec->target.tid;
2565
2566                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2567                                                          LP_USED | OverwritePageMode);
2568                 if (offnum == InvalidOffsetNumber)
2569                         elog(PANIC, "heap_insert_redo: failed to add tuple");
2570                 PageSetLSN(page, lsn);
2571                 PageSetTLI(page, ThisTimeLineID);
2572                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2573                 WriteBuffer(buffer);
2574                 return;
2575         }
2576
2577         /* undo insert */
2578         if (XLByteLT(PageGetLSN(page), lsn))            /* changes are not applied
2579                                                                                                  * ?! */
2580                 elog(PANIC, "heap_insert_undo: bad page LSN");
2581
2582         elog(PANIC, "heap_insert_undo: unimplemented");
2583 }
2584
2585 /*
2586  * Handles UPDATE & MOVE
2587  */
2588 static void
2589 heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
2590 {
2591         xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
2592         Relation        reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2593         Buffer          buffer;
2594         bool            samepage =
2595         (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
2596          ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2597         Page            page;
2598         OffsetNumber offnum;
2599         ItemId          lp = NULL;
2600         HeapTupleHeader htup;
2601
2602         if (!RelationIsValid(reln))
2603                 return;
2604
2605         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2606                 goto newt;
2607
2608         /* Deal with old tuple version */
2609
2610         buffer = XLogReadBuffer(false, reln,
2611                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2612         if (!BufferIsValid(buffer))
2613                 elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un");
2614
2615         page = (Page) BufferGetPage(buffer);
2616         if (PageIsNew((PageHeader) page))
2617                 elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un");
2618
2619         if (redo)
2620         {
2621                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2622                 {
2623                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2624                         ReleaseBuffer(buffer);
2625                         if (samepage)
2626                                 return;
2627                         goto newt;
2628                 }
2629         }
2630         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2631                                                                                                  * ?! */
2632                 elog(PANIC, "heap_update_undo: bad old tuple page LSN");
2633
2634         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2635         if (PageGetMaxOffsetNumber(page) >= offnum)
2636                 lp = PageGetItemId(page, offnum);
2637
2638         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2639                 elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un");
2640
2641         htup = (HeapTupleHeader) PageGetItem(page, lp);
2642
2643         if (redo)
2644         {
2645                 if (move)
2646                 {
2647                         htup->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2648                                                                   HEAP_XMIN_INVALID |
2649                                                                   HEAP_MOVED_IN);
2650                         htup->t_infomask |= HEAP_MOVED_OFF;
2651                         HeapTupleHeaderSetXvac(htup, record->xl_xid);
2652                         /* Make sure there is no forward chain link in t_ctid */
2653                         htup->t_ctid = xlrec->target.tid;
2654                 }
2655                 else
2656                 {
2657                         htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2658                                                                   HEAP_XMAX_INVALID |
2659                                                                   HEAP_XMAX_IS_MULTI |
2660                                                                   HEAP_IS_LOCKED |
2661                                                                   HEAP_MOVED);
2662                         HeapTupleHeaderSetXmax(htup, record->xl_xid);
2663                         HeapTupleHeaderSetCmax(htup, FirstCommandId);
2664                         /* Set forward chain link in t_ctid */
2665                         htup->t_ctid = xlrec->newtid;
2666                 }
2667                 if (samepage)
2668                         goto newsame;
2669                 PageSetLSN(page, lsn);
2670                 PageSetTLI(page, ThisTimeLineID);
2671                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2672                 WriteBuffer(buffer);
2673                 goto newt;
2674         }
2675
2676         elog(PANIC, "heap_update_undo: unimplemented");
2677
2678         /* Deal with new tuple */
2679
2680 newt:;
2681
2682         if (redo &&
2683                 ((record->xl_info & XLR_BKP_BLOCK_2) ||
2684                  ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)))
2685                 return;
2686
2687         buffer = XLogReadBuffer((redo) ? true : false, reln,
2688                                                         ItemPointerGetBlockNumber(&(xlrec->newtid)));
2689         if (!BufferIsValid(buffer))
2690                 return;
2691
2692         page = (Page) BufferGetPage(buffer);
2693
2694 newsame:;
2695         if (PageIsNew((PageHeader) page) &&
2696                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2697                 elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un");
2698
2699         if (redo)
2700         {
2701                 struct
2702                 {
2703                         HeapTupleHeaderData hdr;
2704                         char            data[MaxTupleSize];
2705                 }                       tbuf;
2706                 xl_heap_header xlhdr;
2707                 int                     hsize;
2708                 uint32          newlen;
2709
2710                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2711                         PageInit(page, BufferGetPageSize(buffer), 0);
2712
2713                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2714                 {
2715                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2716                         ReleaseBuffer(buffer);
2717                         return;
2718                 }
2719
2720                 offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
2721                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2722                         elog(PANIC, "heap_update_redo: invalid max offset number");
2723
2724                 hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
2725                 if (move)
2726                         hsize += (2 * sizeof(TransactionId));
2727
2728                 newlen = record->xl_len - hsize;
2729                 Assert(newlen <= MaxTupleSize);
2730                 memcpy((char *) &xlhdr,
2731                            (char *) xlrec + SizeOfHeapUpdate,
2732                            SizeOfHeapHeader);
2733                 htup = &tbuf.hdr;
2734                 MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
2735                 /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
2736                 memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
2737                            (char *) xlrec + hsize,
2738                            newlen);
2739                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2740                 htup->t_natts = xlhdr.t_natts;
2741                 htup->t_infomask = xlhdr.t_infomask;
2742                 htup->t_hoff = xlhdr.t_hoff;
2743
2744                 if (move)
2745                 {
2746                         TransactionId xid[2];           /* xmax, xmin */
2747
2748                         memcpy((char *) xid,
2749                                    (char *) xlrec + SizeOfHeapUpdate + SizeOfHeapHeader,
2750                                    2 * sizeof(TransactionId));
2751                         HeapTupleHeaderSetXmin(htup, xid[1]);
2752                         HeapTupleHeaderSetXmax(htup, xid[0]);
2753                         HeapTupleHeaderSetXvac(htup, record->xl_xid);
2754                 }
2755                 else
2756                 {
2757                         HeapTupleHeaderSetXmin(htup, record->xl_xid);
2758                         HeapTupleHeaderSetCmin(htup, FirstCommandId);
2759                 }
2760                 /* Make sure there is no forward chain link in t_ctid */
2761                 htup->t_ctid = xlrec->newtid;
2762
2763                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2764                                                          LP_USED | OverwritePageMode);
2765                 if (offnum == InvalidOffsetNumber)
2766                         elog(PANIC, "heap_update_redo: failed to add tuple");
2767                 PageSetLSN(page, lsn);
2768                 PageSetTLI(page, ThisTimeLineID);
2769                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2770                 WriteBuffer(buffer);
2771                 return;
2772         }
2773
2774         /* undo */
2775         if (XLByteLT(PageGetLSN(page), lsn))            /* changes not applied?! */
2776                 elog(PANIC, "heap_update_undo: bad new tuple page LSN");
2777
2778         elog(PANIC, "heap_update_undo: unimplemented");
2779
2780 }
2781
2782 static void
2783 heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record)
2784 {
2785         xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
2786         Relation        reln;
2787         Buffer          buffer;
2788         Page            page;
2789         OffsetNumber offnum;
2790         ItemId          lp = NULL;
2791         HeapTupleHeader htup;
2792
2793         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2794                 return;
2795
2796         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2797
2798         if (!RelationIsValid(reln))
2799                 return;
2800
2801         buffer = XLogReadBuffer(false, reln,
2802                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2803         if (!BufferIsValid(buffer))
2804                 elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un");
2805
2806         page = (Page) BufferGetPage(buffer);
2807         if (PageIsNew((PageHeader) page))
2808                 elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un");
2809
2810         if (redo)
2811         {
2812                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2813                 {
2814                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2815                         ReleaseBuffer(buffer);
2816                         return;
2817                 }
2818         }
2819         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2820                                                                                                  * ?! */
2821                 elog(PANIC, "heap_lock_undo: bad page LSN");
2822
2823         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2824         if (PageGetMaxOffsetNumber(page) >= offnum)
2825                 lp = PageGetItemId(page, offnum);
2826
2827         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2828                 elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un");
2829
2830         htup = (HeapTupleHeader) PageGetItem(page, lp);
2831
2832         if (redo)
2833         {
2834                 /*
2835                  * Presently, we don't bother to restore the locked state, but
2836                  * just set the XMAX_INVALID bit.
2837                  */
2838                 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2839                                                           HEAP_XMAX_INVALID |
2840                                                           HEAP_XMAX_IS_MULTI |
2841                                                           HEAP_IS_LOCKED |
2842                                                           HEAP_MOVED);
2843                 htup->t_infomask |= HEAP_XMAX_INVALID;
2844                 HeapTupleHeaderSetXmax(htup, record->xl_xid);
2845                 HeapTupleHeaderSetCmax(htup, FirstCommandId);
2846                 /* Make sure there is no forward chain link in t_ctid */
2847                 htup->t_ctid = xlrec->target.tid;
2848                 PageSetLSN(page, lsn);
2849                 PageSetTLI(page, ThisTimeLineID);
2850                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2851                 WriteBuffer(buffer);
2852                 return;
2853         }
2854
2855         elog(PANIC, "heap_lock_undo: unimplemented");
2856 }
2857
2858 void
2859 heap_redo(XLogRecPtr lsn, XLogRecord *record)
2860 {
2861         uint8           info = record->xl_info & ~XLR_INFO_MASK;
2862
2863         info &= XLOG_HEAP_OPMASK;
2864         if (info == XLOG_HEAP_INSERT)
2865                 heap_xlog_insert(true, lsn, record);
2866         else if (info == XLOG_HEAP_DELETE)
2867                 heap_xlog_delete(true, lsn, record);
2868         else if (info == XLOG_HEAP_UPDATE)
2869                 heap_xlog_update(true, lsn, record, false);
2870         else if (info == XLOG_HEAP_MOVE)
2871                 heap_xlog_update(true, lsn, record, true);
2872         else if (info == XLOG_HEAP_CLEAN)
2873                 heap_xlog_clean(true, lsn, record);
2874         else if (info == XLOG_HEAP_NEWPAGE)
2875                 heap_xlog_newpage(true, lsn, record);
2876         else if (info == XLOG_HEAP_LOCK)
2877                 heap_xlog_lock(true, lsn, record);
2878         else
2879                 elog(PANIC, "heap_redo: unknown op code %u", info);
2880 }
2881
2882 void
2883 heap_undo(XLogRecPtr lsn, XLogRecord *record)
2884 {
2885         uint8           info = record->xl_info & ~XLR_INFO_MASK;
2886
2887         info &= XLOG_HEAP_OPMASK;
2888         if (info == XLOG_HEAP_INSERT)
2889                 heap_xlog_insert(false, lsn, record);
2890         else if (info == XLOG_HEAP_DELETE)
2891                 heap_xlog_delete(false, lsn, record);
2892         else if (info == XLOG_HEAP_UPDATE)
2893                 heap_xlog_update(false, lsn, record, false);
2894         else if (info == XLOG_HEAP_MOVE)
2895                 heap_xlog_update(false, lsn, record, true);
2896         else if (info == XLOG_HEAP_CLEAN)
2897                 heap_xlog_clean(false, lsn, record);
2898         else if (info == XLOG_HEAP_NEWPAGE)
2899                 heap_xlog_newpage(false, lsn, record);
2900         else if (info == XLOG_HEAP_LOCK)
2901                 heap_xlog_lock(false, lsn, record);
2902         else
2903                 elog(PANIC, "heap_undo: unknown op code %u", info);
2904 }
2905
2906 static void
2907 out_target(char *buf, xl_heaptid *target)
2908 {
2909         sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
2910                  target->node.spcNode, target->node.dbNode, target->node.relNode,
2911                         ItemPointerGetBlockNumber(&(target->tid)),
2912                         ItemPointerGetOffsetNumber(&(target->tid)));
2913 }
2914
2915 void
2916 heap_desc(char *buf, uint8 xl_info, char *rec)
2917 {
2918         uint8           info = xl_info & ~XLR_INFO_MASK;
2919
2920         info &= XLOG_HEAP_OPMASK;
2921         if (info == XLOG_HEAP_INSERT)
2922         {
2923                 xl_heap_insert *xlrec = (xl_heap_insert *) rec;
2924
2925                 strcat(buf, "insert: ");
2926                 out_target(buf, &(xlrec->target));
2927         }
2928         else if (info == XLOG_HEAP_DELETE)
2929         {
2930                 xl_heap_delete *xlrec = (xl_heap_delete *) rec;
2931
2932                 strcat(buf, "delete: ");
2933                 out_target(buf, &(xlrec->target));
2934         }
2935         else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
2936         {
2937                 xl_heap_update *xlrec = (xl_heap_update *) rec;
2938
2939                 if (info == XLOG_HEAP_UPDATE)
2940                         strcat(buf, "update: ");
2941                 else
2942                         strcat(buf, "move: ");
2943                 out_target(buf, &(xlrec->target));
2944                 sprintf(buf + strlen(buf), "; new %u/%u",
2945                                 ItemPointerGetBlockNumber(&(xlrec->newtid)),
2946                                 ItemPointerGetOffsetNumber(&(xlrec->newtid)));
2947         }
2948         else if (info == XLOG_HEAP_CLEAN)
2949         {
2950                 xl_heap_clean *xlrec = (xl_heap_clean *) rec;
2951
2952                 sprintf(buf + strlen(buf), "clean: rel %u/%u/%u; blk %u",
2953                                 xlrec->node.spcNode, xlrec->node.dbNode,
2954                                 xlrec->node.relNode, xlrec->block);
2955         }
2956         else if (info == XLOG_HEAP_NEWPAGE)
2957         {
2958                 xl_heap_newpage *xlrec = (xl_heap_newpage *) rec;
2959
2960                 sprintf(buf + strlen(buf), "newpage: rel %u/%u/%u; blk %u",
2961                                 xlrec->node.spcNode, xlrec->node.dbNode,
2962                                 xlrec->node.relNode, xlrec->blkno);
2963         }
2964         else if (info == XLOG_HEAP_LOCK)
2965         {
2966                 xl_heap_lock *xlrec = (xl_heap_lock *) rec;
2967
2968                 if (xlrec->shared_lock)
2969                         strcat(buf, "shared_lock: ");
2970                 else
2971                         strcat(buf, "exclusive_lock: ");
2972                 out_target(buf, &(xlrec->target));
2973         }
2974         else
2975                 strcat(buf, "UNKNOWN");
2976 }