<!ENTITY pgCtl SYSTEM "pg_ctl-ref.sgml">
<!ENTITY pgDump SYSTEM "pg_dump.sgml">
<!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml">
+<!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml">
<!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml">
<!ENTITY pgRestore SYSTEM "pg_restore.sgml">
<!ENTITY postgres SYSTEM "postgres-ref.sgml">
</varlistentry>
<varlistentry>
- <term><option>-x</option></term>
- <term><option>--xlog</option></term>
+ <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
+ <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
to consult the log archive, thus making this a completely standalone
backup.
</para>
- <note>
- <para>
- The transaction log files are collected at the end of the backup.
- Therefore, it is necessary for the
- <xref linkend="guc-wal-keep-segments"> parameter to be set high
- enough that the log is not removed before the end of the backup.
- If the log has been rotated when it's time to transfer it, the
- backup will fail and be unusable.
- </para>
- </note>
+ <para>
+ The following methods for collecting the transaction logs are
+ supported:
+
+ <variablelist>
+ <varlistentry>
+ <term><literal>f</literal></term>
+ <term><literal>fetch</literal></term>
+ <listitem>
+ <para>
+ The transaction log files are collected at the end of the backup.
+ Therefore, it is necessary for the
+ <xref linkend="guc-wal-keep-segments"> parameter to be set high
+ enough that the log is not removed before the end of the backup.
+ If the log has been rotated when it's time to transfer it, the
+ backup will fail and be unusable.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>s</literal></term>
+ <term><literal>stream</literal></term>
+ <listitem>
+ <para>
+ Stream the transaction log while the backup is created. This will
+ open a second connection to the server and start streaming the
+ transaction log in parallel while running the backup. Therefore,
+ it will use up two slots configured by the
+ <xref linkend="guc-max-wal-senders"> parameter. As long as the
+ client can keep up with transaction log received, using this mode
+ requires no extra transaction logs to be saved on the master.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
</listitem>
</varlistentry>
The following command-line options control the database connection parameters.
<variablelist>
+ <varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required when streaming the transaction log (using
+ <literal>--xlog=stream</literal>) if replication timeout is configured
+ on the server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>-h <replaceable class="parameter">host</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
--- /dev/null
+<!--
+doc/src/sgml/ref/pg_receivexlog.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgreceivexlog">
+ <refmeta>
+ <refentrytitle>pg_receivexlog</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>pg_receivexlog</refname>
+ <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgreceivexlog">
+ <primary>pg_receivexlog</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>pg_receivexlog</command>
+ <arg rep="repeat"><replaceable>option</></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>
+ Description
+ </title>
+ <para>
+ <application>pg_receivexlog</application> is used to stream transaction log
+ from a running <productname>PostgreSQL</productname> cluster. The transaction
+ log is streamed using the streaming replication protocol, and is written
+ to a local directory of files. This directory can be used as the archive
+ location for doing a restore using point-in-time recovery (see
+ <xref linkend="continuous-archiving">).
+ </para>
+
+ <para>
+ <application>pg_receivexlog</application> streams the transaction
+ log in real time as it's being generated on the server, and does not wait
+ for segments to complete like <xref linkend="guc-archive-command"> does.
+ For this reason, it is not necessary to set
+ <xref linkend="guc-archive-timeout"> when using
+ <application>pg_receivexlog</application>.
+ </para>
+
+ <para>
+ The transaction log is streamed over a regular
+ <productname>PostgreSQL</productname> connection, and uses the
+ replication protocol. The connection must be
+ made with a user having <literal>REPLICATION</literal> permissions (see
+ <xref linkend="role-attributes">), and the user must be granted explicit
+ permissions in <filename>pg_hba.conf</filename>. The server must also
+ be configured with <xref linkend="guc-max-wal-senders"> set high enough
+ to leave at least one session available for the stream.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Options</title>
+
+ <para>
+ The following command-line options control the location and format of the
+ output.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+ <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+ <listitem>
+ <para>
+ Directory to write the output to.
+ </para>
+ <para>
+ This parameter is required.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ <para>
+ The following command-line options control the running of the program.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-v</option></term>
+ <term><option>--verbose</option></term>
+ <listitem>
+ <para>
+ Enables verbose mode.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ <para>
+ The following command-line options control the database connection parameters.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required if replication timeout is configured on the
+ server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+ <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the host name of the machine on which the server is
+ running. If the value begins with a slash, it is used as the
+ directory for the Unix domain socket. The default is taken
+ from the <envar>PGHOST</envar> environment variable, if set,
+ else a Unix domain socket connection is attempted.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+ <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the TCP port or local Unix domain socket file
+ extension on which the server is listening for connections.
+ Defaults to the <envar>PGPORT</envar> environment variable, if
+ set, or a compiled-in default.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-U <replaceable>username</replaceable></option></term>
+ <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+ <listitem>
+ <para>
+ User name to connect as.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-w</></term>
+ <term><option>--no-password</></term>
+ <listitem>
+ <para>
+ Never issue a password prompt. If the server requires
+ password authentication and a password is not available by
+ other means such as a <filename>.pgpass</filename> file, the
+ connection attempt will fail. This option can be useful in
+ batch jobs and scripts where no user is present to enter a
+ password.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-W</option></term>
+ <term><option>--password</option></term>
+ <listitem>
+ <para>
+ Force <application>pg_receivexlog</application> to prompt for a
+ password before connecting to a database.
+ </para>
+
+ <para>
+ This option is never essential, since
+ <application>pg_receivexlog</application> will automatically prompt
+ for a password if the server demands password authentication.
+ However, <application>pg_receivexlog</application> will waste a
+ connection attempt finding out that the server wants a password.
+ In some cases it is worth typing <option>-W</> to avoid the extra
+ connection attempt.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
+ <para>
+ Other, less commonly used, parameters are also available:
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-V</></term>
+ <term><option>--version</></term>
+ <listitem>
+ <para>
+ Print the <application>pg_receivexlog</application> version and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-?</></term>
+ <term><option>--help</></term>
+ <listitem>
+ <para>
+ Show help about <application>pg_receivexlog</application> command line
+ arguments, and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Environment</title>
+
+ <para>
+ This utility, like most other <productname>PostgreSQL</> utilities,
+ uses the environment variables supported by <application>libpq</>
+ (see <xref linkend="libpq-envars">).
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Notes</title>
+
+ <para>
+ When using <application>pg_receivexlog</application> instead of
+ <xref linkend="guc-archive-command">, the server will continue to
+ recycle transaction log files even if the backups are not properly
+ archived, since there is no command that fails. This can be worked
+ around by having an <xref linkend="guc-archive-command"> that fails
+ when the file has not been properly archived yet.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ To stream the transaction log from the server at
+ <literal>mydbserver</literal> and store it in the local directory
+ <filename>/usr/local/pgsql/archive</filename>:
+ <screen>
+ <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /home/pgbackup/archive</userinput>
+ </screen>
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>See Also</title>
+
+ <simplelist type="inline">
+ <member><xref linkend="APP-PGBASEBACKUP"></member>
+ </simplelist>
+ </refsect1>
+
+</refentry>
&pgConfig;
&pgDump;
&pgDumpall;
+ &pgReceivexlog;
&pgRestore;
&psqlRef;
&reindexdb;
/pg_basebackup
+/pg_receivexlog
\ No newline at end of file
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
-OBJS= pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
- $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+ $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+ $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
clean distclean maintainer-clean:
- rm -f pg_basebackup$(X) $(OBJS)
+ rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
*-------------------------------------------------------------------------
*/
-#include "postgres_fe.h"
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#include "getopt_long.h"
+#include "receivelog.h"
+#include "streamutil.h"
+
/* Global options */
-static const char *progname;
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
+bool streamwal = false;
bool fastcheckpoint = false;
-char *dbhost = NULL;
-char *dbuser = NULL;
-char *dbport = NULL;
-int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
+int standby_message_timeout = 10; /* 10 sec = default */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+#ifndef WIN32
+static int bgpipe[2] = {-1, -1};
+#endif
-#define disconnect_and_exit(code) \
- { \
- if (conn != NULL) PQfinish(conn); \
- exit(code); \
- }
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static int has_xlogendptr = 0;
/* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
-static PGconn *GetConnection(void);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
}
#endif
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
- char *result;
-
- result = strdup(s);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- return result;
-}
-
-static void *
-xmalloc0(int size)
-{
- void *result;
-
- result = malloc(size);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- MemSet(result, 0, size);
- return result;
-}
-
static void
usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
- printf(_(" -x, --xlog include required WAL files in backup\n"));
+ printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
printf(_(" --help show this help, then exit\n"));
printf(_(" --version output version information, then exit\n"));
printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
}
+/*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+ if (!has_xlogendptr)
+ {
+#ifndef WIN32
+ fd_set fds;
+ struct timeval tv;
+ int r;
+
+ /*
+ * Don't have the end pointer yet - check our pipe to see if it has
+ * been sent yet.
+ */
+ FD_ZERO(&fds);
+ FD_SET(bgpipe[0], &fds);
+
+ MemSet(&tv, 0, sizeof(tv));
+
+ r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+ if (r == 1)
+ {
+ char xlogend[64];
+
+ MemSet(xlogend, 0, sizeof(xlogend));
+ r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+ if (r < 0)
+ {
+ fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+ progname, strerror(errno));
+ exit(1);
+ }
+
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ has_xlogendptr = 1;
+
+ /*
+ * Fall through to check if we've reached the point further
+ * already.
+ */
+ }
+ else
+ {
+ /*
+ * No data received on the pipe means we don't know the end
+ * position yet - so just say it's not time to stop yet.
+ */
+ return false;
+ }
+#else
+
+ /*
+ * On win32, has_xlogendptr is set by the main thread, so if it's not
+ * set here, we just go back and wait until it shows up.
+ */
+ return false;
+#endif
+ }
+
+ /*
+ * At this point we have an end pointer, so compare it to the current
+ * position to figure out if it's time to stop.
+ */
+ if (segendpos.xlogid > xlogendptr.xlogid ||
+ (segendpos.xlogid == xlogendptr.xlogid &&
+ segendpos.xrecoff >= xlogendptr.xrecoff))
+ return true;
+
+ /*
+ * Have end pointer, but haven't reached it yet - so tell the caller to
+ * keep streaming.
+ */
+ return false;
+}
+
+typedef struct
+{
+ PGconn *bgconn;
+ XLogRecPtr startptr;
+ char xlogdir[MAXPGPATH];
+ char *sysidentifier;
+ int timeline;
+} logstreamer_param;
+
+static int
+LogStreamerMain(logstreamer_param * param)
+{
+ if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
+ param->sysidentifier, param->xlogdir,
+ segment_callback, NULL, standby_message_timeout))
+
+ /*
+ * Any errors will already have been reported in the function process,
+ * but we need to tell the parent that we didn't shutdown in a nice
+ * way.
+ */
+ return 1;
+
+ PQfinish(param->bgconn);
+ return 0;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
+{
+ logstreamer_param *param;
+
+ param = xmalloc0(sizeof(logstreamer_param));
+ param->timeline = timeline;
+ param->sysidentifier = sysidentifier;
+
+ /* Convert the starting position */
+ if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+ progname, startpos);
+ disconnect_and_exit(1);
+ }
+ /* Round off to even segment position */
+ param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+#ifndef WIN32
+ /* Create our background pipe */
+ if (pgpipe(bgpipe) < 0)
+ {
+ fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+#endif
+
+ /* Get a second connection */
+ param->bgconn = GetConnection();
+
+ /*
+ * Always in plain format, so we can write to basedir/pg_xlog. But the
+ * directory entry in the tar file may arrive later, so make sure it's
+ * created before we start.
+ */
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+ verify_dir_is_empty_or_create(param->xlogdir);
+
+ /*
+ * Start a child process and tell it to start streaming. On Unix, this is
+ * a fork(). On Windows, we create a thread.
+ */
+#ifndef WIN32
+ bgchild = fork();
+ if (bgchild == 0)
+ {
+ /* in child process */
+ exit(LogStreamerMain(param));
+ }
+ else if (bgchild < 0)
+ {
+ fprintf(stderr, _("%s: could not create background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+#else /* WIN32 */
+ bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+ if (bgchild == 0)
+ {
+ fprintf(stderr, _("%s: could not create background thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+#endif
+}
+
/*
* Verify that the given directory exists and is empty. If it does not
* exist, it is created. If it exists but is not empty, an error will
else
strcpy(current_path, PQgetvalue(res, rownum, 1));
- /*
- * Make sure we're unpacking into an empty directory
- */
- verify_dir_is_empty_or_create(current_path);
-
/*
* Get the COPY data
*/
/*
* Directory
*/
- filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
- fprintf(stderr,
+ /*
+ * When streaming WAL, pg_xlog will have been created
+ * by the wal receiver process, so just ignore failure
+ * on that.
+ */
+ if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+ {
+ fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
- progname, filename, strerror(errno));
- disconnect_and_exit(1);
+ progname, filename, strerror(errno));
+ disconnect_and_exit(1);
+ }
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
/*
* Symbolic link
*/
- filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(©buf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
- progname, filename, ©buf[157], strerror(errno));
+ progname, filename, ©buf[157], strerror(errno));
disconnect_and_exit(1);
}
}
}
-static PGconn *
-GetConnection(void)
-{
- PGconn *tmpconn;
- int argcount = 4; /* dbname, replication, fallback_app_name,
- * password */
- int i;
- const char **keywords;
- const char **values;
- char *password = NULL;
-
- if (dbhost)
- argcount++;
- if (dbuser)
- argcount++;
- if (dbport)
- argcount++;
-
- keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
- values = xmalloc0((argcount + 1) * sizeof(*values));
-
- keywords[0] = "dbname";
- values[0] = "replication";
- keywords[1] = "replication";
- values[1] = "true";
- keywords[2] = "fallback_application_name";
- values[2] = progname;
- i = 3;
- if (dbhost)
- {
- keywords[i] = "host";
- values[i] = dbhost;
- i++;
- }
- if (dbuser)
- {
- keywords[i] = "user";
- values[i] = dbuser;
- i++;
- }
- if (dbport)
- {
- keywords[i] = "port";
- values[i] = dbport;
- i++;
- }
-
- while (true)
- {
- if (dbgetpassword == 1)
- {
- /* Prompt for a password */
- password = simple_prompt(_("Password: "), 100, false);
- keywords[argcount - 1] = "password";
- values[argcount - 1] = password;
- }
-
- tmpconn = PQconnectdbParams(keywords, values, true);
- if (password)
- free(password);
-
- if (PQstatus(tmpconn) == CONNECTION_BAD &&
- PQconnectionNeedsPassword(tmpconn) &&
- dbgetpassword != -1)
- {
- dbgetpassword = 1; /* ask for password next time */
- PQfinish(tmpconn);
- continue;
- }
-
- if (PQstatus(tmpconn) != CONNECTION_OK)
- {
- fprintf(stderr, _("%s: could not connect to server: %s"),
- progname, PQerrorMessage(tmpconn));
- exit(1);
- }
-
- /* Connection ok! */
- free(values);
- free(keywords);
- return tmpconn;
- }
-}
-
static void
BaseBackup(void)
{
PGresult *res;
+ char *sysidentifier;
+ uint32 timeline;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
*/
conn = GetConnection();
+ /*
+ * Run IDENTIFY_SYSTEM so we can get the timeline
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ sysidentifier = strdup(PQgetvalue(res, 0, 0));
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ PQclear(res);
+
/*
* Start the actual backup
*/
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
- includewal ? "WAL" : "",
+ includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
disconnect_and_exit(1);
}
+ /*
+ * If we're streaming WAL, start the streaming session before we start
+ * receiving the actual data chunks.
+ */
+ if (streamwal)
+ {
+ if (verbose)
+ fprintf(stderr, _("%s: starting background WAL receiver\n"),
+ progname);
+ StartLogStreamer(xlogstart, timeline, sysidentifier);
+ }
+
/*
* Start receiving chunks
*/
disconnect_and_exit(1);
}
+ if (bgchild > 0)
+ {
+ int status;
+
+#ifndef WIN32
+ int r;
+#endif
+
+ if (verbose)
+ fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+#ifndef WIN32
+ if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+ {
+ fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Just wait for the background process to exit */
+ r = waitpid(bgchild, &status, 0);
+ if (r == -1)
+ {
+ fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (r != bgchild)
+ {
+ fprintf(stderr, _("%s: child %i died, expected %i\n"),
+ progname, r, bgchild);
+ disconnect_and_exit(1);
+ }
+ if (!WIFEXITED(status))
+ {
+ fprintf(stderr, _("%s: child process did not exit normally\n"),
+ progname);
+ disconnect_and_exit(1);
+ }
+ if (WEXITSTATUS(status) != 0)
+ {
+ fprintf(stderr, _("%s: child process exited with error %i\n"),
+ progname, WEXITSTATUS(status));
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy! */
+#else /* WIN32 */
+
+ /*
+ * On Windows, since we are in the same process, we can just store the
+ * value directly in the variable, and then set the flag that says
+ * it's there.
+ */
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ InterlockedIncrement(&has_xlogendptr);
+
+ /* First wait for the thread to exit */
+ if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (status != 0)
+ {
+ fprintf(stderr, _("%s: child thread exited with error %u\n"),
+ progname, status);
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy */
+#endif
+ }
+
/*
* End of copy data. Final result is already checked inside the loop.
*/
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
- {"xlog", no_argument, NULL, 'x'},
+ {"xlog", required_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"progress", no_argument, NULL, 'P'},
{NULL, 0, NULL, 0}
}
}
- while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
+ while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
break;
case 'x':
includewal = true;
+ if (strcmp(optarg, "f") == 0 ||
+ strcmp(optarg, "fetch") == 0)
+ streamwal = false;
+ else if (strcmp(optarg, "s") == 0 ||
+ strcmp(optarg, "stream") == 0)
+ streamwal = true;
+ else
+ {
+ fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
+ progname, optarg);
+ exit(1);
+ }
break;
case 'l':
label = xstrdup(optarg);
case 'W':
dbgetpassword = 1;
break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
exit(1);
}
+ if (format != 'p' && streamwal)
+ {
+ fprintf(stderr,
+ _("%s: wal streaming can only be used in plain mode\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ * to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "access/xlog_internal.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "getopt_long.h"
+
+/* Global options */
+char *basedir = NULL;
+int verbose = 0;
+int standby_message_timeout = 10; /* 10 sec = default */
+volatile bool time_to_abort = false;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+static void
+usage(void)
+{
+ printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+ progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION]...\n"), progname);
+ printf(_("\nOptions controlling the output:\n"));
+ printf(_(" -D, --dir=directory receive xlog files into this directory\n"));
+ printf(_("\nGeneral options:\n"));
+ printf(_(" -v, --verbose output verbose messages\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
+ printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
+ printf(_(" -p, --port=PORT database server port number\n"));
+ printf(_(" -U, --username=NAME connect as specified database user\n"));
+ printf(_(" -w, --no-password never prompt for password\n"));
+ printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+ char fn[MAXPGPATH];
+ struct stat statbuf;
+
+ if (verbose)
+ fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+ progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+ /*
+ * Check if there is a partial file for the name we just finished, and if
+ * there is, remove it under the assumption that we have now got all the
+ * data we need.
+ */
+ segendpos.xrecoff /= XLOG_SEG_SIZE;
+ PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+ snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+ basedir, timeline,
+ segendpos.xlogid,
+ segendpos.xrecoff);
+ if (stat(fn, &statbuf) == 0)
+ {
+ /* File existed, get rid of it */
+ if (verbose)
+ fprintf(stderr, _("%s: removing file \"%s\"\n"),
+ progname, fn);
+ unlink(fn);
+ }
+
+ /*
+ * Never abort from this - we handle all aborting in continue_streaming()
+ */
+ return false;
+}
+
+static bool
+continue_streaming(void)
+{
+ if (time_to_abort)
+ {
+ fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
+ progname);
+ return true;
+ }
+ return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ * we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ * WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+ DIR *dir;
+ struct dirent *dirent;
+ int i;
+ bool b;
+ uint32 high_log = 0;
+ uint32 high_seg = 0;
+ bool partial = false;
+
+ dir = opendir(basedir);
+ if (dir == NULL)
+ {
+ fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+ progname, basedir, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ while ((dirent = readdir(dir)) != NULL)
+ {
+ char fullpath[MAXPGPATH];
+ struct stat statbuf;
+ uint32 tli,
+ log,
+ seg;
+
+ if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+ continue;
+
+ /* xlog files are always 24 characters */
+ if (strlen(dirent->d_name) != 24)
+ continue;
+
+ /* Filenames are always made out of 0-9 and A-F */
+ b = false;
+ for (i = 0; i < 24; i++)
+ {
+ if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+ !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+ {
+ b = true;
+ break;
+ }
+ }
+ if (b)
+ continue;
+
+ /*
+ * Looks like an xlog file. Parse its position.
+ */
+ if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+ {
+ fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+ progname, dirent->d_name);
+ disconnect_and_exit(1);
+ }
+
+ /* Ignore any files that are for another timeline */
+ if (tli != currenttimeline)
+ continue;
+
+ /* Check if this is a completed segment or not */
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+ if (stat(fullpath, &statbuf) != 0)
+ {
+ fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ if (statbuf.st_size == 16 * 1024 * 1024)
+ {
+ /* Completed segment */
+ if (log > high_log ||
+ (log == high_log && seg > high_seg))
+ {
+ high_log = log;
+ high_seg = seg;
+ continue;
+ }
+ }
+ else
+ {
+ /*
+ * This is a partial file. Rename it out of the way.
+ */
+ char newfn[MAXPGPATH];
+
+ fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+ progname, dirent->d_name, dirent->d_name);
+
+ snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+ basedir, dirent->d_name);
+
+ if (stat(newfn, &statbuf) == 0)
+ {
+ /*
+ * XXX: perhaps we should only error out if the existing file
+ * is larger?
+ */
+ fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+ progname, newfn);
+ disconnect_and_exit(1);
+ }
+ if (rename(fullpath, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+ progname, fullpath, newfn, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Don't continue looking for more, we assume this is the last */
+ partial = true;
+ break;
+ }
+ }
+
+ closedir(dir);
+
+ if (high_log > 0 || high_seg > 0)
+ {
+ XLogRecPtr high_ptr;
+
+ if (!partial)
+ {
+ /*
+ * If the segment was partial, the pointer is already at the right
+ * location since we want to re-transmit that segment. If it was
+ * not, we need to move it to the next segment, since we are
+ * tracking the last one that was complete.
+ */
+ NextLogSeg(high_log, high_seg);
+ }
+
+ high_ptr.xlogid = high_log;
+ high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
+
+ return high_ptr;
+ }
+ else
+ return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+ PGresult *res;
+ uint32 timeline;
+ XLogRecPtr startpos;
+
+ /*
+ * Connect in replication mode to the server
+ */
+ conn = GetConnection();
+
+ /*
+ * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+ * position.
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ disconnect_and_exit(1);
+ }
+ PQclear(res);
+
+ /*
+ * Figure out where to start streaming.
+ */
+ startpos = FindStreamingStart(startpos, timeline);
+
+ /*
+ * Always start streaming at the beginning of a segment
+ */
+ startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Start the replication
+ */
+ if (verbose)
+ fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+ progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+ ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
+ segment_callback, continue_streaming,
+ standby_message_timeout);
+}
+
+/*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+static void
+sigint_handler(int signum)
+{
+ time_to_abort = true;
+}
+
+int
+main(int argc, char **argv)
+{
+ static struct option long_options[] = {
+ {"help", no_argument, NULL, '?'},
+ {"version", no_argument, NULL, 'V'},
+ {"dir", required_argument, NULL, 'D'},
+ {"host", required_argument, NULL, 'h'},
+ {"port", required_argument, NULL, 'p'},
+ {"username", required_argument, NULL, 'U'},
+ {"no-password", no_argument, NULL, 'w'},
+ {"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
+ {"verbose", no_argument, NULL, 'v'},
+ {NULL, 0, NULL, 0}
+ };
+ int c;
+
+ int option_index;
+
+ progname = get_progname(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage();
+ exit(0);
+ }
+ else if (strcmp(argv[1], "-V") == 0
+ || strcmp(argv[1], "--version") == 0)
+ {
+ puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
+ long_options, &option_index)) != -1)
+ {
+ switch (c)
+ {
+ case 'D':
+ basedir = xstrdup(optarg);
+ break;
+ case 'h':
+ dbhost = xstrdup(optarg);
+ break;
+ case 'p':
+ if (atoi(optarg) <= 0)
+ {
+ fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ dbport = xstrdup(optarg);
+ break;
+ case 'U':
+ dbuser = xstrdup(optarg);
+ break;
+ case 'w':
+ dbgetpassword = -1;
+ break;
+ case 'W':
+ dbgetpassword = 1;
+ break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
+ case 'v':
+ verbose++;
+ break;
+ default:
+
+ /*
+ * getopt_long already emitted a complaint
+ */
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ }
+
+ /*
+ * Any non-option arguments?
+ */
+ if (optind < argc)
+ {
+ fprintf(stderr,
+ _("%s: too many command-line arguments (first is \"%s\")\n"),
+ progname, argv[optind]);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ /*
+ * Required arguments
+ */
+ if (basedir == NULL)
+ {
+ fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+#ifndef WIN32
+ pqsignal(SIGINT, sigint_handler);
+#endif
+
+ StreamLog();
+
+ exit(0);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ * replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "access/xlog_internal.h"
+#include "replication/walprotocol.h"
+#include "utils/datetime.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+ int f;
+ char fn[MAXPGPATH];
+
+ XLogFileName(namebuf, timeline, startpoint.xlogid,
+ startpoint.xrecoff / XLOG_SEG_SIZE);
+
+ snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ if (f == -1)
+ fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ progname, namebuf, strerror(errno));
+ return f;
+}
+
+/*
+ * Local version of GetCurrentTimestamp(), since we are not linked with
+ * backend code.
+ */
+static TimestampTz
+localGetCurrentTimestamp(void)
+{
+ TimestampTz result;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+
+ result = (TimestampTz) tp.tv_sec -
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+#ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+#else
+ result = result + (tp.tv_usec / 1000000.0);
+#endif
+
+ return result;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * If sysidentifier is specified, validate that both the system
+ * identifier and the timeline matches the specified ones
+ * (by sending an extra IDENTIFY_SYSTEM command)
+ *
+ * All received segments will be written to the directory
+ * specified by basedir.
+ *
+ * The segment_finish callback will be called after each segment
+ * has been finished, and the stream_continue callback will be
+ * called every time data is received. If either of these callbacks
+ * return true, the streaming will stop and the function
+ * return. As long as they return false, streaming will continue
+ * indefinitely.
+ *
+ * standby_message_timeout controls how often we send a message
+ * back to the master letting it know our progress, in seconds.
+ * This message will only contain the write location, and never
+ * flush or replay.
+ *
+ * Note: The log position *must* be at a log segment start!
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+{
+ char query[128];
+ char current_walfile_name[MAXPGPATH];
+ PGresult *res;
+ char *copybuf = NULL;
+ int walfile = -1;
+ int64 last_status = -1;
+ XLogRecPtr blockpos = InvalidXLogRecPtr;
+
+ if (sysidentifier != NULL)
+ {
+ /* Validate system identifier and timeline hasn't changed */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+ if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+ {
+ fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ if (timeline != atoi(PQgetvalue(res, 0, 1)))
+ {
+ fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ PQclear(res);
+ }
+
+ /* Initiate the replication stream at specified location */
+ snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ fprintf(stderr, _("%s: could not start replication: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+
+ /*
+ * Receive the actual xlog data
+ */
+ while (1)
+ {
+ int r;
+ int xlogoff;
+ int bytes_left;
+ int bytes_written;
+ int64 now;
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (stream_continue && stream_continue())
+ {
+ if (walfile != -1)
+ {
+ fsync(walfile);
+ close(walfile);
+ }
+ return true;
+ }
+
+ /*
+ * Potentially send a status message to the master
+ */
+ now = localGetCurrentTimestamp();
+ if (standby_message_timeout > 0 &&
+ last_status < now - standby_message_timeout * 1000000)
+ {
+ /* Time to send feedback! */
+ char replybuf[sizeof(StandbyReplyMessage) + 1];
+ StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+
+ replymsg->write = blockpos;
+ replymsg->flush = InvalidXLogRecPtr;
+ replymsg->apply = InvalidXLogRecPtr;
+ replymsg->sendTime = now;
+ replybuf[0] = 'r';
+
+ if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+ PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+
+ last_status = now;
+ }
+
+ r = PQgetCopyData(conn, ©buf, 1);
+ if (r == 0)
+ {
+ /*
+ * In async mode, and no data available. We block on reading but
+ * not more than the specified timeout, so that we can send a
+ * response back to the client.
+ */
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+ if (standby_message_timeout)
+ {
+ timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ if (timeout.tv_sec <= 0)
+ timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ timeout.tv_usec = 0;
+ timeoutptr = &timeout;
+ }
+ else
+ timeoutptr = NULL;
+
+ r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (r == 0 || (r < 0 && errno == EINTR))
+ {
+ /*
+ * Got a timeout or signal. Continue the loop and either
+ * deliver a status packet to the server or just go back into
+ * blocking.
+ */
+ continue;
+ }
+ else if (r < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+ return false;
+ }
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ continue;
+ }
+ if (r == -1)
+ /* End of copy stream */
+ break;
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
+ if (copybuf[0] != 'w')
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
+ return false;
+ }
+
+ /* Extract WAL location for this block */
+ memcpy(&blockpos, copybuf + 1, 8);
+ xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Verify that the initial location in the stream matches where we
+ * think we are.
+ */
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ }
+
+ bytes_left = r - STREAMING_HEADER_SIZE;
+ bytes_written = 0;
+
+ while (bytes_left)
+ {
+ int bytes_to_write;
+
+ /*
+ * If crossing a WAL boundary, only write up until we reach
+ * XLOG_SEG_SIZE.
+ */
+ if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ else
+ bytes_to_write = bytes_left;
+
+ if (walfile == -1)
+ {
+ walfile = open_walfile(blockpos, timeline,
+ basedir, current_walfile_name);
+ if (walfile == -1)
+ /* Error logged by open_walfile */
+ return false;
+ }
+
+ if (write(walfile,
+ copybuf + STREAMING_HEADER_SIZE + bytes_written,
+ bytes_to_write) != bytes_to_write)
+ {
+ fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ progname,
+ bytes_to_write,
+ current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* Write was successful, advance our position */
+ bytes_written += bytes_to_write;
+ bytes_left -= bytes_to_write;
+ XLByteAdvance(blockpos, bytes_to_write);
+ xlogoff += bytes_to_write;
+
+ /* Did we reach the end of a WAL segment? */
+ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+ {
+ fsync(walfile);
+ close(walfile);
+ walfile = -1;
+ xlogoff = 0;
+
+ if (segment_finish != NULL)
+ {
+ /*
+ * Callback when the segment finished, and return if it
+ * told us to.
+ */
+ if (segment_finish(blockpos, timeline))
+ return true;
+ }
+ }
+ }
+ /* No more data left to write, start receiving next copy packet */
+ }
+
+ /*
+ * The only way to get out of the loop is if the server shut down the
+ * replication stream. If it's a controlled shutdown, the server will send
+ * a shutdown message, and we'll return the latest xlog location that has
+ * been streamed.
+ */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+ return true;
+}
--- /dev/null
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * Called before trying to read more data. Return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*stream_continue_callback)(void);
+
+extern bool ReceiveXlogStream(PGconn *conn,
+ XLogRecPtr startpos,
+ uint32 timeline,
+ char *sysidentifier,
+ char *basedir,
+ segment_finish_callback segment_finish,
+ stream_continue_callback stream_continue,
+ int standby_message_timeout);
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "streamutil.h"
+
+#include <stdio.h>
+#include <string.h>
+
+const char *progname;
+char *dbhost = NULL;
+char *dbuser = NULL;
+char *dbport = NULL;
+int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+ char *result;
+
+ result = strdup(s);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ return result;
+}
+
+void *
+xmalloc0(int size)
+{
+ void *result;
+
+ result = malloc(size);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ MemSet(result, 0, size);
+ return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+ PGconn *tmpconn;
+ int argcount = 4; /* dbname, replication, fallback_app_name,
+ * password */
+ int i;
+ const char **keywords;
+ const char **values;
+ char *password = NULL;
+
+ if (dbhost)
+ argcount++;
+ if (dbuser)
+ argcount++;
+ if (dbport)
+ argcount++;
+
+ keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+ values = xmalloc0((argcount + 1) * sizeof(*values));
+
+ keywords[0] = "dbname";
+ values[0] = "replication";
+ keywords[1] = "replication";
+ values[1] = "true";
+ keywords[2] = "fallback_application_name";
+ values[2] = progname;
+ i = 3;
+ if (dbhost)
+ {
+ keywords[i] = "host";
+ values[i] = dbhost;
+ i++;
+ }
+ if (dbuser)
+ {
+ keywords[i] = "user";
+ values[i] = dbuser;
+ i++;
+ }
+ if (dbport)
+ {
+ keywords[i] = "port";
+ values[i] = dbport;
+ i++;
+ }
+
+ while (true)
+ {
+ if (password)
+ free(password);
+
+ if (dbpassword)
+ {
+ /*
+ * We've saved a password when a previous connection succeeded,
+ * meaning this is the call for a second session to the same
+ * database, so just forcibly reuse that password.
+ */
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = dbpassword;
+ dbgetpassword = -1; /* Don't try again if this fails */
+ }
+ else if (dbgetpassword == 1)
+ {
+ password = simple_prompt(_("Password: "), 100, false);
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = password;
+ }
+
+ tmpconn = PQconnectdbParams(keywords, values, true);
+
+ if (PQstatus(tmpconn) == CONNECTION_BAD &&
+ PQconnectionNeedsPassword(tmpconn) &&
+ dbgetpassword != -1)
+ {
+ dbgetpassword = 1; /* ask for password next time */
+ PQfinish(tmpconn);
+ continue;
+ }
+
+ if (PQstatus(tmpconn) != CONNECTION_OK)
+ {
+ fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ progname, PQerrorMessage(tmpconn));
+ exit(1);
+ }
+
+ /* Connection ok! */
+ free(values);
+ free(keywords);
+
+ /* Store the password for next run */
+ if (password)
+ dbpassword = password;
+ return tmpconn;
+ }
+}
--- /dev/null
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code) \
+ { \
+ if (conn != NULL) PQfinish(conn); \
+ exit(code); \
+ }
+
+
+char *xstrdup(const char *s);
+void *xmalloc0(int size);
+
+PGconn *GetConnection(void);
$initdb->AddLibrary('ws2_32.lib');
my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+ $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+ $pgbasebackup->AddLibrary('ws2_32.lib');
+
+ my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+ $pgreceivexlog->{name} = 'pg_receivexlog';
+ $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
+ $pgreceivexlog->AddLibrary('ws2_32.lib');
my $pgconfig = AddSimpleFrontend('pg_config');