#include "receivelog.h"
#include "streamutil.h"
-/* fd and filename for currently open WAL file */
+/* currently open WAL file */
static Walfile *walfile = NULL;
-static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
* Open a new WAL file in the specified directory.
*
* Returns true if OK; on failure, returns false after printing an error msg.
- * On success, 'walfile' is set to the FD for the file, and the base filename
- * (without partial_suffix) is stored in 'current_walfile_name'.
+ * On success, 'walfile' is set to the opened WAL file.
*
* The file will be padded to 16Mb with zeroes.
*/
char *fn;
ssize_t size;
XLogSegNo segno;
+ char walfile_name[MAXPGPATH];
XLByteToSeg(startpoint, segno, WalSegSz);
- XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
+ XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(current_walfile_name,
+ fn = stream->walmethod->get_file_name(walfile_name,
stream->partial_suffix);
/*
if (size == WalSegSz)
{
/* Already padded file. Open it for use */
- f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+ f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0);
if (f == NULL)
{
pg_log_error("could not open existing write-ahead log file \"%s\": %s",
/* No file existed, so create one */
- f = stream->walmethod->open_for_write(current_walfile_name,
+ f = stream->walmethod->open_for_write(walfile_name,
stream->partial_suffix, WalSegSz);
if (f == NULL)
{
char *fn;
off_t currpos;
int r;
+ char walfile_name[MAXPGPATH];
if (walfile == NULL)
return true;
+ strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
+ currpos = walfile->currpos;
+
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(current_walfile_name,
+ fn = stream->walmethod->get_file_name(walfile_name,
stream->partial_suffix);
- currpos = stream->walmethod->get_current_pos(walfile);
-
- if (currpos == -1)
- {
- pg_log_error("could not determine seek position in file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
- stream->walmethod->close(walfile, CLOSE_UNLINK);
- walfile = NULL;
-
- pg_free(fn);
- return false;
- }
-
if (stream->partial_suffix)
{
if (currpos == WalSegSz)
if (currpos == WalSegSz && stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(stream, current_walfile_name))
+ if (!mark_file_as_archived(stream, walfile_name))
return false;
}
error:
if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
pg_log_error("could not close file \"%s\": %s",
- current_walfile_name, stream->walmethod->getlasterror());
+ walfile->pathname, stream->walmethod->getlasterror());
walfile = NULL;
return false;
}
{
if (stream->walmethod->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- current_walfile_name, stream->walmethod->getlasterror());
+ walfile->pathname, stream->walmethod->getlasterror());
lastFlushPosition = blockpos;
/*
*/
if (stream->walmethod->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- current_walfile_name, stream->walmethod->getlasterror());
+ walfile->pathname, stream->walmethod->getlasterror());
lastFlushPosition = blockpos;
}
else
{
/* More data in existing segment */
- if (stream->walmethod->get_current_pos(walfile) != xlogoff)
+ if (walfile->currpos != xlogoff)
{
pg_log_error("got WAL data offset %08x, expected %08x",
- xlogoff, (int) stream->walmethod->get_current_pos(walfile));
+ xlogoff, (int) walfile->currpos);
return false;
}
}
bytes_to_write) != bytes_to_write)
{
pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
- bytes_to_write, current_walfile_name,
+ bytes_to_write, walfile->pathname,
stream->walmethod->getlasterror());
return false;
}
*/
typedef struct DirectoryMethodFile
{
+ Walfile base;
int fd;
- off_t currpos;
- char *pathname;
char *fullpath;
char *temp_suffix;
#ifdef HAVE_LIBZ
return filename;
}
-static Walfile
+static Walfile *
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
char tmppath[MAXPGPATH];
}
#endif
+ f->base.currpos = 0;
+ f->base.pathname = pg_strdup(pathname);
f->fd = fd;
- f->currpos = 0;
- f->pathname = pg_strdup(pathname);
f->fullpath = pg_strdup(tmppath);
if (temp_suffix)
f->temp_suffix = pg_strdup(temp_suffix);
- return f;
+ return &f->base;
}
static ssize_t
-dir_write(Walfile f, const void *buf, size_t count)
+dir_write(Walfile *f, const void *buf, size_t count)
{
ssize_t r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
}
}
if (r > 0)
- df->currpos += r;
+ df->base.currpos += r;
return r;
}
-static off_t
-dir_get_current_pos(Walfile f)
-{
- Assert(f != NULL);
- dir_clear_error();
-
- /* Use a cached value to prevent lots of reseeks */
- return ((DirectoryMethodFile *) f)->currpos;
-}
-
static int
-dir_close(Walfile f, WalCloseMethod method)
+dir_close(Walfile *f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- filename = dir_get_file_name(df->pathname, df->temp_suffix);
+ filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
/* permanent name, so no need for the prefix */
- filename2 = dir_get_file_name(df->pathname, NULL);
+ filename2 = dir_get_file_name(df->base.pathname, NULL);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, filename2);
pg_free(filename2);
char *filename;
/* Unlink the file once it's closed */
- filename = dir_get_file_name(df->pathname, df->temp_suffix);
+ filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
LZ4F_freeCompressionContext(df->ctx);
#endif
- pg_free(df->pathname);
+ pg_free(df->base.pathname);
pg_free(df->fullpath);
pg_free(df->temp_suffix);
pg_free(df);
}
static int
-dir_sync(Walfile f)
+dir_sync(Walfile *f)
{
int r;
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = dir_open_for_write;
method->write = dir_write;
- method->get_current_pos = dir_get_current_pos;
method->get_file_size = dir_get_file_size;
method->get_file_name = dir_get_file_name;
method->compression_algorithm = dir_compression_algorithm;
typedef struct TarMethodFile
{
+ Walfile base;
off_t ofs_start; /* Where does the *header* for this file start */
- off_t currpos;
char header[TAR_BLOCK_SIZE];
- char *pathname;
size_t pad_to_size;
} TarMethodFile;
#endif
static ssize_t
-tar_write(Walfile f, const void *buf, size_t count)
+tar_write(Walfile *f, const void *buf, size_t count)
{
ssize_t r;
tar_data->lasterrno = errno ? errno : ENOSPC;
return -1;
}
- ((TarMethodFile *) f)->currpos += r;
+ f->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
{
if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
return -1;
- ((TarMethodFile *) f)->currpos += count;
+ f->currpos += count;
return count;
}
#endif
while (bytesleft)
{
size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
- ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
+ ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite);
if (r < 0)
return false;
return filename;
}
-static Walfile
+static Walfile *
tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
char *tmppath;
tar_data->currentfile = NULL;
return NULL;
}
- tar_data->currentfile->currpos = 0;
+ tar_data->currentfile->base.currpos = 0;
if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
{
Assert(false);
}
- tar_data->currentfile->pathname = pg_strdup(pathname);
+ tar_data->currentfile->base.pathname = pg_strdup(pathname);
/*
* Uncompressed files are padded on creation, but for compression we can't
return NULL;
}
- tar_data->currentfile->currpos = 0;
+ tar_data->currentfile->base.currpos = 0;
}
}
- return tar_data->currentfile;
+ return &tar_data->currentfile->base;
}
static ssize_t
return tar_data->compression_algorithm;
}
-static off_t
-tar_get_current_pos(Walfile f)
-{
- Assert(f != NULL);
- tar_clear_error();
-
- return ((TarMethodFile *) f)->currpos;
-}
-
static int
-tar_sync(Walfile f)
+tar_sync(Walfile *f)
{
int r;
}
static int
-tar_close(Walfile f, WalCloseMethod method)
+tar_close(Walfile *f, WalCloseMethod method)
{
ssize_t filesize;
int padding;
return -1;
}
- pg_free(tf->pathname);
+ pg_free(tf->base.pathname);
pg_free(tf);
tar_data->currentfile = NULL;
* A compressed tarfile is padded on close since we cannot know
* the size of the compressed output until the end.
*/
- size_t sizeleft = tf->pad_to_size - tf->currpos;
+ size_t sizeleft = tf->pad_to_size - tf->base.currpos;
if (sizeleft)
{
* An uncompressed tarfile was padded on creation, so just adjust
* the current position as if we seeked to the end.
*/
- tf->currpos = tf->pad_to_size;
+ tf->base.currpos = tf->pad_to_size;
}
}
* Get the size of the file, and pad out to a multiple of the tar block
* size.
*/
- filesize = tar_get_current_pos(f);
+ filesize = f->currpos;
padding = tarPaddingBytesRequired(filesize);
if (padding)
{
* We overwrite it with what it was before if we have no tempname,
* since we're going to write the buffer anyway.
*/
- strlcpy(&(tf->header[0]), tf->pathname, 100);
+ strlcpy(&(tf->header[0]), tf->base.pathname, 100);
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
{
/* XXX this seems pretty bogus; why is only this case fatal? */
pg_fatal("could not fsync file \"%s\": %s",
- tf->pathname, tar_getlasterror());
+ tf->base.pathname, tar_getlasterror());
}
/* Clean up and done */
- pg_free(tf->pathname);
+ pg_free(tf->base.pathname);
pg_free(tf);
tar_data->currentfile = NULL;
if (tar_data->currentfile)
{
- if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+ if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
return false;
}
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = tar_open_for_write;
method->write = tar_write;
- method->get_current_pos = tar_get_current_pos;
method->get_file_size = tar_get_file_size;
method->get_file_name = tar_get_file_name;
method->compression_algorithm = tar_compression_algorithm;
#include "common/compression.h"
-typedef void *Walfile;
+struct WalWriteMethod;
+typedef struct WalWriteMethod WalWriteMethod;
+
+typedef struct
+{
+ off_t currpos;
+ char *pathname;
+ /*
+ * MORE DATA FOLLOWS AT END OF STRUCT
+ *
+ * Each WalWriteMethod is expected to embed this as the first member of
+ * a larger struct with method-specific fields following.
+ */
+} Walfile;
typedef enum
{
* care not to clobber errno between a failed method call and use of
* getlasterror() to retrieve the message.
*/
-typedef struct WalWriteMethod WalWriteMethod;
struct WalWriteMethod
{
/*
* automatically renamed in close(). If pad_to_size is specified, the file
* will be padded with NUL up to that size, if supported by the Walmethod.
*/
- Walfile (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+ Walfile *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
/*
* Close an open Walfile, using one or more methods for handling automatic
* unlinking etc. Returns 0 on success, other values for error.
*/
- int (*close) (Walfile f, WalCloseMethod method);
+ int (*close) (Walfile *f, WalCloseMethod method);
/* Check if a file exist */
bool (*existsfile) (const char *pathname);
* Write count number of bytes to the file, and return the number of bytes
* actually written or -1 for error.
*/
- ssize_t (*write) (Walfile f, const void *buf, size_t count);
-
- /* Return the current position in a file or -1 on error */
- off_t (*get_current_pos) (Walfile f);
+ ssize_t (*write) (Walfile *f, const void *buf, size_t count);
/*
* fsync the contents of the specified file. Returns 0 on success.
*/
- int (*sync) (Walfile f);
+ int (*sync) (Walfile *f);
/*
* Clean up the Walmethod, closing any shared resources. For methods like