1 /*-------------------------------------------------------------------------
5 * Implements the custom output format.
7 * The comments with the routined in this code are a good place to
8 * understand how to write a new format.
10 * See the headers to pg_restore for more details.
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
22 * src/bin/pg_dump/pg_backup_custom.c
24 *-------------------------------------------------------------------------
27 #include "compress_io.h"
31 * Routines in the format interface
35 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
36 static void _StartData(ArchiveHandle *AH, TocEntry *te);
37 static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
38 static void _EndData(ArchiveHandle *AH, TocEntry *te);
39 static int _WriteByte(ArchiveHandle *AH, const int i);
40 static int _ReadByte(ArchiveHandle *);
41 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
42 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
43 static void _CloseArchive(ArchiveHandle *AH);
44 static void _ReopenArchive(ArchiveHandle *AH);
45 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
46 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
47 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
48 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _PrintData(ArchiveHandle *AH);
51 static void _skipData(ArchiveHandle *AH);
52 static void _skipBlobs(ArchiveHandle *AH);
54 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
55 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
56 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
57 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
59 static void _Clone(ArchiveHandle *AH);
60 static void _DeClone(ArchiveHandle *AH);
81 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
82 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
84 static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
85 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
87 static const char *modulename = gettext_noop("custom archiver");
92 * Init routine required by ALL formats. This is a global routine
93 * and should be declared in pg_backup_archiver.h
95 * It's task is to create any extra archive context (using AH->formatData),
96 * and to initialize the supported function pointers.
98 * It should also prepare whatever it's input source is for reading/writing,
99 * and in the case of a read mode connection, it should load the Header & TOC.
102 InitArchiveFmt_Custom(ArchiveHandle *AH)
106 /* Assuming static functions, this can be copied for each format. */
107 AH->ArchiveEntryPtr = _ArchiveEntry;
108 AH->StartDataPtr = _StartData;
109 AH->WriteDataPtr = _WriteData;
110 AH->EndDataPtr = _EndData;
111 AH->WriteBytePtr = _WriteByte;
112 AH->ReadBytePtr = _ReadByte;
113 AH->WriteBufPtr = _WriteBuf;
114 AH->ReadBufPtr = _ReadBuf;
115 AH->ClosePtr = _CloseArchive;
116 AH->ReopenPtr = _ReopenArchive;
117 AH->PrintTocDataPtr = _PrintTocData;
118 AH->ReadExtraTocPtr = _ReadExtraToc;
119 AH->WriteExtraTocPtr = _WriteExtraToc;
120 AH->PrintExtraTocPtr = _PrintExtraToc;
122 AH->StartBlobsPtr = _StartBlobs;
123 AH->StartBlobPtr = _StartBlob;
124 AH->EndBlobPtr = _EndBlob;
125 AH->EndBlobsPtr = _EndBlobs;
126 AH->ClonePtr = _Clone;
127 AH->DeClonePtr = _DeClone;
129 /* Set up a private area. */
130 ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
131 AH->formatData = (void *) ctx;
133 /* Initialize LO buffering */
134 AH->lo_buf_size = LOBBUFSIZE;
135 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
142 if (AH->mode == archModeWrite)
144 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
146 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
148 die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
149 AH->fSpec, strerror(errno));
155 die_horribly(AH, modulename, "could not open output file: %s\n",
159 ctx->hasSeek = checkSeek(AH->FH);
163 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
165 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
167 die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
168 AH->fSpec, strerror(errno));
174 die_horribly(AH, modulename, "could not open input file: %s\n",
178 ctx->hasSeek = checkSeek(AH->FH);
182 ctx->dataStart = _getFilePos(AH, ctx);
188 * Called by the Archiver when the dumper creates a new TOC entry.
192 * Set up extrac format-related TOC data.
195 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
199 ctx = (lclTocEntry *) pg_calloc(1, sizeof(lclTocEntry));
201 ctx->dataState = K_OFFSET_POS_NOT_SET;
203 ctx->dataState = K_OFFSET_NO_DATA;
205 te->formatData = (void *) ctx;
209 * Called by the Archiver to save any extra format-related TOC entry
214 * Use the Archiver routines to write data - they are non-endian, and
215 * maintain other important file information.
218 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
220 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
222 WriteOffset(AH, ctx->dataPos, ctx->dataState);
226 * Called by the Archiver to read any extra format-related TOC data.
230 * Needs to match the order defined in _WriteExtraToc, and sould also
231 * use the Archiver input routines.
234 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
236 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
240 ctx = (lclTocEntry *) pg_calloc(1, sizeof(lclTocEntry));
241 te->formatData = (void *) ctx;
244 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
247 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
250 if (AH->version < K_VERS_1_7)
255 * Called by the Archiver when restoring an archive to output a comment
256 * that includes useful information about the TOC entry.
262 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
264 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
266 if (AH->public.verbose)
267 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
268 (int64) ctx->dataPos);
272 * Called by the archiver when saving TABLE DATA (not schema). This routine
273 * should save whatever format-specific information is needed to read
276 * It is called just prior to the dumper's 'DataDumper' routine being called.
278 * Optional, but strongly recommended.
282 _StartData(ArchiveHandle *AH, TocEntry *te)
284 lclContext *ctx = (lclContext *) AH->formatData;
285 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
287 tctx->dataPos = _getFilePos(AH, ctx);
288 tctx->dataState = K_OFFSET_POS_SET;
290 _WriteByte(AH, BLK_DATA); /* Block type */
291 WriteInt(AH, te->dumpId); /* For sanity check */
293 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
297 * Called by archiver when dumper calls WriteData. This routine is
298 * called for both BLOB and TABLE data; it is the responsibility of
299 * the format to manage each kind of data using StartBlob/StartData.
301 * It should only be called from within a DataDumper routine.
306 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
308 lclContext *ctx = (lclContext *) AH->formatData;
309 CompressorState *cs = ctx->cs;
314 return WriteDataToArchive(AH, cs, data, dLen);
318 * Called by the archiver when a dumper's 'DataDumper' routine has
325 _EndData(ArchiveHandle *AH, TocEntry *te)
327 lclContext *ctx = (lclContext *) AH->formatData;
329 EndCompressor(AH, ctx->cs);
330 /* Send the end marker */
335 * Called by the archiver when starting to save all BLOB DATA (not schema).
336 * This routine should save whatever format-specific information is needed
337 * to read the BLOBs back into memory.
339 * It is called just prior to the dumper's DataDumper routine.
341 * Optional, but strongly recommended.
344 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
346 lclContext *ctx = (lclContext *) AH->formatData;
347 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
349 tctx->dataPos = _getFilePos(AH, ctx);
350 tctx->dataState = K_OFFSET_POS_SET;
352 _WriteByte(AH, BLK_BLOBS); /* Block type */
353 WriteInt(AH, te->dumpId); /* For sanity check */
357 * Called by the archiver when the dumper calls StartBlob.
361 * Must save the passed OID for retrieval at restore-time.
364 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
366 lclContext *ctx = (lclContext *) AH->formatData;
369 die_horribly(AH, modulename, "invalid OID for large object\n");
373 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
377 * Called by the archiver when the dumper calls EndBlob.
382 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
384 lclContext *ctx = (lclContext *) AH->formatData;
386 EndCompressor(AH, ctx->cs);
387 /* Send the end marker */
392 * Called by the archiver when finishing saving all BLOB DATA.
397 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
399 /* Write out a fake zero OID to mark end-of-blobs. */
404 * Print data for a given TOC entry
407 _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
409 lclContext *ctx = (lclContext *) AH->formatData;
410 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
414 if (tctx->dataState == K_OFFSET_NO_DATA)
417 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
420 * We cannot seek directly to the desired block. Instead, skip over
421 * block headers until we find the one we want. This could fail if we
422 * are asked to restore items out-of-order.
424 _readBlockHeader(AH, &blkType, &id);
426 while (blkType != EOF && id != te->dumpId)
438 default: /* Always have a default */
439 die_horribly(AH, modulename,
440 "unrecognized data block type (%d) while searching archive\n",
444 _readBlockHeader(AH, &blkType, &id);
449 /* We can just seek to the place we need to be. */
450 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
451 die_horribly(AH, modulename, "error during file seek: %s\n",
454 _readBlockHeader(AH, &blkType, &id);
457 /* Produce suitable failure message if we fell off end of file */
460 if (tctx->dataState == K_OFFSET_POS_NOT_SET)
461 die_horribly(AH, modulename, "could not find block ID %d in archive -- "
462 "possibly due to out-of-order restore request, "
463 "which cannot be handled due to lack of data offsets in archive\n",
465 else if (!ctx->hasSeek)
466 die_horribly(AH, modulename, "could not find block ID %d in archive -- "
467 "possibly due to out-of-order restore request, "
468 "which cannot be handled due to non-seekable input file\n",
470 else /* huh, the dataPos led us to EOF? */
471 die_horribly(AH, modulename, "could not find block ID %d in archive -- "
472 "possibly corrupt archive\n",
477 if (id != te->dumpId)
478 die_horribly(AH, modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
488 _LoadBlobs(AH, ropt->dropSchema);
491 default: /* Always have a default */
492 die_horribly(AH, modulename, "unrecognized data block type %d while restoring archive\n",
499 * Print data from current file position.
502 _PrintData(ArchiveHandle *AH)
504 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
508 _LoadBlobs(ArchiveHandle *AH, bool drop)
512 StartRestoreBlobs(AH);
517 StartRestoreBlob(AH, oid, drop);
519 EndRestoreBlob(AH, oid);
527 * Skip the BLOBs from the current file position.
528 * BLOBS are written sequentially as data blocks (see below).
529 * Each BLOB is preceded by it's original OID.
530 * A zero OID indicated the end of the BLOBS
533 _skipBlobs(ArchiveHandle *AH)
546 * Skip data from current file position.
547 * Data blocks are formatted as an integer length, followed by data.
548 * A zero length denoted the end of the block.
551 _skipData(ArchiveHandle *AH)
553 lclContext *ctx = (lclContext *) AH->formatData;
559 blkLen = ReadInt(AH);
566 buf = (char *) pg_malloc(blkLen);
569 cnt = fread(buf, 1, blkLen, AH->FH);
573 die_horribly(AH, modulename,
574 "could not read from input file: end of file\n");
576 die_horribly(AH, modulename,
577 "could not read from input file: %s\n", strerror(errno));
580 ctx->filePos += blkLen;
582 blkLen = ReadInt(AH);
590 * Write a byte of data to the archive.
594 * Called by the archiver to do integer & byte output to the archive.
597 _WriteByte(ArchiveHandle *AH, const int i)
599 lclContext *ctx = (lclContext *) AH->formatData;
602 res = fputc(i, AH->FH);
606 die_horribly(AH, modulename, "could not write byte: %s\n", strerror(errno));
611 * Read a byte of data from the archive.
615 * Called by the archiver to read bytes & integers from the archive.
616 * EOF should be treated as a fatal error.
619 _ReadByte(ArchiveHandle *AH)
621 lclContext *ctx = (lclContext *) AH->formatData;
626 die_horribly(AH, modulename, "unexpected end of file\n");
632 * Write a buffer of data to the archive.
636 * Called by the archiver to write a block of bytes to the archive.
639 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
641 lclContext *ctx = (lclContext *) AH->formatData;
644 res = fwrite(buf, 1, len, AH->FH);
647 die_horribly(AH, modulename,
648 "could not write to output file: %s\n", strerror(errno));
655 * Read a block of bytes from the archive.
659 * Called by the archiver to read a block of bytes from the archive
662 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
664 lclContext *ctx = (lclContext *) AH->formatData;
667 res = fread(buf, 1, len, AH->FH);
678 * When writing the archive, this is the routine that actually starts
679 * the process of saving it to files. No data should be written prior
680 * to this point, since the user could sort the TOC after creating it.
682 * If an archive is to be written, this toutine must call:
683 * WriteHead to save the archive header
684 * WriteToc to save the TOC entries
685 * WriteDataChunks to save all DATA & BLOBs.
689 _CloseArchive(ArchiveHandle *AH)
691 lclContext *ctx = (lclContext *) AH->formatData;
694 if (AH->mode == archModeWrite)
697 tpos = ftello(AH->FH);
699 ctx->dataStart = _getFilePos(AH, ctx);
703 * If possible, re-write the TOC in order to update the data offset
704 * information. This is not essential, as pg_restore can cope in most
705 * cases without it; but it can make pg_restore significantly faster
706 * in some situations (especially parallel restore).
709 fseeko(AH->FH, tpos, SEEK_SET) == 0)
713 if (fclose(AH->FH) != 0)
714 die_horribly(AH, modulename, "could not close archive file: %s\n", strerror(errno));
720 * Reopen the archive's file handle.
722 * We close the original file handle, except on Windows. (The difference
723 * is because on Windows, this is used within a multithreading context,
724 * and we don't want a thread closing the parent file handle.)
727 _ReopenArchive(ArchiveHandle *AH)
729 lclContext *ctx = (lclContext *) AH->formatData;
732 if (AH->mode == archModeWrite)
733 die_horribly(AH, modulename, "can only reopen input archives\n");
736 * These two cases are user-facing errors since they represent unsupported
737 * (but not invalid) use-cases. Word the error messages appropriately.
739 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
740 die_horribly(AH, modulename, "parallel restore from stdin is not supported\n");
742 die_horribly(AH, modulename, "parallel restore from non-seekable file is not supported\n");
745 tpos = ftello(AH->FH);
747 die_horribly(AH, modulename, "could not determine seek position in archive file: %s\n",
751 if (fclose(AH->FH) != 0)
752 die_horribly(AH, modulename, "could not close archive file: %s\n",
756 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
758 die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
759 AH->fSpec, strerror(errno));
761 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
762 die_horribly(AH, modulename, "could not set seek position in archive file: %s\n",
767 * Clone format-specific fields during parallel restoration.
770 _Clone(ArchiveHandle *AH)
772 lclContext *ctx = (lclContext *) AH->formatData;
774 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
775 memcpy(AH->formatData, ctx, sizeof(lclContext));
776 ctx = (lclContext *) AH->formatData;
778 /* sanity check, shouldn't happen */
780 die_horribly(AH, modulename, "compressor active\n");
783 * Note: we do not make a local lo_buf because we expect at most one BLOBS
784 * entry per archive, so no parallelism is possible. Likewise,
785 * TOC-entry-local state isn't an issue because any one TOC entry is
786 * touched by just one worker child.
791 _DeClone(ArchiveHandle *AH)
793 lclContext *ctx = (lclContext *) AH->formatData;
798 /*--------------------------------------------------
799 * END OF FORMAT CALLBACKS
800 *--------------------------------------------------
804 * Get the current position in the archive file.
807 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
813 pos = ftello(AH->FH);
814 if (pos != ctx->filePos)
816 write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
819 * Prior to 1.7 (pg7.3) we relied on the internally maintained
820 * pointer. Now we rely on ftello() always, unless the file has
821 * been found to not support it.
831 * Read a data block header. The format changed in V1.3, so we
832 * centralize the code here for simplicity. Returns *type = EOF
836 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
838 lclContext *ctx = (lclContext *) AH->formatData;
842 * Note: if we are at EOF with a pre-1.3 input file, we'll die_horribly
843 * inside ReadInt rather than returning EOF. It doesn't seem worth
844 * jumping through hoops to deal with that case better, because no such
845 * files are likely to exist in the wild: only some 7.1 development
846 * versions of pg_dump ever generated such files.
848 if (AH->version < K_VERS_1_3)
856 *id = 0; /* don't return an uninitialized value */
866 * Callback function for WriteDataToArchive. Writes one block of (compressed)
867 * data to the archive.
870 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
872 /* never write 0-byte blocks (this should not happen) */
877 return _WriteBuf(AH, buf, len);
881 * Callback function for ReadDataFromArchive. To keep things simple, we
882 * always read one compressed block at a time.
885 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
891 blkLen = ReadInt(AH);
895 /* If the caller's buffer is not large enough, allocate a bigger one */
896 if (blkLen > *buflen)
899 *buf = (char *) pg_malloc(blkLen);
903 cnt = _ReadBuf(AH, *buf, blkLen);
907 die_horribly(AH, modulename,
908 "could not read from input file: end of file\n");
910 die_horribly(AH, modulename,
911 "could not read from input file: %s\n", strerror(errno));