{
PGresult *res;
char *sysidentifier;
- uint32 latesttli;
- uint32 starttli;
+ TimeLineID latesttli;
+ TimeLineID starttli;
char *basebkp;
char escaped_label[MAXPGPATH];
char *maxrate_clause = NULL;
/*
* Run IDENTIFY_SYSTEM so we can get the timeline
*/
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
disconnect_and_exit(1);
- }
- if (PQntuples(res) != 1 || PQnfields(res) < 3)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 3);
- disconnect_and_exit(1);
- }
- sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
- latesttli = atoi(PQgetvalue(res, 0, 1));
- PQclear(res);
/*
* Start the actual backup
static void
StreamLog(void)
{
- PGresult *res;
- XLogRecPtr startpos;
- uint32 starttli;
- XLogRecPtr serverpos;
- uint32 servertli;
- uint32 hi,
- lo;
+ XLogRecPtr startpos, serverpos;
+ TimeLineID starttli, servertli;
/*
* Connect in replication mode to the server
}
/*
- * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
- * position.
+ * Identify server, obtaining start LSN position and current timeline ID
+ * at the same time, necessary if not valid data can be found in the
+ * existing output directory.
*/
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
- disconnect_and_exit(1);
- }
- if (PQntuples(res) != 1 || PQnfields(res) < 3)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 3);
+ if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
disconnect_and_exit(1);
- }
- servertli = atoi(PQgetvalue(res, 0, 1));
- if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
- {
- fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
- progname, PQgetvalue(res, 0, 2));
- disconnect_and_exit(1);
- }
- serverpos = ((uint64) hi) << 32 | lo;
- PQclear(res);
/*
* Figure out where to start streaming.
int
main(int argc, char **argv)
{
- PGresult *res;
static struct option long_options[] = {
/* general options */
{"file", required_argument, NULL, 'f'},
int option_index;
uint32 hi,
lo;
+ char *db_name;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
#endif
/*
- * don't really need this but it actually helps to get more precise error
- * messages about authentication, required GUCs and such without starting
- * to loop around connection attempts lateron.
+ * Obtain a connection to server. This is not really necessary but it
+ * helps to get more precise error messages about authentification,
+ * required GUC parameters and such.
*/
- {
- conn = GetConnection();
- if (!conn)
- /* Error message already written in GetConnection() */
- exit(1);
+ conn = GetConnection();
+ if (!conn)
+ /* Error message already written in GetConnection() */
+ exit(1);
- /*
- * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
- * position.
- */
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
- disconnect_and_exit(1);
- }
+ /*
+ * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
+ * replication connection.
+ */
+ if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
+ disconnect_and_exit(1);
- if (PQntuples(res) != 1 || PQnfields(res) < 4)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 4);
- disconnect_and_exit(1);
- }
- PQclear(res);
+ if (db_name == NULL)
+ {
+ fprintf(stderr,
+ _("%s: failed to establish database specific replication connection\n"),
+ progname);
+ disconnect_and_exit(1);
}
-
- /*
- * drop a replication slot
- */
+ /* Drop a replication slot. */
if (do_drop_slot)
{
- char query[256];
-
if (verbose)
fprintf(stderr,
_("%s: dropping replication slot \"%s\"\n"),
progname, replication_slot);
- snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
- replication_slot);
- res = PQexec(conn, query);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query, PQerrorMessage(conn));
+ if (!DropReplicationSlot(conn, replication_slot))
disconnect_and_exit(1);
- }
-
- if (PQntuples(res) != 0 || PQnfields(res) != 0)
- {
- fprintf(stderr,
- _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0);
- disconnect_and_exit(1);
- }
-
- PQclear(res);
- disconnect_and_exit(0);
}
- /*
- * create a replication slot
- */
+ /* Create a replication slot. */
if (do_create_slot)
{
- char query[256];
-
if (verbose)
fprintf(stderr,
_("%s: creating replication slot \"%s\"\n"),
progname, replication_slot);
- snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
- replication_slot, plugin);
-
- res = PQexec(conn, query);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query, PQerrorMessage(conn));
+ if (!CreateReplicationSlot(conn, replication_slot, plugin,
+ &startpos, false))
disconnect_and_exit(1);
- }
-
- if (PQntuples(res) != 1 || PQnfields(res) != 4)
- {
- fprintf(stderr,
- _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4);
- disconnect_and_exit(1);
- }
-
- if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
- {
- fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
- progname, PQgetvalue(res, 0, 1));
- disconnect_and_exit(1);
- }
- startpos = ((uint64) hi) << 32 | lo;
-
- replication_slot = strdup(PQgetvalue(res, 0, 0));
- PQclear(res);
}
-
if (!do_start_slot)
disconnect_and_exit(0);
+ /* Stream loop */
while (true)
{
- StreamLog();
+ StreamLogicalLog();
if (time_to_abort)
{
/*
#include "receivelog.h"
#include "streamutil.h"
+#include "pqexpbuffer.h"
#include "common/fe_memutils.h"
#include "datatype/timestamp.h"
return tmpconn;
}
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - System identifier
+ * - Plugin name
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+ XLogRecPtr *startpos, char **db_name)
+{
+ PGresult *res;
+ uint32 hi, lo;
+
+ /* Check connection existence */
+ Assert(conn != NULL);
+
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ return false;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 3)
+ {
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 3);
+ return false;
+ }
+
+ /* Get system identifier */
+ if (sysid != NULL)
+ *sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+ /* Get timeline ID to start streaming from */
+ if (starttli != NULL)
+ *starttli = atoi(PQgetvalue(res, 0, 1));
+
+ /* Get LSN start position if necessary */
+ if (startpos != NULL)
+ {
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ return false;
+ }
+ *startpos = ((uint64) hi) << 32 | lo;
+ }
+
+ /* Get database name, only available in 9.4 and newer versions */
+ if (db_name != NULL)
+ {
+ if (PQnfields(res) < 4)
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 4);
+
+ if (PQgetisnull(res, 0, 3))
+ *db_name = NULL;
+ else
+ *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+ }
+
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
+ XLogRecPtr *startpos, bool is_physical)
+{
+ PQExpBuffer query;
+ PGresult *res;
+
+ query = createPQExpBuffer();
+
+ Assert((is_physical && plugin == NULL) ||
+ (!is_physical && plugin != NULL));
+ Assert(slot_name != NULL);
+
+ /* Build query */
+ if (is_physical)
+ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+ slot_name);
+ else
+ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+ slot_name, plugin);
+
+ res = PQexec(conn, query->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query->data, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 1 || PQnfields(res) != 4)
+ {
+ fprintf(stderr,
+ _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, slot_name,
+ PQntuples(res), PQnfields(res), 1, 4);
+ return false;
+ }
+
+ /* Get LSN start position if necessary */
+ if (startpos != NULL)
+ {
+ uint32 hi, lo;
+
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 1));
+ return false;
+ }
+ *startpos = ((uint64) hi) << 32 | lo;
+ }
+
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, const char *slot_name)
+{
+ PQExpBuffer query;
+ PGresult *res;
+
+ Assert(slot_name != NULL);
+
+ query = createPQExpBuffer();
+
+ /* Build query */
+ appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
+ slot_name);
+ res = PQexec(conn, query->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query->data, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 0 || PQnfields(res) != 0)
+ {
+ fprintf(stderr,
+ _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, slot_name,
+ PQntuples(res), PQnfields(res), 0, 0);
+ return false;
+ }
+
+ PQclear(res);
+ return true;
+}
+
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
+ * backend code. The replication protocol always uses integer timestamps,
+ * regardless of the server setting.
*/
int64
feGetCurrentTimestamp(void)
#include "libpq-fe.h"
+#include "access/xlogdefs.h"
+
extern const char *progname;
extern char *connection_string;
extern char *dbhost;
extern PGconn *GetConnection(void);
+/* Replication commands */
+extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
+ const char *plugin, XLogRecPtr *startpos,
+ bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
+extern bool RunIdentifySystem(PGconn *conn, char **sysid,
+ TimeLineID *starttli,
+ XLogRecPtr *startpos,
+ char **db_name);
extern int64 feGetCurrentTimestamp(void);
extern void feTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs);