streamutil.o \
walmethods.o
+BBOBJS = \
+ pg_basebackup.o \
+ bbstreamer_file.o \
+ bbstreamer_inject.o \
+ bbstreamer_tar.o
+
all: pg_basebackup pg_receivewal pg_recvlogical
-pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
- $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
+ $(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
clean distclean maintainer-clean:
rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
- pg_basebackup.o pg_receivewal.o pg_recvlogical.o \
+ $(BBOBJS) pg_receivewal.o pg_recvlogical.o \
$(OBJS)
rm -rf tmp_check
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer.h
+ *
+ * Each tar archive returned by the server is passed to one or more
+ * bbstreamer objects for further processing. The bbstreamer may do
+ * something simple, like write the archive to a file, perhaps after
+ * compressing it, but it can also do more complicated things, like
+ * annotating the byte stream to indicate which parts of the data
+ * correspond to tar headers or trailing padding, vs. which parts are
+ * payload data. A subsequent bbstreamer may use this information to
+ * make further decisions about how to process the data; for example,
+ * it might choose to modify the archive contents.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef BBSTREAMER_H
+#define BBSTREAMER_H
+
+#include "lib/stringinfo.h"
+#include "pqexpbuffer.h"
+
+struct bbstreamer;
+struct bbstreamer_ops;
+typedef struct bbstreamer bbstreamer;
+typedef struct bbstreamer_ops bbstreamer_ops;
+
+/*
+ * Each chunk of archive data passed to a bbstreamer is classified into one
+ * of these categories. When data is first received from the remote server,
+ * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will
+ * be of whatever size the remote server chose to send.
+ *
+ * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all
+ * chunks should be labelled as one of the other types listed here. In
+ * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and
+ * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if
+ * that means a zero-length call. There can be any number of
+ * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There
+ * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
+ * last BBSTREAMER_MEMBER_TRAILER chunk.
+ *
+ * In theory, we could need other classifications here, such as a way of
+ * indicating an archive header, but the "tar" format doesn't need anything
+ * else, so for the time being there's no point.
+ */
+typedef enum
+{
+ BBSTREAMER_UNKNOWN,
+ BBSTREAMER_MEMBER_HEADER,
+ BBSTREAMER_MEMBER_CONTENTS,
+ BBSTREAMER_MEMBER_TRAILER,
+ BBSTREAMER_ARCHIVE_TRAILER
+} bbstreamer_archive_context;
+
+/*
+ * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER,
+ * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also
+ * pass a pointer to an instance of this struct. The details are expected
+ * to be present in the archive header and used to fill the struct, after
+ * which all subsequent calls for the same archive member are expected to
+ * pass the same details.
+ */
+typedef struct
+{
+ char pathname[MAXPGPATH];
+ pgoff_t size;
+ mode_t mode;
+ uid_t uid;
+ gid_t gid;
+ bool is_directory;
+ bool is_link;
+ char linktarget[MAXPGPATH];
+} bbstreamer_member;
+
+/*
+ * Generally, each type of bbstreamer will define its own struct, but the
+ * first element should be 'bbstreamer base'. A bbstreamer that does not
+ * require any additional private data could use this structure directly.
+ *
+ * bbs_ops is a pointer to the bbstreamer_ops object which contains the
+ * function pointers appropriate to this type of bbstreamer.
+ *
+ * bbs_next is a pointer to the successor bbstreamer, for those types of
+ * bbstreamer which forward data to a successor. It need not be used and
+ * should be set to NULL when not relevant.
+ *
+ * bbs_buffer is a buffer for accumulating data for temporary storage. Each
+ * type of bbstreamer makes its own decisions about whether and how to use
+ * this buffer.
+ */
+struct bbstreamer
+{
+ const bbstreamer_ops *bbs_ops;
+ bbstreamer *bbs_next;
+ StringInfoData bbs_buffer;
+};
+
+/*
+ * There are three callbacks for a bbstreamer. The 'content' callback is
+ * called repeatedly, as described in the bbstreamer_archive_context comments.
+ * Then, the 'finalize' callback is called once at the end, to give the
+ * bbstreamer a chance to perform cleanup such as closing files. Finally,
+ * because this code is running in a frontend environment where, as of this
+ * writing, there are no memory contexts, the 'free' callback is called to
+ * release memory. These callbacks should always be invoked using the static
+ * inline functions defined below.
+ */
+struct bbstreamer_ops
+{
+ void (*content) (bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+ void (*finalize) (bbstreamer *streamer);
+ void (*free) (bbstreamer *streamer);
+};
+
+/* Send some content to a bbstreamer. */
+static inline void
+bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->content(streamer, member, data, len, context);
+}
+
+/* Finalize a bbstreamer. */
+static inline void
+bbstreamer_finalize(bbstreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->finalize(streamer);
+}
+
+/* Free a bbstreamer. */
+static inline void
+bbstreamer_free(bbstreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->free(streamer);
+}
+
+/*
+ * This is a convenience method for use when implementing a bbstreamer; it is
+ * not for use by outside callers. It adds the amount of data specified by
+ * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data'
+ * accordingly.
+ */
+static inline void
+bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len,
+ int nbytes)
+{
+ Assert(nbytes <= *len);
+
+ appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
+ *len -= nbytes;
+ *data += nbytes;
+}
+
+/*
+ * This is a convenence method for use when implementing a bbstreamer; it is
+ * not for use by outsider callers. It attempts to add enough data to the
+ * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len'
+ * and '*data' accordingly. It returns true if the target length has been
+ * reached and false otherwise.
+ */
+static inline bool
+bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len,
+ int target_bytes)
+{
+ int buflen = streamer->bbs_buffer.len;
+
+ if (buflen >= target_bytes)
+ {
+ /* Target length already reached; nothing to do. */
+ return true;
+ }
+
+ if (buflen + *len < target_bytes)
+ {
+ /* Not enough data to reach target length; buffer all of it. */
+ bbstreamer_buffer_bytes(streamer, data, len, *len);
+ return false;
+ }
+
+ /* Buffer just enough to reach the target length. */
+ bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
+ return true;
+}
+
+/*
+ * Functions for creating bbstreamer objects of various types. See the header
+ * comments for each of these functions for details.
+ */
+extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file);
+extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file,
+ int compresslevel);
+extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *));
+
+extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
+
+extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents);
+extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname,
+ char *data, int len);
+
+#endif
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_file.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_file.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include <unistd.h>
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+typedef struct bbstreamer_plain_writer
+{
+ bbstreamer base;
+ char *pathname;
+ FILE *file;
+ bool should_close_file;
+} bbstreamer_plain_writer;
+
+#ifdef HAVE_LIBZ
+typedef struct bbstreamer_gzip_writer
+{
+ bbstreamer base;
+ char *pathname;
+ gzFile gzfile;
+} bbstreamer_gzip_writer;
+#endif
+
+typedef struct bbstreamer_extractor
+{
+ bbstreamer base;
+ char *basepath;
+ const char *(*link_map) (const char *);
+ void (*report_output_file) (const char *);
+ char filename[MAXPGPATH];
+ FILE *file;
+} bbstreamer_extractor;
+
+static void bbstreamer_plain_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_plain_writer_finalize(bbstreamer *streamer);
+static void bbstreamer_plain_writer_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_plain_writer_ops = {
+ .content = bbstreamer_plain_writer_content,
+ .finalize = bbstreamer_plain_writer_finalize,
+ .free = bbstreamer_plain_writer_free
+};
+
+#ifdef HAVE_LIBZ
+static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
+static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
+static const char *get_gz_error(gzFile gzf);
+
+const bbstreamer_ops bbstreamer_gzip_writer_ops = {
+ .content = bbstreamer_gzip_writer_content,
+ .finalize = bbstreamer_gzip_writer_finalize,
+ .free = bbstreamer_gzip_writer_free
+};
+#endif
+
+static void bbstreamer_extractor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_extractor_finalize(bbstreamer *streamer);
+static void bbstreamer_extractor_free(bbstreamer *streamer);
+static void extract_directory(const char *filename, mode_t mode);
+static void extract_link(const char *filename, const char *linktarget);
+static FILE *create_file_for_extract(const char *filename, mode_t mode);
+
+const bbstreamer_ops bbstreamer_extractor_ops = {
+ .content = bbstreamer_extractor_content,
+ .finalize = bbstreamer_extractor_finalize,
+ .free = bbstreamer_extractor_free
+};
+
+/*
+ * Create a bbstreamer that just writes data to a file.
+ *
+ * The caller must specify a pathname and may specify a file. The pathname is
+ * used for error-reporting purposes either way. If file is NULL, the pathname
+ * also identifies the file to which the data should be written: it is opened
+ * for writing and closed when done. If file is not NULL, the data is written
+ * there.
+ */
+bbstreamer *
+bbstreamer_plain_writer_new(char *pathname, FILE *file)
+{
+ bbstreamer_plain_writer *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_plain_writer));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_plain_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+ streamer->file = file;
+
+ if (file == NULL)
+ {
+ streamer->file = fopen(pathname, "wb");
+ if (streamer->file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m", pathname);
+ exit(1);
+ }
+ streamer->should_close_file = true;
+ }
+
+ return &streamer->base;
+}
+
+/*
+ * Write archive content to file.
+ */
+static void
+bbstreamer_plain_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member, const char *data,
+ int len, bbstreamer_archive_context context)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ mystreamer->pathname);
+ exit(1);
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a plain file consists of closing
+ * the file if we opened it, but not if the caller provided it.
+ */
+static void
+bbstreamer_plain_writer_finalize(bbstreamer *streamer)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
+ {
+ pg_log_error("could not close file \"%s\": %m",
+ mystreamer->pathname);
+ exit(1);
+ }
+
+ mystreamer->file = NULL;
+ mystreamer->should_close_file = false;
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_plain_writer_free(bbstreamer *streamer)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ Assert(!mystreamer->should_close_file);
+ Assert(mystreamer->base.bbs_next == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Create a bbstreamer that just compresses data using gzip, and then writes
+ * it to a file.
+ *
+ * As in the case of bbstreamer_plain_writer_new, pathname is always used
+ * for error reporting purposes; if file is NULL, it is also the opened and
+ * closed so that the data may be written there.
+ */
+bbstreamer *
+bbstreamer_gzip_writer_new(char *pathname, FILE *file, int compresslevel)
+{
+#ifdef HAVE_LIBZ
+ bbstreamer_gzip_writer *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_gzip_writer));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_gzip_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+
+ if (file == NULL)
+ {
+ streamer->gzfile = gzopen(pathname, "wb");
+ if (streamer->gzfile == NULL)
+ {
+ pg_log_error("could not create compressed file \"%s\": %m",
+ pathname);
+ exit(1);
+ }
+ }
+ else
+ {
+ int fd = dup(fileno(file));
+
+ if (fd < 0)
+ {
+ pg_log_error("could not duplicate stdout: %m");
+ exit(1);
+ }
+
+ streamer->gzfile = gzdopen(fd, "wb");
+ if (streamer->gzfile == NULL)
+ {
+ pg_log_error("could not open output file: %m");
+ exit(1);
+ }
+ }
+
+ if (gzsetparams(streamer->gzfile, compresslevel,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ pg_log_error("could not set compression level %d: %s",
+ compresslevel, get_gz_error(streamer->gzfile));
+ exit(1);
+ }
+
+ return &streamer->base;
+#else
+ pg_log_error("this build does not support compression");
+ exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Write archive content to gzip file.
+ */
+static void
+bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member, const char *data,
+ int len, bbstreamer_archive_context context)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (gzwrite(mystreamer->gzfile, data, len) != len)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to compressed file \"%s\": %s",
+ mystreamer->pathname, get_gz_error(mystreamer->gzfile));
+ exit(1);
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a gzip file consists of just
+ * calling gzclose.
+ *
+ * It makes no difference whether we opened the file or the caller did it,
+ * because libz provides no way of avoiding a close on the underling file
+ * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
+ * work around this issue, so that the behavior from the caller's viewpoint
+ * is the same as for bbstreamer_plain_writer.
+ */
+static void
+bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ if (gzclose(mystreamer->gzfile) != 0)
+ {
+ pg_log_error("could not close compressed file \"%s\": %s",
+ mystreamer->pathname,
+ get_gz_error(mystreamer->gzfile));
+ exit(1);
+ }
+
+ mystreamer->gzfile = NULL;
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_gzip_writer_free(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ Assert(mystreamer->base.bbs_next == NULL);
+ Assert(mystreamer->gzfile == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Helper function for libz error reporting.
+ */
+static const char *
+get_gz_error(gzFile gzf)
+{
+ int errnum;
+ const char *errmsg;
+
+ errmsg = gzerror(gzf, &errnum);
+ if (errnum == Z_ERRNO)
+ return strerror(errno);
+ else
+ return errmsg;
+}
+#endif
+
+/*
+ * Create a bbstreamer that extracts an archive.
+ *
+ * All pathnames in the archive are interpreted relative to basepath.
+ *
+ * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here
+ * with untyped chunks; we need typed chunks which follow the rules described
+ * in bbstreamer.h. Assuming we have that, we don't need to worry about the
+ * original archive format; it's enough to just look at the member information
+ * provided and write to the corresponding file.
+ *
+ * 'link_map' is a function that will be applied to the target of any
+ * symbolic link, and which should return a replacement pathname to be used
+ * in its place. If NULL, the symbolic link target is used without
+ * modification.
+ *
+ * 'report_output_file' is a function that will be called each time we open a
+ * new output file. The pathname to that file is passed as an argument. If
+ * NULL, the call is skipped.
+ */
+bbstreamer *
+bbstreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *))
+{
+ bbstreamer_extractor *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_extractor));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_extractor_ops;
+ streamer->basepath = pstrdup(basepath);
+ streamer->link_map = link_map;
+ streamer->report_output_file = report_output_file;
+
+ return &streamer->base;
+}
+
+/*
+ * Extract archive contents to the filesystem.
+ */
+static void
+bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+ int fnamelen;
+
+ Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
+ Assert(context != BBSTREAMER_UNKNOWN);
+
+ switch (context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+ Assert(mystreamer->file == NULL);
+
+ /* Prepend basepath. */
+ snprintf(mystreamer->filename, sizeof(mystreamer->filename),
+ "%s/%s", mystreamer->basepath, member->pathname);
+
+ /* Remove any trailing slash. */
+ fnamelen = strlen(mystreamer->filename);
+ if (mystreamer->filename[fnamelen - 1] == '/')
+ mystreamer->filename[fnamelen - 1] = '\0';
+
+ /* Dispatch based on file type. */
+ if (member->is_directory)
+ extract_directory(mystreamer->filename, member->mode);
+ else if (member->is_link)
+ {
+ const char *linktarget = member->linktarget;
+
+ if (mystreamer->link_map)
+ linktarget = mystreamer->link_map(linktarget);
+ extract_link(mystreamer->filename, linktarget);
+ }
+ else
+ mystreamer->file =
+ create_file_for_extract(mystreamer->filename,
+ member->mode);
+
+ /* Report output file change. */
+ if (mystreamer->report_output_file)
+ mystreamer->report_output_file(mystreamer->filename);
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+ if (mystreamer->file == NULL)
+ break;
+
+ errno = 0;
+ if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ mystreamer->filename);
+ exit(1);
+ }
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+ if (mystreamer->file == NULL)
+ break;
+ fclose(mystreamer->file);
+ mystreamer->file = NULL;
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+ break;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while extracting archive");
+ exit(1);
+ }
+}
+
+/*
+ * Create a directory.
+ */
+static void
+extract_directory(const char *filename, mode_t mode)
+{
+ if (mkdir(filename, pg_dir_create_mode) != 0)
+ {
+ /*
+ * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will
+ * have been created by the wal receiver process. Also, when the WAL
+ * directory location was specified, pg_wal (or pg_xlog) has already
+ * been created as a symbolic link before starting the actual backup.
+ * So just ignore creation failures on related directories.
+ */
+ if (!((pg_str_endswith(filename, "/pg_wal") ||
+ pg_str_endswith(filename, "/pg_xlog") ||
+ pg_str_endswith(filename, "/archive_status")) &&
+ errno == EEXIST))
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ filename);
+ exit(1);
+ }
+ }
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ {
+ pg_log_error("could not set permissions on directory \"%s\": %m",
+ filename);
+ exit(1);
+ }
+#endif
+}
+
+/*
+ * Create a symbolic link.
+ *
+ * It's most likely a link in pg_tblspc directory, to the location of a
+ * tablespace. Apply any tablespace mapping given on the command line
+ * (--tablespace-mapping). (We blindly apply the mapping without checking that
+ * the link really is inside pg_tblspc. We don't expect there to be other
+ * symlinks in a data directory, but if there are, you can call it an
+ * undocumented feature that you can map them too.)
+ */
+static void
+extract_link(const char *filename, const char *linktarget)
+{
+ if (symlink(linktarget, filename) != 0)
+ {
+ pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+ filename, linktarget);
+ exit(1);
+ }
+}
+
+/*
+ * Create a regular file.
+ *
+ * Return the resulting handle so we can write the content to the file.
+ */
+static FILE *
+create_file_for_extract(const char *filename, mode_t mode)
+{
+ FILE *file;
+
+ file = fopen(filename, "wb");
+ if (file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m", filename);
+ exit(1);
+ }
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ {
+ pg_log_error("could not set permissions on file \"%s\": %m",
+ filename);
+ exit(1);
+ }
+#endif
+
+ return file;
+}
+
+/*
+ * End-of-stream processing for extracting an archive.
+ *
+ * There's nothing to do here but sanity checking.
+ */
+static void
+bbstreamer_extractor_finalize(bbstreamer *streamer)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+
+ Assert(mystreamer->file == NULL);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_extractor_free(bbstreamer *streamer)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+
+ pfree(mystreamer->basepath);
+ pfree(mystreamer);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_inject.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_inject.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "bbstreamer.h"
+#include "common/file_perm.h"
+#include "common/logging.h"
+
+typedef struct bbstreamer_recovery_injector
+{
+ bbstreamer base;
+ bool skip_file;
+ bool is_recovery_guc_supported;
+ bool is_postgresql_auto_conf;
+ bool found_postgresql_auto_conf;
+ PQExpBuffer recoveryconfcontents;
+ bbstreamer_member member;
+} bbstreamer_recovery_injector;
+
+static void bbstreamer_recovery_injector_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer);
+static void bbstreamer_recovery_injector_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_recovery_injector_ops = {
+ .content = bbstreamer_recovery_injector_content,
+ .finalize = bbstreamer_recovery_injector_finalize,
+ .free = bbstreamer_recovery_injector_free
+};
+
+/*
+ * Create a bbstreamer that can edit recoverydata into an archive stream.
+ *
+ * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as
+ * per the conventions described in bbstreamer.h; the chunks forwarded to
+ * the next bbstreamer will be similarly typed, but the
+ * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
+ * edited the archive stream.
+ *
+ * Our goal is to do one of the following three things with the content passed
+ * via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then
+ * put the content into recovery.conf, replacing any existing archive member
+ * by that name; (2) if is_recovery_guc_supported is true and
+ * postgresql.auto.conf exists in the archive, then append the content
+ * provided to the existing file; and (3) if is_recovery_guc_supported is
+ * true but postgresql.auto.conf does not exist in the archive, then create
+ * it with the specified content.
+ *
+ * In addition, if is_recovery_guc_supported is true, then we create a
+ * zero-length standby.signal file, dropping any file with that name from
+ * the archive.
+ */
+extern bbstreamer *
+bbstreamer_recovery_injector_new(bbstreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents)
+{
+ bbstreamer_recovery_injector *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_recovery_injector));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_recovery_injector_ops;
+ streamer->base.bbs_next = next;
+ streamer->is_recovery_guc_supported = is_recovery_guc_supported;
+ streamer->recoveryconfcontents = recoveryconfcontents;
+
+ return &streamer->base;
+}
+
+/*
+ * Handle each chunk of tar content while injecting recovery configuration.
+ */
+static void
+bbstreamer_recovery_injector_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_recovery_injector *mystreamer;
+
+ mystreamer = (bbstreamer_recovery_injector *) streamer;
+ Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
+
+ switch (context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+ /* Must copy provided data so we have the option to modify it. */
+ memcpy(&mystreamer->member, member, sizeof(bbstreamer_member));
+
+ /*
+ * On v12+, skip standby.signal and edit postgresql.auto.conf; on
+ * older versions, skip recovery.conf.
+ */
+ if (mystreamer->is_recovery_guc_supported)
+ {
+ mystreamer->skip_file =
+ (strcmp(member->pathname, "standby.signal") == 0);
+ mystreamer->is_postgresql_auto_conf =
+ (strcmp(member->pathname, "postgresql.auto.conf") == 0);
+ if (mystreamer->is_postgresql_auto_conf)
+ {
+ /* Remember we saw it so we don't add it again. */
+ mystreamer->found_postgresql_auto_conf = true;
+
+ /* Increment length by data to be injected. */
+ mystreamer->member.size +=
+ mystreamer->recoveryconfcontents->len;
+
+ /*
+ * Zap data and len because the archive header is no
+ * longer valid; some subsequent bbstreamer must
+ * regenerate it if it's necessary.
+ */
+ data = NULL;
+ len = 0;
+ }
+ }
+ else
+ mystreamer->skip_file =
+ (strcmp(member->pathname, "recovery.conf") == 0);
+
+ /* Do not forward if the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+ /* Do not forward if the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+ /* Do not forward it the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+
+ /* Append provided content to whatever we already sent. */
+ if (mystreamer->is_postgresql_auto_conf)
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len,
+ BBSTREAMER_MEMBER_CONTENTS);
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+ if (mystreamer->is_recovery_guc_supported)
+ {
+ /*
+ * If we didn't already find (and thus modify)
+ * postgresql.auto.conf, inject it as an additional archive
+ * member now.
+ */
+ if (!mystreamer->found_postgresql_auto_conf)
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "postgresql.auto.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
+
+ /* Inject empty standby.signal file. */
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "standby.signal", "", 0);
+ }
+ else
+ {
+ /* Inject recovery.conf file with specified contents. */
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "recovery.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
+ }
+
+ /* Nothing to do here. */
+ break;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while injecting recovery settings");
+ exit(1);
+ }
+
+ bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
+ data, len, context);
+}
+
+/*
+ * End-of-stream processing for this bbstreamer.
+ */
+static void
+bbstreamer_recovery_injector_finalize(bbstreamer *streamer)
+{
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_recovery_injector_free(bbstreamer *streamer)
+{
+ bbstreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
+
+/*
+ * Inject a member into the archive with specified contents.
+ */
+void
+bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
+ int len)
+{
+ bbstreamer_member member;
+
+ strlcpy(member.pathname, pathname, MAXPGPATH);
+ member.size = len;
+ member.mode = pg_file_create_mode;
+ member.is_directory = false;
+ member.is_link = false;
+ member.linktarget[0] = '\0';
+
+ /*
+ * There seems to be no principled argument for these values, but they are
+ * what PostgreSQL has historically used.
+ */
+ member.uid = 04000;
+ member.gid = 02000;
+
+ /*
+ * We don't know here how to generate valid member headers and trailers
+ * for the archiving format in use, so if those are needed, some successor
+ * bbstreamer will have to generate them using the data from 'member'.
+ */
+ bbstreamer_content(streamer, &member, NULL, 0,
+ BBSTREAMER_MEMBER_HEADER);
+ bbstreamer_content(streamer, &member, data, len,
+ BBSTREAMER_MEMBER_CONTENTS);
+ bbstreamer_content(streamer, &member, NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_tar.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_tar.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <time.h>
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "pgtar.h"
+
+typedef struct bbstreamer_tar_parser
+{
+ bbstreamer base;
+ bbstreamer_archive_context next_context;
+ bbstreamer_member member;
+ size_t file_bytes_sent;
+ size_t pad_bytes_expected;
+} bbstreamer_tar_parser;
+
+typedef struct bbstreamer_tar_archiver
+{
+ bbstreamer base;
+ bool rearchive_member;
+} bbstreamer_tar_archiver;
+
+static void bbstreamer_tar_parser_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_tar_parser_finalize(bbstreamer *streamer);
+static void bbstreamer_tar_parser_free(bbstreamer *streamer);
+static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer);
+
+const bbstreamer_ops bbstreamer_tar_parser_ops = {
+ .content = bbstreamer_tar_parser_content,
+ .finalize = bbstreamer_tar_parser_finalize,
+ .free = bbstreamer_tar_parser_free
+};
+
+static void bbstreamer_tar_archiver_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer);
+static void bbstreamer_tar_archiver_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_tar_archiver_ops = {
+ .content = bbstreamer_tar_archiver_content,
+ .finalize = bbstreamer_tar_archiver_finalize,
+ .free = bbstreamer_tar_archiver_free
+};
+
+/*
+ * Create a bbstreamer that can parse a stream of content as tar data.
+ *
+ * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer
+ * specified by 'next' will receive a series of typed chunks, as per the
+ * conventions described in bbstreamer.h.
+ */
+extern bbstreamer *
+bbstreamer_tar_parser_new(bbstreamer *next)
+{
+ bbstreamer_tar_parser *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_tar_parser));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_tar_parser_ops;
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ streamer->next_context = BBSTREAMER_MEMBER_HEADER;
+
+ return &streamer->base;
+}
+
+/*
+ * Parse unknown content as tar data.
+ */
+static void
+bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+ size_t nbytes;
+
+ /* Expect unparsed input. */
+ Assert(member == NULL);
+ Assert(context == BBSTREAMER_UNKNOWN);
+
+ while (len > 0)
+ {
+ switch (mystreamer->next_context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+
+ /*
+ * If we're expecting an archive member header, accumulate a
+ * full block of data before doing anything further.
+ */
+ if (!bbstreamer_buffer_until(streamer, &data, &len,
+ TAR_BLOCK_SIZE))
+ return;
+
+ /*
+ * Now we can process the header and get ready to process the
+ * file contents; however, we might find out that what we
+ * thought was the next file header is actually the start of
+ * the archive trailer. Switch modes accordingly.
+ */
+ if (bbstreamer_tar_header(mystreamer))
+ {
+ if (mystreamer->member.size == 0)
+ {
+ /* No content; trailer is zero-length. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Expect contents. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ mystreamer->file_bytes_sent = 0;
+ }
+ else
+ mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER;
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+
+ /*
+ * Send as much content as we have, but not more than the
+ * remaining file length.
+ */
+ Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
+ nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
+ nbytes = Min(nbytes, len);
+ Assert(nbytes > 0);
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, nbytes,
+ BBSTREAMER_MEMBER_CONTENTS);
+ mystreamer->file_bytes_sent += nbytes;
+ data += nbytes;
+ len -= nbytes;
+
+ /*
+ * If we've not yet sent the whole file, then there's more
+ * content to come; otherwise, it's time to expect the file
+ * trailer.
+ */
+ Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
+ if (mystreamer->file_bytes_sent == mystreamer->member.size)
+ {
+ if (mystreamer->pad_bytes_expected == 0)
+ {
+ /* Trailer is zero-length. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Trailer is not zero-length. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ }
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+
+ /*
+ * If we're expecting an archive member trailer, accumulate
+ * the expected number of padding bytes before sending
+ * anything onward.
+ */
+ if (!bbstreamer_buffer_until(streamer, &data, &len,
+ mystreamer->pad_bytes_expected))
+ return;
+
+ /* OK, now we can send it. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, mystreamer->pad_bytes_expected,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next file header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ mystreamer->base.bbs_buffer.len = 0;
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+
+ /*
+ * We've seen an end-of-archive indicator, so anything more is
+ * buffered and sent as part of the archive trailer. But we
+ * don't expect more than 2 blocks.
+ */
+ bbstreamer_buffer_bytes(streamer, &data, &len, len);
+ if (len > 2 * TAR_BLOCK_SIZE)
+ {
+ pg_log_error("tar file trailer exceeds 2 blocks");
+ exit(1);
+ }
+ return;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while parsing tar archive");
+ exit(1);
+ }
+ }
+}
+
+/*
+ * Parse a file header within a tar stream.
+ *
+ * The return value is true if we found a file header and passed it on to the
+ * next bbstreamer; it is false if we have reached the archive trailer.
+ */
+static bool
+bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
+{
+ bool has_nonzero_byte = false;
+ int i;
+ bbstreamer_member *member = &mystreamer->member;
+ char *buffer = mystreamer->base.bbs_buffer.data;
+
+ Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
+
+ /* Check whether we've got a block of all zero bytes. */
+ for (i = 0; i < TAR_BLOCK_SIZE; ++i)
+ {
+ if (buffer[i] != '\0')
+ {
+ has_nonzero_byte = true;
+ break;
+ }
+ }
+
+ /*
+ * If the entire block was zeros, this is the end of the archive, not the
+ * start of the next file.
+ */
+ if (!has_nonzero_byte)
+ return false;
+
+ /*
+ * Parse key fields out of the header.
+ *
+ * FIXME: It's terrible that we use hard-coded values here instead of some
+ * more principled approach. It's been like this for a long time, but we
+ * ought to do better.
+ */
+ strlcpy(member->pathname, &buffer[0], MAXPGPATH);
+ if (member->pathname[0] == '\0')
+ {
+ pg_log_error("tar member has empty name");
+ exit(1);
+ }
+ member->size = read_tar_number(&buffer[124], 12);
+ member->mode = read_tar_number(&buffer[100], 8);
+ member->uid = read_tar_number(&buffer[108], 8);
+ member->gid = read_tar_number(&buffer[116], 8);
+ member->is_directory = (buffer[156] == '5');
+ member->is_link = (buffer[156] == '2');
+ if (member->is_link)
+ strlcpy(member->linktarget, &buffer[157], 100);
+
+ /* Compute number of padding bytes. */
+ mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
+
+ /* Forward the entire header to the next bbstreamer. */
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ buffer, TAR_BLOCK_SIZE,
+ BBSTREAMER_MEMBER_HEADER);
+
+ return true;
+}
+
+/*
+ * End-of-stream processing for a tar parser.
+ */
+static void
+bbstreamer_tar_parser_finalize(bbstreamer *streamer)
+{
+ bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+
+ if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER &&
+ (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER ||
+ mystreamer->base.bbs_buffer.len > 0))
+ {
+ pg_log_error("COPY stream ended before last file was finished");
+ exit(1);
+ }
+
+ /* Send the archive trailer, even if empty. */
+ bbstreamer_content(streamer->bbs_next, NULL,
+ streamer->bbs_buffer.data, streamer->bbs_buffer.len,
+ BBSTREAMER_ARCHIVE_TRAILER);
+
+ /* Now finalize successor. */
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar parser.
+ */
+static void
+bbstreamer_tar_parser_free(bbstreamer *streamer)
+{
+ pfree(streamer->bbs_buffer.data);
+ bbstreamer_free(streamer->bbs_next);
+}
+
+/*
+ * Create an bbstreamer that can generate a tar archive.
+ *
+ * This is intended to be usable either for generating a brand-new tar archive
+ * or for modifying one on the fly. The input should be a series of typed
+ * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for
+ * bbstreamer_tar_parser_content.
+ */
+extern bbstreamer *
+bbstreamer_tar_archiver_new(bbstreamer *next)
+{
+ bbstreamer_tar_archiver *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_tar_archiver));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_tar_archiver_ops;
+ streamer->base.bbs_next = next;
+
+ return &streamer->base;
+}
+
+/*
+ * Fix up the stream of input chunks to create a valid tar file.
+ *
+ * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
+ * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
+ * passed through without change. Any other size is a fatal error (and
+ * indicates a bug).
+ *
+ * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the
+ * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from
+ * scratch. Specifically, we construct a block of zero bytes sufficient to
+ * pad out to a block boundary, as required by the tar format. Other
+ * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change.
+ *
+ * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change.
+ *
+ * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two
+ * blocks of zero bytes. Not all tar programs require this, but apparently
+ * some do. The server does not supply this trailer. If no archive trailer is
+ * present, one will be added by bbstreamer_tar_parser_finalize.
+ */
+static void
+bbstreamer_tar_archiver_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer;
+ char buffer[2 * TAR_BLOCK_SIZE];
+
+ Assert(context != BBSTREAMER_UNKNOWN);
+
+ if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
+ {
+ Assert(len == 0);
+
+ /* Replace zero-length tar header with a newly constructed one. */
+ tarCreateHeader(buffer, member->pathname, NULL,
+ member->size, member->mode, member->uid, member->gid,
+ time(NULL));
+ data = buffer;
+ len = TAR_BLOCK_SIZE;
+
+ /* Also make a note to replace padding, in case size changed. */
+ mystreamer->rearchive_member = true;
+ }
+ else if (context == BBSTREAMER_MEMBER_TRAILER &&
+ mystreamer->rearchive_member)
+ {
+ int pad_bytes = tarPaddingBytesRequired(member->size);
+
+ /* Also replace padding, if we regenerated the header. */
+ memset(buffer, 0, pad_bytes);
+ data = buffer;
+ len = pad_bytes;
+
+ /* Don't do this agian unless we replace another header. */
+ mystreamer->rearchive_member = false;
+ }
+ else if (context == BBSTREAMER_ARCHIVE_TRAILER)
+ {
+ /* Trailer should always be two blocks of zero bytes. */
+ memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
+ data = buffer;
+ len = 2 * TAR_BLOCK_SIZE;
+ }
+
+ bbstreamer_content(streamer->bbs_next, member, data, len, context);
+}
+
+/*
+ * End-of-stream processing for a tar archiver.
+ */
+static void
+bbstreamer_tar_archiver_finalize(bbstreamer *streamer)
+{
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar archiver.
+ */
+static void
+bbstreamer_tar_archiver_free(bbstreamer *streamer)
+{
+ bbstreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
#endif
#include "access/xlog_internal.h"
+#include "bbstreamer.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "common/logging.h"
-#include "common/string.h"
#include "fe_utils/option_utils.h"
#include "fe_utils/recovery_gen.h"
-#include "fe_utils/string_utils.h"
#include "getopt_long.h"
-#include "libpq-fe.h"
-#include "pgtar.h"
-#include "pgtime.h"
-#include "pqexpbuffer.h"
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
typedef struct WriteTarState
{
int tablespacenum;
- char filename[MAXPGPATH];
- FILE *tarfile;
- char tarhdr[TAR_BLOCK_SIZE];
- bool basetablespace;
- bool in_tarhdr;
- bool skip_file;
- bool is_recovery_guc_supported;
- bool is_postgresql_auto_conf;
- bool found_postgresql_auto_conf;
- int file_padding_len;
- size_t tarhdrsz;
- pgoff_t filesz;
-#ifdef HAVE_LIBZ
- gzFile ztarfile;
-#endif
+ bbstreamer *streamer;
} WriteTarState;
-typedef struct UnpackTarState
-{
- int tablespacenum;
- char current_path[MAXPGPATH];
- char filename[MAXPGPATH];
- const char *mapped_tblspc_path;
- pgoff_t current_len_left;
- int current_padding;
- FILE *file;
-} UnpackTarState;
-
typedef struct WriteManifestState
{
char filename[MAXPGPATH];
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
-/* Progress counters */
+/* Progress indicators */
static uint64 totalsize_kb;
static uint64 totaldone;
static int tablespacecount;
+static const char *progress_filename;
/* Pipe to communicate with background wal receiver process */
#ifndef WIN32
/* Function headers */
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
-static void progress_report(int tablespacenum, const char *filename, bool force,
- bool finished);
-
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void progress_update_filename(const char *filename);
+static void progress_report(int tablespacenum, bool force, bool finished);
+
+static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
+ bbstreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported);
+static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
+ bool tablespacenum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
-static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
- void *callback_data);
static void ReceiveBackupManifest(PGconn *conn);
static void ReceiveBackupManifestChunk(size_t r, char *copybuf,
void *callback_data);
}
-#ifdef HAVE_LIBZ
-static const char *
-get_gz_error(gzFile gzf)
-{
- int errnum;
- const char *errmsg;
-
- errmsg = gzerror(gzf, &errnum);
- if (errnum == Z_ERRNO)
- return strerror(errno);
- else
- return errmsg;
-}
-#endif
-
static void
usage(void)
{
}
}
+/*
+ * Callback to update our notion of the current filename.
+ */
+static void
+progress_update_filename(const char *filename)
+{
+ progress_filename = filename;
+}
/*
* Print a progress report based on the global variables. If verbose output
* is moved to the next line.
*/
static void
-progress_report(int tablespacenum, const char *filename,
- bool force, bool finished)
+progress_report(int tablespacenum, bool force, bool finished)
{
int percent;
char totaldone_str[32];
#define VERBOSE_FILENAME_LENGTH 35
if (verbose)
{
- if (!filename)
+ if (!progress_filename)
/*
* No filename given, so clear the status line (used for last
VERBOSE_FILENAME_LENGTH + 5, "");
else
{
- bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+ bool truncate = (strlen(progress_filename) > VERBOSE_FILENAME_LENGTH);
fprintf(stderr,
ngettext("%*s/%s kB (%d%%), %d/%d tablespace (%s%-*.*s)",
truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
/* Truncate filename at beginning if it's too long */
- truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+ truncate ? progress_filename + strlen(progress_filename) - VERBOSE_FILENAME_LENGTH + 3 : progress_filename);
}
}
else
}
/*
- * Write a piece of tar data
+ * Figure out what to do with an archive received from the server based on
+ * the options selected by the user. We may just write the results directly
+ * to a file, or we might compress first, or we might extract the tar file
+ * and write each member separately. This function doesn't do any of that
+ * directly, but it works out what kind of bbstreamer we need to create so
+ * that the right stuff happens when, down the road, we actually receive
+ * the data.
*/
-static void
-writeTarData(WriteTarState *state, char *buf, int r)
+static bbstreamer *
+CreateBackupStreamer(char *archive_name, char *spclocation,
+ bbstreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported)
{
-#ifdef HAVE_LIBZ
- if (state->ztarfile != NULL)
- {
- errno = 0;
- if (gzwrite(state->ztarfile, buf, r) != r)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to compressed file \"%s\": %s",
- state->filename, get_gz_error(state->ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- errno = 0;
- if (fwrite(buf, r, 1, state->tarfile) != 1)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to file \"%s\": %m",
- state->filename);
- exit(1);
- }
- }
-}
+ bbstreamer *streamer;
+ bbstreamer *manifest_inject_streamer = NULL;
+ bool inject_manifest;
+ bool must_parse_archive;
-/*
- * Receive a tar format file from the connection to the server, and write
- * the data from this file directly into a tar file. If compression is
- * enabled, the data will be compressed while written to the file.
- *
- * The file will be named base.tar[.gz] if it's for the main data directory
- * or <tablespaceoid>.tar[.gz] if it's for another tablespace.
- *
- * No attempt to inspect or validate the contents of the file is done.
- */
-static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
-{
- char zerobuf[TAR_BLOCK_SIZE * 2];
- WriteTarState state;
-
- memset(&state, 0, sizeof(state));
- state.tablespacenum = rownum;
- state.basetablespace = PQgetisnull(res, rownum, 0);
- state.in_tarhdr = true;
+ /*
+ * Normally, we emit the backup manifest as a separate file, but when
+ * we're writing a tarfile to stdout, we don't have that option, so
+ * include it in the one tarfile we've got.
+ */
+ inject_manifest = (format == 't' && strcmp(basedir, "-") == 0 && manifest);
- /* recovery.conf is integrated into postgresql.conf in 12 and newer */
- if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
- state.is_recovery_guc_supported = true;
+ /*
+ * We have to parse the archive if (1) we're suppose to extract it, or if
+ * (2) we need to inject backup_manifest or recovery configuration into it.
+ */
+ must_parse_archive = (format == 'p' || inject_manifest ||
+ (spclocation == NULL && writerecoveryconf));
- if (state.basetablespace)
+ if (format == 'p')
{
+ const char *directory;
+
/*
- * Base tablespaces
+ * In plain format, we must extract the archive. The data for the main
+ * tablespace will be written to the base directory, and the data for
+ * other tablespaces will be written to the directory where they're
+ * located on the server, after applying any user-specified tablespace
+ * mappings.
*/
- if (strcmp(basedir, "-") == 0)
- {
-#ifdef WIN32
- _setmode(fileno(stdout), _O_BINARY);
-#endif
-
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- int fd = dup(fileno(stdout));
-
- if (fd < 0)
- {
- pg_log_error("could not duplicate stdout: %m");
- exit(1);
- }
-
- state.ztarfile = gzdopen(fd, "wb");
- if (state.ztarfile == NULL)
- {
- pg_log_error("could not open output file: %m");
- exit(1);
- }
-
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- state.tarfile = stdout;
- strcpy(state.filename, "-");
- }
- else
- {
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- snprintf(state.filename, sizeof(state.filename),
- "%s/base.tar.gz", basedir);
- state.ztarfile = gzopen(state.filename, "wb");
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- snprintf(state.filename, sizeof(state.filename),
- "%s/base.tar", basedir);
- state.tarfile = fopen(state.filename, "wb");
- }
- }
+ directory = spclocation == NULL ? basedir
+ : get_tablespace_mapping(spclocation);
+ streamer = bbstreamer_extractor_new(directory,
+ get_tablespace_mapping,
+ progress_update_filename);
}
else
{
+ FILE *archive_file;
+ char archive_filename[MAXPGPATH];
+
/*
- * Specific tablespace
+ * In tar format, we just write the archive without extracting it.
+ * Normally, we write it to the archive name provided by the caller,
+ * but when the base directory is "-" that means we need to write
+ * to standard output.
*/
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
+ if (strcmp(basedir, "-") == 0)
{
- snprintf(state.filename, sizeof(state.filename),
- "%s/%s.tar.gz",
- basedir, PQgetvalue(res, rownum, 0));
- state.ztarfile = gzopen(state.filename, "wb");
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
+ snprintf(archive_filename, sizeof(archive_filename), "-");
+ archive_file = stdout;
}
else
-#endif
{
- snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
- basedir, PQgetvalue(res, rownum, 0));
- state.tarfile = fopen(state.filename, "wb");
+ snprintf(archive_filename, sizeof(archive_filename),
+ "%s/%s", basedir, archive_name);
+ archive_file = NULL;
}
- }
#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- if (!state.ztarfile)
+ if (compresslevel != 0)
{
- /* Compression is in use */
- pg_log_error("could not create compressed file \"%s\": %s",
- state.filename, get_gz_error(state.ztarfile));
- exit(1);
+ strlcat(archive_filename, ".gz", sizeof(archive_filename));
+ streamer = bbstreamer_gzip_writer_new(archive_filename,
+ archive_file,
+ compresslevel);
}
- }
- else
+ else
#endif
- {
- /* Either no zlib support, or zlib support but compresslevel = 0 */
- if (!state.tarfile)
- {
- pg_log_error("could not create file \"%s\": %m", state.filename);
- exit(1);
- }
- }
+ streamer = bbstreamer_plain_writer_new(archive_filename,
+ archive_file);
- ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+
+ /*
+ * If we need to parse the archive for whatever reason, then we'll
+ * also need to re-archive, because, if the output format is tar, the
+ * only point of parsing the archive is to be able to inject stuff
+ * into it.
+ */
+ if (must_parse_archive)
+ streamer = bbstreamer_tar_archiver_new(streamer);
+ progress_filename = archive_filename;
+ }
/*
- * End of copy data. If requested, and this is the base tablespace, write
- * configuration file into the tarfile. When done, close the file (but not
- * stdout).
- *
- * Also, write two completely empty blocks at the end of the tar file, as
- * required by some tar programs.
+ * If we're supposed to inject the backup manifest into the results,
+ * it should be done here, so that the file content can be injected
+ * directly, without worrying about the details of the tar format.
*/
+ if (inject_manifest)
+ manifest_inject_streamer = streamer;
- MemSet(zerobuf, 0, sizeof(zerobuf));
-
- if (state.basetablespace && writerecoveryconf)
+ /*
+ * If this is the main tablespace and we're supposed to write
+ * recovery information, arrange to do that.
+ */
+ if (spclocation == NULL && writerecoveryconf)
{
- char header[TAR_BLOCK_SIZE];
+ Assert(must_parse_archive);
+ streamer = bbstreamer_recovery_injector_new(streamer,
+ is_recovery_guc_supported,
+ recoveryconfcontents);
+ }
- /*
- * If postgresql.auto.conf has not been found in the streamed data,
- * add recovery configuration to postgresql.auto.conf if recovery
- * parameters are GUCs. If the instance connected to is older than
- * 12, create recovery.conf with this data otherwise.
- */
- if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
- {
- int padding;
-
- tarCreateHeader(header,
- state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
- NULL,
- recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
-
- padding = tarPaddingBytesRequired(recoveryconfcontents->len);
-
- writeTarData(&state, header, sizeof(header));
- writeTarData(&state, recoveryconfcontents->data,
- recoveryconfcontents->len);
- if (padding)
- writeTarData(&state, zerobuf, padding);
- }
+ /*
+ * If we're doing anything that involves understanding the contents of
+ * the archive, we'll need to parse it.
+ */
+ if (must_parse_archive)
+ streamer = bbstreamer_tar_parser_new(streamer);
- /*
- * standby.signal is supported only if recovery parameters are GUCs.
- */
- if (state.is_recovery_guc_supported)
- {
- tarCreateHeader(header, "standby.signal", NULL,
- 0, /* zero-length file */
- pg_file_create_mode, 04000, 02000,
- time(NULL));
+ /* Return the results. */
+ *manifest_inject_streamer_p = manifest_inject_streamer;
+ return streamer;
+}
- writeTarData(&state, header, sizeof(header));
+/*
+ * Receive raw tar data from the server, and stream it to the appropriate
+ * location. If we're writing a single tarfile to standard output, also
+ * receive the backup manifest and inject it into that tarfile.
+ */
+static void
+ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
+ bool tablespacenum)
+{
+ WriteTarState state;
+ bbstreamer *manifest_inject_streamer;
+ bool is_recovery_guc_supported;
- /*
- * we don't need to pad out to a multiple of the tar block size
- * here, because the file is zero length, which is a multiple of
- * any block size.
- */
- }
- }
+ /* Pass all COPY data through to the backup streamer. */
+ memset(&state, 0, sizeof(state));
+ is_recovery_guc_supported =
+ PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC;
+ state.streamer = CreateBackupStreamer(archive_name, spclocation,
+ &manifest_inject_streamer,
+ is_recovery_guc_supported);
+ state.tablespacenum = tablespacenum;
+ ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+ progress_filename = NULL;
/*
- * Normally, we emit the backup manifest as a separate file, but when
- * we're writing a tarfile to stdout, we don't have that option, so
- * include it in the one tarfile we've got.
+ * The decision as to whether we need to inject the backup manifest into
+ * the output at this stage is made by CreateBackupStreamer; if that is
+ * needed, manifest_inject_streamer will be non-NULL; otherwise, it will
+ * be NULL.
*/
- if (strcmp(basedir, "-") == 0 && manifest)
+ if (manifest_inject_streamer != NULL)
{
- char header[TAR_BLOCK_SIZE];
PQExpBufferData buf;
+ /* Slurp the entire backup manifest into a buffer. */
initPQExpBuffer(&buf);
ReceiveBackupManifestInMemory(conn, &buf);
if (PQExpBufferDataBroken(buf))
pg_log_error("out of memory");
exit(1);
}
- tarCreateHeader(header, "backup_manifest", NULL, buf.len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
- writeTarData(&state, header, sizeof(header));
- writeTarData(&state, buf.data, buf.len);
- termPQExpBuffer(&buf);
- }
- /* 2 * TAR_BLOCK_SIZE bytes empty data at end of file */
- writeTarData(&state, zerobuf, sizeof(zerobuf));
+ /* Inject it into the output tarfile. */
+ bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest",
+ buf.data, buf.len);
-#ifdef HAVE_LIBZ
- if (state.ztarfile != NULL)
- {
- if (gzclose(state.ztarfile) != 0)
- {
- pg_log_error("could not close compressed file \"%s\": %s",
- state.filename, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- if (strcmp(basedir, "-") != 0)
- {
- if (fclose(state.tarfile) != 0)
- {
- pg_log_error("could not close file \"%s\": %m",
- state.filename);
- exit(1);
- }
- }
+ /* Free memory. */
+ termPQExpBuffer(&buf);
}
- progress_report(rownum, state.filename, true, false);
+ /* Cleanup. */
+ bbstreamer_finalize(state.streamer);
+ bbstreamer_free(state.streamer);
+
+ progress_report(tablespacenum, true, false);
/*
* Do not sync the resulting tar file yet, all files are synced once at
{
WriteTarState *state = callback_data;
- if (!writerecoveryconf || !state->basetablespace)
- {
- /*
- * When not writing config file, or when not working on the base
- * tablespace, we never have to look for an existing configuration
- * file in the stream.
- */
- writeTarData(state, copybuf, r);
- }
- else
- {
- /*
- * Look for a config file in the existing tar stream. If it's there,
- * we must skip it so we can later overwrite it with our own version
- * of the file.
- *
- * To do this, we have to process the individual files inside the TAR
- * stream. The stream consists of a header and zero or more chunks,
- * each with a length equal to TAR_BLOCK_SIZE. The stream from the
- * server is broken up into smaller pieces, so we have to track the
- * size of the files to find the next header structure.
- */
- int rr = r;
- int pos = 0;
-
- while (rr > 0)
- {
- if (state->in_tarhdr)
- {
- /*
- * We're currently reading a header structure inside the TAR
- * stream, i.e. the file metadata.
- */
- if (state->tarhdrsz < TAR_BLOCK_SIZE)
- {
- /*
- * Copy the header structure into tarhdr in case the
- * header is not aligned properly or it's not returned in
- * whole by the last PQgetCopyData call.
- */
- int hdrleft;
- int bytes2copy;
-
- hdrleft = TAR_BLOCK_SIZE - state->tarhdrsz;
- bytes2copy = (rr > hdrleft ? hdrleft : rr);
-
- memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
- bytes2copy);
-
- rr -= bytes2copy;
- pos += bytes2copy;
- state->tarhdrsz += bytes2copy;
- }
- else
- {
- /*
- * We have the complete header structure in tarhdr, look
- * at the file metadata: we may want append recovery info
- * into postgresql.auto.conf and skip standby.signal file
- * if recovery parameters are integrated as GUCs, and
- * recovery.conf otherwise. In both cases we must
- * calculate tar padding.
- */
- if (state->is_recovery_guc_supported)
- {
- state->skip_file =
- (strcmp(&state->tarhdr[0], "standby.signal") == 0);
- state->is_postgresql_auto_conf =
- (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
- }
- else
- state->skip_file =
- (strcmp(&state->tarhdr[0], "recovery.conf") == 0);
-
- state->filesz = read_tar_number(&state->tarhdr[124], 12);
- state->file_padding_len =
- tarPaddingBytesRequired(state->filesz);
-
- if (state->is_recovery_guc_supported &&
- state->is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* replace tar header */
- char header[TAR_BLOCK_SIZE];
-
- tarCreateHeader(header, "postgresql.auto.conf", NULL,
- state->filesz + recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
-
- writeTarData(state, header, sizeof(header));
- }
- else
- {
- /* copy stream with padding */
- state->filesz += state->file_padding_len;
-
- if (!state->skip_file)
- {
- /*
- * If we're not skipping the file, write the tar
- * header unmodified.
- */
- writeTarData(state, state->tarhdr, TAR_BLOCK_SIZE);
- }
- }
-
- /* Next part is the file, not the header */
- state->in_tarhdr = false;
- }
- }
- else
- {
- /*
- * We're processing a file's contents.
- */
- if (state->filesz > 0)
- {
- /*
- * We still have data to read (and possibly write).
- */
- int bytes2write;
-
- bytes2write = (state->filesz > rr ? rr : state->filesz);
-
- if (!state->skip_file)
- writeTarData(state, copybuf + pos, bytes2write);
-
- rr -= bytes2write;
- pos += bytes2write;
- state->filesz -= bytes2write;
- }
- else if (state->is_recovery_guc_supported &&
- state->is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* append recovery config to postgresql.auto.conf */
- int padding;
- int tailsize;
-
- tailsize = (TAR_BLOCK_SIZE - state->file_padding_len) + recoveryconfcontents->len;
- padding = tarPaddingBytesRequired(tailsize);
-
- writeTarData(state, recoveryconfcontents->data,
- recoveryconfcontents->len);
-
- if (padding)
- {
- char zerobuf[TAR_BLOCK_SIZE];
-
- MemSet(zerobuf, 0, sizeof(zerobuf));
- writeTarData(state, zerobuf, padding);
- }
+ bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN);
- /* skip original file padding */
- state->is_postgresql_auto_conf = false;
- state->skip_file = true;
- state->filesz += state->file_padding_len;
-
- state->found_postgresql_auto_conf = true;
- }
- else
- {
- /*
- * No more data in the current file, the next piece of
- * data (if any) will be a new file header structure.
- */
- state->in_tarhdr = true;
- state->skip_file = false;
- state->is_postgresql_auto_conf = false;
- state->tarhdrsz = 0;
- state->filesz = 0;
- }
- }
- }
- }
totaldone += r;
- progress_report(state->tablespacenum, state->filename, false, false);
+ progress_report(state->tablespacenum, false, false);
}
return dir;
}
-
-/*
- * Receive a tar format stream from the connection to the server, and unpack
- * the contents of it into a directory. Only files, directories and
- * symlinks are supported, no other kinds of special files.
- *
- * If the data is for the main data directory, it will be restored in the
- * specified directory. If it's for another tablespace, it will be restored
- * in the original or mapped directory.
- */
-static void
-ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
-{
- UnpackTarState state;
- bool basetablespace;
-
- memset(&state, 0, sizeof(state));
- state.tablespacenum = rownum;
-
- basetablespace = PQgetisnull(res, rownum, 0);
- if (basetablespace)
- strlcpy(state.current_path, basedir, sizeof(state.current_path));
- else
- strlcpy(state.current_path,
- get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
- sizeof(state.current_path));
-
- ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
-
-
- if (state.file)
- fclose(state.file);
-
- progress_report(rownum, state.filename, true, false);
-
- if (state.file != NULL)
- {
- pg_log_error("COPY stream ended before last file was finished");
- exit(1);
- }
-
- if (basetablespace && writerecoveryconf)
- WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
- /*
- * No data is synced here, everything is done for all tablespaces at the
- * end.
- */
-}
-
-static void
-ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
-{
- UnpackTarState *state = callback_data;
-
- if (state->file == NULL)
- {
-#ifndef WIN32
- int filemode;
-#endif
-
- /*
- * No current file, so this must be the header for a new file
- */
- if (r != TAR_BLOCK_SIZE)
- {
- pg_log_error("invalid tar block header size: %zu", r);
- exit(1);
- }
- totaldone += TAR_BLOCK_SIZE;
-
- state->current_len_left = read_tar_number(©buf[124], 12);
-
-#ifndef WIN32
- /* Set permissions on the file */
- filemode = read_tar_number(©buf[100], 8);
-#endif
-
- /*
- * All files are padded up to a multiple of TAR_BLOCK_SIZE
- */
- state->current_padding =
- tarPaddingBytesRequired(state->current_len_left);
-
- /*
- * First part of header is zero terminated filename
- */
- snprintf(state->filename, sizeof(state->filename),
- "%s/%s", state->current_path, copybuf);
- if (state->filename[strlen(state->filename) - 1] == '/')
- {
- /*
- * Ends in a slash means directory or symlink to directory
- */
- if (copybuf[156] == '5')
- {
- /*
- * Directory. Remove trailing slash first.
- */
- state->filename[strlen(state->filename) - 1] = '\0';
- if (mkdir(state->filename, pg_dir_create_mode) != 0)
- {
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal receiver
- * process. Also, when the WAL directory location was
- * specified, pg_wal (or pg_xlog) has already been created
- * as a symbolic link before starting the actual backup.
- * So just ignore creation failures on related
- * directories.
- */
- if (!((pg_str_endswith(state->filename, "/pg_wal") ||
- pg_str_endswith(state->filename, "/pg_xlog") ||
- pg_str_endswith(state->filename, "/archive_status")) &&
- errno == EEXIST))
- {
- pg_log_error("could not create directory \"%s\": %m",
- state->filename);
- exit(1);
- }
- }
-#ifndef WIN32
- if (chmod(state->filename, (mode_t) filemode))
- {
- pg_log_error("could not set permissions on directory \"%s\": %m",
- state->filename);
- exit(1);
- }
-#endif
- }
- else if (copybuf[156] == '2')
- {
- /*
- * Symbolic link
- *
- * It's most likely a link in pg_tblspc directory, to the
- * location of a tablespace. Apply any tablespace mapping
- * given on the command line (--tablespace-mapping). (We
- * blindly apply the mapping without checking that the link
- * really is inside pg_tblspc. We don't expect there to be
- * other symlinks in a data directory, but if there are, you
- * can call it an undocumented feature that you can map them
- * too.)
- */
- state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */
-
- state->mapped_tblspc_path =
- get_tablespace_mapping(©buf[157]);
- if (symlink(state->mapped_tblspc_path, state->filename) != 0)
- {
- pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
- state->filename, state->mapped_tblspc_path);
- exit(1);
- }
- }
- else
- {
- pg_log_error("unrecognized link indicator \"%c\"",
- copybuf[156]);
- exit(1);
- }
- return; /* directory or link handled */
- }
-
- /*
- * regular file
- */
- state->file = fopen(state->filename, "wb");
- if (!state->file)
- {
- pg_log_error("could not create file \"%s\": %m", state->filename);
- exit(1);
- }
-
-#ifndef WIN32
- if (chmod(state->filename, (mode_t) filemode))
- {
- pg_log_error("could not set permissions on file \"%s\": %m",
- state->filename);
- exit(1);
- }
-#endif
-
- if (state->current_len_left == 0)
- {
- /*
- * Done with this file, next one will be a new tar header
- */
- fclose(state->file);
- state->file = NULL;
- return;
- }
- } /* new file */
- else
- {
- /*
- * Continuing blocks in existing file
- */
- if (state->current_len_left == 0 && r == state->current_padding)
- {
- /*
- * Received the padding block for this file, ignore it and close
- * the file, then move on to the next tar header.
- */
- fclose(state->file);
- state->file = NULL;
- totaldone += r;
- return;
- }
-
- errno = 0;
- if (fwrite(copybuf, r, 1, state->file) != 1)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to file \"%s\": %m", state->filename);
- exit(1);
- }
- totaldone += r;
- progress_report(state->tablespacenum, state->filename, false, false);
-
- state->current_len_left -= r;
- if (state->current_len_left == 0 && state->current_padding == 0)
- {
- /*
- * Received the last block, and there is no padding to be
- * expected. Close the file and move on to the next tar header.
- */
- fclose(state->file);
- state->file = NULL;
- return;
- }
- } /* continuing data in existing file */
-}
-
/*
* Receive the backup manifest file and write it out to a file.
*/
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
+ /* Receive a tar file for each tablespace in turn */
for (i = 0; i < PQntuples(res); i++)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
+ char archive_name[MAXPGPATH];
+ char *spclocation;
+
+ /*
+ * If we write the data out to a tar file, it will be named base.tar
+ * if it's the main data directory or <tablespaceoid>.tar if it's for
+ * another tablespace. CreateBackupStreamer() will arrange to add .gz
+ * to the archive name if pg_basebackup is performing compression.
+ */
+ if (PQgetisnull(res, i, 0))
+ {
+ strlcpy(archive_name, "base.tar", sizeof(archive_name));
+ spclocation = NULL;
+ }
else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ {
+ snprintf(archive_name, sizeof(archive_name),
+ "%s.tar", PQgetvalue(res, i, 0));
+ spclocation = PQgetvalue(res, i, 1);
+ }
+
+ ReceiveTarFile(conn, archive_name, spclocation, i);
+ }
/*
* Now receive backup manifest, if appropriate.
ReceiveBackupManifest(conn);
if (showprogress)
- progress_report(PQntuples(res), NULL, true, true);
+ {
+ progress_filename = NULL;
+ progress_report(PQntuples(res), true, true);
+ }
PQclear(res);