]> granicus.if.org Git - postgresql/blob - src/backend/storage/large_object/inv_api.c
More archive cleanup.
[postgresql] / src / backend / storage / large_object / inv_api.c
1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c--
4  *        routines for manipulating inversion fs large objects. This file
5  *        contains the user-level large object application interface routines.
6  *
7  * Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.22 1997/11/21 19:02:37 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include <sys/types.h>
16 #include <stdio.h>                              /* for sprintf() */
17 #include <string.h>
18 #include <sys/file.h>
19 #include <sys/stat.h>
20
21 #include "postgres.h"
22 #include "miscadmin.h"
23 #include "libpq/libpq-fs.h"
24 #include "access/genam.h"
25 #include "access/heapam.h"
26 #include "access/relscan.h"
27 #include "access/tupdesc.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/nbtree.h"
31 #include "access/tupdesc.h"
32 #include "catalog/index.h"              /* for index_create() */
33 #include "catalog/catalog.h"    /* for newoid() */
34 #include "catalog/pg_am.h"              /* for BTREE_AM_OID */
35 #include "catalog/pg_opclass.h" /* for INT4_OPS_OID */
36 #include "catalog/pg_proc.h"    /* for INT4GE_PROC_OID */
37 #include "storage/itemptr.h"
38 #include "storage/bufpage.h"
39 #include "storage/bufmgr.h"
40 #include "storage/smgr.h"
41 #include "utils/rel.h"
42 #include "utils/relcache.h"
43 #include "utils/palloc.h"
44 #include "storage/large_object.h"
45 #include "storage/lmgr.h"
46 #include "utils/syscache.h"
47 #include "utils/builtins.h"             /* for namestrcpy() */
48 #include "catalog/heap.h"
49 #include "nodes/pg_list.h"
50
51 /*
52  *      Warning, Will Robinson...  In order to pack data into an inversion
53  *      file as densely as possible, we violate the class abstraction here.
54  *      When we're appending a new tuple to the end of the table, we check
55  *      the last page to see how much data we can put on it.  If it's more
56  *      than IMINBLK, we write enough to fill the page.  This limits external
57  *      fragmentation.  In no case can we write more than IMAXBLK, since
58  *      the 8K postgres page size less overhead leaves only this much space
59  *      for data.
60  */
61
62 #define IFREESPC(p)             (PageGetFreeSpace(p) - sizeof(HeapTupleData) - sizeof(struct varlena) - sizeof(int32))
63 #define IMAXBLK                 8092
64 #define IMINBLK                 512
65
66 /* non-export function prototypes */
67 static HeapTuple
68 inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
69                          Page page, char *dbuf, int nwrite);
70 static HeapTuple inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP);
71 static int      inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
72 static int
73 inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
74                   HeapTuple htup, Buffer buffer);
75 static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup);
76 static int      _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
77
78 /*
79  *      inv_create -- create a new large object.
80  *
81  *              Arguments:
82  *                flags -- was archive, smgr
83  *
84  *              Returns:
85  *                large object descriptor, appropriately filled in.
86  */
87 LargeObjectDesc *
88 inv_create(int flags)
89 {
90         int                     file_oid;
91         LargeObjectDesc *retval;
92         Relation        r;
93         Relation        indr;
94         TupleDesc       tupdesc;
95         AttrNumber      attNums[1];
96         Oid                     classObjectId[1];
97         char            objname[NAMEDATALEN];
98         char            indname[NAMEDATALEN];
99
100         /*
101          * add one here since the pg_class tuple created will have the next
102          * oid and we want to have the relation name to correspond to the
103          * tuple OID
104          */
105         file_oid = newoid() + 1;
106
107         /* come up with some table names */
108         sprintf(objname, "xinv%d", file_oid);
109         sprintf(indname, "xinx%d", file_oid);
110
111         if (SearchSysCacheTuple(RELNAME, PointerGetDatum(objname),
112                                                         0, 0, 0) != NULL)
113         {
114                 elog(WARN,
115                   "internal error: %s already exists -- cannot create large obj",
116                          objname);
117         }
118         if (SearchSysCacheTuple(RELNAME, PointerGetDatum(indname),
119                                                         0, 0, 0) != NULL)
120         {
121                 elog(WARN,
122                   "internal error: %s already exists -- cannot create large obj",
123                          indname);
124         }
125
126         /* this is pretty painful...  want a tuple descriptor */
127         tupdesc = CreateTemplateTupleDesc(2);
128         TupleDescInitEntry(tupdesc, (AttrNumber) 1,
129                                            "olastbye",
130                                            "int4",
131                                            0, false);
132         TupleDescInitEntry(tupdesc, (AttrNumber) 2,
133                                            "odata",
134                                            "bytea",
135                                            0, false);
136
137         /*
138          * First create the table to hold the inversion large object.  It will
139          * be located on whatever storage manager the user requested.
140          */
141
142         heap_create(objname, tupdesc);
143
144         /* make the relation visible in this transaction */
145         CommandCounterIncrement();
146         r = heap_openr(objname);
147
148         if (!RelationIsValid(r))
149         {
150                 elog(WARN, "cannot create large object on %s under inversion",
151                          smgrout(DEFAULT_SMGR));
152         }
153
154         /*
155          * Now create a btree index on the relation's olastbyte attribute to
156          * make seeks go faster.  The hardwired constants are embarassing to
157          * me, and are symptomatic of the pressure under which this code was
158          * written.
159          *
160          * ok, mao, let's put in some symbolic constants - jolly
161          */
162
163         attNums[0] = 1;
164         classObjectId[0] = INT4_OPS_OID;
165         index_create(objname, indname, NULL, NULL, BTREE_AM_OID,
166                                  1, &attNums[0], &classObjectId[0],
167                                  0, (Datum) NULL, NULL, FALSE, FALSE);
168
169         /* make the index visible in this transaction */
170         CommandCounterIncrement();
171         indr = index_openr(indname);
172
173         if (!RelationIsValid(indr))
174         {
175                 elog(WARN, "cannot create index for large obj on %s under inversion",
176                          smgrout(DEFAULT_SMGR));
177         }
178
179         retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
180
181         retval->heap_r = r;
182         retval->index_r = indr;
183         retval->iscan = (IndexScanDesc) NULL;
184         retval->hdesc = RelationGetTupleDescriptor(r);
185         retval->idesc = RelationGetTupleDescriptor(indr);
186         retval->offset = retval->lowbyte =
187                 retval->highbyte = 0;
188         ItemPointerSetInvalid(&(retval->htid));
189
190         if (flags & INV_WRITE)
191         {
192                 RelationSetLockForWrite(r);
193                 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
194         }
195         else if (flags & INV_READ)
196         {
197                 RelationSetLockForRead(r);
198                 retval->flags = IFS_RDLOCK;
199         }
200         retval->flags |= IFS_ATEOF;
201
202         return (retval);
203 }
204
205 LargeObjectDesc *
206 inv_open(Oid lobjId, int flags)
207 {
208         LargeObjectDesc *retval;
209         Relation        r;
210         char       *indname;
211         Relation        indrel;
212
213         r = heap_open(lobjId);
214
215         if (!RelationIsValid(r))
216                 return ((LargeObjectDesc *) NULL);
217
218         indname = pstrdup((r->rd_rel->relname).data);
219
220         /*
221          * hack hack hack...  we know that the fourth character of the
222          * relation name is a 'v', and that the fourth character of the index
223          * name is an 'x', and that they're otherwise identical.
224          */
225         indname[3] = 'x';
226         indrel = index_openr(indname);
227
228         if (!RelationIsValid(indrel))
229                 return ((LargeObjectDesc *) NULL);
230
231         retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
232
233         retval->heap_r = r;
234         retval->index_r = indrel;
235         retval->iscan = (IndexScanDesc) NULL;
236         retval->hdesc = RelationGetTupleDescriptor(r);
237         retval->idesc = RelationGetTupleDescriptor(indrel);
238         retval->offset = retval->lowbyte = retval->highbyte = 0;
239         ItemPointerSetInvalid(&(retval->htid));
240
241         if (flags & INV_WRITE)
242         {
243                 RelationSetLockForWrite(r);
244                 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
245         }
246         else if (flags & INV_READ)
247         {
248                 RelationSetLockForRead(r);
249                 retval->flags = IFS_RDLOCK;
250         }
251
252         return (retval);
253 }
254
255 /*
256  * Closes an existing large object descriptor.
257  */
258 void
259 inv_close(LargeObjectDesc *obj_desc)
260 {
261         Assert(PointerIsValid(obj_desc));
262
263         if (obj_desc->iscan != (IndexScanDesc) NULL)
264                 index_endscan(obj_desc->iscan);
265
266         heap_close(obj_desc->heap_r);
267         index_close(obj_desc->index_r);
268
269         pfree(obj_desc);
270 }
271
272 /*
273  * Destroys an existing large object, and frees its associated pointers.
274  *
275  * returns -1 if failed
276  */
277 int
278 inv_destroy(Oid lobjId)
279 {
280         Relation        r;
281
282         r = (Relation) RelationIdGetRelation(lobjId);
283         if (!RelationIsValid(r) || r->rd_rel->relkind == RELKIND_INDEX)
284                 return -1;
285
286         heap_destroy(r->rd_rel->relname.data);
287         return 1;
288 }
289
290 /*
291  *      inv_stat() -- do a stat on an inversion file.
292  *
293  *              For the time being, this is an insanely expensive operation.  In
294  *              order to find the size of the file, we seek to the last block in
295  *              it and compute the size from that.      We scan pg_class to determine
296  *              the file's owner and create time.  We don't maintain mod time or
297  *              access time, yet.
298  *
299  *              These fields aren't stored in a table anywhere because they're
300  *              updated so frequently, and postgres only appends tuples at the
301  *              end of relations.  Once clustering works, we should fix this.
302  */
303 #ifdef NOT_USED
304 int
305 inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
306 {
307         Assert(PointerIsValid(obj_desc));
308         Assert(stbuf != NULL);
309
310         /* need read lock for stat */
311         if (!(obj_desc->flags & IFS_RDLOCK))
312         {
313                 RelationSetLockForRead(obj_desc->heap_r);
314                 obj_desc->flags |= IFS_RDLOCK;
315         }
316
317         stbuf->st_ino = obj_desc->heap_r->rd_id;
318 #if 1
319         stbuf->st_mode = (S_IFREG | 0666);      /* IFREG|rw-rw-rw- */
320 #else
321         stbuf->st_mode = 100666;        /* IFREG|rw-rw-rw- */
322 #endif
323         stbuf->st_size = _inv_getsize(obj_desc->heap_r,
324                                                                   obj_desc->hdesc,
325                                                                   obj_desc->index_r);
326
327         stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
328
329         /* we have no good way of computing access times right now */
330         stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
331
332         return (0);
333 }
334
335 #endif
336
337 int
338 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
339 {
340         int                     oldOffset;
341         Datum           d;
342         ScanKeyData skey;
343
344         Assert(PointerIsValid(obj_desc));
345
346         if (whence == SEEK_CUR)
347         {
348                 offset += obj_desc->offset;             /* calculate absolute position */
349                 return (inv_seek(obj_desc, offset, SEEK_SET));
350         }
351
352         /*
353          * if you seek past the end (offset > 0) I have no clue what happens
354          * :-(                            B.L.   9/1/93
355          */
356         if (whence == SEEK_END)
357         {
358                 /* need read lock for getsize */
359                 if (!(obj_desc->flags & IFS_RDLOCK))
360                 {
361                         RelationSetLockForRead(obj_desc->heap_r);
362                         obj_desc->flags |= IFS_RDLOCK;
363                 }
364                 offset += _inv_getsize(obj_desc->heap_r,
365                                                            obj_desc->hdesc,
366                                                            obj_desc->index_r);
367                 return (inv_seek(obj_desc, offset, SEEK_SET));
368         }
369
370         /*
371          * Whenever we do a seek, we turn off the EOF flag bit to force
372          * ourselves to check for real on the next read.
373          */
374
375         obj_desc->flags &= ~IFS_ATEOF;
376         oldOffset = obj_desc->offset;
377         obj_desc->offset = offset;
378
379         /* try to avoid doing any work, if we can manage it */
380         if (offset >= obj_desc->lowbyte
381                 && offset <= obj_desc->highbyte
382                 && oldOffset <= obj_desc->highbyte
383                 && obj_desc->iscan != (IndexScanDesc) NULL)
384                 return (offset);
385
386         /*
387          * To do a seek on an inversion file, we start an index scan that will
388          * bring us to the right place.  Each tuple in an inversion file
389          * stores the offset of the last byte that appears on it, and we have
390          * an index on this.
391          */
392
393
394         /* right now, just assume that the operation is SEEK_SET */
395         if (obj_desc->iscan != (IndexScanDesc) NULL)
396         {
397                 d = Int32GetDatum(offset);
398                 btmovescan(obj_desc->iscan, d);
399         }
400         else
401         {
402
403                 ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
404                                                            Int32GetDatum(offset));
405
406                 obj_desc->iscan = index_beginscan(obj_desc->index_r,
407                                                                                   (bool) 0, (uint16) 1,
408                                                                                   &skey);
409         }
410
411         return (offset);
412 }
413
414 int
415 inv_tell(LargeObjectDesc *obj_desc)
416 {
417         Assert(PointerIsValid(obj_desc));
418
419         return (obj_desc->offset);
420 }
421
422 int
423 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
424 {
425         HeapTuple       htup;
426         Buffer          b;
427         int                     nread;
428         int                     off;
429         int                     ncopy;
430         Datum           d;
431         struct varlena *fsblock;
432         bool            isNull;
433
434         Assert(PointerIsValid(obj_desc));
435         Assert(buf != NULL);
436
437         /* if we're already at EOF, we don't need to do any work here */
438         if (obj_desc->flags & IFS_ATEOF)
439                 return (0);
440
441         /* make sure we obey two-phase locking */
442         if (!(obj_desc->flags & IFS_RDLOCK))
443         {
444                 RelationSetLockForRead(obj_desc->heap_r);
445                 obj_desc->flags |= IFS_RDLOCK;
446         }
447
448         nread = 0;
449
450         /* fetch a block at a time */
451         while (nread < nbytes)
452         {
453
454                 /* fetch an inversion file system block */
455                 htup = inv_fetchtup(obj_desc, &b);
456
457                 if (!HeapTupleIsValid(htup))
458                 {
459                         obj_desc->flags |= IFS_ATEOF;
460                         break;
461                 }
462
463                 /* copy the data from this block into the buffer */
464                 d = heap_getattr(htup, b, 2, obj_desc->hdesc, &isNull);
465                 fsblock = (struct varlena *) DatumGetPointer(d);
466
467                 off = obj_desc->offset - obj_desc->lowbyte;
468                 ncopy = obj_desc->highbyte - obj_desc->offset + 1;
469                 if (ncopy > (nbytes - nread))
470                         ncopy = (nbytes - nread);
471                 memmove(buf, &(fsblock->vl_dat[off]), ncopy);
472
473                 /* be a good citizen */
474                 ReleaseBuffer(b);
475
476                 /* move pointers past the amount we just read */
477                 buf += ncopy;
478                 nread += ncopy;
479                 obj_desc->offset += ncopy;
480         }
481
482         /* that's it */
483         return (nread);
484 }
485
486 int
487 inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
488 {
489         HeapTuple       htup;
490         Buffer          b;
491         int                     nwritten;
492         int                     tuplen;
493
494         Assert(PointerIsValid(obj_desc));
495         Assert(buf != NULL);
496
497         /*
498          * Make sure we obey two-phase locking.  A write lock entitles you to
499          * read the relation, as well.
500          */
501
502         if (!(obj_desc->flags & IFS_WRLOCK))
503         {
504                 RelationSetLockForRead(obj_desc->heap_r);
505                 obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
506         }
507
508         nwritten = 0;
509
510         /* write a block at a time */
511         while (nwritten < nbytes)
512         {
513
514                 /*
515                  * Fetch the current inversion file system block.  If the class
516                  * storing the inversion file is empty, we don't want to do an
517                  * index lookup, since index lookups choke on empty files (should
518                  * be fixed someday).
519                  */
520
521                 if ((obj_desc->flags & IFS_ATEOF)
522                         || obj_desc->heap_r->rd_nblocks == 0)
523                         htup = (HeapTuple) NULL;
524                 else
525                         htup = inv_fetchtup(obj_desc, &b);
526
527                 /* either append or replace a block, as required */
528                 if (!HeapTupleIsValid(htup))
529                 {
530                         tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
531                 }
532                 else
533                 {
534                         if (obj_desc->offset > obj_desc->highbyte)
535                                 tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
536                         else
537                                 tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, htup, b);
538                 }
539
540                 /* move pointers past the amount we just wrote */
541                 buf += tuplen;
542                 nwritten += tuplen;
543                 obj_desc->offset += tuplen;
544         }
545
546         /* that's it */
547         return (nwritten);
548 }
549
550 /*
551  *      inv_fetchtup -- Fetch an inversion file system block.
552  *
553  *              This routine finds the file system block containing the offset
554  *              recorded in the obj_desc structure.  Later, we need to think about
555  *              the effects of non-functional updates (can you rewrite the same
556  *              block twice in a single transaction?), but for now, we won't bother.
557  *
558  *              Parameters:
559  *                              obj_desc -- the object descriptor.
560  *                              bufP -- pointer to a buffer in the buffer cache; caller
561  *                                              must free this.
562  *
563  *              Returns:
564  *                              A heap tuple containing the desired block, or NULL if no
565  *                              such tuple exists.
566  */
567 static HeapTuple
568 inv_fetchtup(LargeObjectDesc *obj_desc, Buffer *bufP)
569 {
570         HeapTuple       htup;
571         RetrieveIndexResult res;
572         Datum           d;
573         int                     firstbyte,
574                                 lastbyte;
575         struct varlena *fsblock;
576         bool            isNull;
577
578         /*
579          * If we've exhausted the current block, we need to get the next one.
580          * When we support time travel and non-functional updates, we will
581          * need to loop over the blocks, rather than just have an 'if', in
582          * order to find the one we're really interested in.
583          */
584
585         if (obj_desc->offset > obj_desc->highbyte
586                 || obj_desc->offset < obj_desc->lowbyte
587                 || !ItemPointerIsValid(&(obj_desc->htid)))
588         {
589
590                 /* initialize scan key if not done */
591                 if (obj_desc->iscan == (IndexScanDesc) NULL)
592                 {
593                         ScanKeyData skey;
594
595                         ScanKeyEntryInitialize(&skey, 0x0, 1, INT4GE_PROC_OID,
596                                                                    Int32GetDatum(0));
597                         obj_desc->iscan =
598                                 index_beginscan(obj_desc->index_r,
599                                                                 (bool) 0, (uint16) 1,
600                                                                 &skey);
601                 }
602
603                 do
604                 {
605                         res = index_getnext(obj_desc->iscan, ForwardScanDirection);
606
607                         if (res == (RetrieveIndexResult) NULL)
608                         {
609                                 ItemPointerSetInvalid(&(obj_desc->htid));
610                                 return ((HeapTuple) NULL);
611                         }
612
613                         /*
614                          * For time travel, we need to use the actual time qual here,
615                          * rather that NowTimeQual.  We currently have no way to pass
616                          * a time qual in.
617                          */
618
619                         htup = heap_fetch(obj_desc->heap_r, false,
620                                                           &(res->heap_iptr), bufP);
621
622                 } while (htup == (HeapTuple) NULL);
623
624                 /* remember this tid -- we may need it for later reads/writes */
625                 ItemPointerCopy(&(res->heap_iptr), &(obj_desc->htid));
626
627         }
628         else
629         {
630                 htup = heap_fetch(obj_desc->heap_r, false,
631                                                   &(obj_desc->htid), bufP);
632         }
633
634         /*
635          * By here, we have the heap tuple we're interested in.  We cache the
636          * upper and lower bounds for this block in the object descriptor and
637          * return the tuple.
638          */
639
640         d = heap_getattr(htup, *bufP, 1, obj_desc->hdesc, &isNull);
641         lastbyte = (int32) DatumGetInt32(d);
642         d = heap_getattr(htup, *bufP, 2, obj_desc->hdesc, &isNull);
643         fsblock = (struct varlena *) DatumGetPointer(d);
644
645         /*
646          * order of + and - is important -- these are unsigned quantites near
647          * 0
648          */
649         firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
650
651         obj_desc->lowbyte = firstbyte;
652         obj_desc->highbyte = lastbyte;
653
654         /* done */
655         return (htup);
656 }
657
658 /*
659  *      inv_wrnew() -- append a new filesystem block tuple to the inversion
660  *                                      file.
661  *
662  *              In response to an inv_write, we append one or more file system
663  *              blocks to the class containing the large object.  We violate the
664  *              class abstraction here in order to pack things as densely as we
665  *              are able.  We examine the last page in the relation, and write
666  *              just enough to fill it, assuming that it has above a certain
667  *              threshold of space available.  If the space available is less than
668  *              the threshold, we allocate a new page by writing a big tuple.
669  *
670  *              By the time we get here, we know all the parameters passed in
671  *              are valid, and that we hold the appropriate lock on the heap
672  *              relation.
673  *
674  *              Parameters:
675  *                              obj_desc: large object descriptor for which to append block.
676  *                              buf: buffer containing data to write.
677  *                              nbytes: amount to write
678  *
679  *              Returns:
680  *                              number of bytes actually written to the new tuple.
681  */
682 static int
683 inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
684 {
685         Relation        hr;
686         HeapTuple       ntup;
687         Buffer          buffer;
688         Page            page;
689         int                     nblocks;
690         int                     nwritten;
691
692         hr = obj_desc->heap_r;
693
694         /*
695          * Get the last block in the relation.  If there's no data in the
696          * relation at all, then we just get a new block.  Otherwise, we check
697          * the last block to see whether it has room to accept some or all of
698          * the data that the user wants to write.  If it doesn't, then we
699          * allocate a new block.
700          */
701
702         nblocks = RelationGetNumberOfBlocks(hr);
703
704         if (nblocks > 0)
705                 buffer = ReadBuffer(hr, nblocks - 1);
706         else
707                 buffer = ReadBuffer(hr, P_NEW);
708
709         page = BufferGetPage(buffer);
710
711         /*
712          * If the last page is too small to hold all the data, and it's too
713          * small to hold IMINBLK, then we allocate a new page.  If it will
714          * hold at least IMINBLK, but less than all the data requested, then
715          * we write IMINBLK here.  The caller is responsible for noticing that
716          * less than the requested number of bytes were written, and calling
717          * this routine again.
718          */
719
720         nwritten = IFREESPC(page);
721         if (nwritten < nbytes)
722         {
723                 if (nwritten < IMINBLK)
724                 {
725                         ReleaseBuffer(buffer);
726                         buffer = ReadBuffer(hr, P_NEW);
727                         page = BufferGetPage(buffer);
728                         PageInit(page, BufferGetPageSize(buffer), 0);
729                         if (nbytes > IMAXBLK)
730                                 nwritten = IMAXBLK;
731                         else
732                                 nwritten = nbytes;
733                 }
734         }
735         else
736         {
737                 nwritten = nbytes;
738         }
739
740         /*
741          * Insert a new file system block tuple, index it, and write it out.
742          */
743
744         ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
745         inv_indextup(obj_desc, ntup);
746
747         /* new tuple is inserted */
748         WriteBuffer(buffer);
749
750         return (nwritten);
751 }
752
753 static int
754 inv_wrold(LargeObjectDesc *obj_desc,
755                   char *dbuf,
756                   int nbytes,
757                   HeapTuple htup,
758                   Buffer buffer)
759 {
760         Relation        hr;
761         HeapTuple       ntup;
762         Buffer          newbuf;
763         Page            page;
764         Page            newpage;
765         int                     tupbytes;
766         Datum           d;
767         struct varlena *fsblock;
768         int                     nwritten,
769                                 nblocks,
770                                 freespc;
771         bool            isNull;
772         int                     keep_offset;
773
774         /*
775          * Since we're using a no-overwrite storage manager, the way we
776          * overwrite blocks is to mark the old block invalid and append a new
777          * block.  First mark the old block invalid.  This violates the tuple
778          * abstraction.
779          */
780
781         TransactionIdStore(GetCurrentTransactionId(), &(htup->t_xmax));
782         htup->t_cmax = GetCurrentCommandId();
783         htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
784
785         /*
786          * If we're overwriting the entire block, we're lucky.  All we need to
787          * do is to insert a new block.
788          */
789
790         if (obj_desc->offset == obj_desc->lowbyte
791                 && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
792         {
793                 WriteBuffer(buffer);
794                 return (inv_wrnew(obj_desc, dbuf, nbytes));
795         }
796
797         /*
798          * By here, we need to overwrite part of the data in the current
799          * tuple.  In order to reduce the degree to which we fragment blocks,
800          * we guarantee that no block will be broken up due to an overwrite.
801          * This means that we need to allocate a tuple on a new page, if
802          * there's not room for the replacement on this one.
803          */
804
805         newbuf = buffer;
806         page = BufferGetPage(buffer);
807         newpage = BufferGetPage(newbuf);
808         hr = obj_desc->heap_r;
809         freespc = IFREESPC(page);
810         d = heap_getattr(htup, buffer, 2, obj_desc->hdesc, &isNull);
811         fsblock = (struct varlena *) DatumGetPointer(d);
812         tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
813
814         if (freespc < tupbytes)
815         {
816
817                 /*
818                  * First see if there's enough space on the last page of the table
819                  * to put this tuple.
820                  */
821
822                 nblocks = RelationGetNumberOfBlocks(hr);
823
824                 if (nblocks > 0)
825                         newbuf = ReadBuffer(hr, nblocks - 1);
826                 else
827                         newbuf = ReadBuffer(hr, P_NEW);
828
829                 newpage = BufferGetPage(newbuf);
830                 freespc = IFREESPC(newpage);
831
832                 /*
833                  * If there's no room on the last page, allocate a new last page
834                  * for the table, and put it there.
835                  */
836
837                 if (freespc < tupbytes)
838                 {
839                         ReleaseBuffer(newbuf);
840                         newbuf = ReadBuffer(hr, P_NEW);
841                         newpage = BufferGetPage(newbuf);
842                         PageInit(newpage, BufferGetPageSize(newbuf), 0);
843                 }
844         }
845
846         nwritten = nbytes;
847         if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
848                 nwritten = obj_desc->highbyte - obj_desc->offset + 1;
849         memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
850                         dbuf, nwritten);
851
852         /*
853          * we are rewriting the entire old block, therefore we reset offset to
854          * the lowbyte of the original block before jumping into
855          * inv_newtuple()
856          */
857         keep_offset = obj_desc->offset;
858         obj_desc->offset = obj_desc->lowbyte;
859         ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
860                                                 tupbytes);
861         /* after we are done, we restore to the true offset */
862         obj_desc->offset = keep_offset;
863
864         /*
865          * By here, we have a page (newpage) that's guaranteed to have enough
866          * space on it to put the new tuple.  Call inv_newtuple to do the
867          * work.  Passing NULL as a buffer to inv_newtuple() keeps it from
868          * copying any data into the new tuple.  When it returns, the tuple is
869          * ready to receive data from the old tuple and the user's data
870          * buffer.
871          */
872 /*
873         ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
874         dptr = ((char *) ntup) + ntup->t_hoff - sizeof(ntup->t_bits) + sizeof(int4)
875                                 + sizeof(fsblock->vl_len);
876
877         if (obj_desc->offset > obj_desc->lowbyte) {
878                 memmove(dptr,
879                                 &(fsblock->vl_dat[0]),
880                                 obj_desc->offset - obj_desc->lowbyte);
881                 dptr += obj_desc->offset - obj_desc->lowbyte;
882         }
883
884
885         nwritten = nbytes;
886         if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
887                 nwritten = obj_desc->highbyte - obj_desc->offset + 1;
888
889         memmove(dptr, dbuf, nwritten);
890         dptr += nwritten;
891
892         if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
893 */
894 /*
895                 loc = (obj_desc->highbyte - obj_desc->offset)
896                                 + nwritten;
897                 sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
898
899                 what's going on here?? - jolly
900 */
901 /*
902                 sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
903                 memmove(&(fsblock->vl_dat[0]), dptr, sz);
904         }
905 */
906
907
908         /* index the new tuple */
909         inv_indextup(obj_desc, ntup);
910
911         /*
912          * move the scandesc forward so we don't reread the newly inserted
913          * tuple on the next index scan
914          */
915         if (obj_desc->iscan)
916                 index_getnext(obj_desc->iscan, ForwardScanDirection);
917
918         /*
919          * Okay, by here, a tuple for the new block is correctly placed,
920          * indexed, and filled.  Write the changed pages out.
921          */
922
923         WriteBuffer(buffer);
924         if (newbuf != buffer)
925                 WriteBuffer(newbuf);
926
927         /* done */
928         return (nwritten);
929 }
930
931 static HeapTuple
932 inv_newtuple(LargeObjectDesc *obj_desc,
933                          Buffer buffer,
934                          Page page,
935                          char *dbuf,
936                          int nwrite)
937 {
938         HeapTuple       ntup;
939         PageHeader      ph;
940         int                     tupsize;
941         int                     hoff;
942         Offset          lower;
943         Offset          upper;
944         ItemId          itemId;
945         OffsetNumber off;
946         OffsetNumber limit;
947         char       *attptr;
948
949         /* compute tuple size -- no nulls */
950         hoff = sizeof(HeapTupleData) - sizeof(ntup->t_bits);
951
952         /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
953         tupsize = hoff + (2 * sizeof(int32)) + nwrite;
954         tupsize = LONGALIGN(tupsize);
955
956         /*
957          * Allocate the tuple on the page, violating the page abstraction.
958          * This code was swiped from PageAddItem().
959          */
960
961         ph = (PageHeader) page;
962         limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
963
964         /* look for "recyclable" (unused & deallocated) ItemId */
965         for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
966         {
967                 itemId = &ph->pd_linp[off - 1];
968                 if ((((*itemId).lp_flags & LP_USED) == 0) &&
969                         ((*itemId).lp_len == 0))
970                         break;
971         }
972
973         if (off > limit)
974                 lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
975         else if (off == limit)
976                 lower = ph->pd_lower + sizeof(ItemIdData);
977         else
978                 lower = ph->pd_lower;
979
980         upper = ph->pd_upper - tupsize;
981
982         itemId = &ph->pd_linp[off - 1];
983         (*itemId).lp_off = upper;
984         (*itemId).lp_len = tupsize;
985         (*itemId).lp_flags = LP_USED;
986         ph->pd_lower = lower;
987         ph->pd_upper = upper;
988
989         ntup = (HeapTuple) ((char *) page + upper);
990
991         /*
992          * Tuple is now allocated on the page.  Next, fill in the tuple
993          * header.      This block of code violates the tuple abstraction.
994          */
995
996         ntup->t_len = tupsize;
997         ItemPointerSet(&(ntup->t_ctid), BufferGetBlockNumber(buffer), off);
998         LastOidProcessed = ntup->t_oid = newoid();
999         TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_xmin));
1000         ntup->t_cmin = GetCurrentCommandId();
1001         StoreInvalidTransactionId(&(ntup->t_xmax));
1002         ntup->t_cmax = 0;
1003         ntup->t_infomask = HEAP_XMAX_INVALID;
1004         ntup->t_natts = 2;
1005         ntup->t_hoff = hoff;
1006
1007         /* if a NULL is passed in, avoid the calculations below */
1008         if (dbuf == NULL)
1009                 return ntup;
1010
1011         /*
1012          * Finally, copy the user's data buffer into the tuple.  This violates
1013          * the tuple and class abstractions.
1014          */
1015
1016         attptr = ((char *) ntup) + hoff;
1017         *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
1018         attptr += sizeof(int32);
1019
1020         /*
1021          * *  mer fixed disk layout of varlenas to get rid of the need for
1022          * this. *
1023          *
1024          * ((int32 *) attptr) = nwrite + sizeof(int32); *  attptr +=
1025          * sizeof(int32);
1026          */
1027
1028         *((int32 *) attptr) = nwrite + sizeof(int32);
1029         attptr += sizeof(int32);
1030
1031         /*
1032          * If a data buffer was passed in, then copy the data from the buffer
1033          * to the tuple.  Some callers (eg, inv_wrold()) may not pass in a
1034          * buffer, since they have to copy part of the old tuple data and part
1035          * of the user's new data into the new tuple.
1036          */
1037
1038         if (dbuf != (char *) NULL)
1039                 memmove(attptr, dbuf, nwrite);
1040
1041         /* keep track of boundary of current tuple */
1042         obj_desc->lowbyte = obj_desc->offset;
1043         obj_desc->highbyte = obj_desc->offset + nwrite - 1;
1044
1045         /* new tuple is filled -- return it */
1046         return (ntup);
1047 }
1048
1049 static void
1050 inv_indextup(LargeObjectDesc *obj_desc, HeapTuple htup)
1051 {
1052         InsertIndexResult res;
1053         Datum           v[1];
1054         char            n[1];
1055
1056         n[0] = ' ';
1057         v[0] = Int32GetDatum(obj_desc->highbyte);
1058         res = index_insert(obj_desc->index_r, &v[0], &n[0],
1059                                            &(htup->t_ctid), obj_desc->heap_r);
1060
1061         if (res)
1062                 pfree(res);
1063 }
1064
1065 /*
1066 static void
1067 DumpPage(Page page, int blkno)
1068 {
1069                 ItemId                  lp;
1070                 HeapTuple               tup;
1071                 int                             flags, i, nline;
1072                 ItemPointerData pointerData;
1073
1074                 printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
1075                                 ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
1076                                 ((PageHeader)page)->pd_special);
1077
1078                 printf("\t:MaxOffsetNumber=%d\n",
1079                            (int16) PageGetMaxOffsetNumber(page));
1080
1081                 nline = (int16) PageGetMaxOffsetNumber(page);
1082
1083 {
1084                 int             i;
1085                 char    *cp;
1086
1087                 i = PageGetSpecialSize(page);
1088                 cp = PageGetSpecialPointer(page);
1089
1090                 printf("\t:SpecialData=");
1091
1092                 while (i > 0) {
1093                                 printf(" 0x%02x", *cp);
1094                                 cp += 1;
1095                                 i -= 1;
1096                 }
1097                 printf("\n");
1098 }
1099                 for (i = 0; i < nline; i++) {
1100                                 lp = ((PageHeader)page)->pd_linp + i;
1101                                 flags = (*lp).lp_flags;
1102                                 ItemPointerSet(&pointerData, blkno, 1 + i);
1103                                 printf("%s:off=%d:flags=0x%x:len=%d",
1104                                                 ItemPointerFormExternal(&pointerData), (*lp).lp_off,
1105                                                 flags, (*lp).lp_len);
1106
1107                                 if (flags & LP_USED) {
1108                                                 HeapTupleData   htdata;
1109
1110                                                 printf(":USED");
1111
1112                                                 memmove((char *) &htdata,
1113                                                                 (char *) &((char *)page)[(*lp).lp_off],
1114                                                                 sizeof(htdata));
1115
1116                                                 tup = &htdata;
1117
1118                                                 printf("\n\t:ctid=%s:oid=%d",
1119                                                                 ItemPointerFormExternal(&tup->t_ctid),
1120                                                                 tup->t_oid);
1121                                                 printf(":natts=%d:thoff=%d:",
1122                                                                 tup->t_natts,
1123                                                                 tup->t_hoff);
1124
1125                                                 printf("\n\t:cmin=%u:",
1126                                                                 tup->t_cmin);
1127
1128                                                 printf("xmin=%u:", tup->t_xmin);
1129
1130                                                 printf("\n\t:cmax=%u:",
1131                                                                 tup->t_cmax);
1132
1133                                                 printf("xmax=%u:\n", tup->t_xmax);
1134
1135                                 } else
1136                                                 putchar('\n');
1137                 }
1138 }
1139
1140 static char*
1141 ItemPointerFormExternal(ItemPointer pointer)
1142 {
1143                 static char             itemPointerString[32];
1144
1145                 if (!ItemPointerIsValid(pointer)) {
1146                         memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
1147                 } else {
1148                         sprintf(itemPointerString, "<%u,%u>",
1149                                         ItemPointerGetBlockNumber(pointer),
1150                                         ItemPointerGetOffsetNumber(pointer));
1151                 }
1152
1153                 return (itemPointerString);
1154 }
1155 */
1156
1157 static int
1158 _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
1159 {
1160         IndexScanDesc iscan;
1161         RetrieveIndexResult res;
1162         Buffer          buf;
1163         HeapTuple       htup;
1164         Datum           d;
1165         long            size;
1166         bool            isNull;
1167
1168         /* scan backwards from end */
1169         iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
1170
1171         buf = InvalidBuffer;
1172
1173         do
1174         {
1175                 res = index_getnext(iscan, BackwardScanDirection);
1176
1177                 /*
1178                  * If there are no more index tuples, then the relation is empty,
1179                  * so the file's size is zero.
1180                  */
1181
1182                 if (res == (RetrieveIndexResult) NULL)
1183                 {
1184                         index_endscan(iscan);
1185                         return (0);
1186                 }
1187
1188                 /*
1189                  * For time travel, we need to use the actual time qual here,
1190                  * rather that NowTimeQual.  We currently have no way to pass a
1191                  * time qual in.
1192                  */
1193
1194                 if (buf != InvalidBuffer)
1195                         ReleaseBuffer(buf);
1196
1197                 htup = heap_fetch(hreln, false, &(res->heap_iptr), &buf);
1198
1199         } while (!HeapTupleIsValid(htup));
1200
1201         /* don't need the index scan anymore */
1202         index_endscan(iscan);
1203
1204         /* get olastbyte attribute */
1205         d = heap_getattr(htup, buf, 1, hdesc, &isNull);
1206         size = DatumGetInt32(d) + 1;
1207
1208         /* wei hates it if you forget to do this */
1209         ReleaseBuffer(buf);
1210
1211         return (size);
1212 }