1 /*-------------------------------------------------------------------------
5 * Implements the custom output format.
7 * The comments with the routined in this code are a good place to
8 * understand how to write a new format.
10 * See the headers to pg_restore for more details.
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
22 * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.38 2007/03/18 16:50:44 neilc Exp $
24 *-------------------------------------------------------------------------
27 #include "pg_backup_archiver.h"
30 * Routines in the format interface
34 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
35 static void _StartData(ArchiveHandle *AH, TocEntry *te);
36 static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
37 static void _EndData(ArchiveHandle *AH, TocEntry *te);
38 static int _WriteByte(ArchiveHandle *AH, const int i);
39 static int _ReadByte(ArchiveHandle *);
40 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
41 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
42 static void _CloseArchive(ArchiveHandle *AH);
43 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
44 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
45 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
46 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
48 static void _PrintData(ArchiveHandle *AH);
49 static void _skipData(ArchiveHandle *AH);
50 static void _skipBlobs(ArchiveHandle *AH);
52 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
53 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
54 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
55 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
56 static void _LoadBlobs(ArchiveHandle *AH);
59 * Buffers used in zlib compression and extra data stored in archive and
63 #define zlibOutSize 4096
64 #define zlibInSize 4096
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89 static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
90 static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
91 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
92 static int _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush);
94 static const char *modulename = gettext_noop("custom archiver");
99 * Init routine required by ALL formats. This is a global routine
100 * and should be declared in pg_backup_archiver.h
102 * It's task is to create any extra archive context (using AH->formatData),
103 * and to initialize the supported function pointers.
105 * It should also prepare whatever it's input source is for reading/writing,
106 * and in the case of a read mode connection, it should load the Header & TOC.
109 InitArchiveFmt_Custom(ArchiveHandle *AH)
113 /* Assuming static functions, this can be copied for each format. */
114 AH->ArchiveEntryPtr = _ArchiveEntry;
115 AH->StartDataPtr = _StartData;
116 AH->WriteDataPtr = _WriteData;
117 AH->EndDataPtr = _EndData;
118 AH->WriteBytePtr = _WriteByte;
119 AH->ReadBytePtr = _ReadByte;
120 AH->WriteBufPtr = _WriteBuf;
121 AH->ReadBufPtr = _ReadBuf;
122 AH->ClosePtr = _CloseArchive;
123 AH->PrintTocDataPtr = _PrintTocData;
124 AH->ReadExtraTocPtr = _ReadExtraToc;
125 AH->WriteExtraTocPtr = _WriteExtraToc;
126 AH->PrintExtraTocPtr = _PrintExtraToc;
128 AH->StartBlobsPtr = _StartBlobs;
129 AH->StartBlobPtr = _StartBlob;
130 AH->EndBlobPtr = _EndBlob;
131 AH->EndBlobsPtr = _EndBlobs;
134 * Set up some special context used in compressing data.
136 ctx = (lclContext *) calloc(1, sizeof(lclContext));
138 die_horribly(AH, modulename, "out of memory\n");
139 AH->formatData = (void *) ctx;
141 ctx->zp = (z_streamp) malloc(sizeof(z_stream));
143 die_horribly(AH, modulename, "out of memory\n");
145 /* Initialize LO buffering */
146 AH->lo_buf_size = LOBBUFSIZE;
147 AH->lo_buf = (void *) malloc(LOBBUFSIZE);
148 if (AH->lo_buf == NULL)
149 die_horribly(AH, modulename, "out of memory\n");
152 * zlibOutSize is the buffer size we tell zlib it can output to. We
153 * actually allocate one extra byte because some routines want to append a
154 * trailing zero byte to the zlib output. The input buffer is expansible
155 * and is always of size ctx->inSize; zlibInSize is just the initial
156 * default size for it.
158 ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
159 ctx->zlibIn = (char *) malloc(zlibInSize);
160 ctx->inSize = zlibInSize;
163 if (ctx->zlibOut == NULL || ctx->zlibIn == NULL)
164 die_horribly(AH, modulename, "out of memory\n");
169 if (AH->mode == archModeWrite)
171 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
172 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
177 die_horribly(AH, modulename, "could not open output file \"%s\": %s\n", AH->fSpec, strerror(errno));
179 ctx->hasSeek = checkSeek(AH->FH);
183 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
184 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
188 die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", AH->fSpec, strerror(errno));
190 ctx->hasSeek = checkSeek(AH->FH);
194 ctx->dataStart = _getFilePos(AH, ctx);
200 * Called by the Archiver when the dumper creates a new TOC entry.
204 * Set up extrac format-related TOC data.
207 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
211 ctx = (lclTocEntry *) calloc(1, sizeof(lclTocEntry));
213 ctx->dataState = K_OFFSET_POS_NOT_SET;
215 ctx->dataState = K_OFFSET_NO_DATA;
217 te->formatData = (void *) ctx;
221 * Called by the Archiver to save any extra format-related TOC entry
226 * Use the Archiver routines to write data - they are non-endian, and
227 * maintain other important file information.
230 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
232 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
234 WriteOffset(AH, ctx->dataPos, ctx->dataState);
238 * Called by the Archiver to read any extra format-related TOC data.
242 * Needs to match the order defined in _WriteExtraToc, and sould also
243 * use the Archiver input routines.
246 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
249 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
253 ctx = (lclTocEntry *) calloc(1, sizeof(lclTocEntry));
254 te->formatData = (void *) ctx;
257 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
260 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
263 if (AH->version < K_VERS_1_7)
268 * Called by the Archiver when restoring an archive to output a comment
269 * that includes useful information about the TOC entry.
275 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
277 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
279 if (AH->public.verbose)
280 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
281 (int64) ctx->dataPos);
285 * Called by the archiver when saving TABLE DATA (not schema). This routine
286 * should save whatever format-specific information is needed to read
289 * It is called just prior to the dumper's 'DataDumper' routine being called.
291 * Optional, but strongly recommended.
295 _StartData(ArchiveHandle *AH, TocEntry *te)
297 lclContext *ctx = (lclContext *) AH->formatData;
298 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
300 tctx->dataPos = _getFilePos(AH, ctx);
301 tctx->dataState = K_OFFSET_POS_SET;
303 _WriteByte(AH, BLK_DATA); /* Block type */
304 WriteInt(AH, te->dumpId); /* For sanity check */
306 _StartDataCompressor(AH, te);
310 * Called by archiver when dumper calls WriteData. This routine is
311 * called for both BLOB and TABLE data; it is the responsibility of
312 * the format to manage each kind of data using StartBlob/StartData.
314 * It should only be called from within a DataDumper routine.
319 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
321 lclContext *ctx = (lclContext *) AH->formatData;
322 z_streamp zp = ctx->zp;
324 zp->next_in = (void *) data;
327 while (zp->avail_in != 0)
329 /* printf("Deflating %lu bytes\n", (unsigned long) dLen); */
330 _DoDeflate(AH, ctx, 0);
336 * Called by the archiver when a dumper's 'DataDumper' routine has
343 _EndData(ArchiveHandle *AH, TocEntry *te)
345 /* lclContext *ctx = (lclContext *) AH->formatData; */
346 /* lclTocEntry *tctx = (lclTocEntry *) te->formatData; */
348 _EndDataCompressor(AH, te);
352 * Called by the archiver when starting to save all BLOB DATA (not schema).
353 * This routine should save whatever format-specific information is needed
354 * to read the BLOBs back into memory.
356 * It is called just prior to the dumper's DataDumper routine.
358 * Optional, but strongly recommended.
361 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
363 lclContext *ctx = (lclContext *) AH->formatData;
364 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
366 tctx->dataPos = _getFilePos(AH, ctx);
367 tctx->dataState = K_OFFSET_POS_SET;
369 _WriteByte(AH, BLK_BLOBS); /* Block type */
370 WriteInt(AH, te->dumpId); /* For sanity check */
374 * Called by the archiver when the dumper calls StartBlob.
378 * Must save the passed OID for retrieval at restore-time.
381 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
384 die_horribly(AH, modulename, "invalid OID for large object\n");
387 _StartDataCompressor(AH, te);
391 * Called by the archiver when the dumper calls EndBlob.
396 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
398 _EndDataCompressor(AH, te);
402 * Called by the archiver when finishing saving all BLOB DATA.
407 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
409 /* Write out a fake zero OID to mark end-of-blobs. */
414 * Print data for a given TOC entry
417 _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
419 lclContext *ctx = (lclContext *) AH->formatData;
421 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
425 if (tctx->dataState == K_OFFSET_NO_DATA)
428 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
430 /* Skip over unnecessary blocks until we get the one we want. */
434 _readBlockHeader(AH, &blkType, &id);
436 while (id != te->dumpId)
438 if ((TocIDRequired(AH, id, ropt) & REQ_DATA) != 0)
439 die_horribly(AH, modulename,
440 "dumping a specific TOC data block out of order is not supported"
441 " without ID on this input stream (fseek required)\n");
453 default: /* Always have a default */
454 die_horribly(AH, modulename,
455 "unrecognized data block type (%d) while searching archive\n",
459 _readBlockHeader(AH, &blkType, &id);
465 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
466 die_horribly(AH, modulename, "error during file seek: %s\n", strerror(errno));
468 _readBlockHeader(AH, &blkType, &id);
472 if (id != te->dumpId)
473 die_horribly(AH, modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
486 default: /* Always have a default */
487 die_horribly(AH, modulename, "unrecognized data block type %d while restoring archive\n",
494 * Print data from current file position.
497 _PrintData(ArchiveHandle *AH)
499 lclContext *ctx = (lclContext *) AH->formatData;
500 z_streamp zp = ctx->zp;
502 char *in = ctx->zlibIn;
507 char *out = ctx->zlibOut;
514 if (AH->compression != 0)
520 if (inflateInit(zp) != Z_OK)
521 die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
525 blkLen = ReadInt(AH);
528 if (blkLen + 1 > ctx->inSize)
532 ctx->zlibIn = (char *) malloc(blkLen + 1);
534 die_horribly(AH, modulename, "out of memory\n");
536 ctx->inSize = blkLen + 1;
540 cnt = fread(in, 1, blkLen, AH->FH);
544 die_horribly(AH, modulename,
545 "could not read from input file: end of file\n");
547 die_horribly(AH, modulename,
548 "could not read from input file: %s\n", strerror(errno));
551 ctx->filePos += blkLen;
553 zp->next_in = (void *) in;
554 zp->avail_in = blkLen;
558 if (AH->compression != 0)
560 while (zp->avail_in != 0)
562 zp->next_out = (void *) out;
563 zp->avail_out = zlibOutSize;
564 res = inflate(zp, 0);
565 if (res != Z_OK && res != Z_STREAM_END)
566 die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
568 out[zlibOutSize - zp->avail_out] = '\0';
569 ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
575 in[zp->avail_in] = '\0';
576 ahwrite(in, 1, zp->avail_in, AH);
582 blkLen = ReadInt(AH);
586 if (AH->compression != 0)
590 while (res != Z_STREAM_END)
592 zp->next_out = (void *) out;
593 zp->avail_out = zlibOutSize;
594 res = inflate(zp, 0);
595 if (res != Z_OK && res != Z_STREAM_END)
596 die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
598 out[zlibOutSize - zp->avail_out] = '\0';
599 ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
601 if (inflateEnd(zp) != Z_OK)
602 die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg);
608 _LoadBlobs(ArchiveHandle *AH)
612 StartRestoreBlobs(AH);
617 StartRestoreBlob(AH, oid);
619 EndRestoreBlob(AH, oid);
627 * Skip the BLOBs from the current file position.
628 * BLOBS are written sequentially as data blocks (see below).
629 * Each BLOB is preceded by it's original OID.
630 * A zero OID indicated the end of the BLOBS
633 _skipBlobs(ArchiveHandle *AH)
646 * Skip data from current file position.
647 * Data blocks are formatted as an integer length, followed by data.
648 * A zero length denoted the end of the block.
651 _skipData(ArchiveHandle *AH)
653 lclContext *ctx = (lclContext *) AH->formatData;
655 char *in = ctx->zlibIn;
658 blkLen = ReadInt(AH);
661 if (blkLen > ctx->inSize)
664 ctx->zlibIn = (char *) malloc(blkLen);
665 ctx->inSize = blkLen;
668 cnt = fread(in, 1, blkLen, AH->FH);
672 die_horribly(AH, modulename,
673 "could not read from input file: end of file\n");
675 die_horribly(AH, modulename,
676 "could not read from input file: %s\n", strerror(errno));
679 ctx->filePos += blkLen;
681 blkLen = ReadInt(AH);
686 * Write a byte of data to the archive.
690 * Called by the archiver to do integer & byte output to the archive.
691 * These routines are only used to read & write headers & TOC.
695 _WriteByte(ArchiveHandle *AH, const int i)
697 lclContext *ctx = (lclContext *) AH->formatData;
700 res = fputc(i, AH->FH);
704 die_horribly(AH, modulename, "could not write byte: %s\n", strerror(errno));
709 * Read a byte of data from the archive.
713 * Called by the archiver to read bytes & integers from the archive.
714 * These routines are only used to read & write headers & TOC.
718 _ReadByte(ArchiveHandle *AH)
720 lclContext *ctx = (lclContext *) AH->formatData;
730 * Write a buffer of data to the archive.
734 * Called by the archiver to write a block of bytes to the archive.
735 * These routines are only used to read & write headers & TOC.
739 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
741 lclContext *ctx = (lclContext *) AH->formatData;
744 res = fwrite(buf, 1, len, AH->FH);
747 die_horribly(AH, modulename,
748 "could not write to output file: %s\n", strerror(errno));
755 * Read a block of bytes from the archive.
759 * Called by the archiver to read a block of bytes from the archive
760 * These routines are only used to read & write headers & TOC.
764 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
766 lclContext *ctx = (lclContext *) AH->formatData;
769 res = fread(buf, 1, len, AH->FH);
780 * When writing the archive, this is the routine that actually starts
781 * the process of saving it to files. No data should be written prior
782 * to this point, since the user could sort the TOC after creating it.
784 * If an archive is to be written, this toutine must call:
785 * WriteHead to save the archive header
786 * WriteToc to save the TOC entries
787 * WriteDataChunks to save all DATA & BLOBs.
791 _CloseArchive(ArchiveHandle *AH)
793 lclContext *ctx = (lclContext *) AH->formatData;
796 if (AH->mode == archModeWrite)
799 tpos = ftello(AH->FH);
801 ctx->dataStart = _getFilePos(AH, ctx);
805 * This is not an essential operation - it is really only needed if we
806 * expect to be doing seeks to read the data back - it may be ok to
807 * just use the existing self-consistent block formatting.
811 fseeko(AH->FH, tpos, SEEK_SET);
816 if (fclose(AH->FH) != 0)
817 die_horribly(AH, modulename, "could not close archive file: %s\n", strerror(errno));
822 /*--------------------------------------------------
823 * END OF FORMAT CALLBACKS
824 *--------------------------------------------------
828 * Get the current position in the archive file.
831 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
837 pos = ftello(AH->FH);
838 if (pos != ctx->filePos)
840 write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
843 * Prior to 1.7 (pg7.3) we relied on the internally maintained
844 * pointer. Now we rely on pgoff_t always. pos = ctx->filePos;
854 * Read a data block header. The format changed in V1.3, so we
855 * put the code here for simplicity.
858 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
860 if (AH->version < K_VERS_1_3)
863 *type = _ReadByte(AH);
869 * If zlib is available, then startit up. This is called from
870 * StartData & StartBlob. The buffers are setup in the Init routine.
873 _StartDataCompressor(ArchiveHandle *AH, TocEntry *te)
875 lclContext *ctx = (lclContext *) AH->formatData;
876 z_streamp zp = ctx->zp;
880 if (AH->compression < 0 || AH->compression > 9)
881 AH->compression = Z_DEFAULT_COMPRESSION;
883 if (AH->compression != 0)
889 if (deflateInit(zp, AH->compression) != Z_OK)
890 die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
897 /* Just be paranoid - maybe End is called after Start, with no Write */
898 zp->next_out = (void *) ctx->zlibOut;
899 zp->avail_out = zlibOutSize;
903 * Send compressed data to the output stream (via ahwrite).
904 * Each data chunk is preceded by it's length.
905 * In the case of Z0, or no zlib, just write the raw data.
909 _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush)
911 z_streamp zp = ctx->zp;
914 char *out = ctx->zlibOut;
917 if (AH->compression != 0)
919 res = deflate(zp, flush);
920 if (res == Z_STREAM_ERROR)
921 die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg);
923 if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize))
924 || (zp->avail_out == 0)
925 || (zp->avail_in != 0)
929 * Extra paranoia: avoid zero-length chunks since a zero length
930 * chunk is the EOF marker. This should never happen but...
932 if (zp->avail_out < zlibOutSize)
935 * printf("Wrote %lu byte deflated chunk\n", (unsigned long)
936 * (zlibOutSize - zp->avail_out));
938 WriteInt(AH, zlibOutSize - zp->avail_out);
939 if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out))
940 die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
941 ctx->filePos += zlibOutSize - zp->avail_out;
943 zp->next_out = (void *) out;
944 zp->avail_out = zlibOutSize;
950 if (zp->avail_in > 0)
952 WriteInt(AH, zp->avail_in);
953 if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in)
954 die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
955 ctx->filePos += zp->avail_in;
961 if (flush == Z_FINISH)
975 * Terminate zlib context and flush it's buffers. If no zlib
980 _EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
984 lclContext *ctx = (lclContext *) AH->formatData;
985 z_streamp zp = ctx->zp;
988 if (AH->compression != 0)
995 /* printf("Ending data output\n"); */
996 res = _DoDeflate(AH, ctx, Z_FINISH);
997 } while (res != Z_STREAM_END);
999 if (deflateEnd(zp) != Z_OK)
1000 die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg);
1004 /* Send the end marker */