#include <zlib.h>
#endif
+#include "common/string.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "pqexpbuffer.h"
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
- NULL, false))
+ NULL, false, true))
/*
* Any errors will already have been reported in the function process,
logstreamer_param *param;
uint32 hi,
lo;
+ char statusdir[MAXPGPATH];
param = pg_malloc0(sizeof(logstreamer_param));
param->timeline = timeline;
/* Error message already written in GetConnection() */
exit(1);
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+
/*
- * Always in plain format, so we can write to basedir/pg_xlog. But the
- * directory entry in the tar file may arrive later, so make sure it's
- * created before we start.
+ * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
+ * basedir/pg_xlog as the directory entry in the tar file may arrive
+ * later.
*/
- snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
- verify_dir_is_empty_or_create(param->xlogdir);
+ snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+ basedir);
+
+ if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+ {
+ fprintf(stderr,
+ _("%s: could not create directory \"%s\": %s\n"),
+ progname, statusdir, strerror(errno));
+ disconnect_and_exit(1);
+ }
/*
* Start a child process and tell it to start streaming. On Unix, this is
* by the wal receiver process. Also, when transaction
* log directory location was specified, pg_xlog has
* already been created as a symbolic link before
- * starting the actual backup. So just ignore failure
- * on them.
+ * starting the actual backup. So just ignore creation
+ * failures on related directories.
*/
- if ((!streamwal && (strcmp(xlog_dir, "") == 0))
- || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+ if (!((pg_str_endswith(filename, "/pg_xlog") ||
+ pg_str_endswith(filename, "/archive_status")) &&
+ errno == EEXIST))
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
- bool synchronous);
+ bool synchronous, bool mark_done);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix);
+ char *partial_suffix, bool mark_done);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
- XLogRecPtr *stoppos);
+ XLogRecPtr *stoppos, bool mark_done);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop,
- char *partial_suffix, XLogRecPtr *stoppos);
+ char *partial_suffix, XLogRecPtr *stoppos,
+ bool mark_done);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+ int fd;
+ static char tmppath[MAXPGPATH];
+
+ snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+ basedir, fname);
+
+ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ if (fsync(fd) != 0)
+ {
+ fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ close(fd);
+
+ return true;
+}
+
/*
* Open a new WAL file in the specified directory.
*
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
{
off_t currpos;
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ /*
+ * Mark file as archived if requested by the caller - pg_basebackup needs
+ * to do so as files can otherwise get archived again after promotion of a
+ * new node. This is in line with walreceiver.c always doing a
+ * XLogArchiveForceDone() after a complete segment.
+ */
+ if (currpos == XLOG_SEG_SIZE && mark_done)
+ {
+ /* writes error message if failed */
+ if (!mark_file_as_archived(basedir, current_walfile_name))
+ return false;
+ }
+
lastFlushPosition = pos;
return true;
}
}
static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
+ char *content, bool mark_done)
{
int size = strlen(content);
char path[MAXPGPATH];
return false;
}
+ /* Maintain archive_status, check close_walfile() for details. */
+ if (mark_done)
+ {
+ /* writes error message if failed */
+ if (!mark_file_as_archived(basedir, histfname))
+ return false;
+ }
+
return true;
}
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- bool synchronous)
+ bool synchronous, bool mark_done)
{
char query[128];
char slotcmd[128];
/* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
- PQgetvalue(res, 0, 1));
+ PQgetvalue(res, 0, 1),
+ mark_done);
PQclear(res);
}
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
- &stoppos, synchronous);
+ &stoppos, synchronous, mark_done);
if (res == NULL)
goto error;
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos, bool synchronous)
+ XLogRecPtr *stoppos, bool synchronous, bool mark_done)
{
char *copybuf = NULL;
int64 last_status = -1;
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
- stream_stop, partial_suffix, stoppos))
+ stream_stop, partial_suffix, stoppos,
+ mark_done))
goto error;
now = feGetCurrentTimestamp();
if (r == -2)
{
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
- basedir, partial_suffix, stoppos);
+ basedir, partial_suffix,
+ stoppos, mark_done);
if (res == NULL)
goto error;
else
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
- timeline, basedir, stream_stop, partial_suffix))
+ timeline, basedir, stream_stop,
+ partial_suffix, true))
goto error;
/*
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
- stream_stop, partial_suffix, stoppos))
+ stream_stop, partial_suffix, stoppos,
+ mark_done))
goto error;
}
else
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix)
+ char *partial_suffix, bool mark_done)
{
int xlogoff;
int bytes_left;
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix, *blockpos))
+ if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
/* Error message written in close_walfile() */
return false;
static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
- XLogRecPtr *stoppos)
+ XLogRecPtr *stoppos, bool mark_done)
{
PGresult *res = PQgetResult(conn);
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix, blockpos))
+ if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Error message written in close_walfile() */
PQclear(res);
static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix, XLogRecPtr *stoppos)
+ char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
{
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix, blockpos))
+ if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Potential error message is written by close_walfile */
return false;