]> granicus.if.org Git - postgresql/blob - src/backend/storage/large_object/inv_api.c
pgindent run for 8.3.
[postgresql] / src / backend / storage / large_object / inv_api.c
1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  *        routines for manipulating inversion fs large objects. This file
5  *        contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format.  We have to detoast it since it's
12  * quite likely to be in compressed or short format.  We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code.  We expect that CurrentMemoryContext will
17  * be a short-lived context.  Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  *        $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.126 2007/11/15 21:14:38 momjian Exp $
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/indexing.h"
39 #include "catalog/pg_largeobject.h"
40 #include "commands/comment.h"
41 #include "libpq/libpq-fs.h"
42 #include "storage/large_object.h"
43 #include "utils/fmgroids.h"
44 #include "utils/resowner.h"
45
46
47 /*
48  * All accesses to pg_largeobject and its index make use of a single Relation
49  * reference, so that we only need to open pg_relation once per transaction.
50  * To avoid problems when the first such reference occurs inside a
51  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
52  * the Relation reference to TopTransactionResourceOwner.
53  */
54 static Relation lo_heap_r = NULL;
55 static Relation lo_index_r = NULL;
56
57
58 /*
59  * Open pg_largeobject and its index, if not already done in current xact
60  */
61 static void
62 open_lo_relation(void)
63 {
64         ResourceOwner currentOwner;
65
66         if (lo_heap_r && lo_index_r)
67                 return;                                 /* already open in current xact */
68
69         /* Arrange for the top xact to own these relation references */
70         currentOwner = CurrentResourceOwner;
71         PG_TRY();
72         {
73                 CurrentResourceOwner = TopTransactionResourceOwner;
74
75                 /* Use RowExclusiveLock since we might either read or write */
76                 if (lo_heap_r == NULL)
77                         lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
78                 if (lo_index_r == NULL)
79                         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
80         }
81         PG_CATCH();
82         {
83                 /* Ensure CurrentResourceOwner is restored on error */
84                 CurrentResourceOwner = currentOwner;
85                 PG_RE_THROW();
86         }
87         PG_END_TRY();
88         CurrentResourceOwner = currentOwner;
89 }
90
91 /*
92  * Clean up at main transaction end
93  */
94 void
95 close_lo_relation(bool isCommit)
96 {
97         if (lo_heap_r || lo_index_r)
98         {
99                 /*
100                  * Only bother to close if committing; else abort cleanup will handle
101                  * it
102                  */
103                 if (isCommit)
104                 {
105                         ResourceOwner currentOwner;
106
107                         currentOwner = CurrentResourceOwner;
108                         PG_TRY();
109                         {
110                                 CurrentResourceOwner = TopTransactionResourceOwner;
111
112                                 if (lo_index_r)
113                                         index_close(lo_index_r, NoLock);
114                                 if (lo_heap_r)
115                                         heap_close(lo_heap_r, NoLock);
116                         }
117                         PG_CATCH();
118                         {
119                                 /* Ensure CurrentResourceOwner is restored on error */
120                                 CurrentResourceOwner = currentOwner;
121                                 PG_RE_THROW();
122                         }
123                         PG_END_TRY();
124                         CurrentResourceOwner = currentOwner;
125                 }
126                 lo_heap_r = NULL;
127                 lo_index_r = NULL;
128         }
129 }
130
131
132 /*
133  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
134  * read with can be specified.
135  */
136 static bool
137 myLargeObjectExists(Oid loid, Snapshot snapshot)
138 {
139         bool            retval = false;
140         Relation        pg_largeobject;
141         ScanKeyData skey[1];
142         SysScanDesc sd;
143
144         /*
145          * See if we can find any tuples belonging to the specified LO
146          */
147         ScanKeyInit(&skey[0],
148                                 Anum_pg_largeobject_loid,
149                                 BTEqualStrategyNumber, F_OIDEQ,
150                                 ObjectIdGetDatum(loid));
151
152         pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
153
154         sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
155                                                         snapshot, 1, skey);
156
157         if (systable_getnext(sd) != NULL)
158                 retval = true;
159
160         systable_endscan(sd);
161
162         heap_close(pg_largeobject, AccessShareLock);
163
164         return retval;
165 }
166
167
168 static int32
169 getbytealen(bytea *data)
170 {
171         Assert(!VARATT_IS_EXTENDED(data));
172         if (VARSIZE(data) < VARHDRSZ)
173                 elog(ERROR, "invalid VARSIZE(data)");
174         return (VARSIZE(data) - VARHDRSZ);
175 }
176
177
178 /*
179  *      inv_create -- create a new large object
180  *
181  *      Arguments:
182  *        lobjId - OID to use for new large object, or InvalidOid to pick one
183  *
184  *      Returns:
185  *        OID of new object
186  *
187  * If lobjId is not InvalidOid, then an error occurs if the OID is already
188  * in use.
189  */
190 Oid
191 inv_create(Oid lobjId)
192 {
193         /*
194          * Allocate an OID to be the LO's identifier, unless we were told what to
195          * use.  We can use the index on pg_largeobject for checking OID
196          * uniqueness, even though it has additional columns besides OID.
197          */
198         if (!OidIsValid(lobjId))
199         {
200                 open_lo_relation();
201
202                 lobjId = GetNewOidWithIndex(lo_heap_r, lo_index_r);
203         }
204
205         /*
206          * Create the LO by writing an empty first page for it in pg_largeobject
207          * (will fail if duplicate)
208          */
209         LargeObjectCreate(lobjId);
210
211         /*
212          * Advance command counter to make new tuple visible to later operations.
213          */
214         CommandCounterIncrement();
215
216         return lobjId;
217 }
218
219 /*
220  *      inv_open -- access an existing large object.
221  *
222  *              Returns:
223  *                Large object descriptor, appropriately filled in.  The descriptor
224  *                and subsidiary data are allocated in the specified memory context,
225  *                which must be suitably long-lived for the caller's purposes.
226  */
227 LargeObjectDesc *
228 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
229 {
230         LargeObjectDesc *retval;
231
232         retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
233                                                                                                         sizeof(LargeObjectDesc));
234
235         retval->id = lobjId;
236         retval->subid = GetCurrentSubTransactionId();
237         retval->offset = 0;
238
239         if (flags & INV_WRITE)
240         {
241                 retval->snapshot = SnapshotNow;
242                 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
243         }
244         else if (flags & INV_READ)
245         {
246                 /* be sure to copy snap into mcxt */
247                 MemoryContext oldContext = MemoryContextSwitchTo(mcxt);
248
249                 retval->snapshot = CopySnapshot(ActiveSnapshot);
250                 retval->flags = IFS_RDLOCK;
251                 MemoryContextSwitchTo(oldContext);
252         }
253         else
254                 elog(ERROR, "invalid flags: %d", flags);
255
256         /* Can't use LargeObjectExists here because it always uses SnapshotNow */
257         if (!myLargeObjectExists(lobjId, retval->snapshot))
258                 ereport(ERROR,
259                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
260                                  errmsg("large object %u does not exist", lobjId)));
261
262         return retval;
263 }
264
265 /*
266  * Closes a large object descriptor previously made by inv_open(), and
267  * releases the long-term memory used by it.
268  */
269 void
270 inv_close(LargeObjectDesc *obj_desc)
271 {
272         Assert(PointerIsValid(obj_desc));
273         if (obj_desc->snapshot != SnapshotNow)
274                 FreeSnapshot(obj_desc->snapshot);
275         pfree(obj_desc);
276 }
277
278 /*
279  * Destroys an existing large object (not to be confused with a descriptor!)
280  *
281  * returns -1 if failed
282  */
283 int
284 inv_drop(Oid lobjId)
285 {
286         LargeObjectDrop(lobjId);
287
288         /* Delete any comments on the large object */
289         DeleteComments(lobjId, LargeObjectRelationId, 0);
290
291         /*
292          * Advance command counter so that tuple removal will be seen by later
293          * large-object operations in this transaction.
294          */
295         CommandCounterIncrement();
296
297         return 1;
298 }
299
300 /*
301  * Determine size of a large object
302  *
303  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
304  * the offset of the last byte + 1.
305  */
306 static uint32
307 inv_getsize(LargeObjectDesc *obj_desc)
308 {
309         bool            found = false;
310         uint32          lastbyte = 0;
311         ScanKeyData skey[1];
312         IndexScanDesc sd;
313         HeapTuple       tuple;
314
315         Assert(PointerIsValid(obj_desc));
316
317         open_lo_relation();
318
319         ScanKeyInit(&skey[0],
320                                 Anum_pg_largeobject_loid,
321                                 BTEqualStrategyNumber, F_OIDEQ,
322                                 ObjectIdGetDatum(obj_desc->id));
323
324         sd = index_beginscan(lo_heap_r, lo_index_r,
325                                                  obj_desc->snapshot, 1, skey);
326
327         /*
328          * Because the pg_largeobject index is on both loid and pageno, but we
329          * constrain only loid, a backwards scan should visit all pages of the
330          * large object in reverse pageno order.  So, it's sufficient to examine
331          * the first valid tuple (== last valid page).
332          */
333         while ((tuple = index_getnext(sd, BackwardScanDirection)) != NULL)
334         {
335                 Form_pg_largeobject data;
336                 bytea      *datafield;
337                 bool            pfreeit;
338
339                 found = true;
340                 if (HeapTupleHasNulls(tuple))   /* paranoia */
341                         elog(ERROR, "null field found in pg_largeobject");
342                 data = (Form_pg_largeobject) GETSTRUCT(tuple);
343                 datafield = &(data->data);              /* see note at top of file */
344                 pfreeit = false;
345                 if (VARATT_IS_EXTENDED(datafield))
346                 {
347                         datafield = (bytea *)
348                                 heap_tuple_untoast_attr((struct varlena *) datafield);
349                         pfreeit = true;
350                 }
351                 lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
352                 if (pfreeit)
353                         pfree(datafield);
354                 break;
355         }
356
357         index_endscan(sd);
358
359         if (!found)
360                 ereport(ERROR,
361                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
362                                  errmsg("large object %u does not exist", obj_desc->id)));
363         return lastbyte;
364 }
365
366 int
367 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
368 {
369         Assert(PointerIsValid(obj_desc));
370
371         switch (whence)
372         {
373                 case SEEK_SET:
374                         if (offset < 0)
375                                 elog(ERROR, "invalid seek offset: %d", offset);
376                         obj_desc->offset = offset;
377                         break;
378                 case SEEK_CUR:
379                         if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
380                                 elog(ERROR, "invalid seek offset: %d", offset);
381                         obj_desc->offset += offset;
382                         break;
383                 case SEEK_END:
384                         {
385                                 uint32          size = inv_getsize(obj_desc);
386
387                                 if (offset < 0 && size < ((uint32) (-offset)))
388                                         elog(ERROR, "invalid seek offset: %d", offset);
389                                 obj_desc->offset = size + offset;
390                         }
391                         break;
392                 default:
393                         elog(ERROR, "invalid whence: %d", whence);
394         }
395         return obj_desc->offset;
396 }
397
398 int
399 inv_tell(LargeObjectDesc *obj_desc)
400 {
401         Assert(PointerIsValid(obj_desc));
402
403         return obj_desc->offset;
404 }
405
406 int
407 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
408 {
409         int                     nread = 0;
410         int                     n;
411         int                     off;
412         int                     len;
413         int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
414         uint32          pageoff;
415         ScanKeyData skey[2];
416         IndexScanDesc sd;
417         HeapTuple       tuple;
418
419         Assert(PointerIsValid(obj_desc));
420         Assert(buf != NULL);
421
422         if (nbytes <= 0)
423                 return 0;
424
425         open_lo_relation();
426
427         ScanKeyInit(&skey[0],
428                                 Anum_pg_largeobject_loid,
429                                 BTEqualStrategyNumber, F_OIDEQ,
430                                 ObjectIdGetDatum(obj_desc->id));
431
432         ScanKeyInit(&skey[1],
433                                 Anum_pg_largeobject_pageno,
434                                 BTGreaterEqualStrategyNumber, F_INT4GE,
435                                 Int32GetDatum(pageno));
436
437         sd = index_beginscan(lo_heap_r, lo_index_r,
438                                                  obj_desc->snapshot, 2, skey);
439
440         while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
441         {
442                 Form_pg_largeobject data;
443                 bytea      *datafield;
444                 bool            pfreeit;
445
446                 if (HeapTupleHasNulls(tuple))   /* paranoia */
447                         elog(ERROR, "null field found in pg_largeobject");
448                 data = (Form_pg_largeobject) GETSTRUCT(tuple);
449
450                 /*
451                  * We assume the indexscan will deliver pages in order.  However,
452                  * there may be missing pages if the LO contains unwritten "holes". We
453                  * want missing sections to read out as zeroes.
454                  */
455                 pageoff = ((uint32) data->pageno) * LOBLKSIZE;
456                 if (pageoff > obj_desc->offset)
457                 {
458                         n = pageoff - obj_desc->offset;
459                         n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
460                         MemSet(buf + nread, 0, n);
461                         nread += n;
462                         obj_desc->offset += n;
463                 }
464
465                 if (nread < nbytes)
466                 {
467                         Assert(obj_desc->offset >= pageoff);
468                         off = (int) (obj_desc->offset - pageoff);
469                         Assert(off >= 0 && off < LOBLKSIZE);
470
471                         datafield = &(data->data);      /* see note at top of file */
472                         pfreeit = false;
473                         if (VARATT_IS_EXTENDED(datafield))
474                         {
475                                 datafield = (bytea *)
476                                         heap_tuple_untoast_attr((struct varlena *) datafield);
477                                 pfreeit = true;
478                         }
479                         len = getbytealen(datafield);
480                         if (len > off)
481                         {
482                                 n = len - off;
483                                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
484                                 memcpy(buf + nread, VARDATA(datafield) + off, n);
485                                 nread += n;
486                                 obj_desc->offset += n;
487                         }
488                         if (pfreeit)
489                                 pfree(datafield);
490                 }
491
492                 if (nread >= nbytes)
493                         break;
494         }
495
496         index_endscan(sd);
497
498         return nread;
499 }
500
501 int
502 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
503 {
504         int                     nwritten = 0;
505         int                     n;
506         int                     off;
507         int                     len;
508         int32           pageno = (int32) (obj_desc->offset / LOBLKSIZE);
509         ScanKeyData skey[2];
510         IndexScanDesc sd;
511         HeapTuple       oldtuple;
512         Form_pg_largeobject olddata;
513         bool            neednextpage;
514         bytea      *datafield;
515         bool            pfreeit;
516         struct
517         {
518                 bytea           hdr;
519                 char            data[LOBLKSIZE];
520         }                       workbuf;
521         char       *workb = VARDATA(&workbuf.hdr);
522         HeapTuple       newtup;
523         Datum           values[Natts_pg_largeobject];
524         char            nulls[Natts_pg_largeobject];
525         char            replace[Natts_pg_largeobject];
526         CatalogIndexState indstate;
527
528         Assert(PointerIsValid(obj_desc));
529         Assert(buf != NULL);
530
531         /* enforce writability because snapshot is probably wrong otherwise */
532         if ((obj_desc->flags & IFS_WRLOCK) == 0)
533                 ereport(ERROR,
534                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
535                                  errmsg("large object %u was not opened for writing",
536                                                 obj_desc->id)));
537
538         if (nbytes <= 0)
539                 return 0;
540
541         open_lo_relation();
542
543         indstate = CatalogOpenIndexes(lo_heap_r);
544
545         ScanKeyInit(&skey[0],
546                                 Anum_pg_largeobject_loid,
547                                 BTEqualStrategyNumber, F_OIDEQ,
548                                 ObjectIdGetDatum(obj_desc->id));
549
550         ScanKeyInit(&skey[1],
551                                 Anum_pg_largeobject_pageno,
552                                 BTGreaterEqualStrategyNumber, F_INT4GE,
553                                 Int32GetDatum(pageno));
554
555         sd = index_beginscan(lo_heap_r, lo_index_r,
556                                                  obj_desc->snapshot, 2, skey);
557
558         oldtuple = NULL;
559         olddata = NULL;
560         neednextpage = true;
561
562         while (nwritten < nbytes)
563         {
564                 /*
565                  * If possible, get next pre-existing page of the LO.  We assume the
566                  * indexscan will deliver these in order --- but there may be holes.
567                  */
568                 if (neednextpage)
569                 {
570                         if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
571                         {
572                                 if (HeapTupleHasNulls(oldtuple))                /* paranoia */
573                                         elog(ERROR, "null field found in pg_largeobject");
574                                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
575                                 Assert(olddata->pageno >= pageno);
576                         }
577                         neednextpage = false;
578                 }
579
580                 /*
581                  * If we have a pre-existing page, see if it is the page we want to
582                  * write, or a later one.
583                  */
584                 if (olddata != NULL && olddata->pageno == pageno)
585                 {
586                         /*
587                          * Update an existing page with fresh data.
588                          *
589                          * First, load old data into workbuf
590                          */
591                         datafield = &(olddata->data);           /* see note at top of file */
592                         pfreeit = false;
593                         if (VARATT_IS_EXTENDED(datafield))
594                         {
595                                 datafield = (bytea *)
596                                         heap_tuple_untoast_attr((struct varlena *) datafield);
597                                 pfreeit = true;
598                         }
599                         len = getbytealen(datafield);
600                         Assert(len <= LOBLKSIZE);
601                         memcpy(workb, VARDATA(datafield), len);
602                         if (pfreeit)
603                                 pfree(datafield);
604
605                         /*
606                          * Fill any hole
607                          */
608                         off = (int) (obj_desc->offset % LOBLKSIZE);
609                         if (off > len)
610                                 MemSet(workb + len, 0, off - len);
611
612                         /*
613                          * Insert appropriate portion of new data
614                          */
615                         n = LOBLKSIZE - off;
616                         n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
617                         memcpy(workb + off, buf + nwritten, n);
618                         nwritten += n;
619                         obj_desc->offset += n;
620                         off += n;
621                         /* compute valid length of new page */
622                         len = (len >= off) ? len : off;
623                         SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
624
625                         /*
626                          * Form and insert updated tuple
627                          */
628                         memset(values, 0, sizeof(values));
629                         memset(nulls, ' ', sizeof(nulls));
630                         memset(replace, ' ', sizeof(replace));
631                         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
632                         replace[Anum_pg_largeobject_data - 1] = 'r';
633                         newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
634                                                                           values, nulls, replace);
635                         simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
636                         CatalogIndexInsert(indstate, newtup);
637                         heap_freetuple(newtup);
638
639                         /*
640                          * We're done with this old page.
641                          */
642                         oldtuple = NULL;
643                         olddata = NULL;
644                         neednextpage = true;
645                 }
646                 else
647                 {
648                         /*
649                          * Write a brand new page.
650                          *
651                          * First, fill any hole
652                          */
653                         off = (int) (obj_desc->offset % LOBLKSIZE);
654                         if (off > 0)
655                                 MemSet(workb, 0, off);
656
657                         /*
658                          * Insert appropriate portion of new data
659                          */
660                         n = LOBLKSIZE - off;
661                         n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
662                         memcpy(workb + off, buf + nwritten, n);
663                         nwritten += n;
664                         obj_desc->offset += n;
665                         /* compute valid length of new page */
666                         len = off + n;
667                         SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
668
669                         /*
670                          * Form and insert updated tuple
671                          */
672                         memset(values, 0, sizeof(values));
673                         memset(nulls, ' ', sizeof(nulls));
674                         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
675                         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
676                         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
677                         newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
678                         simple_heap_insert(lo_heap_r, newtup);
679                         CatalogIndexInsert(indstate, newtup);
680                         heap_freetuple(newtup);
681                 }
682                 pageno++;
683         }
684
685         index_endscan(sd);
686
687         CatalogCloseIndexes(indstate);
688
689         /*
690          * Advance command counter so that my tuple updates will be seen by later
691          * large-object operations in this transaction.
692          */
693         CommandCounterIncrement();
694
695         return nwritten;
696 }
697
698 void
699 inv_truncate(LargeObjectDesc *obj_desc, int len)
700 {
701         int32           pageno = (int32) (len / LOBLKSIZE);
702         int                     off;
703         ScanKeyData skey[2];
704         IndexScanDesc sd;
705         HeapTuple       oldtuple;
706         Form_pg_largeobject olddata;
707         struct
708         {
709                 bytea           hdr;
710                 char            data[LOBLKSIZE];
711         }                       workbuf;
712         char       *workb = VARDATA(&workbuf.hdr);
713         HeapTuple       newtup;
714         Datum           values[Natts_pg_largeobject];
715         char            nulls[Natts_pg_largeobject];
716         char            replace[Natts_pg_largeobject];
717         CatalogIndexState indstate;
718
719         Assert(PointerIsValid(obj_desc));
720
721         /* enforce writability because snapshot is probably wrong otherwise */
722         if ((obj_desc->flags & IFS_WRLOCK) == 0)
723                 ereport(ERROR,
724                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
725                                  errmsg("large object %u was not opened for writing",
726                                                 obj_desc->id)));
727
728         open_lo_relation();
729
730         indstate = CatalogOpenIndexes(lo_heap_r);
731
732         ScanKeyInit(&skey[0],
733                                 Anum_pg_largeobject_loid,
734                                 BTEqualStrategyNumber, F_OIDEQ,
735                                 ObjectIdGetDatum(obj_desc->id));
736
737         ScanKeyInit(&skey[1],
738                                 Anum_pg_largeobject_pageno,
739                                 BTGreaterEqualStrategyNumber, F_INT4GE,
740                                 Int32GetDatum(pageno));
741
742         sd = index_beginscan(lo_heap_r, lo_index_r,
743                                                  obj_desc->snapshot, 2, skey);
744
745         /*
746          * If possible, get the page the truncation point is in. The truncation
747          * point may be beyond the end of the LO or in a hole.
748          */
749         olddata = NULL;
750         if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
751         {
752                 if (HeapTupleHasNulls(oldtuple))                /* paranoia */
753                         elog(ERROR, "null field found in pg_largeobject");
754                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
755                 Assert(olddata->pageno >= pageno);
756         }
757
758         /*
759          * If we found the page of the truncation point we need to truncate the
760          * data in it.  Otherwise if we're in a hole, we need to create a page to
761          * mark the end of data.
762          */
763         if (olddata != NULL && olddata->pageno == pageno)
764         {
765                 /* First, load old data into workbuf */
766                 bytea      *datafield = &(olddata->data);               /* see note at top of
767                                                                                                                  * file */
768                 bool            pfreeit = false;
769                 int                     pagelen;
770
771                 if (VARATT_IS_EXTENDED(datafield))
772                 {
773                         datafield = (bytea *)
774                                 heap_tuple_untoast_attr((struct varlena *) datafield);
775                         pfreeit = true;
776                 }
777                 pagelen = getbytealen(datafield);
778                 Assert(pagelen <= LOBLKSIZE);
779                 memcpy(workb, VARDATA(datafield), pagelen);
780                 if (pfreeit)
781                         pfree(datafield);
782
783                 /*
784                  * Fill any hole
785                  */
786                 off = len % LOBLKSIZE;
787                 if (off > pagelen)
788                         MemSet(workb + pagelen, 0, off - pagelen);
789
790                 /* compute length of new page */
791                 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
792
793                 /*
794                  * Form and insert updated tuple
795                  */
796                 memset(values, 0, sizeof(values));
797                 memset(nulls, ' ', sizeof(nulls));
798                 memset(replace, ' ', sizeof(replace));
799                 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
800                 replace[Anum_pg_largeobject_data - 1] = 'r';
801                 newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
802                                                                   values, nulls, replace);
803                 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
804                 CatalogIndexInsert(indstate, newtup);
805                 heap_freetuple(newtup);
806         }
807         else
808         {
809                 /*
810                  * If the first page we found was after the truncation point, we're in
811                  * a hole that we'll fill, but we need to delete the later page.
812                  */
813                 if (olddata != NULL && olddata->pageno > pageno)
814                         simple_heap_delete(lo_heap_r, &oldtuple->t_self);
815
816                 /*
817                  * Write a brand new page.
818                  *
819                  * Fill the hole up to the truncation point
820                  */
821                 off = len % LOBLKSIZE;
822                 if (off > 0)
823                         MemSet(workb, 0, off);
824
825                 /* compute length of new page */
826                 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
827
828                 /*
829                  * Form and insert new tuple
830                  */
831                 memset(values, 0, sizeof(values));
832                 memset(nulls, ' ', sizeof(nulls));
833                 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
834                 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
835                 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
836                 newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
837                 simple_heap_insert(lo_heap_r, newtup);
838                 CatalogIndexInsert(indstate, newtup);
839                 heap_freetuple(newtup);
840         }
841
842         /*
843          * Delete any pages after the truncation point
844          */
845         while ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
846         {
847                 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
848         }
849
850         index_endscan(sd);
851
852         CatalogCloseIndexes(indstate);
853
854         /*
855          * Advance command counter so that tuple updates will be seen by later
856          * large-object operations in this transaction.
857          */
858         CommandCounterIncrement();
859 }