1 /*-------------------------------------------------------------------------
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
22 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
27 * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.126 2007/11/15 21:14:38 momjian Exp $
29 *-------------------------------------------------------------------------
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/indexing.h"
39 #include "catalog/pg_largeobject.h"
40 #include "commands/comment.h"
41 #include "libpq/libpq-fs.h"
42 #include "storage/large_object.h"
43 #include "utils/fmgroids.h"
44 #include "utils/resowner.h"
48 * All accesses to pg_largeobject and its index make use of a single Relation
49 * reference, so that we only need to open pg_relation once per transaction.
50 * To avoid problems when the first such reference occurs inside a
51 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
52 * the Relation reference to TopTransactionResourceOwner.
54 static Relation lo_heap_r = NULL;
55 static Relation lo_index_r = NULL;
59 * Open pg_largeobject and its index, if not already done in current xact
62 open_lo_relation(void)
64 ResourceOwner currentOwner;
66 if (lo_heap_r && lo_index_r)
67 return; /* already open in current xact */
69 /* Arrange for the top xact to own these relation references */
70 currentOwner = CurrentResourceOwner;
73 CurrentResourceOwner = TopTransactionResourceOwner;
75 /* Use RowExclusiveLock since we might either read or write */
76 if (lo_heap_r == NULL)
77 lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
78 if (lo_index_r == NULL)
79 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
83 /* Ensure CurrentResourceOwner is restored on error */
84 CurrentResourceOwner = currentOwner;
88 CurrentResourceOwner = currentOwner;
92 * Clean up at main transaction end
95 close_lo_relation(bool isCommit)
97 if (lo_heap_r || lo_index_r)
100 * Only bother to close if committing; else abort cleanup will handle
105 ResourceOwner currentOwner;
107 currentOwner = CurrentResourceOwner;
110 CurrentResourceOwner = TopTransactionResourceOwner;
113 index_close(lo_index_r, NoLock);
115 heap_close(lo_heap_r, NoLock);
119 /* Ensure CurrentResourceOwner is restored on error */
120 CurrentResourceOwner = currentOwner;
124 CurrentResourceOwner = currentOwner;
133 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
134 * read with can be specified.
137 myLargeObjectExists(Oid loid, Snapshot snapshot)
140 Relation pg_largeobject;
145 * See if we can find any tuples belonging to the specified LO
147 ScanKeyInit(&skey[0],
148 Anum_pg_largeobject_loid,
149 BTEqualStrategyNumber, F_OIDEQ,
150 ObjectIdGetDatum(loid));
152 pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
154 sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
157 if (systable_getnext(sd) != NULL)
160 systable_endscan(sd);
162 heap_close(pg_largeobject, AccessShareLock);
169 getbytealen(bytea *data)
171 Assert(!VARATT_IS_EXTENDED(data));
172 if (VARSIZE(data) < VARHDRSZ)
173 elog(ERROR, "invalid VARSIZE(data)");
174 return (VARSIZE(data) - VARHDRSZ);
179 * inv_create -- create a new large object
182 * lobjId - OID to use for new large object, or InvalidOid to pick one
187 * If lobjId is not InvalidOid, then an error occurs if the OID is already
191 inv_create(Oid lobjId)
194 * Allocate an OID to be the LO's identifier, unless we were told what to
195 * use. We can use the index on pg_largeobject for checking OID
196 * uniqueness, even though it has additional columns besides OID.
198 if (!OidIsValid(lobjId))
202 lobjId = GetNewOidWithIndex(lo_heap_r, lo_index_r);
206 * Create the LO by writing an empty first page for it in pg_largeobject
207 * (will fail if duplicate)
209 LargeObjectCreate(lobjId);
212 * Advance command counter to make new tuple visible to later operations.
214 CommandCounterIncrement();
220 * inv_open -- access an existing large object.
223 * Large object descriptor, appropriately filled in. The descriptor
224 * and subsidiary data are allocated in the specified memory context,
225 * which must be suitably long-lived for the caller's purposes.
228 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
230 LargeObjectDesc *retval;
232 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
233 sizeof(LargeObjectDesc));
236 retval->subid = GetCurrentSubTransactionId();
239 if (flags & INV_WRITE)
241 retval->snapshot = SnapshotNow;
242 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
244 else if (flags & INV_READ)
246 /* be sure to copy snap into mcxt */
247 MemoryContext oldContext = MemoryContextSwitchTo(mcxt);
249 retval->snapshot = CopySnapshot(ActiveSnapshot);
250 retval->flags = IFS_RDLOCK;
251 MemoryContextSwitchTo(oldContext);
254 elog(ERROR, "invalid flags: %d", flags);
256 /* Can't use LargeObjectExists here because it always uses SnapshotNow */
257 if (!myLargeObjectExists(lobjId, retval->snapshot))
259 (errcode(ERRCODE_UNDEFINED_OBJECT),
260 errmsg("large object %u does not exist", lobjId)));
266 * Closes a large object descriptor previously made by inv_open(), and
267 * releases the long-term memory used by it.
270 inv_close(LargeObjectDesc *obj_desc)
272 Assert(PointerIsValid(obj_desc));
273 if (obj_desc->snapshot != SnapshotNow)
274 FreeSnapshot(obj_desc->snapshot);
279 * Destroys an existing large object (not to be confused with a descriptor!)
281 * returns -1 if failed
286 LargeObjectDrop(lobjId);
288 /* Delete any comments on the large object */
289 DeleteComments(lobjId, LargeObjectRelationId, 0);
292 * Advance command counter so that tuple removal will be seen by later
293 * large-object operations in this transaction.
295 CommandCounterIncrement();
301 * Determine size of a large object
303 * NOTE: LOs can contain gaps, just like Unix files. We actually return
304 * the offset of the last byte + 1.
307 inv_getsize(LargeObjectDesc *obj_desc)
315 Assert(PointerIsValid(obj_desc));
319 ScanKeyInit(&skey[0],
320 Anum_pg_largeobject_loid,
321 BTEqualStrategyNumber, F_OIDEQ,
322 ObjectIdGetDatum(obj_desc->id));
324 sd = index_beginscan(lo_heap_r, lo_index_r,
325 obj_desc->snapshot, 1, skey);
328 * Because the pg_largeobject index is on both loid and pageno, but we
329 * constrain only loid, a backwards scan should visit all pages of the
330 * large object in reverse pageno order. So, it's sufficient to examine
331 * the first valid tuple (== last valid page).
333 while ((tuple = index_getnext(sd, BackwardScanDirection)) != NULL)
335 Form_pg_largeobject data;
340 if (HeapTupleHasNulls(tuple)) /* paranoia */
341 elog(ERROR, "null field found in pg_largeobject");
342 data = (Form_pg_largeobject) GETSTRUCT(tuple);
343 datafield = &(data->data); /* see note at top of file */
345 if (VARATT_IS_EXTENDED(datafield))
347 datafield = (bytea *)
348 heap_tuple_untoast_attr((struct varlena *) datafield);
351 lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
361 (errcode(ERRCODE_UNDEFINED_OBJECT),
362 errmsg("large object %u does not exist", obj_desc->id)));
367 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
369 Assert(PointerIsValid(obj_desc));
375 elog(ERROR, "invalid seek offset: %d", offset);
376 obj_desc->offset = offset;
379 if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
380 elog(ERROR, "invalid seek offset: %d", offset);
381 obj_desc->offset += offset;
385 uint32 size = inv_getsize(obj_desc);
387 if (offset < 0 && size < ((uint32) (-offset)))
388 elog(ERROR, "invalid seek offset: %d", offset);
389 obj_desc->offset = size + offset;
393 elog(ERROR, "invalid whence: %d", whence);
395 return obj_desc->offset;
399 inv_tell(LargeObjectDesc *obj_desc)
401 Assert(PointerIsValid(obj_desc));
403 return obj_desc->offset;
407 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
413 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
419 Assert(PointerIsValid(obj_desc));
427 ScanKeyInit(&skey[0],
428 Anum_pg_largeobject_loid,
429 BTEqualStrategyNumber, F_OIDEQ,
430 ObjectIdGetDatum(obj_desc->id));
432 ScanKeyInit(&skey[1],
433 Anum_pg_largeobject_pageno,
434 BTGreaterEqualStrategyNumber, F_INT4GE,
435 Int32GetDatum(pageno));
437 sd = index_beginscan(lo_heap_r, lo_index_r,
438 obj_desc->snapshot, 2, skey);
440 while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
442 Form_pg_largeobject data;
446 if (HeapTupleHasNulls(tuple)) /* paranoia */
447 elog(ERROR, "null field found in pg_largeobject");
448 data = (Form_pg_largeobject) GETSTRUCT(tuple);
451 * We assume the indexscan will deliver pages in order. However,
452 * there may be missing pages if the LO contains unwritten "holes". We
453 * want missing sections to read out as zeroes.
455 pageoff = ((uint32) data->pageno) * LOBLKSIZE;
456 if (pageoff > obj_desc->offset)
458 n = pageoff - obj_desc->offset;
459 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
460 MemSet(buf + nread, 0, n);
462 obj_desc->offset += n;
467 Assert(obj_desc->offset >= pageoff);
468 off = (int) (obj_desc->offset - pageoff);
469 Assert(off >= 0 && off < LOBLKSIZE);
471 datafield = &(data->data); /* see note at top of file */
473 if (VARATT_IS_EXTENDED(datafield))
475 datafield = (bytea *)
476 heap_tuple_untoast_attr((struct varlena *) datafield);
479 len = getbytealen(datafield);
483 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
484 memcpy(buf + nread, VARDATA(datafield) + off, n);
486 obj_desc->offset += n;
502 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
508 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
512 Form_pg_largeobject olddata;
519 char data[LOBLKSIZE];
521 char *workb = VARDATA(&workbuf.hdr);
523 Datum values[Natts_pg_largeobject];
524 char nulls[Natts_pg_largeobject];
525 char replace[Natts_pg_largeobject];
526 CatalogIndexState indstate;
528 Assert(PointerIsValid(obj_desc));
531 /* enforce writability because snapshot is probably wrong otherwise */
532 if ((obj_desc->flags & IFS_WRLOCK) == 0)
534 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
535 errmsg("large object %u was not opened for writing",
543 indstate = CatalogOpenIndexes(lo_heap_r);
545 ScanKeyInit(&skey[0],
546 Anum_pg_largeobject_loid,
547 BTEqualStrategyNumber, F_OIDEQ,
548 ObjectIdGetDatum(obj_desc->id));
550 ScanKeyInit(&skey[1],
551 Anum_pg_largeobject_pageno,
552 BTGreaterEqualStrategyNumber, F_INT4GE,
553 Int32GetDatum(pageno));
555 sd = index_beginscan(lo_heap_r, lo_index_r,
556 obj_desc->snapshot, 2, skey);
562 while (nwritten < nbytes)
565 * If possible, get next pre-existing page of the LO. We assume the
566 * indexscan will deliver these in order --- but there may be holes.
570 if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
572 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
573 elog(ERROR, "null field found in pg_largeobject");
574 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
575 Assert(olddata->pageno >= pageno);
577 neednextpage = false;
581 * If we have a pre-existing page, see if it is the page we want to
582 * write, or a later one.
584 if (olddata != NULL && olddata->pageno == pageno)
587 * Update an existing page with fresh data.
589 * First, load old data into workbuf
591 datafield = &(olddata->data); /* see note at top of file */
593 if (VARATT_IS_EXTENDED(datafield))
595 datafield = (bytea *)
596 heap_tuple_untoast_attr((struct varlena *) datafield);
599 len = getbytealen(datafield);
600 Assert(len <= LOBLKSIZE);
601 memcpy(workb, VARDATA(datafield), len);
608 off = (int) (obj_desc->offset % LOBLKSIZE);
610 MemSet(workb + len, 0, off - len);
613 * Insert appropriate portion of new data
616 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
617 memcpy(workb + off, buf + nwritten, n);
619 obj_desc->offset += n;
621 /* compute valid length of new page */
622 len = (len >= off) ? len : off;
623 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
626 * Form and insert updated tuple
628 memset(values, 0, sizeof(values));
629 memset(nulls, ' ', sizeof(nulls));
630 memset(replace, ' ', sizeof(replace));
631 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
632 replace[Anum_pg_largeobject_data - 1] = 'r';
633 newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
634 values, nulls, replace);
635 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
636 CatalogIndexInsert(indstate, newtup);
637 heap_freetuple(newtup);
640 * We're done with this old page.
649 * Write a brand new page.
651 * First, fill any hole
653 off = (int) (obj_desc->offset % LOBLKSIZE);
655 MemSet(workb, 0, off);
658 * Insert appropriate portion of new data
661 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
662 memcpy(workb + off, buf + nwritten, n);
664 obj_desc->offset += n;
665 /* compute valid length of new page */
667 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
670 * Form and insert updated tuple
672 memset(values, 0, sizeof(values));
673 memset(nulls, ' ', sizeof(nulls));
674 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
675 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
676 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
677 newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
678 simple_heap_insert(lo_heap_r, newtup);
679 CatalogIndexInsert(indstate, newtup);
680 heap_freetuple(newtup);
687 CatalogCloseIndexes(indstate);
690 * Advance command counter so that my tuple updates will be seen by later
691 * large-object operations in this transaction.
693 CommandCounterIncrement();
699 inv_truncate(LargeObjectDesc *obj_desc, int len)
701 int32 pageno = (int32) (len / LOBLKSIZE);
706 Form_pg_largeobject olddata;
710 char data[LOBLKSIZE];
712 char *workb = VARDATA(&workbuf.hdr);
714 Datum values[Natts_pg_largeobject];
715 char nulls[Natts_pg_largeobject];
716 char replace[Natts_pg_largeobject];
717 CatalogIndexState indstate;
719 Assert(PointerIsValid(obj_desc));
721 /* enforce writability because snapshot is probably wrong otherwise */
722 if ((obj_desc->flags & IFS_WRLOCK) == 0)
724 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
725 errmsg("large object %u was not opened for writing",
730 indstate = CatalogOpenIndexes(lo_heap_r);
732 ScanKeyInit(&skey[0],
733 Anum_pg_largeobject_loid,
734 BTEqualStrategyNumber, F_OIDEQ,
735 ObjectIdGetDatum(obj_desc->id));
737 ScanKeyInit(&skey[1],
738 Anum_pg_largeobject_pageno,
739 BTGreaterEqualStrategyNumber, F_INT4GE,
740 Int32GetDatum(pageno));
742 sd = index_beginscan(lo_heap_r, lo_index_r,
743 obj_desc->snapshot, 2, skey);
746 * If possible, get the page the truncation point is in. The truncation
747 * point may be beyond the end of the LO or in a hole.
750 if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
752 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
753 elog(ERROR, "null field found in pg_largeobject");
754 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
755 Assert(olddata->pageno >= pageno);
759 * If we found the page of the truncation point we need to truncate the
760 * data in it. Otherwise if we're in a hole, we need to create a page to
761 * mark the end of data.
763 if (olddata != NULL && olddata->pageno == pageno)
765 /* First, load old data into workbuf */
766 bytea *datafield = &(olddata->data); /* see note at top of
768 bool pfreeit = false;
771 if (VARATT_IS_EXTENDED(datafield))
773 datafield = (bytea *)
774 heap_tuple_untoast_attr((struct varlena *) datafield);
777 pagelen = getbytealen(datafield);
778 Assert(pagelen <= LOBLKSIZE);
779 memcpy(workb, VARDATA(datafield), pagelen);
786 off = len % LOBLKSIZE;
788 MemSet(workb + pagelen, 0, off - pagelen);
790 /* compute length of new page */
791 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
794 * Form and insert updated tuple
796 memset(values, 0, sizeof(values));
797 memset(nulls, ' ', sizeof(nulls));
798 memset(replace, ' ', sizeof(replace));
799 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
800 replace[Anum_pg_largeobject_data - 1] = 'r';
801 newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
802 values, nulls, replace);
803 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
804 CatalogIndexInsert(indstate, newtup);
805 heap_freetuple(newtup);
810 * If the first page we found was after the truncation point, we're in
811 * a hole that we'll fill, but we need to delete the later page.
813 if (olddata != NULL && olddata->pageno > pageno)
814 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
817 * Write a brand new page.
819 * Fill the hole up to the truncation point
821 off = len % LOBLKSIZE;
823 MemSet(workb, 0, off);
825 /* compute length of new page */
826 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
829 * Form and insert new tuple
831 memset(values, 0, sizeof(values));
832 memset(nulls, ' ', sizeof(nulls));
833 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
834 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
835 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
836 newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
837 simple_heap_insert(lo_heap_r, newtup);
838 CatalogIndexInsert(indstate, newtup);
839 heap_freetuple(newtup);
843 * Delete any pages after the truncation point
845 while ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
847 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
852 CatalogCloseIndexes(indstate);
855 * Advance command counter so that tuple updates will be seen by later
856 * large-object operations in this transaction.
858 CommandCounterIncrement();