#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
+/* fd for currently open WAL file */
+static int walfile = -1;
+
+
/*
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
{
fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
progname, fn, strerror(errno));
+ free(zerobuf);
close(f);
unlink(fn);
return -1;
* completed writing the whole segment.
*/
static bool
-close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
+close_walfile(char *basedir, char *walname, bool segment_complete)
{
off_t currpos = lseek(walfile, 0, SEEK_CUR);
{
fprintf(stderr, _("%s: could not close file %s: %s\n"),
progname, walname, strerror(errno));
+ walfile = -1;
return false;
}
+ walfile = -1;
/*
* Rename the .partial file only if we've completed writing the whole
char current_walfile_name[MAXPGPATH];
PGresult *res;
char *copybuf = NULL;
- int walfile = -1;
int64 last_status = -1;
XLogRecPtr blockpos = InvalidXLogRecPtr;
{
fprintf(stderr, _("%s: could not start replication: %s\n"),
progname, PQresultErrorMessage(res));
+ PQclear(res);
return false;
}
PQclear(res);
*/
if (stream_stop && stream_stop(blockpos, timeline, false))
{
- if (walfile != -1)
+ if (walfile != -1 && !close_walfile(basedir, current_walfile_name, rename_partial))
/* Potential error message is written by close_walfile */
- return close_walfile(walfile, basedir, current_walfile_name, rename_partial);
+ goto error;
return true;
}
{
fprintf(stderr, _("%s: could not send feedback packet: %s"),
progname, PQerrorMessage(conn));
- return false;
+ goto error;
}
last_status = now;
{
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
- return false;
+ goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
progname, PQerrorMessage(conn));
- return false;
+ goto error;
}
continue;
}
{
fprintf(stderr, _("%s: could not read copy data: %s\n"),
progname, PQerrorMessage(conn));
- return false;
+ goto error;
}
if (copybuf[0] == 'k')
{
{
fprintf(stderr, _("%s: keepalive message is incorrect size: %d\n"),
progname, r);
- return false;
+ goto error;
}
continue;
}
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
- return false;
+ goto error;
}
if (r < STREAMING_HEADER_SIZE + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
- return false;
+ goto error;
}
/* Extract WAL location for this block */
{
fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
progname, xlogoff);
- return false;
+ goto error;
}
}
else
{
fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
- return false;
+ goto error;
}
}
basedir, current_walfile_name);
if (walfile == -1)
/* Error logged by open_walfile */
- return false;
+ goto error;
}
if (write(walfile,
bytes_to_write,
current_walfile_name,
strerror(errno));
- return false;
+ goto error;
}
/* Write was successful, advance our position */
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(walfile, basedir, current_walfile_name, false))
+ if (!close_walfile(basedir, current_walfile_name, false))
/* Error message written in close_walfile() */
- return false;
+ goto error;
- walfile = -1;
xlogoff = 0;
if (stream_stop != NULL)
{
fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
progname, PQresultErrorMessage(res));
- return false;
+ goto error;
}
PQclear(res);
+
+ if (copybuf != NULL)
+ PQfreemem(copybuf);
+ if (walfile != -1 && close(walfile) != 0)
+ fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ progname, current_walfile_name, strerror(errno));
return true;
+
+error:
+ if (copybuf != NULL)
+ PQfreemem(copybuf);
+ if (walfile != -1 && close(walfile) != 0)
+ fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ progname, current_walfile_name, strerror(errno));
+ return false;
}