1 /*-------------------------------------------------------------------------
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
7 * Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.22 1997/11/21 19:02:37 momjian Exp $
13 *-------------------------------------------------------------------------
15 #include <sys/types.h>
16 #include <stdio.h> /* for sprintf() */
22 #include "miscadmin.h"
23 #include "libpq/libpq-fs.h"
24 #include "access/genam.h"
25 #include "access/heapam.h"
26 #include "access/relscan.h"
27 #include "access/tupdesc.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/nbtree.h"
31 #include "access/tupdesc.h"
32 #include "catalog/index.h" /* for index_create() */
33 #include "catalog/catalog.h" /* for newoid() */
34 #include "catalog/pg_am.h" /* for BTREE_AM_OID */
35 #include "catalog/pg_opclass.h" /* for INT4_OPS_OID */
36 #include "catalog/pg_proc.h" /* for INT4GE_PROC_OID */
37 #include "storage/itemptr.h"
38 #include "storage/bufpage.h"
39 #include "storage/bufmgr.h"
40 #include "storage/smgr.h"
41 #include "utils/rel.h"
42 #include "utils/relcache.h"
43 #include "utils/palloc.h"
44 #include "storage/large_object.h"
45 #include "storage/lmgr.h"
46 #include "utils/syscache.h"
47 #include "utils/builtins.h" /* for namestrcpy() */
48 #include "catalog/heap.h"
49 #include "nodes/pg_list.h"
52 * Warning, Will Robinson... In order to pack data into an inversion
53 * file as densely as possible, we violate the class abstraction here.
54 * When we're appending a new tuple to the end of the table, we check
55 * the last page to see how much data we can put on it. If it's more
56 * than IMINBLK, we write enough to fill the page. This limits external
57 * fragmentation. In no case can we write more than IMAXBLK, since
58 * the 8K postgres page size less overhead leaves only this much space
62 #define IFREESPC(p) (PageGetFreeSpace(p) - sizeof(HeapTupleData) - sizeof(struct varlena) - sizeof(int32))
66 /* non-export function prototypes */
68 inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
69 Page page, char *dbuf, int nwrite);
70 static HeapTuple inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP);
71 static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
73 inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
74 HeapTuple htup, Buffer buffer);
75 static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup);
76 static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
79 * inv_create -- create a new large object.
82 * flags -- was archive, smgr
85 * large object descriptor, appropriately filled in.
91 LargeObjectDesc *retval;
95 AttrNumber attNums[1];
97 char objname[NAMEDATALEN];
98 char indname[NAMEDATALEN];
101 * add one here since the pg_class tuple created will have the next
102 * oid and we want to have the relation name to correspond to the
105 file_oid = newoid() + 1;
107 /* come up with some table names */
108 sprintf(objname, "xinv%d", file_oid);
109 sprintf(indname, "xinx%d", file_oid);
111 if (SearchSysCacheTuple(RELNAME, PointerGetDatum(objname),
115 "internal error: %s already exists -- cannot create large obj",
118 if (SearchSysCacheTuple(RELNAME, PointerGetDatum(indname),
122 "internal error: %s already exists -- cannot create large obj",
126 /* this is pretty painful... want a tuple descriptor */
127 tupdesc = CreateTemplateTupleDesc(2);
128 TupleDescInitEntry(tupdesc, (AttrNumber) 1,
132 TupleDescInitEntry(tupdesc, (AttrNumber) 2,
138 * First create the table to hold the inversion large object. It will
139 * be located on whatever storage manager the user requested.
142 heap_create(objname, tupdesc);
144 /* make the relation visible in this transaction */
145 CommandCounterIncrement();
146 r = heap_openr(objname);
148 if (!RelationIsValid(r))
150 elog(WARN, "cannot create large object on %s under inversion",
151 smgrout(DEFAULT_SMGR));
155 * Now create a btree index on the relation's olastbyte attribute to
156 * make seeks go faster. The hardwired constants are embarassing to
157 * me, and are symptomatic of the pressure under which this code was
160 * ok, mao, let's put in some symbolic constants - jolly
164 classObjectId[0] = INT4_OPS_OID;
165 index_create(objname, indname, NULL, NULL, BTREE_AM_OID,
166 1, &attNums[0], &classObjectId[0],
167 0, (Datum) NULL, NULL, FALSE, FALSE);
169 /* make the index visible in this transaction */
170 CommandCounterIncrement();
171 indr = index_openr(indname);
173 if (!RelationIsValid(indr))
175 elog(WARN, "cannot create index for large obj on %s under inversion",
176 smgrout(DEFAULT_SMGR));
179 retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
182 retval->index_r = indr;
183 retval->iscan = (IndexScanDesc) NULL;
184 retval->hdesc = RelationGetTupleDescriptor(r);
185 retval->idesc = RelationGetTupleDescriptor(indr);
186 retval->offset = retval->lowbyte =
187 retval->highbyte = 0;
188 ItemPointerSetInvalid(&(retval->htid));
190 if (flags & INV_WRITE)
192 RelationSetLockForWrite(r);
193 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
195 else if (flags & INV_READ)
197 RelationSetLockForRead(r);
198 retval->flags = IFS_RDLOCK;
200 retval->flags |= IFS_ATEOF;
206 inv_open(Oid lobjId, int flags)
208 LargeObjectDesc *retval;
213 r = heap_open(lobjId);
215 if (!RelationIsValid(r))
216 return ((LargeObjectDesc *) NULL);
218 indname = pstrdup((r->rd_rel->relname).data);
221 * hack hack hack... we know that the fourth character of the
222 * relation name is a 'v', and that the fourth character of the index
223 * name is an 'x', and that they're otherwise identical.
226 indrel = index_openr(indname);
228 if (!RelationIsValid(indrel))
229 return ((LargeObjectDesc *) NULL);
231 retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
234 retval->index_r = indrel;
235 retval->iscan = (IndexScanDesc) NULL;
236 retval->hdesc = RelationGetTupleDescriptor(r);
237 retval->idesc = RelationGetTupleDescriptor(indrel);
238 retval->offset = retval->lowbyte = retval->highbyte = 0;
239 ItemPointerSetInvalid(&(retval->htid));
241 if (flags & INV_WRITE)
243 RelationSetLockForWrite(r);
244 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
246 else if (flags & INV_READ)
248 RelationSetLockForRead(r);
249 retval->flags = IFS_RDLOCK;
256 * Closes an existing large object descriptor.
259 inv_close(LargeObjectDesc *obj_desc)
261 Assert(PointerIsValid(obj_desc));
263 if (obj_desc->iscan != (IndexScanDesc) NULL)
264 index_endscan(obj_desc->iscan);
266 heap_close(obj_desc->heap_r);
267 index_close(obj_desc->index_r);
273 * Destroys an existing large object, and frees its associated pointers.
275 * returns -1 if failed
278 inv_destroy(Oid lobjId)
282 r = (Relation) RelationIdGetRelation(lobjId);
283 if (!RelationIsValid(r) || r->rd_rel->relkind == RELKIND_INDEX)
286 heap_destroy(r->rd_rel->relname.data);
291 * inv_stat() -- do a stat on an inversion file.
293 * For the time being, this is an insanely expensive operation. In
294 * order to find the size of the file, we seek to the last block in
295 * it and compute the size from that. We scan pg_class to determine
296 * the file's owner and create time. We don't maintain mod time or
299 * These fields aren't stored in a table anywhere because they're
300 * updated so frequently, and postgres only appends tuples at the
301 * end of relations. Once clustering works, we should fix this.
305 inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
307 Assert(PointerIsValid(obj_desc));
308 Assert(stbuf != NULL);
310 /* need read lock for stat */
311 if (!(obj_desc->flags & IFS_RDLOCK))
313 RelationSetLockForRead(obj_desc->heap_r);
314 obj_desc->flags |= IFS_RDLOCK;
317 stbuf->st_ino = obj_desc->heap_r->rd_id;
319 stbuf->st_mode = (S_IFREG | 0666); /* IFREG|rw-rw-rw- */
321 stbuf->st_mode = 100666; /* IFREG|rw-rw-rw- */
323 stbuf->st_size = _inv_getsize(obj_desc->heap_r,
327 stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
329 /* we have no good way of computing access times right now */
330 stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
338 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
344 Assert(PointerIsValid(obj_desc));
346 if (whence == SEEK_CUR)
348 offset += obj_desc->offset; /* calculate absolute position */
349 return (inv_seek(obj_desc, offset, SEEK_SET));
353 * if you seek past the end (offset > 0) I have no clue what happens
356 if (whence == SEEK_END)
358 /* need read lock for getsize */
359 if (!(obj_desc->flags & IFS_RDLOCK))
361 RelationSetLockForRead(obj_desc->heap_r);
362 obj_desc->flags |= IFS_RDLOCK;
364 offset += _inv_getsize(obj_desc->heap_r,
367 return (inv_seek(obj_desc, offset, SEEK_SET));
371 * Whenever we do a seek, we turn off the EOF flag bit to force
372 * ourselves to check for real on the next read.
375 obj_desc->flags &= ~IFS_ATEOF;
376 oldOffset = obj_desc->offset;
377 obj_desc->offset = offset;
379 /* try to avoid doing any work, if we can manage it */
380 if (offset >= obj_desc->lowbyte
381 && offset <= obj_desc->highbyte
382 && oldOffset <= obj_desc->highbyte
383 && obj_desc->iscan != (IndexScanDesc) NULL)
387 * To do a seek on an inversion file, we start an index scan that will
388 * bring us to the right place. Each tuple in an inversion file
389 * stores the offset of the last byte that appears on it, and we have
394 /* right now, just assume that the operation is SEEK_SET */
395 if (obj_desc->iscan != (IndexScanDesc) NULL)
397 d = Int32GetDatum(offset);
398 btmovescan(obj_desc->iscan, d);
403 ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
404 Int32GetDatum(offset));
406 obj_desc->iscan = index_beginscan(obj_desc->index_r,
407 (bool) 0, (uint16) 1,
415 inv_tell(LargeObjectDesc *obj_desc)
417 Assert(PointerIsValid(obj_desc));
419 return (obj_desc->offset);
423 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
431 struct varlena *fsblock;
434 Assert(PointerIsValid(obj_desc));
437 /* if we're already at EOF, we don't need to do any work here */
438 if (obj_desc->flags & IFS_ATEOF)
441 /* make sure we obey two-phase locking */
442 if (!(obj_desc->flags & IFS_RDLOCK))
444 RelationSetLockForRead(obj_desc->heap_r);
445 obj_desc->flags |= IFS_RDLOCK;
450 /* fetch a block at a time */
451 while (nread < nbytes)
454 /* fetch an inversion file system block */
455 htup = inv_fetchtup(obj_desc, &b);
457 if (!HeapTupleIsValid(htup))
459 obj_desc->flags |= IFS_ATEOF;
463 /* copy the data from this block into the buffer */
464 d = heap_getattr(htup, b, 2, obj_desc->hdesc, &isNull);
465 fsblock = (struct varlena *) DatumGetPointer(d);
467 off = obj_desc->offset - obj_desc->lowbyte;
468 ncopy = obj_desc->highbyte - obj_desc->offset + 1;
469 if (ncopy > (nbytes - nread))
470 ncopy = (nbytes - nread);
471 memmove(buf, &(fsblock->vl_dat[off]), ncopy);
473 /* be a good citizen */
476 /* move pointers past the amount we just read */
479 obj_desc->offset += ncopy;
487 inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
494 Assert(PointerIsValid(obj_desc));
498 * Make sure we obey two-phase locking. A write lock entitles you to
499 * read the relation, as well.
502 if (!(obj_desc->flags & IFS_WRLOCK))
504 RelationSetLockForRead(obj_desc->heap_r);
505 obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
510 /* write a block at a time */
511 while (nwritten < nbytes)
515 * Fetch the current inversion file system block. If the class
516 * storing the inversion file is empty, we don't want to do an
517 * index lookup, since index lookups choke on empty files (should
521 if ((obj_desc->flags & IFS_ATEOF)
522 || obj_desc->heap_r->rd_nblocks == 0)
523 htup = (HeapTuple) NULL;
525 htup = inv_fetchtup(obj_desc, &b);
527 /* either append or replace a block, as required */
528 if (!HeapTupleIsValid(htup))
530 tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
534 if (obj_desc->offset > obj_desc->highbyte)
535 tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
537 tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, htup, b);
540 /* move pointers past the amount we just wrote */
543 obj_desc->offset += tuplen;
551 * inv_fetchtup -- Fetch an inversion file system block.
553 * This routine finds the file system block containing the offset
554 * recorded in the obj_desc structure. Later, we need to think about
555 * the effects of non-functional updates (can you rewrite the same
556 * block twice in a single transaction?), but for now, we won't bother.
559 * obj_desc -- the object descriptor.
560 * bufP -- pointer to a buffer in the buffer cache; caller
564 * A heap tuple containing the desired block, or NULL if no
568 inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP)
571 RetrieveIndexResult res;
575 struct varlena *fsblock;
579 * If we've exhausted the current block, we need to get the next one.
580 * When we support time travel and non-functional updates, we will
581 * need to loop over the blocks, rather than just have an 'if', in
582 * order to find the one we're really interested in.
585 if (obj_desc->offset > obj_desc->highbyte
586 || obj_desc->offset < obj_desc->lowbyte
587 || !ItemPointerIsValid(&(obj_desc->htid)))
590 /* initialize scan key if not done */
591 if (obj_desc->iscan == (IndexScanDesc) NULL)
595 ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
598 index_beginscan(obj_desc->index_r,
599 (bool) 0, (uint16) 1,
605 res = index_getnext(obj_desc->iscan, ForwardScanDirection);
607 if (res == (RetrieveIndexResult) NULL)
609 ItemPointerSetInvalid(&(obj_desc->htid));
610 return ((HeapTuple) NULL);
614 * For time travel, we need to use the actual time qual here,
615 * rather that NowTimeQual. We currently have no way to pass
619 htup = heap_fetch(obj_desc->heap_r, false,
620 &(res->heap_iptr), bufP);
622 } while (htup == (HeapTuple) NULL);
624 /* remember this tid -- we may need it for later reads/writes */
625 ItemPointerCopy(&(res->heap_iptr), &(obj_desc->htid));
630 htup = heap_fetch(obj_desc->heap_r, false,
631 &(obj_desc->htid), bufP);
635 * By here, we have the heap tuple we're interested in. We cache the
636 * upper and lower bounds for this block in the object descriptor and
640 d = heap_getattr(htup, *bufP, 1, obj_desc->hdesc, &isNull);
641 lastbyte = (int32) DatumGetInt32(d);
642 d = heap_getattr(htup, *bufP, 2, obj_desc->hdesc, &isNull);
643 fsblock = (struct varlena *) DatumGetPointer(d);
646 * order of + and - is important -- these are unsigned quantites near
649 firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
651 obj_desc->lowbyte = firstbyte;
652 obj_desc->highbyte = lastbyte;
659 * inv_wrnew() -- append a new filesystem block tuple to the inversion
662 * In response to an inv_write, we append one or more file system
663 * blocks to the class containing the large object. We violate the
664 * class abstraction here in order to pack things as densely as we
665 * are able. We examine the last page in the relation, and write
666 * just enough to fill it, assuming that it has above a certain
667 * threshold of space available. If the space available is less than
668 * the threshold, we allocate a new page by writing a big tuple.
670 * By the time we get here, we know all the parameters passed in
671 * are valid, and that we hold the appropriate lock on the heap
675 * obj_desc: large object descriptor for which to append block.
676 * buf: buffer containing data to write.
677 * nbytes: amount to write
680 * number of bytes actually written to the new tuple.
683 inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
692 hr = obj_desc->heap_r;
695 * Get the last block in the relation. If there's no data in the
696 * relation at all, then we just get a new block. Otherwise, we check
697 * the last block to see whether it has room to accept some or all of
698 * the data that the user wants to write. If it doesn't, then we
699 * allocate a new block.
702 nblocks = RelationGetNumberOfBlocks(hr);
705 buffer = ReadBuffer(hr, nblocks - 1);
707 buffer = ReadBuffer(hr, P_NEW);
709 page = BufferGetPage(buffer);
712 * If the last page is too small to hold all the data, and it's too
713 * small to hold IMINBLK, then we allocate a new page. If it will
714 * hold at least IMINBLK, but less than all the data requested, then
715 * we write IMINBLK here. The caller is responsible for noticing that
716 * less than the requested number of bytes were written, and calling
717 * this routine again.
720 nwritten = IFREESPC(page);
721 if (nwritten < nbytes)
723 if (nwritten < IMINBLK)
725 ReleaseBuffer(buffer);
726 buffer = ReadBuffer(hr, P_NEW);
727 page = BufferGetPage(buffer);
728 PageInit(page, BufferGetPageSize(buffer), 0);
729 if (nbytes > IMAXBLK)
741 * Insert a new file system block tuple, index it, and write it out.
744 ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
745 inv_indextup(obj_desc, ntup);
747 /* new tuple is inserted */
754 inv_wrold(LargeObjectDesc *obj_desc,
767 struct varlena *fsblock;
775 * Since we're using a no-overwrite storage manager, the way we
776 * overwrite blocks is to mark the old block invalid and append a new
777 * block. First mark the old block invalid. This violates the tuple
781 TransactionIdStore(GetCurrentTransactionId(), &(htup->t_xmax));
782 htup->t_cmax = GetCurrentCommandId();
783 htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
786 * If we're overwriting the entire block, we're lucky. All we need to
787 * do is to insert a new block.
790 if (obj_desc->offset == obj_desc->lowbyte
791 && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
794 return (inv_wrnew(obj_desc, dbuf, nbytes));
798 * By here, we need to overwrite part of the data in the current
799 * tuple. In order to reduce the degree to which we fragment blocks,
800 * we guarantee that no block will be broken up due to an overwrite.
801 * This means that we need to allocate a tuple on a new page, if
802 * there's not room for the replacement on this one.
806 page = BufferGetPage(buffer);
807 newpage = BufferGetPage(newbuf);
808 hr = obj_desc->heap_r;
809 freespc = IFREESPC(page);
810 d = heap_getattr(htup, buffer, 2, obj_desc->hdesc, &isNull);
811 fsblock = (struct varlena *) DatumGetPointer(d);
812 tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
814 if (freespc < tupbytes)
818 * First see if there's enough space on the last page of the table
822 nblocks = RelationGetNumberOfBlocks(hr);
825 newbuf = ReadBuffer(hr, nblocks - 1);
827 newbuf = ReadBuffer(hr, P_NEW);
829 newpage = BufferGetPage(newbuf);
830 freespc = IFREESPC(newpage);
833 * If there's no room on the last page, allocate a new last page
834 * for the table, and put it there.
837 if (freespc < tupbytes)
839 ReleaseBuffer(newbuf);
840 newbuf = ReadBuffer(hr, P_NEW);
841 newpage = BufferGetPage(newbuf);
842 PageInit(newpage, BufferGetPageSize(newbuf), 0);
847 if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
848 nwritten = obj_desc->highbyte - obj_desc->offset + 1;
849 memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
853 * we are rewriting the entire old block, therefore we reset offset to
854 * the lowbyte of the original block before jumping into
857 keep_offset = obj_desc->offset;
858 obj_desc->offset = obj_desc->lowbyte;
859 ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
861 /* after we are done, we restore to the true offset */
862 obj_desc->offset = keep_offset;
865 * By here, we have a page (newpage) that's guaranteed to have enough
866 * space on it to put the new tuple. Call inv_newtuple to do the
867 * work. Passing NULL as a buffer to inv_newtuple() keeps it from
868 * copying any data into the new tuple. When it returns, the tuple is
869 * ready to receive data from the old tuple and the user's data
873 ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
874 dptr = ((char *) ntup) + ntup->t_hoff - sizeof(ntup->t_bits) + sizeof(int4)
875 + sizeof(fsblock->vl_len);
877 if (obj_desc->offset > obj_desc->lowbyte) {
879 &(fsblock->vl_dat[0]),
880 obj_desc->offset - obj_desc->lowbyte);
881 dptr += obj_desc->offset - obj_desc->lowbyte;
886 if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
887 nwritten = obj_desc->highbyte - obj_desc->offset + 1;
889 memmove(dptr, dbuf, nwritten);
892 if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
895 loc = (obj_desc->highbyte - obj_desc->offset)
897 sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
899 what's going on here?? - jolly
902 sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
903 memmove(&(fsblock->vl_dat[0]), dptr, sz);
908 /* index the new tuple */
909 inv_indextup(obj_desc, ntup);
912 * move the scandesc forward so we don't reread the newly inserted
913 * tuple on the next index scan
916 index_getnext(obj_desc->iscan, ForwardScanDirection);
919 * Okay, by here, a tuple for the new block is correctly placed,
920 * indexed, and filled. Write the changed pages out.
924 if (newbuf != buffer)
932 inv_newtuple(LargeObjectDesc *obj_desc,
949 /* compute tuple size -- no nulls */
950 hoff = sizeof(HeapTupleData) - sizeof(ntup->t_bits);
952 /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
953 tupsize = hoff + (2 * sizeof(int32)) + nwrite;
954 tupsize = LONGALIGN(tupsize);
957 * Allocate the tuple on the page, violating the page abstraction.
958 * This code was swiped from PageAddItem().
961 ph = (PageHeader) page;
962 limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
964 /* look for "recyclable" (unused & deallocated) ItemId */
965 for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
967 itemId = &ph->pd_linp[off - 1];
968 if ((((*itemId).lp_flags & LP_USED) == 0) &&
969 ((*itemId).lp_len == 0))
974 lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
975 else if (off == limit)
976 lower = ph->pd_lower + sizeof(ItemIdData);
978 lower = ph->pd_lower;
980 upper = ph->pd_upper - tupsize;
982 itemId = &ph->pd_linp[off - 1];
983 (*itemId).lp_off = upper;
984 (*itemId).lp_len = tupsize;
985 (*itemId).lp_flags = LP_USED;
986 ph->pd_lower = lower;
987 ph->pd_upper = upper;
989 ntup = (HeapTuple) ((char *) page + upper);
992 * Tuple is now allocated on the page. Next, fill in the tuple
993 * header. This block of code violates the tuple abstraction.
996 ntup->t_len = tupsize;
997 ItemPointerSet(&(ntup->t_ctid), BufferGetBlockNumber(buffer), off);
998 LastOidProcessed = ntup->t_oid = newoid();
999 TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_xmin));
1000 ntup->t_cmin = GetCurrentCommandId();
1001 StoreInvalidTransactionId(&(ntup->t_xmax));
1003 ntup->t_infomask = HEAP_XMAX_INVALID;
1005 ntup->t_hoff = hoff;
1007 /* if a NULL is passed in, avoid the calculations below */
1012 * Finally, copy the user's data buffer into the tuple. This violates
1013 * the tuple and class abstractions.
1016 attptr = ((char *) ntup) + hoff;
1017 *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
1018 attptr += sizeof(int32);
1021 * * mer fixed disk layout of varlenas to get rid of the need for
1024 * ((int32 *) attptr) = nwrite + sizeof(int32); * attptr +=
1028 *((int32 *) attptr) = nwrite + sizeof(int32);
1029 attptr += sizeof(int32);
1032 * If a data buffer was passed in, then copy the data from the buffer
1033 * to the tuple. Some callers (eg, inv_wrold()) may not pass in a
1034 * buffer, since they have to copy part of the old tuple data and part
1035 * of the user's new data into the new tuple.
1038 if (dbuf != (char *) NULL)
1039 memmove(attptr, dbuf, nwrite);
1041 /* keep track of boundary of current tuple */
1042 obj_desc->lowbyte = obj_desc->offset;
1043 obj_desc->highbyte = obj_desc->offset + nwrite - 1;
1045 /* new tuple is filled -- return it */
1050 inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup)
1052 InsertIndexResult res;
1057 v[0] = Int32GetDatum(obj_desc->highbyte);
1058 res = index_insert(obj_desc->index_r, &v[0], &n[0],
1059 &(htup->t_ctid), obj_desc->heap_r);
1067 DumpPage(Page page, int blkno)
1071 int flags, i, nline;
1072 ItemPointerData pointerData;
1074 printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
1075 ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
1076 ((PageHeader)page)->pd_special);
1078 printf("\t:MaxOffsetNumber=%d\n",
1079 (int16) PageGetMaxOffsetNumber(page));
1081 nline = (int16) PageGetMaxOffsetNumber(page);
1087 i = PageGetSpecialSize(page);
1088 cp = PageGetSpecialPointer(page);
1090 printf("\t:SpecialData=");
1093 printf(" 0x%02x", *cp);
1099 for (i = 0; i < nline; i++) {
1100 lp = ((PageHeader)page)->pd_linp + i;
1101 flags = (*lp).lp_flags;
1102 ItemPointerSet(&pointerData, blkno, 1 + i);
1103 printf("%s:off=%d:flags=0x%x:len=%d",
1104 ItemPointerFormExternal(&pointerData), (*lp).lp_off,
1105 flags, (*lp).lp_len);
1107 if (flags & LP_USED) {
1108 HeapTupleData htdata;
1112 memmove((char *) &htdata,
1113 (char *) &((char *)page)[(*lp).lp_off],
1118 printf("\n\t:ctid=%s:oid=%d",
1119 ItemPointerFormExternal(&tup->t_ctid),
1121 printf(":natts=%d:thoff=%d:",
1125 printf("\n\t:cmin=%u:",
1128 printf("xmin=%u:", tup->t_xmin);
1130 printf("\n\t:cmax=%u:",
1133 printf("xmax=%u:\n", tup->t_xmax);
1141 ItemPointerFormExternal(ItemPointer pointer)
1143 static char itemPointerString[32];
1145 if (!ItemPointerIsValid(pointer)) {
1146 memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
1148 sprintf(itemPointerString, "<%u,%u>",
1149 ItemPointerGetBlockNumber(pointer),
1150 ItemPointerGetOffsetNumber(pointer));
1153 return (itemPointerString);
1158 _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
1160 IndexScanDesc iscan;
1161 RetrieveIndexResult res;
1168 /* scan backwards from end */
1169 iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
1171 buf = InvalidBuffer;
1175 res = index_getnext(iscan, BackwardScanDirection);
1178 * If there are no more index tuples, then the relation is empty,
1179 * so the file's size is zero.
1182 if (res == (RetrieveIndexResult) NULL)
1184 index_endscan(iscan);
1189 * For time travel, we need to use the actual time qual here,
1190 * rather that NowTimeQual. We currently have no way to pass a
1194 if (buf != InvalidBuffer)
1197 htup = heap_fetch(hreln, false, &(res->heap_iptr), &buf);
1199 } while (!HeapTupleIsValid(htup));
1201 /* don't need the index scan anymore */
1202 index_endscan(iscan);
1204 /* get olastbyte attribute */
1205 d = heap_getattr(htup, buf, 1, hdesc, &isNull);
1206 size = DatumGetInt32(d) + 1;
1208 /* wei hates it if you forget to do this */