]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/heapam.c
Wups, managed to break ANALYZE with one aspect of that heap_fetch change.
[postgresql] / src / backend / access / heap / heapam.c
1 /*-------------------------------------------------------------------------
2  *
3  * heapam.c
4  *        heap access method code
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.137 2002/05/24 19:52:43 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              relation_open   - open any relation by relation OID
16  *              relation_openrv - open any relation specified by a RangeVar
17  *              relation_openr  - open a system relation by name
18  *              relation_close  - close any relation
19  *              heap_open               - open a heap relation by relation OID
20  *              heap_openrv             - open a heap relation specified by a RangeVar
21  *              heap_openr              - open a system heap relation by name
22  *              heap_close              - (now just a macro for relation_close)
23  *              heap_beginscan  - begin relation scan
24  *              heap_rescan             - restart a relation scan
25  *              heap_endscan    - end relation scan
26  *              heap_getnext    - retrieve next tuple in scan
27  *              heap_fetch              - retrieve tuple with tid
28  *              heap_insert             - insert tuple into a relation
29  *              heap_delete             - delete a tuple from a relation
30  *              heap_update             - replace a tuple in a relation with another tuple
31  *              heap_markpos    - mark scan position
32  *              heap_restrpos   - restore position to marked location
33  *
34  * NOTES
35  *        This file contains the heap_ routines which implement
36  *        the POSTGRES heap access method used for all POSTGRES
37  *        relations.
38  *
39  *-------------------------------------------------------------------------
40  */
41 #include "postgres.h"
42
43 #include "access/heapam.h"
44 #include "access/hio.h"
45 #include "access/tuptoaster.h"
46 #include "access/valid.h"
47 #include "access/xlogutils.h"
48 #include "catalog/catalog.h"
49 #include "catalog/namespace.h"
50 #include "miscadmin.h"
51 #include "utils/inval.h"
52 #include "utils/relcache.h"
53 #include "pgstat.h"
54
55
56 /* comments are in heap_update */
57 static xl_heaptid _locked_tuple_;
58 static void _heap_unlock_tuple(void *data);
59 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
60            ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
61
62
63 /* ----------------------------------------------------------------
64  *                                               heap support routines
65  * ----------------------------------------------------------------
66  */
67
68 /* ----------------
69  *              initscan - scan code common to heap_beginscan and heap_rescan
70  * ----------------
71  */
72 static void
73 initscan(HeapScanDesc scan, ScanKey key)
74 {
75         /*
76          * Make sure we have up-to-date idea of number of blocks in relation.
77          * It is sufficient to do this once at scan start, since any tuples
78          * added while the scan is in progress will be invisible to my
79          * transaction anyway...
80          */
81         scan->rs_rd->rd_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
82
83         scan->rs_ctup.t_datamcxt = NULL;
84         scan->rs_ctup.t_data = NULL;
85         scan->rs_cbuf = InvalidBuffer;
86
87         /* we don't have a marked position... */
88         ItemPointerSetInvalid(&(scan->rs_mctid));
89
90         /*
91          * copy the scan key, if appropriate
92          */
93         if (key != NULL)
94                 memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
95 }
96
97 /* ----------------
98  *              heapgettup - fetch next heap tuple
99  *
100  *              routine used by heap_getnext() which does most of the
101  *              real work in scanning tuples.
102  *
103  *              The passed-in *buffer must be either InvalidBuffer or the pinned
104  *              current page of the scan.  If we have to move to another page,
105  *              we will unpin this buffer (if valid).  On return, *buffer is either
106  *              InvalidBuffer or the ID of a pinned buffer.
107  * ----------------
108  */
109 static void
110 heapgettup(Relation relation,
111                    int dir,
112                    HeapTuple tuple,
113                    Buffer *buffer,
114                    Snapshot snapshot,
115                    int nkeys,
116                    ScanKey key)
117 {
118         ItemId          lpp;
119         Page            dp;
120         BlockNumber page;
121         BlockNumber pages;
122         int                     lines;
123         OffsetNumber lineoff;
124         int                     linesleft;
125         ItemPointer tid;
126
127         /*
128          * increment access statistics
129          */
130         IncrHeapAccessStat(local_heapgettup);
131         IncrHeapAccessStat(global_heapgettup);
132
133         tid = (tuple->t_data == NULL) ? (ItemPointer) NULL : &(tuple->t_self);
134
135         /*
136          * debugging stuff
137          *
138          * check validity of arguments, here and for other functions too Note: no
139          * locking manipulations needed--this is a local function
140          */
141 #ifdef  HEAPDEBUGALL
142         if (ItemPointerIsValid(tid))
143         {
144                 elog(LOG, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
145                          RelationGetRelationName(relation), tid, tid->ip_blkid,
146                          tid->ip_posid, dir);
147         }
148         else
149         {
150                 elog(LOG, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
151                          RelationGetRelationName(relation), tid, dir);
152         }
153         elog(LOG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
154
155         elog(LOG, "heapgettup: relation(%c)=`%s', %p",
156                  relation->rd_rel->relkind, RelationGetRelationName(relation),
157                  snapshot);
158 #endif   /* !defined(HEAPLOGALL) */
159
160         if (!ItemPointerIsValid(tid))
161         {
162                 Assert(!PointerIsValid(tid));
163                 tid = NULL;
164         }
165
166         tuple->t_tableOid = relation->rd_id;
167
168         /*
169          * return null immediately if relation is empty
170          */
171         if ((pages = relation->rd_nblocks) == 0)
172         {
173                 if (BufferIsValid(*buffer))
174                         ReleaseBuffer(*buffer);
175                 *buffer = InvalidBuffer;
176                 tuple->t_datamcxt = NULL;
177                 tuple->t_data = NULL;
178                 return;
179         }
180
181         /*
182          * calculate next starting lineoff, given scan direction
183          */
184         if (dir == 0)
185         {
186                 /*
187                  * ``no movement'' scan direction: refetch same tuple
188                  */
189                 if (tid == NULL)
190                 {
191                         if (BufferIsValid(*buffer))
192                                 ReleaseBuffer(*buffer);
193                         *buffer = InvalidBuffer;
194                         tuple->t_datamcxt = NULL;
195                         tuple->t_data = NULL;
196                         return;
197                 }
198
199                 *buffer = ReleaseAndReadBuffer(*buffer,
200                                                                            relation,
201                                                                            ItemPointerGetBlockNumber(tid));
202                 if (!BufferIsValid(*buffer))
203                         elog(ERROR, "heapgettup: failed ReadBuffer");
204
205                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
206
207                 dp = (Page) BufferGetPage(*buffer);
208                 lineoff = ItemPointerGetOffsetNumber(tid);
209                 lpp = PageGetItemId(dp, lineoff);
210
211                 tuple->t_datamcxt = NULL;
212                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
213                 tuple->t_len = ItemIdGetLength(lpp);
214                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
215
216                 return;
217         }
218         else if (dir < 0)
219         {
220                 /*
221                  * reverse scan direction
222                  */
223                 if (tid == NULL)
224                 {
225                         page = pages - 1;       /* final page */
226                 }
227                 else
228                 {
229                         page = ItemPointerGetBlockNumber(tid);          /* current page */
230                 }
231
232                 Assert(page < pages);
233
234                 *buffer = ReleaseAndReadBuffer(*buffer,
235                                                                            relation,
236                                                                            page);
237                 if (!BufferIsValid(*buffer))
238                         elog(ERROR, "heapgettup: failed ReadBuffer");
239
240                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
241
242                 dp = (Page) BufferGetPage(*buffer);
243                 lines = PageGetMaxOffsetNumber(dp);
244                 if (tid == NULL)
245                 {
246                         lineoff = lines;        /* final offnum */
247                 }
248                 else
249                 {
250                         lineoff =                       /* previous offnum */
251                                 OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
252                 }
253                 /* page and lineoff now reference the physically previous tid */
254         }
255         else
256         {
257                 /*
258                  * forward scan direction
259                  */
260                 if (tid == NULL)
261                 {
262                         page = 0;                       /* first page */
263                         lineoff = FirstOffsetNumber;            /* first offnum */
264                 }
265                 else
266                 {
267                         page = ItemPointerGetBlockNumber(tid);          /* current page */
268                         lineoff =                       /* next offnum */
269                                 OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
270                 }
271
272                 Assert(page < pages);
273
274                 *buffer = ReleaseAndReadBuffer(*buffer,
275                                                                            relation,
276                                                                            page);
277                 if (!BufferIsValid(*buffer))
278                         elog(ERROR, "heapgettup: failed ReadBuffer");
279
280                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
281
282                 dp = (Page) BufferGetPage(*buffer);
283                 lines = PageGetMaxOffsetNumber(dp);
284                 /* page and lineoff now reference the physically next tid */
285         }
286
287         /* 'dir' is now non-zero */
288
289         /*
290          * calculate line pointer and number of remaining items to check on
291          * this page.
292          */
293         lpp = PageGetItemId(dp, lineoff);
294         if (dir < 0)
295                 linesleft = lineoff - 1;
296         else
297                 linesleft = lines - lineoff;
298
299         /*
300          * advance the scan until we find a qualifying tuple or run out of
301          * stuff to scan
302          */
303         for (;;)
304         {
305                 while (linesleft >= 0)
306                 {
307                         if (ItemIdIsUsed(lpp))
308                         {
309                                 bool    valid;
310
311                                 tuple->t_datamcxt = NULL;
312                                 tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
313                                 tuple->t_len = ItemIdGetLength(lpp);
314                                 ItemPointerSet(&(tuple->t_self), page, lineoff);
315
316                                 /*
317                                  * if current tuple qualifies, return it.
318                                  */
319                                 HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
320                                                                    snapshot, nkeys, key, valid);
321                                 if (valid)
322                                 {
323                                         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
324                                         return;
325                                 }
326                         }
327
328                         /*
329                          * otherwise move to the next item on the page
330                          */
331                         --linesleft;
332                         if (dir < 0)
333                         {
334                                 --lpp;                  /* move back in this page's ItemId array */
335                                 --lineoff;
336                         }
337                         else
338                         {
339                                 ++lpp;                  /* move forward in this page's ItemId
340                                                                  * array */
341                                 ++lineoff;
342                         }
343                 }
344
345                 /*
346                  * if we get here, it means we've exhausted the items on this page
347                  * and it's time to move to the next.
348                  */
349                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
350
351                 /*
352                  * return NULL if we've exhausted all the pages
353                  */
354                 if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
355                 {
356                         if (BufferIsValid(*buffer))
357                                 ReleaseBuffer(*buffer);
358                         *buffer = InvalidBuffer;
359                         tuple->t_datamcxt = NULL;
360                         tuple->t_data = NULL;
361                         return;
362                 }
363
364                 page = (dir < 0) ? (page - 1) : (page + 1);
365
366                 Assert(page < pages);
367
368                 *buffer = ReleaseAndReadBuffer(*buffer,
369                                                                            relation,
370                                                                            page);
371                 if (!BufferIsValid(*buffer))
372                         elog(ERROR, "heapgettup: failed ReadBuffer");
373
374                 LockBuffer(*buffer, BUFFER_LOCK_SHARE);
375                 dp = (Page) BufferGetPage(*buffer);
376                 lines = PageGetMaxOffsetNumber((Page) dp);
377                 linesleft = lines - 1;
378                 if (dir < 0)
379                 {
380                         lineoff = lines;
381                         lpp = PageGetItemId(dp, lines);
382                 }
383                 else
384                 {
385                         lineoff = FirstOffsetNumber;
386                         lpp = PageGetItemId(dp, FirstOffsetNumber);
387                 }
388         }
389 }
390
391
392 #if defined(DISABLE_COMPLEX_MACRO)
393 /*
394  * This is formatted so oddly so that the correspondence to the macro
395  * definition in access/heapam.h is maintained.
396  */
397 Datum
398 fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
399                         bool *isnull)
400 {
401         return (
402                         (attnum) > 0 ?
403                         (
404                          ((isnull) ? (*(isnull) = false) : (dummyret) NULL),
405                          HeapTupleNoNulls(tup) ?
406                          (
407                           (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
408                           (
409                            fetchatt((tupleDesc)->attrs[(attnum) - 1],
410                                                 (char *) (tup)->t_data + (tup)->t_data->t_hoff +
411                                                 (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
412                            )
413                           :
414                           nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
415                           )
416                          :
417                          (
418                           att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
419                           (
420                            ((isnull) ? (*(isnull) = true) : (dummyret) NULL),
421                            (Datum) NULL
422                            )
423                           :
424                           (
425                            nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
426                            )
427                           )
428                          )
429                         :
430                         (
431                          (Datum) NULL
432                          )
433                 );
434 }
435 #endif   /* defined(DISABLE_COMPLEX_MACRO) */
436
437
438 /* ----------------------------------------------------------------
439  *                                       heap access method interface
440  * ----------------------------------------------------------------
441  */
442
443 /* ----------------
444  *              relation_open - open any relation by relation OID
445  *
446  *              If lockmode is not "NoLock", the specified kind of lock is
447  *              obtained on the relation.  (Generally, NoLock should only be
448  *              used if the caller knows it has some appropriate lock on the
449  *              relation already.)
450  *
451  *              An error is raised if the relation does not exist.
452  *
453  *              NB: a "relation" is anything with a pg_class entry.  The caller is
454  *              expected to check whether the relkind is something it can handle.
455  * ----------------
456  */
457 Relation
458 relation_open(Oid relationId, LOCKMODE lockmode)
459 {
460         Relation        r;
461
462         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
463
464         /*
465          * increment access statistics
466          */
467         IncrHeapAccessStat(local_open);
468         IncrHeapAccessStat(global_open);
469
470         /* The relcache does all the real work... */
471         r = RelationIdGetRelation(relationId);
472
473         if (!RelationIsValid(r))
474                 elog(ERROR, "Relation %u does not exist", relationId);
475
476         if (lockmode != NoLock)
477                 LockRelation(r, lockmode);
478
479         return r;
480 }
481
482 /* ----------------
483  *              relation_openrv - open any relation specified by a RangeVar
484  *
485  *              As above, but the relation is specified by a RangeVar.
486  * ----------------
487  */
488 Relation
489 relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
490 {
491         Oid                     relOid;
492
493         /*
494          * In bootstrap mode, don't do any namespace processing.
495          */
496         if (IsBootstrapProcessingMode())
497         {
498                 Assert(relation->schemaname == NULL);
499                 return relation_openr(relation->relname, lockmode);
500         }
501
502         /*
503          * Check for shared-cache-inval messages before trying to open the
504          * relation.  This is needed to cover the case where the name
505          * identifies a rel that has been dropped and recreated since the
506          * start of our transaction: if we don't flush the old syscache entry
507          * then we'll latch onto that entry and suffer an error when we do
508          * LockRelation. Note that relation_open does not need to do this,
509          * since a relation's OID never changes.
510          *
511          * We skip this if asked for NoLock, on the assumption that the caller
512          * has already ensured some appropriate lock is held.
513          */
514         if (lockmode != NoLock)
515                 AcceptInvalidationMessages();
516
517         /* Look up the appropriate relation using namespace search */
518         relOid = RangeVarGetRelid(relation, false);
519
520         /* Let relation_open do the rest */
521         return relation_open(relOid, lockmode);
522 }
523
524 /* ----------------
525  *              relation_openr - open a system relation specified by name.
526  *
527  *              As above, but the relation is specified by an unqualified name;
528  *              it is assumed to live in the system catalog namespace.
529  * ----------------
530  */
531 Relation
532 relation_openr(const char *sysRelationName, LOCKMODE lockmode)
533 {
534         Relation        r;
535
536         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
537
538         /*
539          * increment access statistics
540          */
541         IncrHeapAccessStat(local_openr);
542         IncrHeapAccessStat(global_openr);
543
544         /*
545          * We assume we should not need to worry about the rel's OID changing,
546          * hence no need for AcceptInvalidationMessages here.
547          */
548
549         /* The relcache does all the real work... */
550         r = RelationSysNameGetRelation(sysRelationName);
551
552         if (!RelationIsValid(r))
553                 elog(ERROR, "Relation \"%s\" does not exist", sysRelationName);
554
555         if (lockmode != NoLock)
556                 LockRelation(r, lockmode);
557
558         return r;
559 }
560
561 /* ----------------
562  *              relation_close - close any relation
563  *
564  *              If lockmode is not "NoLock", we first release the specified lock.
565  *
566  *              Note that it is often sensible to hold a lock beyond relation_close;
567  *              in that case, the lock is released automatically at xact end.
568  * ----------------
569  */
570 void
571 relation_close(Relation relation, LOCKMODE lockmode)
572 {
573         Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
574
575         /*
576          * increment access statistics
577          */
578         IncrHeapAccessStat(local_close);
579         IncrHeapAccessStat(global_close);
580
581         if (lockmode != NoLock)
582                 UnlockRelation(relation, lockmode);
583
584         /* The relcache does the real work... */
585         RelationClose(relation);
586 }
587
588
589 /* ----------------
590  *              heap_open - open a heap relation by relation OID
591  *
592  *              This is essentially relation_open plus check that the relation
593  *              is not an index or special relation.  (The caller should also check
594  *              that it's not a view before assuming it has storage.)
595  * ----------------
596  */
597 Relation
598 heap_open(Oid relationId, LOCKMODE lockmode)
599 {
600         Relation        r;
601
602         r = relation_open(relationId, lockmode);
603
604         if (r->rd_rel->relkind == RELKIND_INDEX)
605                 elog(ERROR, "%s is an index relation",
606                          RelationGetRelationName(r));
607         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
608                 elog(ERROR, "%s is a special relation",
609                          RelationGetRelationName(r));
610
611         pgstat_initstats(&r->pgstat_info, r);
612
613         return r;
614 }
615
616 /* ----------------
617  *              heap_openrv - open a heap relation specified
618  *              by a RangeVar node
619  *
620  *              As above, but relation is specified by a RangeVar.
621  * ----------------
622  */
623 Relation
624 heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
625 {
626         Relation        r;
627
628         r = relation_openrv(relation, lockmode);
629
630         if (r->rd_rel->relkind == RELKIND_INDEX)
631                 elog(ERROR, "%s is an index relation",
632                          RelationGetRelationName(r));
633         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
634                 elog(ERROR, "%s is a special relation",
635                          RelationGetRelationName(r));
636
637         pgstat_initstats(&r->pgstat_info, r);
638
639         return r;
640 }
641
642 /* ----------------
643  *              heap_openr - open a system heap relation specified by name.
644  *
645  *              As above, but the relation is specified by an unqualified name;
646  *              it is assumed to live in the system catalog namespace.
647  * ----------------
648  */
649 Relation
650 heap_openr(const char *sysRelationName, LOCKMODE lockmode)
651 {
652         Relation        r;
653
654         r = relation_openr(sysRelationName, lockmode);
655
656         if (r->rd_rel->relkind == RELKIND_INDEX)
657                 elog(ERROR, "%s is an index relation",
658                          RelationGetRelationName(r));
659         else if (r->rd_rel->relkind == RELKIND_SPECIAL)
660                 elog(ERROR, "%s is a special relation",
661                          RelationGetRelationName(r));
662
663         pgstat_initstats(&r->pgstat_info, r);
664
665         return r;
666 }
667
668
669 /* ----------------
670  *              heap_beginscan  - begin relation scan
671  * ----------------
672  */
673 HeapScanDesc
674 heap_beginscan(Relation relation, Snapshot snapshot,
675                            int nkeys, ScanKey key)
676 {
677         HeapScanDesc scan;
678
679         /*
680          * increment access statistics
681          */
682         IncrHeapAccessStat(local_beginscan);
683         IncrHeapAccessStat(global_beginscan);
684
685         /*
686          * sanity checks
687          */
688         if (!RelationIsValid(relation))
689                 elog(ERROR, "heap_beginscan: !RelationIsValid(relation)");
690
691         /*
692          * increment relation ref count while scanning relation
693          *
694          * This is just to make really sure the relcache entry won't go away
695          * while the scan has a pointer to it.  Caller should be holding the
696          * rel open anyway, so this is redundant in all normal scenarios...
697          */
698         RelationIncrementReferenceCount(relation);
699
700         /* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */
701         if (relation->rd_rel->relkind == RELKIND_UNCATALOGED)
702                 snapshot = SnapshotSelf;
703
704         /*
705          * allocate and initialize scan descriptor
706          */
707         scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
708
709         scan->rs_rd = relation;
710         scan->rs_snapshot = snapshot;
711         scan->rs_nkeys = nkeys;
712
713         /*
714          * we do this here instead of in initscan() because heap_rescan also
715          * calls initscan() and we don't want to allocate memory again
716          */
717         if (nkeys > 0)
718                 scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
719         else
720                 scan->rs_key = NULL;
721
722         pgstat_initstats(&scan->rs_pgstat_info, relation);
723
724         initscan(scan, key);
725
726         return scan;
727 }
728
729 /* ----------------
730  *              heap_rescan             - restart a relation scan
731  * ----------------
732  */
733 void
734 heap_rescan(HeapScanDesc scan,
735                         ScanKey key)
736 {
737         /*
738          * increment access statistics
739          */
740         IncrHeapAccessStat(local_rescan);
741         IncrHeapAccessStat(global_rescan);
742
743         /*
744          * unpin scan buffers
745          */
746         if (BufferIsValid(scan->rs_cbuf))
747                 ReleaseBuffer(scan->rs_cbuf);
748
749         /*
750          * reinitialize scan descriptor
751          */
752         initscan(scan, key);
753
754         pgstat_reset_heap_scan(&scan->rs_pgstat_info);
755 }
756
757 /* ----------------
758  *              heap_endscan    - end relation scan
759  *
760  *              See how to integrate with index scans.
761  *              Check handling if reldesc caching.
762  * ----------------
763  */
764 void
765 heap_endscan(HeapScanDesc scan)
766 {
767         /*
768          * increment access statistics
769          */
770         IncrHeapAccessStat(local_endscan);
771         IncrHeapAccessStat(global_endscan);
772
773         /* Note: no locking manipulations needed */
774
775         /*
776          * unpin scan buffers
777          */
778         if (BufferIsValid(scan->rs_cbuf))
779                 ReleaseBuffer(scan->rs_cbuf);
780
781         /*
782          * decrement relation reference count and free scan descriptor storage
783          */
784         RelationDecrementReferenceCount(scan->rs_rd);
785
786         if (scan->rs_key)
787                 pfree(scan->rs_key);
788
789         pfree(scan);
790 }
791
792 /* ----------------
793  *              heap_getnext    - retrieve next tuple in scan
794  *
795  *              Fix to work with index relations.
796  *              We don't return the buffer anymore, but you can get it from the
797  *              returned HeapTuple.
798  * ----------------
799  */
800
801 #ifdef HEAPDEBUGALL
802 #define HEAPDEBUG_1 \
803         elog(LOG, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
804                  RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
805
806 #define HEAPDEBUG_2 \
807          elog(LOG, "heap_getnext returning EOS")
808
809 #define HEAPDEBUG_3 \
810          elog(LOG, "heap_getnext returning tuple")
811 #else
812 #define HEAPDEBUG_1
813 #define HEAPDEBUG_2
814 #define HEAPDEBUG_3
815 #endif   /* !defined(HEAPDEBUGALL) */
816
817
818 HeapTuple
819 heap_getnext(HeapScanDesc scan, ScanDirection direction)
820 {
821         /*
822          * increment access statistics
823          */
824         IncrHeapAccessStat(local_getnext);
825         IncrHeapAccessStat(global_getnext);
826
827         /* Note: no locking manipulations needed */
828
829         /*
830          * argument checks
831          */
832         if (scan == NULL)
833                 elog(ERROR, "heap_getnext: NULL relscan");
834
835         HEAPDEBUG_1;                            /* heap_getnext( info ) */
836
837         /*
838          * Note: we depend here on the -1/0/1 encoding of ScanDirection.
839          */
840         heapgettup(scan->rs_rd,
841                            (int) direction,
842                            &(scan->rs_ctup),
843                            &(scan->rs_cbuf),
844                            scan->rs_snapshot,
845                            scan->rs_nkeys,
846                            scan->rs_key);
847
848         if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
849         {
850                 HEAPDEBUG_2;                    /* heap_getnext returning EOS */
851                 return NULL;
852         }
853
854         pgstat_count_heap_scan(&scan->rs_pgstat_info);
855
856         /*
857          * if we get here it means we have a new current scan tuple, so point
858          * to the proper return buffer and return the tuple.
859          */
860
861         HEAPDEBUG_3;                            /* heap_getnext returning tuple */
862
863         if (scan->rs_ctup.t_data != NULL)
864                 pgstat_count_heap_getnext(&scan->rs_pgstat_info);
865
866         return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
867 }
868
869 /*
870  *      heap_fetch              - retrieve tuple with given tid
871  *
872  * On entry, tuple->t_self is the TID to fetch.  We pin the buffer holding
873  * the tuple, fill in the remaining fields of *tuple, and check the tuple
874  * against the specified snapshot.
875  *
876  * If successful (tuple found and passes snapshot time qual), then *userbuf
877  * is set to the buffer holding the tuple and TRUE is returned.  The caller
878  * must unpin the buffer when done with the tuple.
879  *
880  * If the tuple is not found, then tuple->t_data is set to NULL, *userbuf
881  * is set to InvalidBuffer, and FALSE is returned.
882  *
883  * If the tuple is found but fails the time qual check, then FALSE will be
884  * returned. When the caller specifies keep_buf = true, we retain the pin
885  * on the buffer and return it in *userbuf (so the caller can still access
886  * the tuple); when keep_buf = false, the pin is released and *userbuf is set
887  * to InvalidBuffer.
888  *
889  * It is somewhat inconsistent that we elog() on invalid block number but
890  * return false on invalid item number.  This is historical.  The only
891  * justification I can see is that the caller can relatively easily check the
892  * block number for validity, but cannot check the item number without reading
893  * the page himself.
894  */
895 bool
896 heap_fetch(Relation relation,
897                    Snapshot snapshot,
898                    HeapTuple tuple,
899                    Buffer *userbuf,
900                    bool keep_buf,
901                    PgStat_Info *pgstat_info)
902 {
903         ItemPointer tid = &(tuple->t_self);
904         ItemId          lp;
905         Buffer          buffer;
906         PageHeader      dp;
907         OffsetNumber offnum;
908         bool            valid;
909
910         /*
911          * increment access statistics
912          */
913         IncrHeapAccessStat(local_fetch);
914         IncrHeapAccessStat(global_fetch);
915
916         /*
917          * get the buffer from the relation descriptor. Note that this does a
918          * buffer pin.
919          */
920         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
921
922         if (!BufferIsValid(buffer))
923                 elog(ERROR, "heap_fetch: ReadBuffer(%s, %lu) failed",
924                          RelationGetRelationName(relation),
925                          (unsigned long) ItemPointerGetBlockNumber(tid));
926
927         /*
928          * Need share lock on buffer to examine tuple commit status.
929          */
930         LockBuffer(buffer, BUFFER_LOCK_SHARE);
931
932         /*
933          * get the item line pointer corresponding to the requested tid
934          */
935         dp = (PageHeader) BufferGetPage(buffer);
936         offnum = ItemPointerGetOffsetNumber(tid);
937         lp = PageGetItemId(dp, offnum);
938
939         /*
940          * must check for deleted tuple (see for example analyze.c, which is
941          * careful to pass an offnum in range, but doesn't know if the offnum
942          * actually corresponds to an undeleted tuple).
943          */
944         if (!ItemIdIsUsed(lp))
945         {
946                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
947                 ReleaseBuffer(buffer);
948                 *userbuf = InvalidBuffer;
949                 tuple->t_datamcxt = NULL;
950                 tuple->t_data = NULL;
951                 return false;
952         }
953
954         /*
955          * fill in *tuple fields
956          */
957         tuple->t_datamcxt = NULL;
958         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
959         tuple->t_len = ItemIdGetLength(lp);
960         tuple->t_tableOid = relation->rd_id;
961
962         /*
963          * check time qualification of tuple, then release lock
964          */
965         HeapTupleSatisfies(tuple, relation, buffer, dp,
966                                            snapshot, 0, (ScanKey) NULL, valid);
967
968         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
969
970         if (valid)
971         {
972                 /*
973                  * All checks passed, so return the tuple as valid. Caller is now
974                  * responsible for releasing the buffer.
975                  */
976                 *userbuf = buffer;
977
978                 /*
979                  * Count the successful fetch in *pgstat_info if given,
980                  * otherwise in the relation's default statistics area.
981                  */
982                 if (pgstat_info != NULL)
983                         pgstat_count_heap_fetch(pgstat_info);
984                 else
985                         pgstat_count_heap_fetch(&relation->pgstat_info);
986
987                 return true;
988         }
989
990         /* Tuple failed time qual, but maybe caller wants to see it anyway. */
991         if (keep_buf)
992         {
993                 *userbuf = buffer;
994
995                 return false;
996         }
997
998         /* Okay to release pin on buffer. */
999         ReleaseBuffer(buffer);
1000
1001         *userbuf = InvalidBuffer;
1002
1003         return false;
1004 }
1005
1006 /*
1007  *      heap_get_latest_tid -  get the latest tid of a specified tuple
1008  */
1009 ItemPointer
1010 heap_get_latest_tid(Relation relation,
1011                                         Snapshot snapshot,
1012                                         ItemPointer tid)
1013 {
1014         ItemId          lp = NULL;
1015         Buffer          buffer;
1016         PageHeader      dp;
1017         OffsetNumber offnum;
1018         HeapTupleData tp;
1019         HeapTupleHeader t_data;
1020         ItemPointerData ctid;
1021         bool            invalidBlock,
1022                                 linkend,
1023                                 valid;
1024
1025         /*
1026          * get the buffer from the relation descriptor Note that this does a
1027          * buffer pin.
1028          */
1029
1030         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1031
1032         if (!BufferIsValid(buffer))
1033                 elog(ERROR, "heap_get_latest_tid: %s relation: ReadBuffer(%lx) failed",
1034                          RelationGetRelationName(relation), (long) tid);
1035
1036         LockBuffer(buffer, BUFFER_LOCK_SHARE);
1037
1038         /*
1039          * get the item line pointer corresponding to the requested tid
1040          */
1041         dp = (PageHeader) BufferGetPage(buffer);
1042         offnum = ItemPointerGetOffsetNumber(tid);
1043         invalidBlock = true;
1044         if (!PageIsNew(dp))
1045         {
1046                 lp = PageGetItemId(dp, offnum);
1047                 if (ItemIdIsUsed(lp))
1048                         invalidBlock = false;
1049         }
1050         if (invalidBlock)
1051         {
1052                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1053                 ReleaseBuffer(buffer);
1054                 return NULL;
1055         }
1056
1057         /*
1058          * more sanity checks
1059          */
1060
1061         tp.t_datamcxt = NULL;
1062         t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1063         tp.t_len = ItemIdGetLength(lp);
1064         tp.t_self = *tid;
1065         ctid = tp.t_data->t_ctid;
1066
1067         /*
1068          * check time qualification of tid
1069          */
1070
1071         HeapTupleSatisfies(&tp, relation, buffer, dp,
1072                                            snapshot, 0, (ScanKey) NULL, valid);
1073
1074         linkend = true;
1075         if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
1076                 !ItemPointerEquals(tid, &ctid))
1077                 linkend = false;
1078
1079         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1080         ReleaseBuffer(buffer);
1081
1082         if (!valid)
1083         {
1084                 if (linkend)
1085                         return NULL;
1086                 heap_get_latest_tid(relation, snapshot, &ctid);
1087                 *tid = ctid;
1088         }
1089
1090         return tid;
1091 }
1092
1093 /*
1094  *      heap_insert             - insert tuple into a heap
1095  *
1096  * The new tuple is stamped with current transaction ID and the specified
1097  * command ID.
1098  */
1099 Oid
1100 heap_insert(Relation relation, HeapTuple tup, CommandId cid)
1101 {
1102         Buffer          buffer;
1103
1104         /* increment access statistics */
1105         IncrHeapAccessStat(local_insert);
1106         IncrHeapAccessStat(global_insert);
1107
1108         if (relation->rd_rel->relhasoids)
1109         {
1110                 /*
1111                  * If the object id of this tuple has already been assigned, trust
1112                  * the caller.  There are a couple of ways this can happen.  At
1113                  * initial db creation, the backend program sets oids for tuples.
1114                  * When we define an index, we set the oid.  Finally, in the
1115                  * future, we may allow users to set their own object ids in order
1116                  * to support a persistent object store (objects need to contain
1117                  * pointers to one another).
1118                  */
1119                 if (!OidIsValid(tup->t_data->t_oid))
1120                         tup->t_data->t_oid = newoid();
1121                 else
1122                         CheckMaxObjectId(tup->t_data->t_oid);
1123         }
1124
1125         TransactionIdStore(GetCurrentTransactionId(), &(tup->t_data->t_xmin));
1126         tup->t_data->t_cmin = cid;
1127         StoreInvalidTransactionId(&(tup->t_data->t_xmax));
1128         tup->t_data->t_cmax = FirstCommandId;
1129         tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1130         tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1131         tup->t_tableOid = relation->rd_id;
1132
1133 #ifdef TUPLE_TOASTER_ACTIVE
1134
1135         /*
1136          * If the new tuple is too big for storage or contains already toasted
1137          * attributes from some other relation, invoke the toaster.
1138          */
1139         if (HeapTupleHasExtended(tup) ||
1140                 (MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD))
1141                 heap_tuple_toast_attrs(relation, tup, NULL);
1142 #endif
1143
1144         /* Find buffer to insert this tuple into */
1145         buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
1146
1147         /* NO ELOG(ERROR) from here till changes are logged */
1148         START_CRIT_SECTION();
1149         RelationPutHeapTuple(relation, buffer, tup);
1150
1151         pgstat_count_heap_insert(&relation->pgstat_info);
1152
1153         /* XLOG stuff */
1154         {
1155                 xl_heap_insert xlrec;
1156                 xl_heap_header xlhdr;
1157                 XLogRecPtr      recptr;
1158                 XLogRecData rdata[3];
1159                 Page            page = BufferGetPage(buffer);
1160                 uint8           info = XLOG_HEAP_INSERT;
1161
1162                 xlrec.target.node = relation->rd_node;
1163                 xlrec.target.tid = tup->t_self;
1164                 rdata[0].buffer = InvalidBuffer;
1165                 rdata[0].data = (char *) &xlrec;
1166                 rdata[0].len = SizeOfHeapInsert;
1167                 rdata[0].next = &(rdata[1]);
1168
1169                 xlhdr.t_oid = tup->t_data->t_oid;
1170                 xlhdr.t_natts = tup->t_data->t_natts;
1171                 xlhdr.t_hoff = tup->t_data->t_hoff;
1172                 xlhdr.mask = tup->t_data->t_infomask;
1173                 rdata[1].buffer = buffer;
1174                 rdata[1].data = (char *) &xlhdr;
1175                 rdata[1].len = SizeOfHeapHeader;
1176                 rdata[1].next = &(rdata[2]);
1177
1178                 rdata[2].buffer = buffer;
1179                 rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits);
1180                 rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits);
1181                 rdata[2].next = NULL;
1182
1183                 /* If this is the single and first tuple on page... */
1184                 if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber &&
1185                         PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
1186                 {
1187                         info |= XLOG_HEAP_INIT_PAGE;
1188                         rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
1189                 }
1190
1191                 recptr = XLogInsert(RM_HEAP_ID, info, rdata);
1192
1193                 PageSetLSN(page, recptr);
1194                 PageSetSUI(page, ThisStartUpID);
1195         }
1196         END_CRIT_SECTION();
1197
1198         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1199         WriteBuffer(buffer);
1200
1201         /*
1202          * If tuple is cachable, mark it for invalidation from the caches in case
1203          * we abort.  Note it is OK to do this after WriteBuffer releases the
1204          * buffer, because the "tup" data structure is all in local memory,
1205          * not in the shared buffer.
1206          */
1207         CacheInvalidateHeapTuple(relation, tup);
1208
1209         return tup->t_data->t_oid;
1210 }
1211
1212 /*
1213  *      simple_heap_insert - insert a tuple
1214  *
1215  * Currently, this routine differs from heap_insert only in supplying
1216  * a default command ID.  But it should be used rather than using
1217  * heap_insert directly in most places where we are modifying system catalogs.
1218  */
1219 Oid
1220 simple_heap_insert(Relation relation, HeapTuple tup)
1221 {
1222         return heap_insert(relation, tup, GetCurrentCommandId());
1223 }
1224
1225 /*
1226  *      heap_delete             - delete a tuple
1227  *
1228  * NB: do not call this directly unless you are prepared to deal with
1229  * concurrent-update conditions.  Use simple_heap_delete instead.
1230  */
1231 int
1232 heap_delete(Relation relation, ItemPointer tid,
1233                         ItemPointer ctid, CommandId cid)
1234 {
1235         ItemId          lp;
1236         HeapTupleData tp;
1237         PageHeader      dp;
1238         Buffer          buffer;
1239         int                     result;
1240
1241         /* increment access statistics */
1242         IncrHeapAccessStat(local_delete);
1243         IncrHeapAccessStat(global_delete);
1244
1245         Assert(ItemPointerIsValid(tid));
1246
1247         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1248
1249         if (!BufferIsValid(buffer))
1250                 elog(ERROR, "heap_delete: failed ReadBuffer");
1251
1252         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1253
1254         dp = (PageHeader) BufferGetPage(buffer);
1255         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1256         tp.t_datamcxt = NULL;
1257         tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1258         tp.t_len = ItemIdGetLength(lp);
1259         tp.t_self = *tid;
1260         tp.t_tableOid = relation->rd_id;
1261
1262 l1:
1263         result = HeapTupleSatisfiesUpdate(&tp, cid);
1264
1265         if (result == HeapTupleInvisible)
1266         {
1267                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1268                 ReleaseBuffer(buffer);
1269                 elog(ERROR, "heap_delete: (am)invalid tid");
1270         }
1271         else if (result == HeapTupleBeingUpdated)
1272         {
1273                 TransactionId xwait = tp.t_data->t_xmax;
1274
1275                 /* sleep until concurrent transaction ends */
1276                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1277                 XactLockTableWait(xwait);
1278
1279                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1280                 if (TransactionIdDidAbort(xwait))
1281                         goto l1;
1282
1283                 /*
1284                  * xwait is committed but if xwait had just marked the tuple for
1285                  * update then some other xaction could update this tuple before
1286                  * we got to this point.
1287                  */
1288                 if (!TransactionIdEquals(tp.t_data->t_xmax, xwait))
1289                         goto l1;
1290                 if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
1291                 {
1292                         tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1293                         SetBufferCommitInfoNeedsSave(buffer);
1294                 }
1295                 /* if tuple was marked for update but not updated... */
1296                 if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
1297                         result = HeapTupleMayBeUpdated;
1298                 else
1299                         result = HeapTupleUpdated;
1300         }
1301         if (result != HeapTupleMayBeUpdated)
1302         {
1303                 Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1304                 *ctid = tp.t_data->t_ctid;
1305                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1306                 ReleaseBuffer(buffer);
1307                 return result;
1308         }
1309
1310         START_CRIT_SECTION();
1311         /* store transaction information of xact deleting the tuple */
1312         TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax));
1313         tp.t_data->t_cmax = cid;
1314         tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1315                                                          HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
1316         /* XLOG stuff */
1317         {
1318                 xl_heap_delete xlrec;
1319                 XLogRecPtr      recptr;
1320                 XLogRecData rdata[2];
1321
1322                 xlrec.target.node = relation->rd_node;
1323                 xlrec.target.tid = tp.t_self;
1324                 rdata[0].buffer = InvalidBuffer;
1325                 rdata[0].data = (char *) &xlrec;
1326                 rdata[0].len = SizeOfHeapDelete;
1327                 rdata[0].next = &(rdata[1]);
1328
1329                 rdata[1].buffer = buffer;
1330                 rdata[1].data = NULL;
1331                 rdata[1].len = 0;
1332                 rdata[1].next = NULL;
1333
1334                 recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
1335
1336                 PageSetLSN(dp, recptr);
1337                 PageSetSUI(dp, ThisStartUpID);
1338         }
1339         END_CRIT_SECTION();
1340
1341         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1342
1343 #ifdef TUPLE_TOASTER_ACTIVE
1344
1345         /*
1346          * If the relation has toastable attributes, we need to delete no
1347          * longer needed items there too.  We have to do this before
1348          * WriteBuffer because we need to look at the contents of the tuple,
1349          * but it's OK to release the context lock on the buffer first.
1350          */
1351         if (HeapTupleHasExtended(&tp))
1352                 heap_tuple_toast_attrs(relation, NULL, &(tp));
1353 #endif
1354
1355         pgstat_count_heap_delete(&relation->pgstat_info);
1356
1357         /*
1358          * Mark tuple for invalidation from system caches at next command
1359          * boundary. We have to do this before WriteBuffer because we need to
1360          * look at the contents of the tuple, so we need to hold our refcount
1361          * on the buffer.
1362          */
1363         CacheInvalidateHeapTuple(relation, &tp);
1364
1365         WriteBuffer(buffer);
1366
1367         return HeapTupleMayBeUpdated;
1368 }
1369
1370 /*
1371  *      simple_heap_delete - delete a tuple
1372  *
1373  * This routine may be used to delete a tuple when concurrent updates of
1374  * the target tuple are not expected (for example, because we have a lock
1375  * on the relation associated with the tuple).  Any failure is reported
1376  * via elog().
1377  */
1378 void
1379 simple_heap_delete(Relation relation, ItemPointer tid)
1380 {
1381         ItemPointerData ctid;
1382         int                     result;
1383
1384         result = heap_delete(relation, tid, &ctid, GetCurrentCommandId());
1385         switch (result)
1386         {
1387                 case HeapTupleSelfUpdated:
1388                         /* Tuple was already updated in current command? */
1389                         elog(ERROR, "simple_heap_delete: tuple already updated by self");
1390                         break;
1391
1392                 case HeapTupleMayBeUpdated:
1393                         /* done successfully */
1394                         break;
1395
1396                 case HeapTupleUpdated:
1397                         elog(ERROR, "simple_heap_delete: tuple concurrently updated");
1398                         break;
1399
1400                 default:
1401                         elog(ERROR, "Unknown status %u from heap_delete", result);
1402                         break;
1403         }
1404 }
1405
1406 /*
1407  *      heap_update - replace a tuple
1408  *
1409  * NB: do not call this directly unless you are prepared to deal with
1410  * concurrent-update conditions.  Use simple_heap_update instead.
1411  */
1412 int
1413 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
1414                         ItemPointer ctid, CommandId cid)
1415 {
1416         ItemId          lp;
1417         HeapTupleData oldtup;
1418         PageHeader      dp;
1419         Buffer          buffer,
1420                                 newbuf;
1421         bool            need_toast,
1422                                 already_marked;
1423         Size            newtupsize,
1424                                 pagefree;
1425         int                     result;
1426
1427         /* increment access statistics */
1428         IncrHeapAccessStat(local_replace);
1429         IncrHeapAccessStat(global_replace);
1430
1431         Assert(ItemPointerIsValid(otid));
1432
1433         buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
1434         if (!BufferIsValid(buffer))
1435                 elog(ERROR, "heap_update: failed ReadBuffer");
1436         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1437
1438         dp = (PageHeader) BufferGetPage(buffer);
1439         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));
1440
1441         oldtup.t_datamcxt = NULL;
1442         oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
1443         oldtup.t_len = ItemIdGetLength(lp);
1444         oldtup.t_self = *otid;
1445
1446         /*
1447          * Note: beyond this point, use oldtup not otid to refer to old tuple.
1448          * otid may very well point at newtup->t_self, which we will overwrite
1449          * with the new tuple's location, so there's great risk of confusion
1450          * if we use otid anymore.
1451          */
1452
1453 l2:
1454         result = HeapTupleSatisfiesUpdate(&oldtup, cid);
1455
1456         if (result == HeapTupleInvisible)
1457         {
1458                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1459                 ReleaseBuffer(buffer);
1460                 elog(ERROR, "heap_update: (am)invalid tid");
1461         }
1462         else if (result == HeapTupleBeingUpdated)
1463         {
1464                 TransactionId xwait = oldtup.t_data->t_xmax;
1465
1466                 /* sleep untill concurrent transaction ends */
1467                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1468                 XactLockTableWait(xwait);
1469
1470                 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1471                 if (TransactionIdDidAbort(xwait))
1472                         goto l2;
1473
1474                 /*
1475                  * xwait is committed but if xwait had just marked the tuple for
1476                  * update then some other xaction could update this tuple before
1477                  * we got to this point.
1478                  */
1479                 if (!TransactionIdEquals(oldtup.t_data->t_xmax, xwait))
1480                         goto l2;
1481                 if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
1482                 {
1483                         oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1484                         SetBufferCommitInfoNeedsSave(buffer);
1485                 }
1486                 /* if tuple was marked for update but not updated... */
1487                 if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
1488                         result = HeapTupleMayBeUpdated;
1489                 else
1490                         result = HeapTupleUpdated;
1491         }
1492         if (result != HeapTupleMayBeUpdated)
1493         {
1494                 Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1495                 *ctid = oldtup.t_data->t_ctid;
1496                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1497                 ReleaseBuffer(buffer);
1498                 return result;
1499         }
1500
1501         /* Fill in OID and transaction status data for newtup */
1502         newtup->t_data->t_oid = oldtup.t_data->t_oid;
1503         TransactionIdStore(GetCurrentTransactionId(), &(newtup->t_data->t_xmin));
1504         newtup->t_data->t_cmin = cid;
1505         StoreInvalidTransactionId(&(newtup->t_data->t_xmax));
1506         newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1507         newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1508
1509         /*
1510          * If the toaster needs to be activated, OR if the new tuple will not
1511          * fit on the same page as the old, then we need to release the
1512          * context lock (but not the pin!) on the old tuple's buffer while we
1513          * are off doing TOAST and/or table-file-extension work.  We must mark
1514          * the old tuple to show that it's already being updated, else other
1515          * processes may try to update it themselves. To avoid second XLOG log
1516          * record, we use xact mgr hook to unlock old tuple without reading
1517          * log if xact will abort before update is logged. In the event of
1518          * crash prio logging, TQUAL routines will see HEAP_XMAX_UNLOGGED
1519          * flag...
1520          *
1521          * NOTE: this trick is useless currently but saved for future when we'll
1522          * implement UNDO and will re-use transaction IDs after postmaster
1523          * startup.
1524          *
1525          * We need to invoke the toaster if there are already any toasted values
1526          * present, or if the new tuple is over-threshold.
1527          */
1528         need_toast = (HeapTupleHasExtended(&oldtup) ||
1529                                   HeapTupleHasExtended(newtup) ||
1530                                   (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
1531
1532         newtupsize = MAXALIGN(newtup->t_len);
1533         pagefree = PageGetFreeSpace((Page) dp);
1534
1535         if (need_toast || newtupsize > pagefree)
1536         {
1537                 _locked_tuple_.node = relation->rd_node;
1538                 _locked_tuple_.tid = oldtup.t_self;
1539                 XactPushRollback(_heap_unlock_tuple, (void *) &_locked_tuple_);
1540
1541                 TransactionIdStore(GetCurrentTransactionId(),
1542                                                    &(oldtup.t_data->t_xmax));
1543                 oldtup.t_data->t_cmax = cid;
1544                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1545                                                                            HEAP_XMAX_INVALID |
1546                                                                            HEAP_MARKED_FOR_UPDATE);
1547                 oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED;
1548                 already_marked = true;
1549                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1550
1551                 /* Let the toaster do its thing */
1552                 if (need_toast)
1553                 {
1554                         heap_tuple_toast_attrs(relation, newtup, &oldtup);
1555                         newtupsize = MAXALIGN(newtup->t_len);
1556                 }
1557
1558                 /*
1559                  * Now, do we need a new page for the tuple, or not?  This is a
1560                  * bit tricky since someone else could have added tuples to the
1561                  * page while we weren't looking.  We have to recheck the
1562                  * available space after reacquiring the buffer lock.  But don't
1563                  * bother to do that if the former amount of free space is still
1564                  * not enough; it's unlikely there's more free now than before.
1565                  *
1566                  * What's more, if we need to get a new page, we will need to acquire
1567                  * buffer locks on both old and new pages.      To avoid deadlock
1568                  * against some other backend trying to get the same two locks in
1569                  * the other order, we must be consistent about the order we get
1570                  * the locks in. We use the rule "lock the lower-numbered page of
1571                  * the relation first".  To implement this, we must do
1572                  * RelationGetBufferForTuple while not holding the lock on the old
1573                  * page, and we must rely on it to get the locks on both pages in
1574                  * the correct order.
1575                  */
1576                 if (newtupsize > pagefree)
1577                 {
1578                         /* Assume there's no chance to put newtup on same page. */
1579                         newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1580                                                                                            buffer);
1581                 }
1582                 else
1583                 {
1584                         /* Re-acquire the lock on the old tuple's page. */
1585                         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1586                         /* Re-check using the up-to-date free space */
1587                         pagefree = PageGetFreeSpace((Page) dp);
1588                         if (newtupsize > pagefree)
1589                         {
1590                                 /*
1591                                  * Rats, it doesn't fit anymore.  We must now unlock and
1592                                  * relock to avoid deadlock.  Fortunately, this path
1593                                  * should seldom be taken.
1594                                  */
1595                                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1596                                 newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
1597                                                                                                    buffer);
1598                         }
1599                         else
1600                         {
1601                                 /* OK, it fits here, so we're done. */
1602                                 newbuf = buffer;
1603                         }
1604                 }
1605         }
1606         else
1607         {
1608                 /* No TOAST work needed, and it'll fit on same page */
1609                 already_marked = false;
1610                 newbuf = buffer;
1611         }
1612
1613         pgstat_count_heap_update(&relation->pgstat_info);
1614
1615         /*
1616          * At this point newbuf and buffer are both pinned and locked, and
1617          * newbuf has enough space for the new tuple.  If they are the same
1618          * buffer, only one pin is held.
1619          */
1620
1621         /* NO ELOG(ERROR) from here till changes are logged */
1622         START_CRIT_SECTION();
1623
1624         RelationPutHeapTuple(relation, newbuf, newtup);         /* insert new tuple */
1625
1626         if (already_marked)
1627         {
1628                 oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED;
1629                 XactPopRollback();
1630         }
1631         else
1632         {
1633                 TransactionIdStore(GetCurrentTransactionId(),
1634                                                    &(oldtup.t_data->t_xmax));
1635                 oldtup.t_data->t_cmax = cid;
1636                 oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
1637                                                                            HEAP_XMAX_INVALID |
1638                                                                            HEAP_MARKED_FOR_UPDATE);
1639         }
1640
1641         /* record address of new tuple in t_ctid of old one */
1642         oldtup.t_data->t_ctid = newtup->t_self;
1643
1644         /* XLOG stuff */
1645         {
1646                 XLogRecPtr      recptr = log_heap_update(relation, buffer, oldtup.t_self,
1647                                                                                          newbuf, newtup, false);
1648
1649                 if (newbuf != buffer)
1650                 {
1651                         PageSetLSN(BufferGetPage(newbuf), recptr);
1652                         PageSetSUI(BufferGetPage(newbuf), ThisStartUpID);
1653                 }
1654                 PageSetLSN(BufferGetPage(buffer), recptr);
1655                 PageSetSUI(BufferGetPage(buffer), ThisStartUpID);
1656         }
1657
1658         END_CRIT_SECTION();
1659
1660         if (newbuf != buffer)
1661                 LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
1662         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1663
1664         /*
1665          * Mark old tuple for invalidation from system caches at next command
1666          * boundary. We have to do this before WriteBuffer because we need to
1667          * look at the contents of the tuple, so we need to hold our refcount.
1668          */
1669         CacheInvalidateHeapTuple(relation, &oldtup);
1670
1671         if (newbuf != buffer)
1672                 WriteBuffer(newbuf);
1673         WriteBuffer(buffer);
1674
1675         /*
1676          * If new tuple is cachable, mark it for invalidation from the caches in
1677          * case we abort.  Note it is OK to do this after WriteBuffer releases
1678          * the buffer, because the "newtup" data structure is all in local
1679          * memory, not in the shared buffer.
1680          */
1681         CacheInvalidateHeapTuple(relation, newtup);
1682
1683         return HeapTupleMayBeUpdated;
1684 }
1685
1686 /*
1687  *      simple_heap_update - replace a tuple
1688  *
1689  * This routine may be used to update a tuple when concurrent updates of
1690  * the target tuple are not expected (for example, because we have a lock
1691  * on the relation associated with the tuple).  Any failure is reported
1692  * via elog().
1693  */
1694 void
1695 simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
1696 {
1697         ItemPointerData ctid;
1698         int                     result;
1699
1700         result = heap_update(relation, otid, tup, &ctid, GetCurrentCommandId());
1701         switch (result)
1702         {
1703                 case HeapTupleSelfUpdated:
1704                         /* Tuple was already updated in current command? */
1705                         elog(ERROR, "simple_heap_update: tuple already updated by self");
1706                         break;
1707
1708                 case HeapTupleMayBeUpdated:
1709                         /* done successfully */
1710                         break;
1711
1712                 case HeapTupleUpdated:
1713                         elog(ERROR, "simple_heap_update: tuple concurrently updated");
1714                         break;
1715
1716                 default:
1717                         elog(ERROR, "Unknown status %u from heap_update", result);
1718                         break;
1719         }
1720 }
1721
1722 /*
1723  *      heap_mark4update                - mark a tuple for update
1724  */
1725 int
1726 heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
1727                                  CommandId cid)
1728 {
1729         ItemPointer tid = &(tuple->t_self);
1730         ItemId          lp;
1731         PageHeader      dp;
1732         int                     result;
1733
1734         /* increment access statistics */
1735         IncrHeapAccessStat(local_mark4update);
1736         IncrHeapAccessStat(global_mark4update);
1737
1738         *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1739
1740         if (!BufferIsValid(*buffer))
1741                 elog(ERROR, "heap_mark4update: failed ReadBuffer");
1742
1743         LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1744
1745         dp = (PageHeader) BufferGetPage(*buffer);
1746         lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1747         tuple->t_datamcxt = NULL;
1748         tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
1749         tuple->t_len = ItemIdGetLength(lp);
1750
1751 l3:
1752         result = HeapTupleSatisfiesUpdate(tuple, cid);
1753
1754         if (result == HeapTupleInvisible)
1755         {
1756                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1757                 ReleaseBuffer(*buffer);
1758                 elog(ERROR, "heap_mark4update: (am)invalid tid");
1759         }
1760         else if (result == HeapTupleBeingUpdated)
1761         {
1762                 TransactionId xwait = tuple->t_data->t_xmax;
1763
1764                 /* sleep untill concurrent transaction ends */
1765                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1766                 XactLockTableWait(xwait);
1767
1768                 LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
1769                 if (TransactionIdDidAbort(xwait))
1770                         goto l3;
1771
1772                 /*
1773                  * xwait is committed but if xwait had just marked the tuple for
1774                  * update then some other xaction could update this tuple before
1775                  * we got to this point.
1776                  */
1777                 if (!TransactionIdEquals(tuple->t_data->t_xmax, xwait))
1778                         goto l3;
1779                 if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
1780                 {
1781                         tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
1782                         SetBufferCommitInfoNeedsSave(*buffer);
1783                 }
1784                 /* if tuple was marked for update but not updated... */
1785                 if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
1786                         result = HeapTupleMayBeUpdated;
1787                 else
1788                         result = HeapTupleUpdated;
1789         }
1790         if (result != HeapTupleMayBeUpdated)
1791         {
1792                 Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1793                 tuple->t_self = tuple->t_data->t_ctid;
1794                 LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1795                 return result;
1796         }
1797
1798         /*
1799          * XLOG stuff: no logging is required as long as we have no
1800          * savepoints. For savepoints private log could be used...
1801          */
1802         ((PageHeader) BufferGetPage(*buffer))->pd_sui = ThisStartUpID;
1803
1804         /* store transaction information of xact marking the tuple */
1805         TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
1806         tuple->t_data->t_cmax = cid;
1807         tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
1808         tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
1809
1810         LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
1811
1812         WriteNoReleaseBuffer(*buffer);
1813
1814         return HeapTupleMayBeUpdated;
1815 }
1816
1817 /* ----------------
1818  *              heap_markpos    - mark scan position
1819  *
1820  *              Note:
1821  *                              Should only one mark be maintained per scan at one time.
1822  *              Check if this can be done generally--say calls to get the
1823  *              next/previous tuple and NEVER pass struct scandesc to the
1824  *              user AM's.  Now, the mark is sent to the executor for safekeeping.
1825  *              Probably can store this info into a GENERAL scan structure.
1826  *
1827  *              May be best to change this call to store the marked position
1828  *              (up to 2?) in the scan structure itself.
1829  *              Fix to use the proper caching structure.
1830  * ----------------
1831  */
1832 void
1833 heap_markpos(HeapScanDesc scan)
1834 {
1835         /*
1836          * increment access statistics
1837          */
1838         IncrHeapAccessStat(local_markpos);
1839         IncrHeapAccessStat(global_markpos);
1840
1841         /* Note: no locking manipulations needed */
1842
1843         if (scan->rs_ctup.t_data != NULL)
1844                 scan->rs_mctid = scan->rs_ctup.t_self;
1845         else
1846                 ItemPointerSetInvalid(&scan->rs_mctid);
1847 }
1848
1849 /* ----------------
1850  *              heap_restrpos   - restore position to marked location
1851  *
1852  *              Note:  there are bad side effects here.  If we were past the end
1853  *              of a relation when heapmarkpos is called, then if the relation is
1854  *              extended via insert, then the next call to heaprestrpos will set
1855  *              cause the added tuples to be visible when the scan continues.
1856  *              Problems also arise if the TID's are rearranged!!!
1857  *
1858  * XXX  might be better to do direct access instead of
1859  *              using the generality of heapgettup().
1860  *
1861  * XXX It is very possible that when a scan is restored, that a tuple
1862  * XXX which previously qualified may fail for time range purposes, unless
1863  * XXX some form of locking exists (ie., portals currently can act funny.
1864  * ----------------
1865  */
1866 void
1867 heap_restrpos(HeapScanDesc scan)
1868 {
1869         /*
1870          * increment access statistics
1871          */
1872         IncrHeapAccessStat(local_restrpos);
1873         IncrHeapAccessStat(global_restrpos);
1874
1875         /* XXX no amrestrpos checking that ammarkpos called */
1876
1877         /* Note: no locking manipulations needed */
1878
1879         /*
1880          * unpin scan buffers
1881          */
1882         if (BufferIsValid(scan->rs_cbuf))
1883                 ReleaseBuffer(scan->rs_cbuf);
1884         scan->rs_cbuf = InvalidBuffer;
1885
1886         if (!ItemPointerIsValid(&scan->rs_mctid))
1887         {
1888                 scan->rs_ctup.t_datamcxt = NULL;
1889                 scan->rs_ctup.t_data = NULL;
1890         }
1891         else
1892         {
1893                 scan->rs_ctup.t_self = scan->rs_mctid;
1894                 scan->rs_ctup.t_datamcxt = NULL;
1895                 scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;   /* for heapgettup */
1896                 heapgettup(scan->rs_rd,
1897                                    0,
1898                                    &(scan->rs_ctup),
1899                                    &(scan->rs_cbuf),
1900                                    scan->rs_snapshot,
1901                                    0,
1902                                    (ScanKey) NULL);
1903         }
1904 }
1905
1906 XLogRecPtr
1907 log_heap_clean(Relation reln, Buffer buffer, char *unused, int unlen)
1908 {
1909         xl_heap_clean xlrec;
1910         XLogRecPtr      recptr;
1911         XLogRecData rdata[3];
1912
1913         xlrec.node = reln->rd_node;
1914         xlrec.block = BufferGetBlockNumber(buffer);
1915         rdata[0].buffer = InvalidBuffer;
1916         rdata[0].data = (char *) &xlrec;
1917         rdata[0].len = SizeOfHeapClean;
1918         rdata[0].next = &(rdata[1]);
1919
1920         if (unlen > 0)
1921         {
1922                 rdata[1].buffer = buffer;
1923                 rdata[1].data = unused;
1924                 rdata[1].len = unlen;
1925                 rdata[1].next = &(rdata[2]);
1926         }
1927         else
1928                 rdata[0].next = &(rdata[2]);
1929
1930         rdata[2].buffer = buffer;
1931         rdata[2].data = NULL;
1932         rdata[2].len = 0;
1933         rdata[2].next = NULL;
1934
1935         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);
1936
1937         return (recptr);
1938 }
1939
1940 static XLogRecPtr
1941 log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
1942                                 Buffer newbuf, HeapTuple newtup, bool move)
1943 {
1944         /*
1945          * Note: xlhdr is declared to have adequate size and correct alignment
1946          * for an xl_heap_header.  However the two tids, if present at all,
1947          * will be packed in with no wasted space after the xl_heap_header;
1948          * they aren't necessarily aligned as implied by this struct
1949          * declaration.
1950          */
1951         struct
1952         {
1953                 xl_heap_header hdr;
1954                 TransactionId tid1;
1955                 TransactionId tid2;
1956         }                       xlhdr;
1957         int                     hsize = SizeOfHeapHeader;
1958         xl_heap_update xlrec;
1959         XLogRecPtr      recptr;
1960         XLogRecData rdata[4];
1961         Page            page = BufferGetPage(newbuf);
1962         uint8           info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
1963
1964         xlrec.target.node = reln->rd_node;
1965         xlrec.target.tid = from;
1966         xlrec.newtid = newtup->t_self;
1967         rdata[0].buffer = InvalidBuffer;
1968         rdata[0].data = (char *) &xlrec;
1969         rdata[0].len = SizeOfHeapUpdate;
1970         rdata[0].next = &(rdata[1]);
1971
1972         rdata[1].buffer = oldbuf;
1973         rdata[1].data = NULL;
1974         rdata[1].len = 0;
1975         rdata[1].next = &(rdata[2]);
1976
1977         xlhdr.hdr.t_oid = newtup->t_data->t_oid;
1978         xlhdr.hdr.t_natts = newtup->t_data->t_natts;
1979         xlhdr.hdr.t_hoff = newtup->t_data->t_hoff;
1980         xlhdr.hdr.mask = newtup->t_data->t_infomask;
1981         if (move)                                       /* remember xmin & xmax */
1982         {
1983                 TransactionId xmax;
1984
1985                 if (newtup->t_data->t_infomask & HEAP_XMAX_INVALID ||
1986                         newtup->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
1987                         xmax = InvalidTransactionId;
1988                 else
1989                         xmax = newtup->t_data->t_xmax;
1990                 memcpy((char *) &xlhdr + hsize, &xmax, sizeof(TransactionId));
1991                 memcpy((char *) &xlhdr + hsize + sizeof(TransactionId),
1992                            &(newtup->t_data->t_xmin), sizeof(TransactionId));
1993                 hsize += 2 * sizeof(TransactionId);
1994         }
1995         rdata[2].buffer = newbuf;
1996         rdata[2].data = (char *) &xlhdr;
1997         rdata[2].len = hsize;
1998         rdata[2].next = &(rdata[3]);
1999
2000         rdata[3].buffer = newbuf;
2001         rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
2002         rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
2003         rdata[3].next = NULL;
2004
2005         /* If new tuple is the single and first tuple on page... */
2006         if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
2007                 PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
2008         {
2009                 info |= XLOG_HEAP_INIT_PAGE;
2010                 rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
2011         }
2012
2013         recptr = XLogInsert(RM_HEAP_ID, info, rdata);
2014
2015         return (recptr);
2016 }
2017
2018 XLogRecPtr
2019 log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
2020                           Buffer newbuf, HeapTuple newtup)
2021 {
2022         return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true));
2023 }
2024
2025 static void
2026 heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record)
2027 {
2028         xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
2029         Relation        reln;
2030         Buffer          buffer;
2031         Page            page;
2032
2033         if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2034                 return;
2035
2036         reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
2037
2038         if (!RelationIsValid(reln))
2039                 return;
2040
2041         buffer = XLogReadBuffer(false, reln, xlrec->block);
2042         if (!BufferIsValid(buffer))
2043                 elog(PANIC, "heap_clean_redo: no block");
2044
2045         page = (Page) BufferGetPage(buffer);
2046         if (PageIsNew((PageHeader) page))
2047                 elog(PANIC, "heap_clean_redo: uninitialized page");
2048
2049         if (XLByteLE(lsn, PageGetLSN(page)))
2050         {
2051                 UnlockAndReleaseBuffer(buffer);
2052                 return;
2053         }
2054
2055         if (record->xl_len > SizeOfHeapClean)
2056         {
2057                 OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2058                 OffsetNumber *unused = unbuf;
2059                 char       *unend;
2060                 ItemId          lp;
2061
2062                 Assert((record->xl_len - SizeOfHeapClean) <= BLCKSZ);
2063                 memcpy((char *) unbuf,
2064                            (char *) xlrec + SizeOfHeapClean,
2065                            record->xl_len - SizeOfHeapClean);
2066                 unend = (char *) unbuf + (record->xl_len - SizeOfHeapClean);
2067
2068                 while ((char *) unused < unend)
2069                 {
2070                         lp = ((PageHeader) page)->pd_linp + *unused;
2071                         lp->lp_flags &= ~LP_USED;
2072                         unused++;
2073                 }
2074         }
2075
2076         PageRepairFragmentation(page, NULL);
2077         UnlockAndWriteBuffer(buffer);
2078 }
2079
2080 static void
2081 heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
2082 {
2083         xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
2084         Relation        reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2085         Buffer          buffer;
2086         Page            page;
2087         OffsetNumber offnum;
2088         ItemId          lp = NULL;
2089         HeapTupleHeader htup;
2090
2091         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2092                 return;
2093
2094         if (!RelationIsValid(reln))
2095                 return;
2096
2097         buffer = XLogReadBuffer(false, reln,
2098                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2099         if (!BufferIsValid(buffer))
2100                 elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un");
2101
2102         page = (Page) BufferGetPage(buffer);
2103         if (PageIsNew((PageHeader) page))
2104                 elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un");
2105
2106         if (redo)
2107         {
2108                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2109                 {
2110                         UnlockAndReleaseBuffer(buffer);
2111                         return;
2112                 }
2113         }
2114         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2115                                                                                                  * ?! */
2116                 elog(PANIC, "heap_delete_undo: bad page LSN");
2117
2118         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2119         if (PageGetMaxOffsetNumber(page) >= offnum)
2120                 lp = PageGetItemId(page, offnum);
2121
2122         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2123                 elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un");
2124
2125         htup = (HeapTupleHeader) PageGetItem(page, lp);
2126
2127         if (redo)
2128         {
2129                 htup->t_xmax = record->xl_xid;
2130                 htup->t_cmax = FirstCommandId;
2131                 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2132                                                           HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
2133                 PageSetLSN(page, lsn);
2134                 PageSetSUI(page, ThisStartUpID);
2135                 UnlockAndWriteBuffer(buffer);
2136                 return;
2137         }
2138
2139         elog(PANIC, "heap_delete_undo: unimplemented");
2140 }
2141
2142 static void
2143 heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
2144 {
2145         xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
2146         Relation        reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2147         Buffer          buffer;
2148         Page            page;
2149         OffsetNumber offnum;
2150
2151         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2152                 return;
2153
2154         if (!RelationIsValid(reln))
2155                 return;
2156
2157         buffer = XLogReadBuffer((redo) ? true : false, reln,
2158                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2159         if (!BufferIsValid(buffer))
2160                 return;
2161
2162         page = (Page) BufferGetPage(buffer);
2163         if (PageIsNew((PageHeader) page) &&
2164                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2165                 elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2166
2167         if (redo)
2168         {
2169                 struct
2170                 {
2171                         HeapTupleHeaderData hdr;
2172                         char            data[MaxTupleSize];
2173                 }                       tbuf;
2174                 HeapTupleHeader htup;
2175                 xl_heap_header xlhdr;
2176                 uint32          newlen;
2177
2178                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2179                         PageInit(page, BufferGetPageSize(buffer), 0);
2180
2181                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2182                 {
2183                         UnlockAndReleaseBuffer(buffer);
2184                         return;
2185                 }
2186
2187                 offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2188                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2189                         elog(PANIC, "heap_insert_redo: invalid max offset number");
2190
2191                 newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
2192                 Assert(newlen <= MaxTupleSize);
2193                 memcpy((char *) &xlhdr,
2194                            (char *) xlrec + SizeOfHeapInsert,
2195                            SizeOfHeapHeader);
2196                 memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits),
2197                            (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
2198                            newlen);
2199                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2200                 htup = &tbuf.hdr;
2201                 htup->t_oid = xlhdr.t_oid;
2202                 htup->t_natts = xlhdr.t_natts;
2203                 htup->t_hoff = xlhdr.t_hoff;
2204                 htup->t_xmin = record->xl_xid;
2205                 htup->t_cmin = FirstCommandId;
2206                 htup->t_xmax = InvalidTransactionId;
2207                 htup->t_cmax = FirstCommandId;
2208                 htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask;
2209
2210                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2211                                                          LP_USED | OverwritePageMode);
2212                 if (offnum == InvalidOffsetNumber)
2213                         elog(PANIC, "heap_insert_redo: failed to add tuple");
2214                 PageSetLSN(page, lsn);
2215                 PageSetSUI(page, ThisStartUpID);                /* prev sui */
2216                 UnlockAndWriteBuffer(buffer);
2217                 return;
2218         }
2219
2220         /* undo insert */
2221         if (XLByteLT(PageGetLSN(page), lsn))            /* changes are not applied
2222                                                                                                  * ?! */
2223                 elog(PANIC, "heap_insert_undo: bad page LSN");
2224
2225         elog(PANIC, "heap_insert_undo: unimplemented");
2226 }
2227
2228 /*
2229  * Handles UPDATE & MOVE
2230  */
2231 static void
2232 heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
2233 {
2234         xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
2235         Relation        reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
2236         Buffer          buffer;
2237         bool            samepage =
2238         (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
2239          ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2240         Page            page;
2241         OffsetNumber offnum;
2242         ItemId          lp = NULL;
2243         HeapTupleHeader htup;
2244
2245         if (!RelationIsValid(reln))
2246                 return;
2247
2248         if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
2249                 goto newt;
2250
2251         /* Deal with old tuple version */
2252
2253         buffer = XLogReadBuffer(false, reln,
2254                                                 ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2255         if (!BufferIsValid(buffer))
2256                 elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un");
2257
2258         page = (Page) BufferGetPage(buffer);
2259         if (PageIsNew((PageHeader) page))
2260                 elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un");
2261
2262         if (redo)
2263         {
2264                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2265                 {
2266                         UnlockAndReleaseBuffer(buffer);
2267                         if (samepage)
2268                                 return;
2269                         goto newt;
2270                 }
2271         }
2272         else if (XLByteLT(PageGetLSN(page), lsn))       /* changes are not applied
2273                                                                                                  * ?! */
2274                 elog(PANIC, "heap_update_undo: bad old tuple page LSN");
2275
2276         offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
2277         if (PageGetMaxOffsetNumber(page) >= offnum)
2278                 lp = PageGetItemId(page, offnum);
2279
2280         if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
2281                 elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un");
2282
2283         htup = (HeapTupleHeader) PageGetItem(page, lp);
2284
2285         if (redo)
2286         {
2287                 if (move)
2288                 {
2289                         TransactionIdStore(record->xl_xid, (TransactionId *) &(htup->t_cmin));
2290                         htup->t_infomask &=
2291                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
2292                         htup->t_infomask |= HEAP_MOVED_OFF;
2293                 }
2294                 else
2295                 {
2296                         htup->t_xmax = record->xl_xid;
2297                         htup->t_cmax = FirstCommandId;
2298                         htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
2299                                                          HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
2300                 }
2301                 if (samepage)
2302                         goto newsame;
2303                 PageSetLSN(page, lsn);
2304                 PageSetSUI(page, ThisStartUpID);
2305                 UnlockAndWriteBuffer(buffer);
2306                 goto newt;
2307         }
2308
2309         elog(PANIC, "heap_update_undo: unimplemented");
2310
2311         /* Deal with new tuple */
2312
2313 newt:;
2314
2315         if (redo &&
2316                 ((record->xl_info & XLR_BKP_BLOCK_2) ||
2317                  ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)))
2318                 return;
2319
2320         buffer = XLogReadBuffer((redo) ? true : false, reln,
2321                                                         ItemPointerGetBlockNumber(&(xlrec->newtid)));
2322         if (!BufferIsValid(buffer))
2323                 return;
2324
2325         page = (Page) BufferGetPage(buffer);
2326
2327 newsame:;
2328         if (PageIsNew((PageHeader) page) &&
2329                 (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
2330                 elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un");
2331
2332         if (redo)
2333         {
2334                 struct
2335                 {
2336                         HeapTupleHeaderData hdr;
2337                         char            data[MaxTupleSize];
2338                 }                       tbuf;
2339                 xl_heap_header xlhdr;
2340                 int                     hsize;
2341                 uint32          newlen;
2342
2343                 if (record->xl_info & XLOG_HEAP_INIT_PAGE)
2344                         PageInit(page, BufferGetPageSize(buffer), 0);
2345
2346                 if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
2347                 {
2348                         UnlockAndReleaseBuffer(buffer);
2349                         return;
2350                 }
2351
2352                 offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
2353                 if (PageGetMaxOffsetNumber(page) + 1 < offnum)
2354                         elog(PANIC, "heap_update_redo: invalid max offset number");
2355
2356                 hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
2357                 if (move)
2358                         hsize += (2 * sizeof(TransactionId));
2359
2360                 newlen = record->xl_len - hsize;
2361                 Assert(newlen <= MaxTupleSize);
2362                 memcpy((char *) &xlhdr,
2363                            (char *) xlrec + SizeOfHeapUpdate,
2364                            SizeOfHeapHeader);
2365                 memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits),
2366                            (char *) xlrec + hsize,
2367                            newlen);
2368                 newlen += offsetof(HeapTupleHeaderData, t_bits);
2369                 htup = &tbuf.hdr;
2370                 htup->t_oid = xlhdr.t_oid;
2371                 htup->t_natts = xlhdr.t_natts;
2372                 htup->t_hoff = xlhdr.t_hoff;
2373                 if (move)
2374                 {
2375                         hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
2376                         memcpy(&(htup->t_xmax),
2377                                    (char *) xlrec + hsize,
2378                                    sizeof(TransactionId));
2379                         memcpy(&(htup->t_xmin),
2380                                    (char *) xlrec + hsize + sizeof(TransactionId),
2381                                    sizeof(TransactionId));
2382                         TransactionIdStore(record->xl_xid, (TransactionId *) &(htup->t_cmin));
2383                         htup->t_infomask = xlhdr.mask;
2384                         htup->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2385                                                                   HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
2386                         htup->t_infomask |= HEAP_MOVED_IN;
2387                 }
2388                 else
2389                 {
2390                         htup->t_xmin = record->xl_xid;
2391                         htup->t_cmin = FirstCommandId;
2392                         htup->t_xmax = InvalidTransactionId;
2393                         htup->t_cmax = FirstCommandId;
2394                         htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask;
2395                 }
2396
2397                 offnum = PageAddItem(page, (Item) htup, newlen, offnum,
2398                                                          LP_USED | OverwritePageMode);
2399                 if (offnum == InvalidOffsetNumber)
2400                         elog(PANIC, "heap_update_redo: failed to add tuple");
2401                 PageSetLSN(page, lsn);
2402                 PageSetSUI(page, ThisStartUpID);                /* prev sui */
2403                 UnlockAndWriteBuffer(buffer);
2404                 return;
2405         }
2406
2407         /* undo */
2408         if (XLByteLT(PageGetLSN(page), lsn))            /* changes not applied?! */
2409                 elog(PANIC, "heap_update_undo: bad new tuple page LSN");
2410
2411         elog(PANIC, "heap_update_undo: unimplemented");
2412
2413 }
2414
2415 static void
2416 _heap_unlock_tuple(void *data)
2417 {
2418         xl_heaptid *xltid = (xl_heaptid *) data;
2419         Relation        reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node);
2420         Buffer          buffer;
2421         Page            page;
2422         OffsetNumber offnum;
2423         ItemId          lp;
2424         HeapTupleHeader htup;
2425
2426         if (!RelationIsValid(reln))
2427                 elog(PANIC, "_heap_unlock_tuple: can't open relation");
2428
2429         buffer = XLogReadBuffer(false, reln,
2430                                                         ItemPointerGetBlockNumber(&(xltid->tid)));
2431         if (!BufferIsValid(buffer))
2432                 elog(PANIC, "_heap_unlock_tuple: can't read buffer");
2433
2434         page = (Page) BufferGetPage(buffer);
2435         if (PageIsNew((PageHeader) page))
2436                 elog(PANIC, "_heap_unlock_tuple: uninitialized page");
2437
2438         offnum = ItemPointerGetOffsetNumber(&(xltid->tid));
2439         if (offnum > PageGetMaxOffsetNumber(page))
2440                 elog(PANIC, "_heap_unlock_tuple: invalid itemid");
2441         lp = PageGetItemId(page, offnum);
2442
2443         if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
2444                 elog(PANIC, "_heap_unlock_tuple: unused/deleted tuple in rollback");
2445
2446         htup = (HeapTupleHeader) PageGetItem(page, lp);
2447
2448         if (!TransactionIdEquals(htup->t_xmax, GetCurrentTransactionId()))
2449                 elog(PANIC, "_heap_unlock_tuple: invalid xmax in rollback");
2450         htup->t_infomask &= ~HEAP_XMAX_UNLOGGED;
2451         htup->t_infomask |= HEAP_XMAX_INVALID;
2452         UnlockAndWriteBuffer(buffer);
2453         return;
2454 }
2455
2456 void
2457 heap_redo(XLogRecPtr lsn, XLogRecord *record)
2458 {
2459         uint8           info = record->xl_info & ~XLR_INFO_MASK;
2460
2461         info &= XLOG_HEAP_OPMASK;
2462         if (info == XLOG_HEAP_INSERT)
2463                 heap_xlog_insert(true, lsn, record);
2464         else if (info == XLOG_HEAP_DELETE)
2465                 heap_xlog_delete(true, lsn, record);
2466         else if (info == XLOG_HEAP_UPDATE)
2467                 heap_xlog_update(true, lsn, record, false);
2468         else if (info == XLOG_HEAP_MOVE)
2469                 heap_xlog_update(true, lsn, record, true);
2470         else if (info == XLOG_HEAP_CLEAN)
2471                 heap_xlog_clean(true, lsn, record);
2472         else
2473                 elog(PANIC, "heap_redo: unknown op code %u", info);
2474 }
2475
2476 void
2477 heap_undo(XLogRecPtr lsn, XLogRecord *record)
2478 {
2479         uint8           info = record->xl_info & ~XLR_INFO_MASK;
2480
2481         info &= XLOG_HEAP_OPMASK;
2482         if (info == XLOG_HEAP_INSERT)
2483                 heap_xlog_insert(false, lsn, record);
2484         else if (info == XLOG_HEAP_DELETE)
2485                 heap_xlog_delete(false, lsn, record);
2486         else if (info == XLOG_HEAP_UPDATE)
2487                 heap_xlog_update(false, lsn, record, false);
2488         else if (info == XLOG_HEAP_MOVE)
2489                 heap_xlog_update(false, lsn, record, true);
2490         else if (info == XLOG_HEAP_CLEAN)
2491                 heap_xlog_clean(false, lsn, record);
2492         else
2493                 elog(PANIC, "heap_undo: unknown op code %u", info);
2494 }
2495
2496 static void
2497 out_target(char *buf, xl_heaptid *target)
2498 {
2499         sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
2500                         target->node.tblNode, target->node.relNode,
2501                         ItemPointerGetBlockNumber(&(target->tid)),
2502                         ItemPointerGetOffsetNumber(&(target->tid)));
2503 }
2504
2505 void
2506 heap_desc(char *buf, uint8 xl_info, char *rec)
2507 {
2508         uint8           info = xl_info & ~XLR_INFO_MASK;
2509
2510         info &= XLOG_HEAP_OPMASK;
2511         if (info == XLOG_HEAP_INSERT)
2512         {
2513                 xl_heap_insert *xlrec = (xl_heap_insert *) rec;
2514
2515                 strcat(buf, "insert: ");
2516                 out_target(buf, &(xlrec->target));
2517         }
2518         else if (info == XLOG_HEAP_DELETE)
2519         {
2520                 xl_heap_delete *xlrec = (xl_heap_delete *) rec;
2521
2522                 strcat(buf, "delete: ");
2523                 out_target(buf, &(xlrec->target));
2524         }
2525         else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
2526         {
2527                 xl_heap_update *xlrec = (xl_heap_update *) rec;
2528
2529                 if (info == XLOG_HEAP_UPDATE)
2530                         strcat(buf, "update: ");
2531                 else
2532                         strcat(buf, "move: ");
2533                 out_target(buf, &(xlrec->target));
2534                 sprintf(buf + strlen(buf), "; new %u/%u",
2535                                 ItemPointerGetBlockNumber(&(xlrec->newtid)),
2536                                 ItemPointerGetOffsetNumber(&(xlrec->newtid)));
2537         }
2538         else if (info == XLOG_HEAP_CLEAN)
2539         {
2540                 xl_heap_clean *xlrec = (xl_heap_clean *) rec;
2541
2542                 sprintf(buf + strlen(buf), "clean: node %u/%u; blk %u",
2543                                 xlrec->node.tblNode, xlrec->node.relNode, xlrec->block);
2544         }
2545         else
2546                 strcat(buf, "UNKNOWN");
2547 }