]> granicus.if.org Git - postgresql/blob - src/backend/libpq/be-fsstubs.c
a750f921fa157a0ec41733c28da5f97ba51fcf34
[postgresql] / src / backend / libpq / be-fsstubs.c
1 /*-------------------------------------------------------------------------
2  *
3  * be-fsstubs.c
4  *        Builtin functions for open/close/read/write operations on large objects
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/libpq/be-fsstubs.c
12  *
13  * NOTES
14  *        This should be moved to a more appropriate place.  It is here
15  *        for lack of a better place.
16  *
17  *        These functions store LargeObjectDesc structs in a private MemoryContext,
18  *        which means that large object descriptors hang around until we destroy
19  *        the context at transaction end.  It'd be possible to prolong the lifetime
20  *        of the context so that LO FDs are good across transactions (for example,
21  *        we could release the context only if we see that no FDs remain open).
22  *        But we'd need additional state in order to do the right thing at the
23  *        end of an aborted transaction.  FDs opened during an aborted xact would
24  *        still need to be closed, since they might not be pointing at valid
25  *        relations at all.  Locking semantics are also an interesting problem
26  *        if LOs stay open across transactions.  For now, we'll stick with the
27  *        existing documented semantics of LO FDs: they're only good within a
28  *        transaction.
29  *
30  *        As of PostgreSQL 8.0, much of the angst expressed above is no longer
31  *        relevant, and in fact it'd be pretty easy to allow LO FDs to stay
32  *        open across transactions.  (Snapshot relevancy would still be an issue.)
33  *        However backwards compatibility suggests that we should stick to the
34  *        status quo.
35  *
36  *-------------------------------------------------------------------------
37  */
38
39 #include "postgres.h"
40
41 #include <fcntl.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include "libpq/be-fsstubs.h"
46 #include "libpq/libpq-fs.h"
47 #include "miscadmin.h"
48 #include "storage/fd.h"
49 #include "storage/large_object.h"
50 #include "utils/acl.h"
51 #include "utils/builtins.h"
52 #include "utils/memutils.h"
53
54 /* define this to enable debug logging */
55 /* #define FSDB 1 */
56 /* chunk size for lo_import/lo_export transfers */
57 #define BUFSIZE                 8192
58
59 /*
60  * LO "FD"s are indexes into the cookies array.
61  *
62  * A non-null entry is a pointer to a LargeObjectDesc allocated in the
63  * LO private memory context "fscxt".  The cookies array itself is also
64  * dynamically allocated in that context.  Its current allocated size is
65  * cookies_size entries, of which any unused entries will be NULL.
66  */
67 static LargeObjectDesc **cookies = NULL;
68 static int      cookies_size = 0;
69
70 static MemoryContext fscxt = NULL;
71
72 #define CreateFSContext() \
73         do { \
74                 if (fscxt == NULL) \
75                         fscxt = AllocSetContextCreate(TopMemoryContext, \
76                                                                                   "Filesystem", \
77                                                                                   ALLOCSET_DEFAULT_SIZES); \
78         } while (0)
79
80
81 static int      newLOfd(LargeObjectDesc *lobjCookie);
82 static void deleteLOfd(int fd);
83 static Oid      lo_import_internal(text *filename, Oid lobjOid);
84
85
86 /*****************************************************************************
87  *      File Interfaces for Large Objects
88  *****************************************************************************/
89
90 Datum
91 be_lo_open(PG_FUNCTION_ARGS)
92 {
93         Oid                     lobjId = PG_GETARG_OID(0);
94         int32           mode = PG_GETARG_INT32(1);
95         LargeObjectDesc *lobjDesc;
96         int                     fd;
97
98 #if FSDB
99         elog(DEBUG4, "lo_open(%u,%d)", lobjId, mode);
100 #endif
101
102         CreateFSContext();
103
104         lobjDesc = inv_open(lobjId, mode, fscxt);
105
106         fd = newLOfd(lobjDesc);
107
108         PG_RETURN_INT32(fd);
109 }
110
111 Datum
112 be_lo_close(PG_FUNCTION_ARGS)
113 {
114         int32           fd = PG_GETARG_INT32(0);
115
116         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
117                 ereport(ERROR,
118                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
119                                  errmsg("invalid large-object descriptor: %d", fd)));
120
121 #if FSDB
122         elog(DEBUG4, "lo_close(%d)", fd);
123 #endif
124
125         inv_close(cookies[fd]);
126
127         deleteLOfd(fd);
128
129         PG_RETURN_INT32(0);
130 }
131
132
133 /*****************************************************************************
134  *      Bare Read/Write operations --- these are not fmgr-callable!
135  *
136  *      We assume the large object supports byte oriented reads and seeks so
137  *      that our work is easier.
138  *
139  *****************************************************************************/
140
141 int
142 lo_read(int fd, char *buf, int len)
143 {
144         int                     status;
145         LargeObjectDesc *lobj;
146
147         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
148                 ereport(ERROR,
149                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
150                                  errmsg("invalid large-object descriptor: %d", fd)));
151         lobj = cookies[fd];
152
153         /*
154          * Check state.  inv_read() would throw an error anyway, but we want the
155          * error to be about the FD's state not the underlying privilege; it might
156          * be that the privilege exists but user forgot to ask for read mode.
157          */
158         if ((lobj->flags & IFS_RDLOCK) == 0)
159                 ereport(ERROR,
160                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
161                                  errmsg("large object descriptor %d was not opened for reading",
162                                                 fd)));
163
164         status = inv_read(lobj, buf, len);
165
166         return status;
167 }
168
169 int
170 lo_write(int fd, const char *buf, int len)
171 {
172         int                     status;
173         LargeObjectDesc *lobj;
174
175         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
176                 ereport(ERROR,
177                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
178                                  errmsg("invalid large-object descriptor: %d", fd)));
179         lobj = cookies[fd];
180
181         /* see comment in lo_read() */
182         if ((lobj->flags & IFS_WRLOCK) == 0)
183                 ereport(ERROR,
184                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
185                                  errmsg("large object descriptor %d was not opened for writing",
186                                                 fd)));
187
188         status = inv_write(lobj, buf, len);
189
190         return status;
191 }
192
193 Datum
194 be_lo_lseek(PG_FUNCTION_ARGS)
195 {
196         int32           fd = PG_GETARG_INT32(0);
197         int32           offset = PG_GETARG_INT32(1);
198         int32           whence = PG_GETARG_INT32(2);
199         int64           status;
200
201         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
202                 ereport(ERROR,
203                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
204                                  errmsg("invalid large-object descriptor: %d", fd)));
205
206         status = inv_seek(cookies[fd], offset, whence);
207
208         /* guard against result overflow */
209         if (status != (int32) status)
210                 ereport(ERROR,
211                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
212                                  errmsg("lo_lseek result out of range for large-object descriptor %d",
213                                                 fd)));
214
215         PG_RETURN_INT32((int32) status);
216 }
217
218 Datum
219 be_lo_lseek64(PG_FUNCTION_ARGS)
220 {
221         int32           fd = PG_GETARG_INT32(0);
222         int64           offset = PG_GETARG_INT64(1);
223         int32           whence = PG_GETARG_INT32(2);
224         int64           status;
225
226         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
227                 ereport(ERROR,
228                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
229                                  errmsg("invalid large-object descriptor: %d", fd)));
230
231         status = inv_seek(cookies[fd], offset, whence);
232
233         PG_RETURN_INT64(status);
234 }
235
236 Datum
237 be_lo_creat(PG_FUNCTION_ARGS)
238 {
239         Oid                     lobjId;
240
241         /*
242          * We don't actually need to store into fscxt, but create it anyway to
243          * ensure that AtEOXact_LargeObject knows there is state to clean up
244          */
245         CreateFSContext();
246
247         lobjId = inv_create(InvalidOid);
248
249         PG_RETURN_OID(lobjId);
250 }
251
252 Datum
253 be_lo_create(PG_FUNCTION_ARGS)
254 {
255         Oid                     lobjId = PG_GETARG_OID(0);
256
257         /*
258          * We don't actually need to store into fscxt, but create it anyway to
259          * ensure that AtEOXact_LargeObject knows there is state to clean up
260          */
261         CreateFSContext();
262
263         lobjId = inv_create(lobjId);
264
265         PG_RETURN_OID(lobjId);
266 }
267
268 Datum
269 be_lo_tell(PG_FUNCTION_ARGS)
270 {
271         int32           fd = PG_GETARG_INT32(0);
272         int64           offset;
273
274         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
275                 ereport(ERROR,
276                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
277                                  errmsg("invalid large-object descriptor: %d", fd)));
278
279         offset = inv_tell(cookies[fd]);
280
281         /* guard against result overflow */
282         if (offset != (int32) offset)
283                 ereport(ERROR,
284                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
285                                  errmsg("lo_tell result out of range for large-object descriptor %d",
286                                                 fd)));
287
288         PG_RETURN_INT32((int32) offset);
289 }
290
291 Datum
292 be_lo_tell64(PG_FUNCTION_ARGS)
293 {
294         int32           fd = PG_GETARG_INT32(0);
295         int64           offset;
296
297         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
298                 ereport(ERROR,
299                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
300                                  errmsg("invalid large-object descriptor: %d", fd)));
301
302         offset = inv_tell(cookies[fd]);
303
304         PG_RETURN_INT64(offset);
305 }
306
307 Datum
308 be_lo_unlink(PG_FUNCTION_ARGS)
309 {
310         Oid                     lobjId = PG_GETARG_OID(0);
311
312         /*
313          * Must be owner of the large object.  It would be cleaner to check this
314          * in inv_drop(), but we want to throw the error before not after closing
315          * relevant FDs.
316          */
317         if (!lo_compat_privileges &&
318                 !pg_largeobject_ownercheck(lobjId, GetUserId()))
319                 ereport(ERROR,
320                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
321                                  errmsg("must be owner of large object %u", lobjId)));
322
323         /*
324          * If there are any open LO FDs referencing that ID, close 'em.
325          */
326         if (fscxt != NULL)
327         {
328                 int                     i;
329
330                 for (i = 0; i < cookies_size; i++)
331                 {
332                         if (cookies[i] != NULL && cookies[i]->id == lobjId)
333                         {
334                                 inv_close(cookies[i]);
335                                 deleteLOfd(i);
336                         }
337                 }
338         }
339
340         /*
341          * inv_drop does not create a need for end-of-transaction cleanup and
342          * hence we don't need to have created fscxt.
343          */
344         PG_RETURN_INT32(inv_drop(lobjId));
345 }
346
347 /*****************************************************************************
348  *      Read/Write using bytea
349  *****************************************************************************/
350
351 Datum
352 be_loread(PG_FUNCTION_ARGS)
353 {
354         int32           fd = PG_GETARG_INT32(0);
355         int32           len = PG_GETARG_INT32(1);
356         bytea      *retval;
357         int                     totalread;
358
359         if (len < 0)
360                 len = 0;
361
362         retval = (bytea *) palloc(VARHDRSZ + len);
363         totalread = lo_read(fd, VARDATA(retval), len);
364         SET_VARSIZE(retval, totalread + VARHDRSZ);
365
366         PG_RETURN_BYTEA_P(retval);
367 }
368
369 Datum
370 be_lowrite(PG_FUNCTION_ARGS)
371 {
372         int32           fd = PG_GETARG_INT32(0);
373         bytea      *wbuf = PG_GETARG_BYTEA_PP(1);
374         int                     bytestowrite;
375         int                     totalwritten;
376
377         bytestowrite = VARSIZE_ANY_EXHDR(wbuf);
378         totalwritten = lo_write(fd, VARDATA_ANY(wbuf), bytestowrite);
379         PG_RETURN_INT32(totalwritten);
380 }
381
382 /*****************************************************************************
383  *       Import/Export of Large Object
384  *****************************************************************************/
385
386 /*
387  * lo_import -
388  *        imports a file as an (inversion) large object.
389  */
390 Datum
391 be_lo_import(PG_FUNCTION_ARGS)
392 {
393         text       *filename = PG_GETARG_TEXT_PP(0);
394
395         PG_RETURN_OID(lo_import_internal(filename, InvalidOid));
396 }
397
398 /*
399  * lo_import_with_oid -
400  *        imports a file as an (inversion) large object specifying oid.
401  */
402 Datum
403 be_lo_import_with_oid(PG_FUNCTION_ARGS)
404 {
405         text       *filename = PG_GETARG_TEXT_PP(0);
406         Oid                     oid = PG_GETARG_OID(1);
407
408         PG_RETURN_OID(lo_import_internal(filename, oid));
409 }
410
411 static Oid
412 lo_import_internal(text *filename, Oid lobjOid)
413 {
414         int                     fd;
415         int                     nbytes,
416                                 tmp PG_USED_FOR_ASSERTS_ONLY;
417         char            buf[BUFSIZE];
418         char            fnamebuf[MAXPGPATH];
419         LargeObjectDesc *lobj;
420         Oid                     oid;
421
422         CreateFSContext();
423
424         /*
425          * open the file to be read in
426          */
427         text_to_cstring_buffer(filename, fnamebuf, sizeof(fnamebuf));
428         fd = OpenTransientFile(fnamebuf, O_RDONLY | PG_BINARY);
429         if (fd < 0)
430                 ereport(ERROR,
431                                 (errcode_for_file_access(),
432                                  errmsg("could not open server file \"%s\": %m",
433                                                 fnamebuf)));
434
435         /*
436          * create an inversion object
437          */
438         oid = inv_create(lobjOid);
439
440         /*
441          * read in from the filesystem and write to the inversion object
442          */
443         lobj = inv_open(oid, INV_WRITE, fscxt);
444
445         while ((nbytes = read(fd, buf, BUFSIZE)) > 0)
446         {
447                 tmp = inv_write(lobj, buf, nbytes);
448                 Assert(tmp == nbytes);
449         }
450
451         if (nbytes < 0)
452                 ereport(ERROR,
453                                 (errcode_for_file_access(),
454                                  errmsg("could not read server file \"%s\": %m",
455                                                 fnamebuf)));
456
457         inv_close(lobj);
458
459         if (CloseTransientFile(fd) != 0)
460                 ereport(ERROR,
461                                 (errcode_for_file_access(),
462                                  errmsg("could not close file \"%s\": %m",
463                                                 fnamebuf)));
464
465         return oid;
466 }
467
468 /*
469  * lo_export -
470  *        exports an (inversion) large object.
471  */
472 Datum
473 be_lo_export(PG_FUNCTION_ARGS)
474 {
475         Oid                     lobjId = PG_GETARG_OID(0);
476         text       *filename = PG_GETARG_TEXT_PP(1);
477         int                     fd;
478         int                     nbytes,
479                                 tmp;
480         char            buf[BUFSIZE];
481         char            fnamebuf[MAXPGPATH];
482         LargeObjectDesc *lobj;
483         mode_t          oumask;
484
485         CreateFSContext();
486
487         /*
488          * open the inversion object (no need to test for failure)
489          */
490         lobj = inv_open(lobjId, INV_READ, fscxt);
491
492         /*
493          * open the file to be written to
494          *
495          * Note: we reduce backend's normal 077 umask to the slightly friendlier
496          * 022. This code used to drop it all the way to 0, but creating
497          * world-writable export files doesn't seem wise.
498          */
499         text_to_cstring_buffer(filename, fnamebuf, sizeof(fnamebuf));
500         oumask = umask(S_IWGRP | S_IWOTH);
501         PG_TRY();
502         {
503                 fd = OpenTransientFilePerm(fnamebuf, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY,
504                                                                    S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
505         }
506         PG_CATCH();
507         {
508                 umask(oumask);
509                 PG_RE_THROW();
510         }
511         PG_END_TRY();
512         umask(oumask);
513         if (fd < 0)
514                 ereport(ERROR,
515                                 (errcode_for_file_access(),
516                                  errmsg("could not create server file \"%s\": %m",
517                                                 fnamebuf)));
518
519         /*
520          * read in from the inversion file and write to the filesystem
521          */
522         while ((nbytes = inv_read(lobj, buf, BUFSIZE)) > 0)
523         {
524                 tmp = write(fd, buf, nbytes);
525                 if (tmp != nbytes)
526                         ereport(ERROR,
527                                         (errcode_for_file_access(),
528                                          errmsg("could not write server file \"%s\": %m",
529                                                         fnamebuf)));
530         }
531
532         if (CloseTransientFile(fd) != 0)
533                 ereport(ERROR,
534                                 (errcode_for_file_access(),
535                                  errmsg("could not close file \"%s\": %m",
536                                                 fnamebuf)));
537
538         inv_close(lobj);
539
540         PG_RETURN_INT32(1);
541 }
542
543 /*
544  * lo_truncate -
545  *        truncate a large object to a specified length
546  */
547 static void
548 lo_truncate_internal(int32 fd, int64 len)
549 {
550         LargeObjectDesc *lobj;
551
552         if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
553                 ereport(ERROR,
554                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
555                                  errmsg("invalid large-object descriptor: %d", fd)));
556         lobj = cookies[fd];
557
558         /* see comment in lo_read() */
559         if ((lobj->flags & IFS_WRLOCK) == 0)
560                 ereport(ERROR,
561                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
562                                  errmsg("large object descriptor %d was not opened for writing",
563                                                 fd)));
564
565         inv_truncate(lobj, len);
566 }
567
568 Datum
569 be_lo_truncate(PG_FUNCTION_ARGS)
570 {
571         int32           fd = PG_GETARG_INT32(0);
572         int32           len = PG_GETARG_INT32(1);
573
574         lo_truncate_internal(fd, len);
575         PG_RETURN_INT32(0);
576 }
577
578 Datum
579 be_lo_truncate64(PG_FUNCTION_ARGS)
580 {
581         int32           fd = PG_GETARG_INT32(0);
582         int64           len = PG_GETARG_INT64(1);
583
584         lo_truncate_internal(fd, len);
585         PG_RETURN_INT32(0);
586 }
587
588 /*
589  * AtEOXact_LargeObject -
590  *               prepares large objects for transaction commit
591  */
592 void
593 AtEOXact_LargeObject(bool isCommit)
594 {
595         int                     i;
596
597         if (fscxt == NULL)
598                 return;                                 /* no LO operations in this xact */
599
600         /*
601          * Close LO fds and clear cookies array so that LO fds are no longer good.
602          * On abort we skip the close step.
603          */
604         for (i = 0; i < cookies_size; i++)
605         {
606                 if (cookies[i] != NULL)
607                 {
608                         if (isCommit)
609                                 inv_close(cookies[i]);
610                         deleteLOfd(i);
611                 }
612         }
613
614         /* Needn't actually pfree since we're about to zap context */
615         cookies = NULL;
616         cookies_size = 0;
617
618         /* Release the LO memory context to prevent permanent memory leaks. */
619         MemoryContextDelete(fscxt);
620         fscxt = NULL;
621
622         /* Give inv_api.c a chance to clean up, too */
623         close_lo_relation(isCommit);
624 }
625
626 /*
627  * AtEOSubXact_LargeObject
628  *              Take care of large objects at subtransaction commit/abort
629  *
630  * Reassign LOs created/opened during a committing subtransaction
631  * to the parent subtransaction.  On abort, just close them.
632  */
633 void
634 AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
635                                                 SubTransactionId parentSubid)
636 {
637         int                     i;
638
639         if (fscxt == NULL)                      /* no LO operations in this xact */
640                 return;
641
642         for (i = 0; i < cookies_size; i++)
643         {
644                 LargeObjectDesc *lo = cookies[i];
645
646                 if (lo != NULL && lo->subid == mySubid)
647                 {
648                         if (isCommit)
649                                 lo->subid = parentSubid;
650                         else
651                         {
652                                 /*
653                                  * Make sure we do not call inv_close twice if it errors out
654                                  * for some reason.  Better a leak than a crash.
655                                  */
656                                 deleteLOfd(i);
657                                 inv_close(lo);
658                         }
659                 }
660         }
661 }
662
663 /*****************************************************************************
664  *      Support routines for this file
665  *****************************************************************************/
666
667 static int
668 newLOfd(LargeObjectDesc *lobjCookie)
669 {
670         int                     i,
671                                 newsize;
672
673         /* Try to find a free slot */
674         for (i = 0; i < cookies_size; i++)
675         {
676                 if (cookies[i] == NULL)
677                 {
678                         cookies[i] = lobjCookie;
679                         return i;
680                 }
681         }
682
683         /* No free slot, so make the array bigger */
684         if (cookies_size <= 0)
685         {
686                 /* First time through, arbitrarily make 64-element array */
687                 i = 0;
688                 newsize = 64;
689                 cookies = (LargeObjectDesc **)
690                         MemoryContextAllocZero(fscxt, newsize * sizeof(LargeObjectDesc *));
691                 cookies_size = newsize;
692         }
693         else
694         {
695                 /* Double size of array */
696                 i = cookies_size;
697                 newsize = cookies_size * 2;
698                 cookies = (LargeObjectDesc **)
699                         repalloc(cookies, newsize * sizeof(LargeObjectDesc *));
700                 MemSet(cookies + cookies_size, 0,
701                            (newsize - cookies_size) * sizeof(LargeObjectDesc *));
702                 cookies_size = newsize;
703         }
704
705         Assert(cookies[i] == NULL);
706         cookies[i] = lobjCookie;
707         return i;
708 }
709
710 static void
711 deleteLOfd(int fd)
712 {
713         cookies[fd] = NULL;
714 }
715
716 /*****************************************************************************
717  *      Wrappers oriented toward SQL callers
718  *****************************************************************************/
719
720 /*
721  * Read [offset, offset+nbytes) within LO; when nbytes is -1, read to end.
722  */
723 static bytea *
724 lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
725 {
726         LargeObjectDesc *loDesc;
727         int64           loSize;
728         int64           result_length;
729         int                     total_read PG_USED_FOR_ASSERTS_ONLY;
730         bytea      *result = NULL;
731
732         /*
733          * We don't actually need to store into fscxt, but create it anyway to
734          * ensure that AtEOXact_LargeObject knows there is state to clean up
735          */
736         CreateFSContext();
737
738         loDesc = inv_open(loOid, INV_READ, fscxt);
739
740         /*
741          * Compute number of bytes we'll actually read, accommodating nbytes == -1
742          * and reads beyond the end of the LO.
743          */
744         loSize = inv_seek(loDesc, 0, SEEK_END);
745         if (loSize > offset)
746         {
747                 if (nbytes >= 0 && nbytes <= loSize - offset)
748                         result_length = nbytes; /* request is wholly inside LO */
749                 else
750                         result_length = loSize - offset;        /* adjust to end of LO */
751         }
752         else
753                 result_length = 0;              /* request is wholly outside LO */
754
755         /*
756          * A result_length calculated from loSize may not fit in a size_t.  Check
757          * that the size will satisfy this and subsequently-enforced size limits.
758          */
759         if (result_length > MaxAllocSize - VARHDRSZ)
760                 ereport(ERROR,
761                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
762                                  errmsg("large object read request is too large")));
763
764         result = (bytea *) palloc(VARHDRSZ + result_length);
765
766         inv_seek(loDesc, offset, SEEK_SET);
767         total_read = inv_read(loDesc, VARDATA(result), result_length);
768         Assert(total_read == result_length);
769         SET_VARSIZE(result, result_length + VARHDRSZ);
770
771         inv_close(loDesc);
772
773         return result;
774 }
775
776 /*
777  * Read entire LO
778  */
779 Datum
780 be_lo_get(PG_FUNCTION_ARGS)
781 {
782         Oid                     loOid = PG_GETARG_OID(0);
783         bytea      *result;
784
785         result = lo_get_fragment_internal(loOid, 0, -1);
786
787         PG_RETURN_BYTEA_P(result);
788 }
789
790 /*
791  * Read range within LO
792  */
793 Datum
794 be_lo_get_fragment(PG_FUNCTION_ARGS)
795 {
796         Oid                     loOid = PG_GETARG_OID(0);
797         int64           offset = PG_GETARG_INT64(1);
798         int32           nbytes = PG_GETARG_INT32(2);
799         bytea      *result;
800
801         if (nbytes < 0)
802                 ereport(ERROR,
803                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
804                                  errmsg("requested length cannot be negative")));
805
806         result = lo_get_fragment_internal(loOid, offset, nbytes);
807
808         PG_RETURN_BYTEA_P(result);
809 }
810
811 /*
812  * Create LO with initial contents given by a bytea argument
813  */
814 Datum
815 be_lo_from_bytea(PG_FUNCTION_ARGS)
816 {
817         Oid                     loOid = PG_GETARG_OID(0);
818         bytea      *str = PG_GETARG_BYTEA_PP(1);
819         LargeObjectDesc *loDesc;
820         int                     written PG_USED_FOR_ASSERTS_ONLY;
821
822         CreateFSContext();
823
824         loOid = inv_create(loOid);
825         loDesc = inv_open(loOid, INV_WRITE, fscxt);
826         written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
827         Assert(written == VARSIZE_ANY_EXHDR(str));
828         inv_close(loDesc);
829
830         PG_RETURN_OID(loOid);
831 }
832
833 /*
834  * Update range within LO
835  */
836 Datum
837 be_lo_put(PG_FUNCTION_ARGS)
838 {
839         Oid                     loOid = PG_GETARG_OID(0);
840         int64           offset = PG_GETARG_INT64(1);
841         bytea      *str = PG_GETARG_BYTEA_PP(2);
842         LargeObjectDesc *loDesc;
843         int                     written PG_USED_FOR_ASSERTS_ONLY;
844
845         CreateFSContext();
846
847         loDesc = inv_open(loOid, INV_WRITE, fscxt);
848
849         /* Permission check */
850         if (!lo_compat_privileges &&
851                 pg_largeobject_aclcheck_snapshot(loDesc->id,
852                                                                                  GetUserId(),
853                                                                                  ACL_UPDATE,
854                                                                                  loDesc->snapshot) != ACLCHECK_OK)
855                 ereport(ERROR,
856                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
857                                  errmsg("permission denied for large object %u",
858                                                 loDesc->id)));
859
860         inv_seek(loDesc, offset, SEEK_SET);
861         written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
862         Assert(written == VARSIZE_ANY_EXHDR(str));
863         inv_close(loDesc);
864
865         PG_RETURN_VOID();
866 }