]> granicus.if.org Git - postgresql/blobdiff - src/bin/pg_dump/pg_backup_custom.c
Modify pg_dump to use error-free memory allocation macros. This avoids
[postgresql] / src / bin / pg_dump / pg_backup_custom.c
index 37597f7b064aeb233958039a837bb487fe0f3c6f..bfdf482a6b268a2cdfbdd88009c2c0aadb736348 100644 (file)
  *
  *
  * IDENTIFICATION
- *             $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.21 2002/09/04 20:31:34 momjian Exp $
+ *             src/bin/pg_dump/pg_backup_custom.c
  *
  *-------------------------------------------------------------------------
  */
 
-#include "pg_backup.h"
-#include "pg_backup_archiver.h"
-
-#include <errno.h>
+#include "compress_io.h"
+#include "common.h"
 
 /*--------
  * Routines in the format interface
@@ -43,6 +41,7 @@ static int    _ReadByte(ArchiveHandle *);
 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
 static void _CloseArchive(ArchiveHandle *AH);
+static void _ReopenArchive(ArchiveHandle *AH);
 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
@@ -56,31 +55,22 @@ static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
-static void _LoadBlobs(ArchiveHandle *AH);
-
-/*------------
- * Buffers used in zlib compression and extra data stored in archive and
- * in TOC entries.
- *------------
- */
-#define zlibOutSize 4096
-#define zlibInSize     4096
+static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+static void _Clone(ArchiveHandle *AH);
+static void _DeClone(ArchiveHandle *AH);
 
 typedef struct
 {
-       z_streamp       zp;
-       char       *zlibOut;
-       char       *zlibIn;
-       size_t          inSize;
+       CompressorState *cs;
        int                     hasSeek;
-       off_t           filePos;
-       off_t           dataStart;
+       pgoff_t         filePos;
+       pgoff_t         dataStart;
 } lclContext;
 
 typedef struct
 {
-       off_t           dataPos;
-       size_t          dataLen;
+       int                     dataState;
+       pgoff_t         dataPos;
 } lclTocEntry;
 
 
@@ -89,12 +79,12 @@ typedef struct
  *------
  */
 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
-static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
-static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
-static off_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
-static int     _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush);
+static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
+
+static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
+static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
 
-static char *modulename = gettext_noop("custom archiver");
+static const char *modulename = gettext_noop("custom archiver");
 
 
 
@@ -123,6 +113,7 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
        AH->WriteBufPtr = _WriteBuf;
        AH->ReadBufPtr = _ReadBuf;
        AH->ClosePtr = _CloseArchive;
+       AH->ReopenPtr = _ReopenArchive;
        AH->PrintTocDataPtr = _PrintTocData;
        AH->ReadExtraTocPtr = _ReadExtraToc;
        AH->WriteExtraTocPtr = _WriteExtraToc;
@@ -132,68 +123,59 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
        AH->StartBlobPtr = _StartBlob;
        AH->EndBlobPtr = _EndBlob;
        AH->EndBlobsPtr = _EndBlobs;
+       AH->ClonePtr = _Clone;
+       AH->DeClonePtr = _DeClone;
 
-       /*
-        * Set up some special context used in compressing data.
-        */
-       ctx = (lclContext *) malloc(sizeof(lclContext));
-       if (ctx == NULL)
-               die_horribly(AH, modulename, "out of memory\n");
+       /* Set up a private area. */
+       ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
        AH->formatData = (void *) ctx;
 
-       ctx->zp = (z_streamp) malloc(sizeof(z_stream));
-       if (ctx->zp == NULL)
-               die_horribly(AH, modulename, "out of memory\n");
-
        /* Initialize LO buffering */
        AH->lo_buf_size = LOBBUFSIZE;
-       AH->lo_buf = (void *) malloc(LOBBUFSIZE);
-       if (AH->lo_buf == NULL)
-               die_horribly(AH, modulename, "out of memory\n");
+       AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
 
-       /*
-        * zlibOutSize is the buffer size we tell zlib it can output to.  We
-        * actually allocate one extra byte because some routines want to
-        * append a trailing zero byte to the zlib output.      The input buffer
-        * is expansible and is always of size ctx->inSize; zlibInSize is just
-        * the initial default size for it.
-        */
-       ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
-       ctx->zlibIn = (char *) malloc(zlibInSize);
-       ctx->inSize = zlibInSize;
        ctx->filePos = 0;
 
-       if (ctx->zlibOut == NULL || ctx->zlibIn == NULL)
-               die_horribly(AH, modulename, "out of memory\n");
-
        /*
         * Now open the file
         */
        if (AH->mode == archModeWrite)
        {
-
                if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
+               {
                        AH->FH = fopen(AH->fSpec, PG_BINARY_W);
+                       if (!AH->FH)
+                               die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
+                                                        AH->fSpec, strerror(errno));
+               }
                else
+               {
                        AH->FH = stdout;
+                       if (!AH->FH)
+                               die_horribly(AH, modulename, "could not open output file: %s\n",
+                                                        strerror(errno));
+               }
 
-               if (!AH->FH)
-                       die_horribly(AH, modulename, "could not open archive file %s: %s\n", AH->fSpec, strerror(errno));
-
-               ctx->hasSeek = (fseeko(AH->FH, 0, SEEK_CUR) == 0);
-
+               ctx->hasSeek = checkSeek(AH->FH);
        }
        else
        {
-
                if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
+               {
                        AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+                       if (!AH->FH)
+                               die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                                                        AH->fSpec, strerror(errno));
+               }
                else
+               {
                        AH->FH = stdin;
-               if (!AH->FH)
-                       die_horribly(AH, modulename, "could not open archive file %s: %s\n", AH->fSpec, strerror(errno));
+                       if (!AH->FH)
+                               die_horribly(AH, modulename, "could not open input file: %s\n",
+                                                        strerror(errno));
+               }
 
-               ctx->hasSeek = (fseeko(AH->FH, 0, SEEK_CUR) == 0);
+               ctx->hasSeek = checkSeek(AH->FH);
 
                ReadHead(AH);
                ReadToc(AH);
@@ -214,14 +196,13 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
 {
        lclTocEntry *ctx;
 
-       ctx = (lclTocEntry *) calloc(1, sizeof(lclTocEntry));
+       ctx = (lclTocEntry *) pg_calloc(1, sizeof(lclTocEntry));
        if (te->dataDumper)
-               ctx->dataPos = -1;
+               ctx->dataState = K_OFFSET_POS_NOT_SET;
        else
-               ctx->dataPos = 0;
-       ctx->dataLen = 0;
-       te->formatData = (void *) ctx;
+               ctx->dataState = K_OFFSET_NO_DATA;
 
+       te->formatData = (void *) ctx;
 }
 
 /*
@@ -238,8 +219,7 @@ _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
 {
        lclTocEntry *ctx = (lclTocEntry *) te->formatData;
 
-       WriteInt(AH, ctx->dataPos);
-       WriteInt(AH, ctx->dataLen);
+       WriteOffset(AH, ctx->dataPos, ctx->dataState);
 }
 
 /*
@@ -257,12 +237,18 @@ _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
 
        if (ctx == NULL)
        {
-               ctx = (lclTocEntry *) malloc(sizeof(lclTocEntry));
+               ctx = (lclTocEntry *) pg_calloc(1, sizeof(lclTocEntry));
                te->formatData = (void *) ctx;
        }
 
-       ctx->dataPos = ReadInt(AH);
-       ctx->dataLen = ReadInt(AH);
+       ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
+
+       /*
+        * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
+        * dump it at all.
+        */
+       if (AH->version < K_VERS_1_7)
+               ReadInt(AH);
 }
 
 /*
@@ -277,8 +263,9 @@ _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
 {
        lclTocEntry *ctx = (lclTocEntry *) te->formatData;
 
-       ahprintf(AH, "-- Data Pos: " INT64_FORMAT " (Length %lu)\n",
-                        (int64) ctx->dataPos, (unsigned long) ctx->dataLen);
+       if (AH->public.verbose)
+               ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
+                                (int64) ctx->dataPos);
 }
 
 /*
@@ -298,12 +285,12 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
        lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 
        tctx->dataPos = _getFilePos(AH, ctx);
+       tctx->dataState = K_OFFSET_POS_SET;
 
        _WriteByte(AH, BLK_DATA);       /* Block type */
-       WriteInt(AH, te->id);           /* For sanity check */
-
-       _StartDataCompressor(AH, te);
+       WriteInt(AH, te->dumpId);       /* For sanity check */
 
+       ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
 }
 
 /*
@@ -311,26 +298,20 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
  * called for both BLOB and TABLE data; it is the responsibility of
  * the format to manage each kind of data using StartBlob/StartData.
  *
- * It should only be called from withing a DataDumper routine.
+ * It should only be called from within a DataDumper routine.
  *
  * Mandatory.
- *
  */
 static size_t
 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
 {
        lclContext *ctx = (lclContext *) AH->formatData;
-       z_streamp       zp = ctx->zp;
+       CompressorState *cs = ctx->cs;
 
-       zp->next_in = (void *) data;
-       zp->avail_in = dLen;
+       if (dLen == 0)
+               return 0;
 
-       while (zp->avail_in != 0)
-       {
-               /* printf("Deflating %lu bytes\n", (unsigned long) dLen); */
-               _DoDeflate(AH, ctx, 0);
-       }
-       return dLen;
+       return WriteDataToArchive(AH, cs, data, dLen);
 }
 
 /*
@@ -344,11 +325,10 @@ static void
 _EndData(ArchiveHandle *AH, TocEntry *te)
 {
        lclContext *ctx = (lclContext *) AH->formatData;
-       lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 
-       _EndDataCompressor(AH, te);
-
-       tctx->dataLen = _getFilePos(AH, ctx) - tctx->dataPos;
+       EndCompressor(AH, ctx->cs);
+       /* Send the end marker */
+       WriteInt(AH, 0);
 }
 
 /*
@@ -359,7 +339,6 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
  * It is called just prior to the dumper's DataDumper routine.
  *
  * Optional, but strongly recommended.
- *
  */
 static void
 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
@@ -368,10 +347,10 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
        lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 
        tctx->dataPos = _getFilePos(AH, ctx);
+       tctx->dataState = K_OFFSET_POS_SET;
 
        _WriteByte(AH, BLK_BLOBS);      /* Block type */
-       WriteInt(AH, te->id);           /* For sanity check */
-
+       WriteInt(AH, te->dumpId);       /* For sanity check */
 }
 
 /*
@@ -384,30 +363,35 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 static void
 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
+       lclContext *ctx = (lclContext *) AH->formatData;
+
        if (oid == 0)
                die_horribly(AH, modulename, "invalid OID for large object\n");
 
        WriteInt(AH, oid);
-       _StartDataCompressor(AH, te);
+
+       ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
 }
 
 /*
  * Called by the archiver when the dumper calls EndBlob.
  *
  * Optional.
- *
  */
 static void
 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
-       _EndDataCompressor(AH, te);
+       lclContext *ctx = (lclContext *) AH->formatData;
+
+       EndCompressor(AH, ctx->cs);
+       /* Send the end marker */
+       WriteInt(AH, 0);
 }
 
 /*
  * Called by the archiver when finishing saving all BLOB DATA.
  *
  * Optional.
- *
  */
 static void
 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
@@ -417,104 +401,98 @@ _EndBlobs(ArchiveHandle *AH, TocEntry *te)
 }
 
 /*
- * Print data for a gievn TOC entry
-*/
+ * Print data for a given TOC entry
+ */
 static void
 _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 {
        lclContext *ctx = (lclContext *) AH->formatData;
-       int                     id;
        lclTocEntry *tctx = (lclTocEntry *) te->formatData;
        int                     blkType;
-       int                     found = 0;
+       int                     id;
 
-       if (tctx->dataPos == 0)
+       if (tctx->dataState == K_OFFSET_NO_DATA)
                return;
 
-       if (!ctx->hasSeek || tctx->dataPos < 0)
+       if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
        {
-
-               /* Skip over unnecessary blocks until we get the one we want. */
-
-               found = 0;
-
+               /*
+                * We cannot seek directly to the desired block.  Instead, skip over
+                * block headers until we find the one we want.  This could fail if we
+                * are asked to restore items out-of-order.
+                */
                _readBlockHeader(AH, &blkType, &id);
 
-               while (id != te->id)
+               while (blkType != EOF && id != te->dumpId)
                {
-
-                       if ((TocIDRequired(AH, id, ropt) & 2) != 0)
-                               die_horribly(AH, modulename,
-                                                        "Dumping a specific TOC data block out of order is not supported"
-                                 " without id on this input stream (fseek required)\n");
-
                        switch (blkType)
                        {
-
                                case BLK_DATA:
-
                                        _skipData(AH);
                                        break;
 
                                case BLK_BLOBS:
-
                                        _skipBlobs(AH);
                                        break;
 
                                default:                /* Always have a default */
-
                                        die_horribly(AH, modulename,
                                                                 "unrecognized data block type (%d) while searching archive\n",
                                                                 blkType);
                                        break;
                        }
-
                        _readBlockHeader(AH, &blkType, &id);
-
                }
-
        }
        else
        {
-
-               /* Grab it */
-
+               /* We can just seek to the place we need to be. */
                if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
-                       die_horribly(AH, modulename, "error during file seek: %s\n", strerror(errno));
+                       die_horribly(AH, modulename, "error during file seek: %s\n",
+                                                strerror(errno));
 
                _readBlockHeader(AH, &blkType, &id);
+       }
 
+       /* Produce suitable failure message if we fell off end of file */
+       if (blkType == EOF)
+       {
+               if (tctx->dataState == K_OFFSET_POS_NOT_SET)
+                       die_horribly(AH, modulename, "could not find block ID %d in archive -- "
+                                                "possibly due to out-of-order restore request, "
+                                                "which cannot be handled due to lack of data offsets in archive\n",
+                                                te->dumpId);
+               else if (!ctx->hasSeek)
+                       die_horribly(AH, modulename, "could not find block ID %d in archive -- "
+                                                "possibly due to out-of-order restore request, "
+                                 "which cannot be handled due to non-seekable input file\n",
+                                                te->dumpId);
+               else    /* huh, the dataPos led us to EOF? */
+                       die_horribly(AH, modulename, "could not find block ID %d in archive -- "
+                                                "possibly corrupt archive\n",
+                                                te->dumpId);
        }
 
        /* Are we sane? */
-       if (id != te->id)
-               die_horribly(AH, modulename, "found unexpected block ID (%d) when reading data - expected %d\n",
-                                        id, te->id);
+       if (id != te->dumpId)
+               die_horribly(AH, modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
+                                        id, te->dumpId);
 
        switch (blkType)
        {
-
                case BLK_DATA:
-
                        _PrintData(AH);
                        break;
 
                case BLK_BLOBS:
-
-                       if (!AH->connection)
-                               die_horribly(AH, modulename, "large objects cannot be loaded without a database connection\n");
-
-                       _LoadBlobs(AH);
+                       _LoadBlobs(AH, ropt->dropSchema);
                        break;
 
                default:                                /* Always have a default */
-
                        die_horribly(AH, modulename, "unrecognized data block type %d while restoring archive\n",
                                                 blkType);
                        break;
        }
-
-       ahprintf(AH, "\n\n");
 }
 
 /*
@@ -523,115 +501,11 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 static void
 _PrintData(ArchiveHandle *AH)
 {
-       lclContext *ctx = (lclContext *) AH->formatData;
-       z_streamp       zp = ctx->zp;
-       size_t          blkLen;
-       char       *in = ctx->zlibIn;
-       size_t          cnt;
-
-#ifdef HAVE_LIBZ
-       int                     res;
-       char       *out = ctx->zlibOut;
-#endif
-
-#ifdef HAVE_LIBZ
-
-       res = Z_OK;
-
-       if (AH->compression != 0)
-       {
-               zp->zalloc = Z_NULL;
-               zp->zfree = Z_NULL;
-               zp->opaque = Z_NULL;
-
-               if (inflateInit(zp) != Z_OK)
-                       die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
-       }
-#endif
-
-       blkLen = ReadInt(AH);
-       while (blkLen != 0)
-       {
-               if (blkLen + 1 > ctx->inSize)
-               {
-                       free(ctx->zlibIn);
-                       ctx->zlibIn = NULL;
-                       ctx->zlibIn = (char *) malloc(blkLen + 1);
-                       if (!ctx->zlibIn)
-                               die_horribly(AH, modulename, "out of memory\n");
-
-                       ctx->inSize = blkLen + 1;
-                       in = ctx->zlibIn;
-               }
-
-               cnt = fread(in, 1, blkLen, AH->FH);
-               if (cnt != blkLen)
-                       die_horribly(AH, modulename,
-                                  "could not read data block - expected %lu, got %lu\n",
-                                                (unsigned long) blkLen, (unsigned long) cnt);
-
-               ctx->filePos += blkLen;
-
-               zp->next_in = in;
-               zp->avail_in = blkLen;
-
-#ifdef HAVE_LIBZ
-
-               if (AH->compression != 0)
-               {
-
-                       while (zp->avail_in != 0)
-                       {
-                               zp->next_out = out;
-                               zp->avail_out = zlibOutSize;
-                               res = inflate(zp, 0);
-                               if (res != Z_OK && res != Z_STREAM_END)
-                                       die_horribly(AH, modulename, "unable to uncompress data: %s\n", zp->msg);
-
-                               out[zlibOutSize - zp->avail_out] = '\0';
-                               ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
-                       }
-               }
-               else
-               {
-#endif
-                       in[zp->avail_in] = '\0';
-                       ahwrite(in, 1, zp->avail_in, AH);
-                       zp->avail_in = 0;
-
-#ifdef HAVE_LIBZ
-               }
-#endif
-
-               blkLen = ReadInt(AH);
-
-       }
-
-#ifdef HAVE_LIBZ
-       if (AH->compression != 0)
-       {
-               zp->next_in = NULL;
-               zp->avail_in = 0;
-               while (res != Z_STREAM_END)
-               {
-                       zp->next_out = out;
-                       zp->avail_out = zlibOutSize;
-                       res = inflate(zp, 0);
-                       if (res != Z_OK && res != Z_STREAM_END)
-                               die_horribly(AH, modulename, "unable to uncompress data: %s\n", zp->msg);
-
-                       out[zlibOutSize - zp->avail_out] = '\0';
-                       ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
-               }
-               if (inflateEnd(zp) != Z_OK)
-                       die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg);
-       }
-#endif
-
+       ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
 }
 
 static void
-_LoadBlobs(ArchiveHandle *AH)
+_LoadBlobs(ArchiveHandle *AH, bool drop)
 {
        Oid                     oid;
 
@@ -640,14 +514,13 @@ _LoadBlobs(ArchiveHandle *AH)
        oid = ReadInt(AH);
        while (oid != 0)
        {
-               StartRestoreBlob(AH, oid);
+               StartRestoreBlob(AH, oid, drop);
                _PrintData(AH);
                EndRestoreBlob(AH, oid);
                oid = ReadInt(AH);
        }
 
        EndRestoreBlobs(AH);
-
 }
 
 /*
@@ -679,30 +552,38 @@ _skipData(ArchiveHandle *AH)
 {
        lclContext *ctx = (lclContext *) AH->formatData;
        size_t          blkLen;
-       char       *in = ctx->zlibIn;
+       char       *buf = NULL;
+       int                     buflen = 0;
        size_t          cnt;
 
        blkLen = ReadInt(AH);
        while (blkLen != 0)
        {
-               if (blkLen > ctx->inSize)
+               if (blkLen > buflen)
                {
-                       free(ctx->zlibIn);
-                       ctx->zlibIn = (char *) malloc(blkLen);
-                       ctx->inSize = blkLen;
-                       in = ctx->zlibIn;
+                       if (buf)
+                               free(buf);
+                       buf = (char *) pg_malloc(blkLen);
+                       buflen = blkLen;
                }
-               cnt = fread(in, 1, blkLen, AH->FH);
+               cnt = fread(buf, 1, blkLen, AH->FH);
                if (cnt != blkLen)
-                       die_horribly(AH, modulename,
-                                  "could not read data block - expected %lu, got %lu\n",
-                                                (unsigned long) blkLen, (unsigned long) cnt);
+               {
+                       if (feof(AH->FH))
+                               die_horribly(AH, modulename,
+                                                        "could not read from input file: end of file\n");
+                       else
+                               die_horribly(AH, modulename,
+                                       "could not read from input file: %s\n", strerror(errno));
+               }
 
                ctx->filePos += blkLen;
 
                blkLen = ReadInt(AH);
        }
 
+       if (buf)
+               free(buf);
 }
 
 /*
@@ -711,8 +592,6 @@ _skipData(ArchiveHandle *AH)
  * Mandatory.
  *
  * Called by the archiver to do integer & byte output to the archive.
- * These routines are only used to read & write headers & TOC.
- *
  */
 static int
 _WriteByte(ArchiveHandle *AH, const int i)
@@ -734,8 +613,7 @@ _WriteByte(ArchiveHandle *AH, const int i)
  * Mandatory
  *
  * Called by the archiver to read bytes & integers from the archive.
- * These routines are only used to read & write headers & TOC.
- *
+ * EOF should be treated as a fatal error.
  */
 static int
 _ReadByte(ArchiveHandle *AH)
@@ -743,9 +621,10 @@ _ReadByte(ArchiveHandle *AH)
        lclContext *ctx = (lclContext *) AH->formatData;
        int                     res;
 
-       res = fgetc(AH->FH);
-       if (res != EOF)
-               ctx->filePos += 1;
+       res = getc(AH->FH);
+       if (res == EOF)
+               die_horribly(AH, modulename, "unexpected end of file\n");
+       ctx->filePos += 1;
        return res;
 }
 
@@ -755,8 +634,6 @@ _ReadByte(ArchiveHandle *AH)
  * Mandatory.
  *
  * Called by the archiver to write a block of bytes to the archive.
- * These routines are only used to read & write headers & TOC.
- *
  */
 static size_t
 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
@@ -768,8 +645,7 @@ _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
 
        if (res != len)
                die_horribly(AH, modulename,
-                                        "write error in _WriteBuf (%lu != %lu)\n",
-                                        (unsigned long) res, (unsigned long) len);
+                                        "could not write to output file: %s\n", strerror(errno));
 
        ctx->filePos += res;
        return res;
@@ -781,8 +657,6 @@ _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
  * Mandatory.
  *
  * Called by the archiver to read a block of bytes from the archive
- * These routines are only used to read & write headers & TOC.
- *
  */
 static size_t
 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
@@ -815,7 +689,7 @@ static void
 _CloseArchive(ArchiveHandle *AH)
 {
        lclContext *ctx = (lclContext *) AH->formatData;
-       off_t           tpos;
+       pgoff_t         tpos;
 
        if (AH->mode == archModeWrite)
        {
@@ -826,16 +700,14 @@ _CloseArchive(ArchiveHandle *AH)
                WriteDataChunks(AH);
 
                /*
-                * This is not an essential operation - it is really only needed
-                * if we expect to be doing seeks to read the data back - it may
-                * be ok to just use the existing self-consistent block
-                * formatting.
+                * If possible, re-write the TOC in order to update the data offset
+                * information.  This is not essential, as pg_restore can cope in most
+                * cases without it; but it can make pg_restore significantly faster
+                * in some situations (especially parallel restore).
                 */
-               if (ctx->hasSeek)
-               {
-                       fseeko(AH->FH, tpos, SEEK_SET);
+               if (ctx->hasSeek &&
+                       fseeko(AH->FH, tpos, SEEK_SET) == 0)
                        WriteToc(AH);
-               }
        }
 
        if (fclose(AH->FH) != 0)
@@ -844,6 +716,85 @@ _CloseArchive(ArchiveHandle *AH)
        AH->FH = NULL;
 }
 
+/*
+ * Reopen the archive's file handle.
+ *
+ * We close the original file handle, except on Windows.  (The difference
+ * is because on Windows, this is used within a multithreading context,
+ * and we don't want a thread closing the parent file handle.)
+ */
+static void
+_ReopenArchive(ArchiveHandle *AH)
+{
+       lclContext *ctx = (lclContext *) AH->formatData;
+       pgoff_t         tpos;
+
+       if (AH->mode == archModeWrite)
+               die_horribly(AH, modulename, "can only reopen input archives\n");
+
+       /*
+        * These two cases are user-facing errors since they represent unsupported
+        * (but not invalid) use-cases.  Word the error messages appropriately.
+        */
+       if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
+               die_horribly(AH, modulename, "parallel restore from stdin is not supported\n");
+       if (!ctx->hasSeek)
+               die_horribly(AH, modulename, "parallel restore from non-seekable file is not supported\n");
+
+       errno = 0;
+       tpos = ftello(AH->FH);
+       if (errno)
+               die_horribly(AH, modulename, "could not determine seek position in archive file: %s\n",
+                                        strerror(errno));
+
+#ifndef WIN32
+       if (fclose(AH->FH) != 0)
+               die_horribly(AH, modulename, "could not close archive file: %s\n",
+                                        strerror(errno));
+#endif
+
+       AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+       if (!AH->FH)
+               die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                                        AH->fSpec, strerror(errno));
+
+       if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
+               die_horribly(AH, modulename, "could not set seek position in archive file: %s\n",
+                                        strerror(errno));
+}
+
+/*
+ * Clone format-specific fields during parallel restoration.
+ */
+static void
+_Clone(ArchiveHandle *AH)
+{
+       lclContext *ctx = (lclContext *) AH->formatData;
+
+       AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+       memcpy(AH->formatData, ctx, sizeof(lclContext));
+       ctx = (lclContext *) AH->formatData;
+
+       /* sanity check, shouldn't happen */
+       if (ctx->cs != NULL)
+               die_horribly(AH, modulename, "compressor active\n");
+
+       /*
+        * Note: we do not make a local lo_buf because we expect at most one BLOBS
+        * entry per archive, so no parallelism is possible.  Likewise,
+        * TOC-entry-local state isn't an issue because any one TOC entry is
+        * touched by just one worker child.
+        */
+}
+
+static void
+_DeClone(ArchiveHandle *AH)
+{
+       lclContext *ctx = (lclContext *) AH->formatData;
+
+       free(ctx);
+}
+
 /*--------------------------------------------------
  * END OF FORMAT CALLBACKS
  *--------------------------------------------------
@@ -852,18 +803,23 @@ _CloseArchive(ArchiveHandle *AH)
 /*
  * Get the current position in the archive file.
  */
-static off_t
+static pgoff_t
 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
 {
-       off_t           pos;
+       pgoff_t         pos;
 
        if (ctx->hasSeek)
        {
                pos = ftello(AH->FH);
                if (pos != ctx->filePos)
                {
-                       write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell ignored\n");
-                       pos = ctx->filePos;
+                       write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
+
+                       /*
+                        * Prior to 1.7 (pg7.3) we relied on the internally maintained
+                        * pointer. Now we rely on ftello() always, unless the file has
+                        * been found to not support it.
+                        */
                }
        }
        else
@@ -873,161 +829,86 @@ _getFilePos(ArchiveHandle *AH, lclContext *ctx)
 
 /*
  * Read a data block header. The format changed in V1.3, so we
- * put the code here for simplicity.
+ * centralize the code here for simplicity.  Returns *type = EOF
+ * if at EOF.
  */
 static void
 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
 {
+       lclContext *ctx = (lclContext *) AH->formatData;
+       int                     byt;
+
+       /*
+        * Note: if we are at EOF with a pre-1.3 input file, we'll die_horribly
+        * inside ReadInt rather than returning EOF.  It doesn't seem worth
+        * jumping through hoops to deal with that case better, because no such
+        * files are likely to exist in the wild: only some 7.1 development
+        * versions of pg_dump ever generated such files.
+        */
        if (AH->version < K_VERS_1_3)
                *type = BLK_DATA;
        else
-               *type = _ReadByte(AH);;
+       {
+               byt = getc(AH->FH);
+               *type = byt;
+               if (byt == EOF)
+               {
+                       *id = 0;                        /* don't return an uninitialized value */
+                       return;
+               }
+               ctx->filePos += 1;
+       }
 
        *id = ReadInt(AH);
 }
 
 /*
- * If zlib is available, then startit up. This is called from
- * StartData & StartBlob. The buffers are setup in the Init routine.
- *
+ * Callback function for WriteDataToArchive. Writes one block of (compressed)
+ * data to the archive.
  */
-static void
-_StartDataCompressor(ArchiveHandle *AH, TocEntry *te)
+static size_t
+_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
 {
-       lclContext *ctx = (lclContext *) AH->formatData;
-       z_streamp       zp = ctx->zp;
-
-#ifdef HAVE_LIBZ
-
-       if (AH->compression < 0 || AH->compression > 9)
-               AH->compression = Z_DEFAULT_COMPRESSION;
-
-       if (AH->compression != 0)
-       {
-               zp->zalloc = Z_NULL;
-               zp->zfree = Z_NULL;
-               zp->opaque = Z_NULL;
+       /* never write 0-byte blocks (this should not happen) */
+       if (len == 0)
+               return 0;
 
-               if (deflateInit(zp, AH->compression) != Z_OK)
-                       die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
-       }
-
-#else
-
-       AH->compression = 0;
-#endif
-
-       /* Just be paranoid - maybe End is called after Start, with no Write */
-       zp->next_out = ctx->zlibOut;
-       zp->avail_out = zlibOutSize;
+       WriteInt(AH, len);
+       return _WriteBuf(AH, buf, len);
 }
 
 /*
- * Send compressed data to the output stream (via ahwrite).
- * Each data chunk is preceded by it's length.
- * In the case of Z0, or no zlib, just write the raw data.
- *
+ * Callback function for ReadDataFromArchive. To keep things simple, we
+ * always read one compressed block at a time.
  */
-static int
-_DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush)
+static size_t
+_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
 {
-       z_streamp       zp = ctx->zp;
+       size_t          blkLen;
+       size_t          cnt;
 
-#ifdef HAVE_LIBZ
-       char       *out = ctx->zlibOut;
-       int                     res = Z_OK;
+       /* Read length */
+       blkLen = ReadInt(AH);
+       if (blkLen == 0)
+               return 0;
 
-       if (AH->compression != 0)
+       /* If the caller's buffer is not large enough, allocate a bigger one */
+       if (blkLen > *buflen)
        {
-               res = deflate(zp, flush);
-               if (res == Z_STREAM_ERROR)
-                       die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg);
-
-               if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize))
-                       || (zp->avail_out == 0)
-                       || (zp->avail_in != 0)
-                       )
-               {
-                       /*
-                        * Extra paranoia: avoid zero-length chunks since a zero
-                        * length chunk is the EOF marker. This should never happen
-                        * but...
-                        */
-                       if (zp->avail_out < zlibOutSize)
-                       {
-                               /*
-                                * printf("Wrote %lu byte deflated chunk\n", (unsigned
-                                * long) (zlibOutSize - zp->avail_out));
-                                */
-                               WriteInt(AH, zlibOutSize - zp->avail_out);
-                               if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out))
-                                       die_horribly(AH, modulename, "could not write compressed chunk\n");
-                               ctx->filePos += zlibOutSize - zp->avail_out;
-                       }
-                       zp->next_out = out;
-                       zp->avail_out = zlibOutSize;
-               }
+               free(*buf);
+               *buf = (char *) pg_malloc(blkLen);
+               *buflen = blkLen;
        }
-       else
-#endif
-       {
-               if (zp->avail_in > 0)
-               {
-                       WriteInt(AH, zp->avail_in);
-                       if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in)
-                               die_horribly(AH, modulename, "could not write uncompressed chunk\n");
-                       ctx->filePos += zp->avail_in;
-                       zp->avail_in = 0;
-               }
-               else
-               {
-#ifdef HAVE_LIBZ
-                       if (flush == Z_FINISH)
-                               res = Z_STREAM_END;
-#endif
-               }
-
-
-       }
-
-#ifdef HAVE_LIBZ
-       return res;
-#else
-       return 1;
-#endif
-
-}
-
-/*
- * Terminate zlib context and flush it's buffers. If no zlib
- * then just return.
- *
- */
-static void
-_EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
-{
-
-#ifdef HAVE_LIBZ
-       lclContext *ctx = (lclContext *) AH->formatData;
-       z_streamp       zp = ctx->zp;
-       int                     res;
 
-       if (AH->compression != 0)
+       cnt = _ReadBuf(AH, *buf, blkLen);
+       if (cnt != blkLen)
        {
-               zp->next_in = NULL;
-               zp->avail_in = 0;
-
-               do
-               {
-                       /* printf("Ending data output\n"); */
-                       res = _DoDeflate(AH, ctx, Z_FINISH);
-               } while (res != Z_STREAM_END);
-
-               if (deflateEnd(zp) != Z_OK)
-                       die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg);
+               if (feof(AH->FH))
+                       die_horribly(AH, modulename,
+                                                "could not read from input file: end of file\n");
+               else
+                       die_horribly(AH, modulename,
+                                       "could not read from input file: %s\n", strerror(errno));
        }
-#endif
-
-       /* Send the end marker */
-       WriteInt(AH, 0);
+       return cnt;
 }