</variablelist>
</sect2>
+ <sect2 id="runtime-config-wal-summarization">
+ <title>WAL Summarization</title>
+
+ <!--
+ <para>
+ These settings control WAL summarization, a feature which must be
+ enabled in order to perform an
+ <link linkend="backup-incremental-backup">incremental backup</link>.
+ </para>
+ -->
+
+ <variablelist>
+ <varlistentry id="guc-summarize-wal" xreflabel="summarize_wal">
+ <term><varname>summarize_wal</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>summarize_wal</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables the WAL summarizer process. Note that WAL summarization can
+ be enabled either on a primary or on a standby. WAL summarization
+ cannot be enabled when <varname>wal_level</varname> is set to
+ <literal>minimal</literal>. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command line.
+ The default is <literal>off</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-wal-summary-keep-time" xreflabel="wal_summary_keep_time">
+ <term><varname>wal_summary_keep_time</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>wal_summary_keep_time</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Configures the amount of time after which the WAL summarizer
+ automatically removes old WAL summaries. The file timestamp is used to
+ determine which files are old enough to remove. Typically, you should set
+ this comfortably higher than the time that could pass between a backup
+ and a later incremental backup that depends on it. WAL summaries must
+ be available for the entire range of WAL records between the preceding
+ backup and the new one being taken; if not, the incremental backup will
+ fail. If this parameter is set to zero, WAL summaries will not be
+ automatically deleted, but it is safe to manually remove files that you
+ know will not be required for future incremental backups.
+ This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command line.
+ The default is 10 days. If <literal>summarize_wal = off</literal>,
+ existing WAL summaries will not be removed regardless of the value of
+ this parameter, because the WAL summarizer will not run.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ </sect2>
+
</sect1>
<sect1 id="runtime-config-replication">
#include "port/pg_iovec.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "postmaster/walsummarizer.h"
#include "postmaster/walwriter.h"
#include "replication/logical.h"
#include "replication/origin.h"
return lastRemovedSegNo;
}
+/*
+ * Return the oldest WAL segment on the given TLI that still exists in
+ * XLOGDIR, or 0 if none.
+ */
+XLogSegNo
+XLogGetOldestSegno(TimeLineID tli)
+{
+ DIR *xldir;
+ struct dirent *xlde;
+ XLogSegNo oldest_segno = 0;
+
+ xldir = AllocateDir(XLOGDIR);
+ while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+ {
+ TimeLineID file_tli;
+ XLogSegNo file_segno;
+
+ /* Ignore files that are not XLOG segments. */
+ if (!IsXLogFileName(xlde->d_name))
+ continue;
+
+ /* Parse filename to get TLI and segno. */
+ XLogFromFileName(xlde->d_name, &file_tli, &file_segno,
+ wal_segment_size);
+
+ /* Ignore anything that's not from the TLI of interest. */
+ if (tli != file_tli)
+ continue;
+
+ /* If it's the oldest so far, update oldest_segno. */
+ if (oldest_segno == 0 || file_segno < oldest_segno)
+ oldest_segno = file_segno;
+ }
+
+ FreeDir(xldir);
+ return oldest_segno;
+}
/*
* Update the last removed segno pointer in shared memory, to reflect that the
}
/*
- * Verify whether pg_wal and pg_wal/archive_status exist.
- * If the latter does not exist, recreate it.
+ * Verify whether pg_wal, pg_wal/archive_status, and pg_wal/summaries exist.
+ * If the latter do not exist, recreate them.
*
* It is not the goal of this function to verify the contents of these
* directories, but to help in cases where someone has performed a cluster
(errmsg("could not create missing directory \"%s\": %m",
path)));
}
+
+ /* Check for summaries */
+ snprintf(path, MAXPGPATH, XLOGDIR "/summaries");
+ if (stat(path, &stat_buf) == 0)
+ {
+ /* Check for weird cases where it exists but isn't a directory */
+ if (!S_ISDIR(stat_buf.st_mode))
+ ereport(FATAL,
+ (errmsg("required WAL directory \"%s\" does not exist",
+ path)));
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("creating missing WAL directory \"%s\"", path)));
+ if (MakePGDirectory(path) < 0)
+ ereport(FATAL,
+ (errmsg("could not create missing directory \"%s\": %m",
+ path)));
+ }
}
/*
#endif
/*
- * Verify that pg_wal and pg_wal/archive_status exist. In cases where
- * someone has performed a copy for PITR, these directories may have been
- * excluded and need to be re-created.
+ * Verify that pg_wal, pg_wal/archive_status, and pg_wal/summaries exist.
+ * In cases where someone has performed a copy for PITR, these directories
+ * may have been excluded and need to be re-created.
*/
ValidateXLOGDirectoryStructure();
*/
END_CRIT_SECTION();
+ /*
+ * WAL summaries end when the next XLOG_CHECKPOINT_REDO or
+ * XLOG_CHECKPOINT_SHUTDOWN record is reached. This is the first point
+ * where (a) we're not inside of a critical section and (b) we can be
+ * certain that the relevant record has been flushed to disk, which must
+ * happen before it can be summarized.
+ *
+ * If this is a shutdown checkpoint, then this happens reasonably
+ * promptly: we've only just inserted and flushed the
+ * XLOG_CHECKPOINT_SHUTDOWN record. If this is not a shutdown checkpoint,
+ * then this might not be very prompt at all: the XLOG_CHECKPOINT_REDO
+ * record was written before we began flushing data to disk, and that
+ * could be many minutes ago at this point. However, we don't XLogFlush()
+ * after inserting that record, so we're not guaranteed that it's on disk
+ * until after the above call that flushes the XLOG_CHECKPOINT_ONLINE
+ * record.
+ */
+ SetWalSummarizerLatch();
+
/*
* Let smgr do post-checkpoint cleanup (eg, deleting old files).
*/
}
}
+ /*
+ * If WAL summarization is in use, don't remove WAL that has yet to be
+ * summarized.
+ */
+ keep = GetOldestUnsummarizedLSN(NULL, NULL, false);
+ if (keep != InvalidXLogRecPtr)
+ {
+ XLogSegNo unsummarized_segno;
+
+ XLByteToSeg(keep, unsummarized_segno, wal_segment_size);
+ if (unsummarized_segno < segno)
+ segno = unsummarized_segno;
+ }
+
/* but, keep at least wal_keep_size if that's set */
if (wal_keep_size_mb > 0)
{
basebackup_server.o \
basebackup_sink.o \
basebackup_target.o \
- basebackup_throttle.o
+ basebackup_throttle.o \
+ walsummary.o \
+ walsummaryfuncs.o
include $(top_srcdir)/src/backend/common.mk
'basebackup_target.c',
'basebackup_throttle.c',
'basebackup_zstd.c',
+ 'walsummary.c',
+ 'walsummaryfuncs.c',
)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walsummary.c
+ * Functions for accessing and managing WAL summary data.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/backend/backup/walsummary.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "backup/walsummary.h"
+#include "utils/wait_event.h"
+
+static bool IsWalSummaryFilename(char *filename);
+static int ListComparatorForWalSummaryFiles(const ListCell *a,
+ const ListCell *b);
+
+/*
+ * Get a list of WAL summaries.
+ *
+ * If tli != 0, only WAL summaries with the indicated TLI will be included.
+ *
+ * If start_lsn != InvalidXLogRecPtr, only summaries that end after the
+ * indicated LSN will be included.
+ *
+ * If end_lsn != InvalidXLogRecPtr, only summaries that start before the
+ * indicated LSN will be included.
+ *
+ * The intent is that you can call GetWalSummaries(tli, start_lsn, end_lsn)
+ * to get all WAL summaries on the indicated timeline that overlap the
+ * specified LSN range.
+ */
+List *
+GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+{
+ DIR *sdir;
+ struct dirent *dent;
+ List *result = NIL;
+
+ sdir = AllocateDir(XLOGDIR "/summaries");
+ while ((dent = ReadDir(sdir, XLOGDIR "/summaries")) != NULL)
+ {
+ WalSummaryFile *ws;
+ uint32 tmp[5];
+ TimeLineID file_tli;
+ XLogRecPtr file_start_lsn;
+ XLogRecPtr file_end_lsn;
+
+ /* Decode filename, or skip if it's not in the expected format. */
+ if (!IsWalSummaryFilename(dent->d_name))
+ continue;
+ sscanf(dent->d_name, "%08X%08X%08X%08X%08X",
+ &tmp[0], &tmp[1], &tmp[2], &tmp[3], &tmp[4]);
+ file_tli = tmp[0];
+ file_start_lsn = ((uint64) tmp[1]) << 32 | tmp[2];
+ file_end_lsn = ((uint64) tmp[3]) << 32 | tmp[4];
+
+ /* Skip if it doesn't match the filter criteria. */
+ if (tli != 0 && tli != file_tli)
+ continue;
+ if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn >= file_end_lsn)
+ continue;
+ if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn <= file_start_lsn)
+ continue;
+
+ /* Add it to the list. */
+ ws = palloc(sizeof(WalSummaryFile));
+ ws->tli = file_tli;
+ ws->start_lsn = file_start_lsn;
+ ws->end_lsn = file_end_lsn;
+ result = lappend(result, ws);
+ }
+ FreeDir(sdir);
+
+ return result;
+}
+
+/*
+ * Build a new list of WAL summaries based on an existing list, but filtering
+ * out summaries that don't match the search parameters.
+ *
+ * If tli != 0, only WAL summaries with the indicated TLI will be included.
+ *
+ * If start_lsn != InvalidXLogRecPtr, only summaries that end after the
+ * indicated LSN will be included.
+ *
+ * If end_lsn != InvalidXLogRecPtr, only summaries that start before the
+ * indicated LSN will be included.
+ */
+List *
+FilterWalSummaries(List *wslist, TimeLineID tli,
+ XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+{
+ List *result = NIL;
+ ListCell *lc;
+
+ /* Loop over input. */
+ foreach(lc, wslist)
+ {
+ WalSummaryFile *ws = lfirst(lc);
+
+ /* Skip if it doesn't match the filter criteria. */
+ if (tli != 0 && tli != ws->tli)
+ continue;
+ if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn > ws->end_lsn)
+ continue;
+ if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn < ws->start_lsn)
+ continue;
+
+ /* Add it to the result list. */
+ result = lappend(result, ws);
+ }
+
+ return result;
+}
+
+/*
+ * Check whether the supplied list of WalSummaryFile objects covers the
+ * whole range of LSNs from start_lsn to end_lsn. This function ignores
+ * timelines, so the caller should probably filter using the appropriate
+ * timeline before calling this.
+ *
+ * If the whole range of LSNs is covered, returns true, otherwise false.
+ * If false is returned, *missing_lsn is set either to InvalidXLogRecPtr
+ * if there are no WAL summary files in the input list, or to the first LSN
+ * in the range that is not covered by a WAL summary file in the input list.
+ */
+bool
+WalSummariesAreComplete(List *wslist, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn, XLogRecPtr *missing_lsn)
+{
+ XLogRecPtr current_lsn = start_lsn;
+ ListCell *lc;
+
+ /* Special case for empty list. */
+ if (wslist == NIL)
+ {
+ *missing_lsn = InvalidXLogRecPtr;
+ return false;
+ }
+
+ /* Make a private copy of the list and sort it by start LSN. */
+ wslist = list_copy(wslist);
+ list_sort(wslist, ListComparatorForWalSummaryFiles);
+
+ /*
+ * Consider summary files in order of increasing start_lsn, advancing the
+ * known-summarized range from start_lsn toward end_lsn.
+ *
+ * Normally, the summary files should cover non-overlapping WAL ranges,
+ * but this algorithm is intended to be correct even in case of overlap.
+ */
+ foreach(lc, wslist)
+ {
+ WalSummaryFile *ws = lfirst(lc);
+
+ if (ws->start_lsn > current_lsn)
+ {
+ /* We found a gap. */
+ break;
+ }
+ if (ws->end_lsn > current_lsn)
+ {
+ /*
+ * Next summary extends beyond end of previous summary, so extend
+ * the end of the range known to be summarized.
+ */
+ current_lsn = ws->end_lsn;
+
+ /*
+ * If the range we know to be summarized has reached the required
+ * end LSN, we have proved completeness.
+ */
+ if (current_lsn >= end_lsn)
+ return true;
+ }
+ }
+
+ /*
+ * We either ran out of summary files without reaching the end LSN, or we
+ * hit a gap in the sequence that resulted in us bailing out of the loop
+ * above.
+ */
+ *missing_lsn = current_lsn;
+ return false;
+}
+
+/*
+ * Open a WAL summary file.
+ *
+ * This will throw an error in case of trouble. As an exception, if
+ * missing_ok = true and the trouble is specifically that the file does
+ * not exist, it will not throw an error and will return a value less than 0.
+ */
+File
+OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok)
+{
+ char path[MAXPGPATH];
+ File file;
+
+ snprintf(path, MAXPGPATH,
+ XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
+ ws->tli,
+ LSN_FORMAT_ARGS(ws->start_lsn),
+ LSN_FORMAT_ARGS(ws->end_lsn));
+
+ file = PathNameOpenFile(path, O_RDONLY);
+ if (file < 0 && (errno != EEXIST || !missing_ok))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ return file;
+}
+
+/*
+ * Remove a WAL summary file if the last modification time precedes the
+ * cutoff time.
+ */
+void
+RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
+{
+ char path[MAXPGPATH];
+ struct stat statbuf;
+
+ snprintf(path, MAXPGPATH,
+ XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
+ ws->tli,
+ LSN_FORMAT_ARGS(ws->start_lsn),
+ LSN_FORMAT_ARGS(ws->end_lsn));
+
+ if (lstat(path, &statbuf) != 0)
+ {
+ if (errno == ENOENT)
+ return;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", path)));
+ }
+ if (statbuf.st_mtime >= cutoff_time)
+ return;
+ if (unlink(path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", path)));
+ ereport(DEBUG2,
+ (errmsg_internal("removing file \"%s\"", path)));
+}
+
+/*
+ * Test whether a filename looks like a WAL summary file.
+ */
+static bool
+IsWalSummaryFilename(char *filename)
+{
+ return strspn(filename, "0123456789ABCDEF") == 40 &&
+ strcmp(filename + 40, ".summary") == 0;
+}
+
+/*
+ * Data read callback for use with CreateBlockRefTableReader.
+ */
+int
+ReadWalSummary(void *wal_summary_io, void *data, int length)
+{
+ WalSummaryIO *io = wal_summary_io;
+ int nbytes;
+
+ nbytes = FileRead(io->file, data, length, io->filepos,
+ WAIT_EVENT_WAL_SUMMARY_READ);
+ if (nbytes < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ FilePathName(io->file))));
+
+ io->filepos += nbytes;
+ return nbytes;
+}
+
+/*
+ * Data write callback for use with WriteBlockRefTable.
+ */
+int
+WriteWalSummary(void *wal_summary_io, void *data, int length)
+{
+ WalSummaryIO *io = wal_summary_io;
+ int nbytes;
+
+ nbytes = FileWrite(io->file, data, length, io->filepos,
+ WAIT_EVENT_WAL_SUMMARY_WRITE);
+ if (nbytes < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ FilePathName(io->file))));
+ if (nbytes != length)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
+ FilePathName(io->file), nbytes,
+ length, (unsigned) io->filepos),
+ errhint("Check free disk space.")));
+
+ io->filepos += nbytes;
+ return nbytes;
+}
+
+/*
+ * Error-reporting callback for use with CreateBlockRefTableReader.
+ */
+void
+ReportWalSummaryError(void *callback_arg, char *fmt,...)
+{
+ StringInfoData buf;
+ va_list ap;
+ int needed;
+
+ initStringInfo(&buf);
+ for (;;)
+ {
+ va_start(ap, fmt);
+ needed = appendStringInfoVA(&buf, fmt, ap);
+ va_end(ap);
+ if (needed == 0)
+ break;
+ enlargeStringInfo(&buf, needed);
+ }
+ ereport(ERROR,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("%s", buf.data));
+}
+
+/*
+ * Comparator to sort a List of WalSummaryFile objects by start_lsn.
+ */
+static int
+ListComparatorForWalSummaryFiles(const ListCell *a, const ListCell *b)
+{
+ WalSummaryFile *ws1 = lfirst(a);
+ WalSummaryFile *ws2 = lfirst(b);
+
+ if (ws1->start_lsn < ws2->start_lsn)
+ return -1;
+ if (ws1->start_lsn > ws2->start_lsn)
+ return 1;
+ return 0;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walsummaryfuncs.c
+ * SQL-callable functions for accessing WAL summary data.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/backend/backup/walsummaryfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "backup/walsummary.h"
+#include "common/blkreftable.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+
+#define NUM_WS_ATTS 3
+#define NUM_SUMMARY_ATTS 6
+#define MAX_BLOCKS_PER_CALL 256
+
+/*
+ * List the WAL summary files available in pg_wal/summaries.
+ */
+Datum
+pg_available_wal_summaries(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsi;
+ List *wslist;
+ ListCell *lc;
+ Datum values[NUM_WS_ATTS];
+ bool nulls[NUM_WS_ATTS];
+
+ InitMaterializedSRF(fcinfo, 0);
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ memset(nulls, 0, sizeof(nulls));
+
+ wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ foreach(lc, wslist)
+ {
+ WalSummaryFile *ws = (WalSummaryFile *) lfirst(lc);
+ HeapTuple tuple;
+
+ CHECK_FOR_INTERRUPTS();
+
+ values[0] = Int64GetDatum((int64) ws->tli);
+ values[1] = LSNGetDatum(ws->start_lsn);
+ values[2] = LSNGetDatum(ws->end_lsn);
+
+ tuple = heap_form_tuple(rsi->setDesc, values, nulls);
+ tuplestore_puttuple(rsi->setResult, tuple);
+ }
+
+ return (Datum) 0;
+}
+
+/*
+ * List the contents of a WAL summary file identified by TLI, start LSN,
+ * and end LSN.
+ */
+Datum
+pg_wal_summary_contents(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsi;
+ Datum values[NUM_SUMMARY_ATTS];
+ bool nulls[NUM_SUMMARY_ATTS];
+ WalSummaryFile ws;
+ WalSummaryIO io;
+ BlockRefTableReader *reader;
+ int64 raw_tli;
+ RelFileLocator rlocator;
+ ForkNumber forknum;
+ BlockNumber limit_block;
+
+ InitMaterializedSRF(fcinfo, 0);
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+ memset(nulls, 0, sizeof(nulls));
+
+ /*
+ * Since the timeline could at least in theory be more than 2^31, and
+ * since we don't have unsigned types at the SQL level, it is passed as a
+ * 64-bit integer. Test whether it's out of range.
+ */
+ raw_tli = PG_GETARG_INT64(0);
+ if (raw_tli < 1 || raw_tli > PG_INT32_MAX)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid timeline %lld", (long long) raw_tli));
+
+ /* Prepare to read the specified WAL summry file. */
+ ws.tli = (TimeLineID) raw_tli;
+ ws.start_lsn = PG_GETARG_LSN(1);
+ ws.end_lsn = PG_GETARG_LSN(2);
+ io.filepos = 0;
+ io.file = OpenWalSummaryFile(&ws, false);
+ reader = CreateBlockRefTableReader(ReadWalSummary, &io,
+ FilePathName(io.file),
+ ReportWalSummaryError, NULL);
+
+ /* Loop over relation forks. */
+ while (BlockRefTableReaderNextRelation(reader, &rlocator, &forknum,
+ &limit_block))
+ {
+ BlockNumber blocks[MAX_BLOCKS_PER_CALL];
+ HeapTuple tuple;
+
+ CHECK_FOR_INTERRUPTS();
+
+ values[0] = ObjectIdGetDatum(rlocator.relNumber);
+ values[1] = ObjectIdGetDatum(rlocator.spcOid);
+ values[2] = ObjectIdGetDatum(rlocator.dbOid);
+ values[3] = Int16GetDatum((int16) forknum);
+
+ /* Loop over blocks within the current relation fork. */
+ while (1)
+ {
+ unsigned nblocks;
+ unsigned i;
+
+ CHECK_FOR_INTERRUPTS();
+
+ nblocks = BlockRefTableReaderGetBlocks(reader, blocks,
+ MAX_BLOCKS_PER_CALL);
+ if (nblocks == 0)
+ break;
+
+ /*
+ * For each block that we specifically know to have been modified,
+ * emit a row with that block number and limit_block = false.
+ */
+ values[5] = BoolGetDatum(false);
+ for (i = 0; i < nblocks; ++i)
+ {
+ values[4] = Int64GetDatum((int64) blocks[i]);
+
+ tuple = heap_form_tuple(rsi->setDesc, values, nulls);
+ tuplestore_puttuple(rsi->setResult, tuple);
+ }
+
+ /*
+ * If the limit block is not InvalidBlockNumber, emit an exta row
+ * with that block number and limit_block = true.
+ *
+ * There is no point in doing this when the limit_block is
+ * InvalidBlockNumber, because no block with that number or any
+ * higher number can ever exist.
+ */
+ if (BlockNumberIsValid(limit_block))
+ {
+ values[4] = Int64GetDatum((int64) limit_block);
+ values[5] = BoolGetDatum(true);
+
+ tuple = heap_form_tuple(rsi->setDesc, values, nulls);
+ tuplestore_puttuple(rsi->setResult, tuple);
+ }
+ }
+ }
+
+ /* Cleanup */
+ DestroyBlockRefTableReader(reader);
+ FileClose(io.file);
+
+ return (Datum) 0;
+}
postmaster.o \
startup.o \
syslogger.o \
+ walsummarizer.o \
walwriter.o
include $(top_srcdir)/src/backend/common.mk
#include "postmaster/auxprocess.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "postmaster/walsummarizer.h"
#include "postmaster/walwriter.h"
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
case WalReceiverProcess:
MyBackendType = B_WAL_RECEIVER;
break;
+ case WalSummarizerProcess:
+ MyBackendType = B_WAL_SUMMARIZER;
+ break;
default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
MyBackendType = B_INVALID;
WalReceiverMain();
proc_exit(1);
+ case WalSummarizerProcess:
+ WalSummarizerMain();
+ proc_exit(1);
+
default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
proc_exit(1);
'postmaster.c',
'startup.c',
'syslogger.c',
+ 'walsummarizer.c',
'walwriter.c',
)
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
+#include "postmaster/walsummarizer.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
#include "storage/fd.h"
CheckpointerPID = 0,
WalWriterPID = 0,
WalReceiverPID = 0,
+ WalSummarizerPID = 0,
AutoVacPID = 0,
PgArchPID = 0,
SysLoggerPID = 0;
static pid_t StartChildProcess(AuxProcType type);
static void StartAutovacuumWorker(void);
static void MaybeStartWalReceiver(void);
+static void MaybeStartWalSummarizer(void);
static void InitPostmasterDeathWatchHandle(void);
/*
#define StartCheckpointer() StartChildProcess(CheckpointerProcess)
#define StartWalWriter() StartChildProcess(WalWriterProcess)
#define StartWalReceiver() StartChildProcess(WalReceiverProcess)
+#define StartWalSummarizer() StartChildProcess(WalSummarizerProcess)
/* Macros to check exit status of a child process */
#define EXIT_STATUS_0(st) ((st) == 0)
if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"replica\" or \"logical\"")));
+ if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL)
+ ereport(ERROR,
+ (errmsg("WAL cannot be summarized when wal_level is \"minimal\"")));
/*
* Other one-time internal sanity checks can go here, if they are fast.
if (WalReceiverRequested)
MaybeStartWalReceiver();
+ /* If we need to start a WAL summarizer, try to do that now */
+ MaybeStartWalSummarizer();
+
/* Get other worker processes running, if needed */
if (StartWorkerNeeded || HaveCrashedWorker)
maybe_start_bgworkers();
signal_child(WalWriterPID, SIGHUP);
if (WalReceiverPID != 0)
signal_child(WalReceiverPID, SIGHUP);
+ if (WalSummarizerPID != 0)
+ signal_child(WalSummarizerPID, SIGHUP);
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGHUP);
if (PgArchPID != 0)
BgWriterPID = StartBackgroundWriter();
if (WalWriterPID == 0)
WalWriterPID = StartWalWriter();
+ MaybeStartWalSummarizer();
/*
* Likewise, start other special children as needed. In a restart
continue;
}
+ /*
+ * Was it the wal summarizer? Normal exit can be ignored; we'll start
+ * a new one at the next iteration of the postmaster's main loop, if
+ * necessary. Any other exit condition is treated as a crash.
+ */
+ if (pid == WalSummarizerPID)
+ {
+ WalSummarizerPID = 0;
+ if (!EXIT_STATUS_0(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("WAL summarizer process"));
+ continue;
+ }
+
/*
* Was it the autovacuum launcher? Normal exit can be ignored; we'll
* start a new one at the next iteration of the postmaster's main
else if (WalReceiverPID != 0 && take_action)
sigquit_child(WalReceiverPID);
+ /* Take care of the walsummarizer too */
+ if (pid == WalSummarizerPID)
+ WalSummarizerPID = 0;
+ else if (WalSummarizerPID != 0 && take_action)
+ sigquit_child(WalSummarizerPID);
+
/* Take care of the autovacuum launcher too */
if (pid == AutoVacPID)
AutoVacPID = 0;
signal_child(StartupPID, SIGTERM);
if (WalReceiverPID != 0)
signal_child(WalReceiverPID, SIGTERM);
+ if (WalSummarizerPID != 0)
+ signal_child(WalSummarizerPID, SIGTERM);
/* checkpointer, archiver, stats, and syslogger may continue for now */
/* Now transition to PM_WAIT_BACKENDS state to wait for them to die */
if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 &&
StartupPID == 0 &&
WalReceiverPID == 0 &&
+ WalSummarizerPID == 0 &&
BgWriterPID == 0 &&
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
/* These other guys should be dead already */
Assert(StartupPID == 0);
Assert(WalReceiverPID == 0);
+ Assert(WalSummarizerPID == 0);
Assert(BgWriterPID == 0);
Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0);
signal_child(WalWriterPID, signal);
if (WalReceiverPID != 0)
signal_child(WalReceiverPID, signal);
+ if (WalSummarizerPID != 0)
+ signal_child(WalSummarizerPID, signal);
if (AutoVacPID != 0)
signal_child(AutoVacPID, signal);
if (PgArchPID != 0)
ereport(LOG,
(errmsg("could not fork WAL receiver process: %m")));
break;
+ case WalSummarizerProcess:
+ ereport(LOG,
+ (errmsg("could not fork WAL summarizer process: %m")));
+ break;
default:
ereport(LOG,
(errmsg("could not fork process: %m")));
}
}
+/*
+ * MaybeStartWalSummarizer
+ * Start the WAL summarizer process, if not running and our state allows.
+ */
+static void
+MaybeStartWalSummarizer(void)
+{
+ if (summarize_wal && WalSummarizerPID == 0 &&
+ (pmState == PM_RUN || pmState == PM_HOT_STANDBY) &&
+ Shutdown <= SmartShutdown)
+ WalSummarizerPID = StartWalSummarizer();
+}
+
/*
* Create the opts file
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walsummarizer.c
+ *
+ * Background process to perform WAL summarization, if it is enabled.
+ * It continuously scans the write-ahead log and periodically emits a
+ * summary file which indicates which blocks in which relation forks
+ * were modified by WAL records in the LSN range covered by the summary
+ * file. See walsummary.c and blkreftable.c for more details on the
+ * naming and contents of WAL summary files.
+ *
+ * If configured to do, this background process will also remove WAL
+ * summary files when the file timestamp is older than a configurable
+ * threshold (but only if the WAL has been removed first).
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/walsummarizer.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/timeline.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "backup/walsummary.h"
+#include "catalog/storage_xlog.h"
+#include "common/blkreftable.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/walsummarizer.h"
+#include "replication/walreceiver.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/wait_event.h"
+
+/*
+ * Data in shared memory related to WAL summarization.
+ */
+typedef struct
+{
+ /*
+ * These fields are protected by WALSummarizerLock.
+ *
+ * Until we've discovered what summary files already exist on disk and
+ * stored that information in shared memory, initialized is false and the
+ * other fields here contain no meaningful information. After that has
+ * been done, initialized is true.
+ *
+ * summarized_tli and summarized_lsn indicate the last LSN and TLI at
+ * which the next summary file will start. Normally, these are the LSN and
+ * TLI at which the last file ended; in such case, lsn_is_exact is true.
+ * If, however, the LSN is just an approximation, then lsn_is_exact is
+ * false. This can happen if, for example, there are no existing WAL
+ * summary files at startup. In that case, we have to derive the position
+ * at which to start summarizing from the WAL files that exist on disk,
+ * and so the LSN might point to the start of the next file even though
+ * that might happen to be in the middle of a WAL record.
+ *
+ * summarizer_pgprocno is the pgprocno value for the summarizer process,
+ * if one is running, or else INVALID_PGPROCNO.
+ *
+ * pending_lsn is used by the summarizer to advertise the ending LSN of a
+ * record it has recently read. It shouldn't ever be less than
+ * summarized_lsn, but might be greater, because the summarizer buffers
+ * data for a range of LSNs in memory before writing out a new file.
+ */
+ bool initialized;
+ TimeLineID summarized_tli;
+ XLogRecPtr summarized_lsn;
+ bool lsn_is_exact;
+ int summarizer_pgprocno;
+ XLogRecPtr pending_lsn;
+
+ /*
+ * This field handles its own synchronizaton.
+ */
+ ConditionVariable summary_file_cv;
+} WalSummarizerData;
+
+/*
+ * Private data for our xlogreader's page read callback.
+ */
+typedef struct
+{
+ TimeLineID tli;
+ bool historic;
+ XLogRecPtr read_upto;
+ bool end_of_wal;
+} SummarizerReadLocalXLogPrivate;
+
+/* Pointer to shared memory state. */
+static WalSummarizerData *WalSummarizerCtl;
+
+/*
+ * When we reach end of WAL and need to read more, we sleep for a number of
+ * milliseconds that is a integer multiple of MS_PER_SLEEP_QUANTUM. This is
+ * the multiplier. It should vary between 1 and MAX_SLEEP_QUANTA, depending
+ * on system activity. See summarizer_wait_for_wal() for how we adjust this.
+ */
+static long sleep_quanta = 1;
+
+/*
+ * The sleep time will always be a multiple of 200ms and will not exceed
+ * thirty seconds (150 * 200 = 30 * 1000). Note that the timeout here needs
+ * to be substntially less than the maximum amount of time for which an
+ * incremental backup will wait for this process to catch up. Otherwise, an
+ * incremental backup might time out on an idle system just because we sleep
+ * for too long.
+ */
+#define MAX_SLEEP_QUANTA 150
+#define MS_PER_SLEEP_QUANTUM 200
+
+/*
+ * This is a count of the number of pages of WAL that we've read since the
+ * last time we waited for more WAL to appear.
+ */
+static long pages_read_since_last_sleep = 0;
+
+/*
+ * Most recent RedoRecPtr value observed by MaybeRemoveOldWalSummaries.
+ */
+static XLogRecPtr redo_pointer_at_last_summary_removal = InvalidXLogRecPtr;
+
+/*
+ * GUC parameters
+ */
+bool summarize_wal = false;
+int wal_summary_keep_time = 10 * 24 * 60;
+
+static XLogRecPtr GetLatestLSN(TimeLineID *tli);
+static void HandleWalSummarizerInterrupts(void);
+static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn,
+ bool exact, XLogRecPtr switch_lsn,
+ XLogRecPtr maximum_lsn);
+static void SummarizeSmgrRecord(XLogReaderState *xlogreader,
+ BlockRefTable *brtab);
+static void SummarizeXactRecord(XLogReaderState *xlogreader,
+ BlockRefTable *brtab);
+static bool SummarizeXlogRecord(XLogReaderState *xlogreader);
+static int summarizer_read_local_xlog_page(XLogReaderState *state,
+ XLogRecPtr targetPagePtr,
+ int reqLen,
+ XLogRecPtr targetRecPtr,
+ char *cur_page);
+static void summarizer_wait_for_wal(void);
+static void MaybeRemoveOldWalSummaries(void);
+
+/*
+ * Amount of shared memory required for this module.
+ */
+Size
+WalSummarizerShmemSize(void)
+{
+ return sizeof(WalSummarizerData);
+}
+
+/*
+ * Create or attach to shared memory segment for this module.
+ */
+void
+WalSummarizerShmemInit(void)
+{
+ bool found;
+
+ WalSummarizerCtl = (WalSummarizerData *)
+ ShmemInitStruct("Wal Summarizer Ctl", WalSummarizerShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ /*
+ * First time through, so initialize.
+ *
+ * We're just filling in dummy values here -- the real initialization
+ * will happen when GetOldestUnsummarizedLSN() is called for the first
+ * time.
+ */
+ WalSummarizerCtl->initialized = false;
+ WalSummarizerCtl->summarized_tli = 0;
+ WalSummarizerCtl->summarized_lsn = InvalidXLogRecPtr;
+ WalSummarizerCtl->lsn_is_exact = false;
+ WalSummarizerCtl->summarizer_pgprocno = INVALID_PGPROCNO;
+ WalSummarizerCtl->pending_lsn = InvalidXLogRecPtr;
+ ConditionVariableInit(&WalSummarizerCtl->summary_file_cv);
+ }
+}
+
+/*
+ * Entry point for walsummarizer process.
+ */
+void
+WalSummarizerMain(void)
+{
+ sigjmp_buf local_sigjmp_buf;
+ MemoryContext context;
+
+ /*
+ * Within this function, 'current_lsn' and 'current_tli' refer to the
+ * point from which the next WAL summary file should start. 'exact' is
+ * true if 'current_lsn' is known to be the start of a WAL recod or WAL
+ * segment, and false if it might be in the middle of a record someplace.
+ *
+ * 'switch_lsn' and 'switch_tli', if set, are the LSN at which we need to
+ * switch to a new timeline and the timeline to which we need to switch.
+ * If not set, we either haven't figured out the answers yet or we're
+ * already on the latest timeline.
+ */
+ XLogRecPtr current_lsn;
+ TimeLineID current_tli;
+ bool exact;
+ XLogRecPtr switch_lsn = InvalidXLogRecPtr;
+ TimeLineID switch_tli = 0;
+
+ ereport(DEBUG1,
+ (errmsg_internal("WAL summarizer started")));
+
+ /*
+ * Properly accept or ignore signals the postmaster might send us
+ *
+ * We have no particular use for SIGINT at the moment, but seems
+ * reasonable to treat like SIGTERM.
+ */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+ /* SIGQUIT handler was already set up by InitPostmasterChild */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN); /* not used */
+
+ /* Advertise ourselves. */
+ LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
+ WalSummarizerCtl->summarizer_pgprocno = MyProc->pgprocno;
+ LWLockRelease(WALSummarizerLock);
+
+ /* Create and switch to a memory context that we can reset on error. */
+ context = AllocSetContextCreate(TopMemoryContext,
+ "Wal Summarizer",
+ ALLOCSET_DEFAULT_SIZES);
+ MemoryContextSwitchTo(context);
+
+ /*
+ * Reset some signals that are accepted by postmaster but not here
+ */
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /* Release resources we might have acquired. */
+ LWLockReleaseAll();
+ ConditionVariableCancelSleep();
+ pgstat_report_wait_end();
+ ReleaseAuxProcessResources(false);
+ AtEOXact_Files(false);
+ AtEOXact_HashTables(false);
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextReset(context);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep for 10 seconds before attempting to resume operations in
+ * order to avoid excessing logging.
+ *
+ * Many of the likely error conditions are things that will repeat
+ * every time. For example, if the WAL can't be read or the summary
+ * can't be written, only administrator action will cure the problem.
+ * So a really fast retry time doesn't seem to be especially
+ * beneficial, and it will clutter the logs.
+ */
+ (void) WaitLatch(MyLatch,
+ WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10000,
+ WAIT_EVENT_WAL_SUMMARIZER_ERROR);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+ /*
+ * Fetch information about previous progress from shared memory, and ask
+ * GetOldestUnsummarizedLSN to reset pending_lsn to summarized_lsn. We
+ * might be recovering from an error, and if so, pending_lsn might have
+ * advanced past summarized_lsn, but any WAL we read previously has been
+ * lost and will need to be reread.
+ *
+ * If we discover that WAL summarization is not enabled, just exit.
+ */
+ current_lsn = GetOldestUnsummarizedLSN(¤t_tli, &exact, true);
+ if (XLogRecPtrIsInvalid(current_lsn))
+ proc_exit(0);
+
+ /*
+ * Loop forever
+ */
+ for (;;)
+ {
+ XLogRecPtr latest_lsn;
+ TimeLineID latest_tli;
+ XLogRecPtr end_of_summary_lsn;
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextReset(context);
+
+ /* Process any signals received recently. */
+ HandleWalSummarizerInterrupts();
+
+ /* If it's time to remove any old WAL summaries, do that now. */
+ MaybeRemoveOldWalSummaries();
+
+ /* Find the LSN and TLI up to which we can safely summarize. */
+ latest_lsn = GetLatestLSN(&latest_tli);
+
+ /*
+ * If we're summarizing a historic timeline and we haven't yet
+ * computed the point at which to switch to the next timeline, do that
+ * now.
+ *
+ * Note that if this is a standby, what was previously the current
+ * timeline could become historic at any time.
+ *
+ * We could try to make this more efficient by caching the results of
+ * readTimeLineHistory when latest_tli has not changed, but since we
+ * only have to do this once per timeline switch, we probably wouldn't
+ * save any significant amount of work in practice.
+ */
+ if (current_tli != latest_tli && XLogRecPtrIsInvalid(switch_lsn))
+ {
+ List *tles = readTimeLineHistory(latest_tli);
+
+ switch_lsn = tliSwitchPoint(current_tli, tles, &switch_tli);
+ ereport(DEBUG1,
+ errmsg("switch point from TLI %u to TLI %u is at %X/%X",
+ current_tli, switch_tli, LSN_FORMAT_ARGS(switch_lsn)));
+ }
+
+ /*
+ * If we've reached the switch LSN, we can't summarize anything else
+ * on this timeline. Switch to the next timeline and go around again.
+ */
+ if (!XLogRecPtrIsInvalid(switch_lsn) && current_lsn >= switch_lsn)
+ {
+ current_tli = switch_tli;
+ switch_lsn = InvalidXLogRecPtr;
+ switch_tli = 0;
+ continue;
+ }
+
+ /* Summarize WAL. */
+ end_of_summary_lsn = SummarizeWAL(current_tli,
+ current_lsn, exact,
+ switch_lsn, latest_lsn);
+ Assert(!XLogRecPtrIsInvalid(end_of_summary_lsn));
+ Assert(end_of_summary_lsn >= current_lsn);
+
+ /*
+ * Update state for next loop iteration.
+ *
+ * Next summary file should start from exactly where this one ended.
+ */
+ current_lsn = end_of_summary_lsn;
+ exact = true;
+
+ /* Update state in shared memory. */
+ LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
+ Assert(WalSummarizerCtl->pending_lsn <= end_of_summary_lsn);
+ WalSummarizerCtl->summarized_lsn = end_of_summary_lsn;
+ WalSummarizerCtl->summarized_tli = current_tli;
+ WalSummarizerCtl->lsn_is_exact = true;
+ WalSummarizerCtl->pending_lsn = end_of_summary_lsn;
+ LWLockRelease(WALSummarizerLock);
+
+ /* Wake up anyone waiting for more summary files to be written. */
+ ConditionVariableBroadcast(&WalSummarizerCtl->summary_file_cv);
+ }
+}
+
+/*
+ * Get the oldest LSN in this server's timeline history that has not yet been
+ * summarized.
+ *
+ * If *tli != NULL, it will be set to the TLI for the LSN that is returned.
+ *
+ * If *lsn_is_exact != NULL, it will be set to true if the returned LSN is
+ * necessarily the start of a WAL record and false if it's just the beginning
+ * of a WAL segment.
+ *
+ * If reset_pending_lsn is true, resets the pending_lsn in shared memory to
+ * be equal to the summarized_lsn.
+ */
+XLogRecPtr
+GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact,
+ bool reset_pending_lsn)
+{
+ TimeLineID latest_tli;
+ LWLockMode mode = reset_pending_lsn ? LW_EXCLUSIVE : LW_SHARED;
+ int n;
+ List *tles;
+ XLogRecPtr unsummarized_lsn;
+ TimeLineID unsummarized_tli = 0;
+ bool should_make_exact = false;
+ List *existing_summaries;
+ ListCell *lc;
+
+ /* If not summarizing WAL, do nothing. */
+ if (!summarize_wal)
+ return InvalidXLogRecPtr;
+
+ /*
+ * Unless we need to reset the pending_lsn, we initally acquire the lock
+ * in shared mode and try to fetch the required information. If we acquire
+ * in shared mode and find that the data structure hasn't been
+ * initialized, we reacquire the lock in exclusive mode so that we can
+ * initialize it. However, if someone else does that first before we get
+ * the lock, then we can just return the requested information after all.
+ */
+ while (1)
+ {
+ LWLockAcquire(WALSummarizerLock, mode);
+
+ if (WalSummarizerCtl->initialized)
+ {
+ unsummarized_lsn = WalSummarizerCtl->summarized_lsn;
+ if (tli != NULL)
+ *tli = WalSummarizerCtl->summarized_tli;
+ if (lsn_is_exact != NULL)
+ *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
+ if (reset_pending_lsn)
+ WalSummarizerCtl->pending_lsn =
+ WalSummarizerCtl->summarized_lsn;
+ LWLockRelease(WALSummarizerLock);
+ return unsummarized_lsn;
+ }
+
+ if (mode == LW_EXCLUSIVE)
+ break;
+
+ LWLockRelease(WALSummarizerLock);
+ mode = LW_EXCLUSIVE;
+ }
+
+ /*
+ * The data structure needs to be initialized, and we are the first to
+ * obtain the lock in exclusive mode, so it's our job to do that
+ * initialization.
+ *
+ * So, find the oldest timeline on which WAL still exists, and the
+ * earliest segment for which it exists.
+ */
+ (void) GetLatestLSN(&latest_tli);
+ tles = readTimeLineHistory(latest_tli);
+ for (n = list_length(tles) - 1; n >= 0; --n)
+ {
+ TimeLineHistoryEntry *tle = list_nth(tles, n);
+ XLogSegNo oldest_segno;
+
+ oldest_segno = XLogGetOldestSegno(tle->tli);
+ if (oldest_segno != 0)
+ {
+ /* Compute oldest LSN that still exists on disk. */
+ XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size,
+ unsummarized_lsn);
+
+ unsummarized_tli = tle->tli;
+ break;
+ }
+ }
+
+ /* It really should not be possible for us to find no WAL. */
+ if (unsummarized_tli == 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg_internal("no WAL found on timeline %d", latest_tli));
+
+ /*
+ * Don't try to summarize anything older than the end LSN of the newest
+ * summary file that exists for this timeline.
+ */
+ existing_summaries =
+ GetWalSummaries(unsummarized_tli,
+ InvalidXLogRecPtr, InvalidXLogRecPtr);
+ foreach(lc, existing_summaries)
+ {
+ WalSummaryFile *ws = lfirst(lc);
+
+ if (ws->end_lsn > unsummarized_lsn)
+ {
+ unsummarized_lsn = ws->end_lsn;
+ should_make_exact = true;
+ }
+ }
+
+ /* Update shared memory with the discovered values. */
+ WalSummarizerCtl->initialized = true;
+ WalSummarizerCtl->summarized_lsn = unsummarized_lsn;
+ WalSummarizerCtl->summarized_tli = unsummarized_tli;
+ WalSummarizerCtl->lsn_is_exact = should_make_exact;
+ WalSummarizerCtl->pending_lsn = unsummarized_lsn;
+
+ /* Also return the to the caller as required. */
+ if (tli != NULL)
+ *tli = WalSummarizerCtl->summarized_tli;
+ if (lsn_is_exact != NULL)
+ *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
+ LWLockRelease(WALSummarizerLock);
+
+ return unsummarized_lsn;
+}
+
+/*
+ * Attempt to set the WAL summarizer's latch.
+ *
+ * This might not work, because there's no guarantee that the WAL summarizer
+ * process was successfully started, and it also might have started but
+ * subsequently terminated. So, under normal circumstances, this will get the
+ * latch set, but there's no guarantee.
+ */
+void
+SetWalSummarizerLatch(void)
+{
+ int pgprocno;
+
+ if (WalSummarizerCtl == NULL)
+ return;
+
+ LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
+ pgprocno = WalSummarizerCtl->summarizer_pgprocno;
+ LWLockRelease(WALSummarizerLock);
+
+ if (pgprocno != INVALID_PGPROCNO)
+ SetLatch(&ProcGlobal->allProcs[pgprocno].procLatch);
+}
+
+/*
+ * Wait until WAL summarization reaches the given LSN, but not longer than
+ * the given timeout.
+ *
+ * The return value is the first still-unsummarized LSN. If it's greater than
+ * or equal to the passed LSN, then that LSN was reached. If not, we timed out.
+ *
+ * Either way, *pending_lsn is set to the value taken from WalSummarizerCtl.
+ */
+XLogRecPtr
+WaitForWalSummarization(XLogRecPtr lsn, long timeout, XLogRecPtr *pending_lsn)
+{
+ TimestampTz start_time = GetCurrentTimestamp();
+ TimestampTz deadline = TimestampTzPlusMilliseconds(start_time, timeout);
+ XLogRecPtr summarized_lsn;
+
+ Assert(!XLogRecPtrIsInvalid(lsn));
+ Assert(timeout > 0);
+
+ while (1)
+ {
+ TimestampTz now;
+ long remaining_timeout;
+
+ /*
+ * If the LSN summarized on disk has reached the target value, stop.
+ */
+ LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
+ summarized_lsn = WalSummarizerCtl->summarized_lsn;
+ *pending_lsn = WalSummarizerCtl->pending_lsn;
+ LWLockRelease(WALSummarizerLock);
+ if (summarized_lsn >= lsn)
+ break;
+
+ /* Timeout reached? If yes, stop. */
+ now = GetCurrentTimestamp();
+ remaining_timeout = TimestampDifferenceMilliseconds(now, deadline);
+ if (remaining_timeout <= 0)
+ break;
+
+ /* Wait and see. */
+ ConditionVariableTimedSleep(&WalSummarizerCtl->summary_file_cv,
+ remaining_timeout,
+ WAIT_EVENT_WAL_SUMMARY_READY);
+ }
+
+ return summarized_lsn;
+}
+
+/*
+ * Get the latest LSN that is eligible to be summarized, and set *tli to the
+ * corresponding timeline.
+ */
+static XLogRecPtr
+GetLatestLSN(TimeLineID *tli)
+{
+ if (!RecoveryInProgress())
+ {
+ /* Don't summarize WAL before it's flushed. */
+ return GetFlushRecPtr(tli);
+ }
+ else
+ {
+ XLogRecPtr flush_lsn;
+ TimeLineID flush_tli;
+ XLogRecPtr replay_lsn;
+ TimeLineID replay_tli;
+
+ /*
+ * What we really want to know is how much WAL has been flushed to
+ * disk, but the only flush position available is the one provided by
+ * the walreceiver, which may not be running, because this could be
+ * crash recovery or recovery via restore_command. So use either the
+ * WAL receiver's flush position or the replay position, whichever is
+ * further ahead, on the theory that if the WAL has been replayed then
+ * it must also have been flushed to disk.
+ */
+ flush_lsn = GetWalRcvFlushRecPtr(NULL, &flush_tli);
+ replay_lsn = GetXLogReplayRecPtr(&replay_tli);
+ if (flush_lsn > replay_lsn)
+ {
+ *tli = flush_tli;
+ return flush_lsn;
+ }
+ else
+ {
+ *tli = replay_tli;
+ return replay_lsn;
+ }
+ }
+}
+
+/*
+ * Interrupt handler for main loop of WAL summarizer process.
+ */
+static void
+HandleWalSummarizerInterrupts(void)
+{
+ if (ProcSignalBarrierPending)
+ ProcessProcSignalBarrier();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (ShutdownRequestPending || !summarize_wal)
+ {
+ ereport(DEBUG1,
+ errmsg_internal("WAL summarizer shutting down"));
+ proc_exit(0);
+ }
+
+ /* Perform logging of memory contexts of this process */
+ if (LogMemoryContextPending)
+ ProcessLogMemoryContextInterrupt();
+}
+
+/*
+ * Summarize a range of WAL records on a single timeline.
+ *
+ * 'tli' is the timeline to be summarized.
+ *
+ * 'start_lsn' is the point at which we should start summarizing. If this
+ * value comes from the end LSN of the previous record as returned by the
+ * xlograder machinery, 'exact' should be true; otherwise, 'exact' should
+ * be false, and this function will search forward for the start of a valid
+ * WAL record.
+ *
+ * 'switch_lsn' is the point at which we should switch to a later timeline,
+ * if we're summarizing a historic timeline.
+ *
+ * 'maximum_lsn' identifies the point beyond which we can't count on being
+ * able to read any more WAL. It should be the switch point when reading a
+ * historic timeline, or the most-recently-measured end of WAL when reading
+ * the current timeline.
+ *
+ * The return value is the LSN at which the WAL summary actually ends. Most
+ * often, a summary file ends because we notice that a checkpoint has
+ * occurred and reach the redo pointer of that checkpoint, but sometimes
+ * we stop for other reasons, such as a timeline switch.
+ */
+static XLogRecPtr
+SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact,
+ XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
+{
+ SummarizerReadLocalXLogPrivate *private_data;
+ XLogReaderState *xlogreader;
+ XLogRecPtr summary_start_lsn;
+ XLogRecPtr summary_end_lsn = switch_lsn;
+ char temp_path[MAXPGPATH];
+ char final_path[MAXPGPATH];
+ WalSummaryIO io;
+ BlockRefTable *brtab = CreateEmptyBlockRefTable();
+
+ /* Initialize private data for xlogreader. */
+ private_data = (SummarizerReadLocalXLogPrivate *)
+ palloc0(sizeof(SummarizerReadLocalXLogPrivate));
+ private_data->tli = tli;
+ private_data->historic = !XLogRecPtrIsInvalid(switch_lsn);
+ private_data->read_upto = maximum_lsn;
+
+ /* Create xlogreader. */
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &summarizer_read_local_xlog_page,
+ .segment_open = &wal_segment_open,
+ .segment_close = &wal_segment_close),
+ private_data);
+ if (xlogreader == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+
+ /*
+ * When exact = false, we're starting from an arbitrary point in the WAL
+ * and must search forward for the start of the next record.
+ *
+ * When exact = true, start_lsn should be either the LSN where a record
+ * begins, or the LSN of a page where the page header is immediately
+ * followed by the start of a new record. XLogBeginRead should tolerate
+ * either case.
+ *
+ * We need to allow for both cases because the behavior of xlogreader
+ * varies. When a record spans two or more xlog pages, the ending LSN
+ * reported by xlogreader will be the starting LSN of the following
+ * record, but when an xlog page boundary falls between two records, the
+ * end LSN for the first will be reported as the first byte of the
+ * following page. We can't know until we read that page how large the
+ * header will be, but we'll have to skip over it to find the next record.
+ */
+ if (exact)
+ {
+ /*
+ * Even if start_lsn is the beginning of a page rather than the
+ * beginning of the first record on that page, we should still use it
+ * as the start LSN for the summary file. That's because we detect
+ * missing summary files by looking for cases where the end LSN of one
+ * file is less than the start LSN of the next file. When only a page
+ * header is skipped, nothing has been missed.
+ */
+ XLogBeginRead(xlogreader, start_lsn);
+ summary_start_lsn = start_lsn;
+ }
+ else
+ {
+ summary_start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
+ if (XLogRecPtrIsInvalid(summary_start_lsn))
+ {
+ /*
+ * If we hit end-of-WAL while trying to find the next valid
+ * record, we must be on a historic timeline that has no valid
+ * records that begin after start_lsn and before end of WAL.
+ */
+ if (private_data->end_of_wal)
+ {
+ ereport(DEBUG1,
+ errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
+ tli,
+ LSN_FORMAT_ARGS(start_lsn),
+ LSN_FORMAT_ARGS(private_data->read_upto)));
+
+ /*
+ * The timeline ends at or after start_lsn, without containing
+ * any records. Thus, we must make sure the main loop does not
+ * iterate. If start_lsn is the end of the timeline, then we
+ * won't actually emit an empty summary file, but otherwise,
+ * we must, to capture the fact that the LSN range in question
+ * contains no interesting WAL records.
+ */
+ summary_start_lsn = start_lsn;
+ summary_end_lsn = private_data->read_upto;
+ switch_lsn = xlogreader->EndRecPtr;
+ }
+ else
+ ereport(ERROR,
+ (errmsg("could not find a valid record after %X/%X",
+ LSN_FORMAT_ARGS(start_lsn))));
+ }
+
+ /* We shouldn't go backward. */
+ Assert(summary_start_lsn >= start_lsn);
+ }
+
+ /*
+ * Main loop: read xlog records one by one.
+ */
+ while (1)
+ {
+ int block_id;
+ char *errormsg;
+ XLogRecord *record;
+ bool stop_requested = false;
+
+ HandleWalSummarizerInterrupts();
+
+ /* We shouldn't go backward. */
+ Assert(summary_start_lsn <= xlogreader->EndRecPtr);
+
+ /* Now read the next record. */
+ record = XLogReadRecord(xlogreader, &errormsg);
+ if (record == NULL)
+ {
+ if (private_data->end_of_wal)
+ {
+ /*
+ * This timeline must be historic and must end before we were
+ * able to read a complete record.
+ */
+ ereport(DEBUG1,
+ errmsg_internal("could not read WAL from timeline %d at %X/%X: end of WAL at %X/%X",
+ tli,
+ LSN_FORMAT_ARGS(xlogreader->EndRecPtr),
+ LSN_FORMAT_ARGS(private_data->read_upto)));
+ /* Summary ends at end of WAL. */
+ summary_end_lsn = private_data->read_upto;
+ break;
+ }
+ if (errormsg)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read WAL from timeline %u at %X/%X: %s",
+ tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr),
+ errormsg)));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read WAL from timeline %u at %X/%X",
+ tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+ }
+
+ /* We shouldn't go backward. */
+ Assert(summary_start_lsn <= xlogreader->EndRecPtr);
+
+ if (!XLogRecPtrIsInvalid(switch_lsn) &&
+ xlogreader->ReadRecPtr >= switch_lsn)
+ {
+ /*
+ * Woops! We've read a record that *starts* after the switch LSN,
+ * contrary to our goal of reading only until we hit the first
+ * record that ends at or after the switch LSN. Pretend we didn't
+ * read it after all by bailing out of this loop right here,
+ * before we do anything with this record.
+ *
+ * This can happen because the last record before the switch LSN
+ * might be continued across multiple pages, and then we might
+ * come to a page with XLP_FIRST_IS_OVERWRITE_CONTRECORD set. In
+ * that case, the record that was continued across multiple pages
+ * is incomplete and will be disregarded, and the read will
+ * restart from the beginning of the page that is flagged
+ * XLP_FIRST_IS_OVERWRITE_CONTRECORD.
+ *
+ * If this case occurs, we can fairly say that the current summary
+ * file ends at the switch LSN exactly. The first record on the
+ * page marked XLP_FIRST_IS_OVERWRITE_CONTRECORD will be
+ * discovered when generating the next summary file.
+ */
+ summary_end_lsn = switch_lsn;
+ break;
+ }
+
+ /* Special handling for particular types of WAL records. */
+ switch (XLogRecGetRmid(xlogreader))
+ {
+ case RM_SMGR_ID:
+ SummarizeSmgrRecord(xlogreader, brtab);
+ break;
+ case RM_XACT_ID:
+ SummarizeXactRecord(xlogreader, brtab);
+ break;
+ case RM_XLOG_ID:
+ stop_requested = SummarizeXlogRecord(xlogreader);
+ break;
+ default:
+ break;
+ }
+
+ /*
+ * If we've been told that it's time to end this WAL summary file, do
+ * so. As an exception, if there's nothing included in this WAL
+ * summary file yet, then stopping doesn't make any sense, and we
+ * should wait until the next stop point instead.
+ */
+ if (stop_requested && xlogreader->ReadRecPtr > summary_start_lsn)
+ {
+ summary_end_lsn = xlogreader->ReadRecPtr;
+ break;
+ }
+
+ /* Feed block references from xlog record to block reference table. */
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(xlogreader);
+ block_id++)
+ {
+ RelFileLocator rlocator;
+ ForkNumber forknum;
+ BlockNumber blocknum;
+
+ if (!XLogRecGetBlockTagExtended(xlogreader, block_id, &rlocator,
+ &forknum, &blocknum, NULL))
+ continue;
+
+ /*
+ * As we do elsewhere, ignore the FSM fork, because it's not fully
+ * WAL-logged.
+ */
+ if (forknum != FSM_FORKNUM)
+ BlockRefTableMarkBlockModified(brtab, &rlocator, forknum,
+ blocknum);
+ }
+
+ /* Update our notion of where this summary file ends. */
+ summary_end_lsn = xlogreader->EndRecPtr;
+
+ /* Also update shared memory. */
+ LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
+ Assert(summary_end_lsn >= WalSummarizerCtl->pending_lsn);
+ Assert(summary_end_lsn >= WalSummarizerCtl->summarized_lsn);
+ WalSummarizerCtl->pending_lsn = summary_end_lsn;
+ LWLockRelease(WALSummarizerLock);
+
+ /*
+ * If we have a switch LSN and have reached it, stop before reading
+ * the next record.
+ */
+ if (!XLogRecPtrIsInvalid(switch_lsn) &&
+ xlogreader->EndRecPtr >= switch_lsn)
+ break;
+ }
+
+ /* Destroy xlogreader. */
+ pfree(xlogreader->private_data);
+ XLogReaderFree(xlogreader);
+
+ /*
+ * If a timeline switch occurs, we may fail to make any progress at all
+ * before exiting the loop above. If that happens, we don't write a WAL
+ * summary file at all.
+ */
+ if (summary_end_lsn > summary_start_lsn)
+ {
+ /* Generate temporary and final path name. */
+ snprintf(temp_path, MAXPGPATH,
+ XLOGDIR "/summaries/temp.summary");
+ snprintf(final_path, MAXPGPATH,
+ XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
+ tli,
+ LSN_FORMAT_ARGS(summary_start_lsn),
+ LSN_FORMAT_ARGS(summary_end_lsn));
+
+ /* Open the temporary file for writing. */
+ io.filepos = 0;
+ io.file = PathNameOpenFile(temp_path, O_WRONLY | O_CREAT | O_TRUNC);
+ if (io.file < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", temp_path)));
+
+ /* Write the data. */
+ WriteBlockRefTable(brtab, WriteWalSummary, &io);
+
+ /* Close temporary file and shut down xlogreader. */
+ FileClose(io.file);
+
+ /* Tell the user what we did. */
+ ereport(DEBUG1,
+ errmsg("summarized WAL on TLI %d from %X/%X to %X/%X",
+ tli,
+ LSN_FORMAT_ARGS(summary_start_lsn),
+ LSN_FORMAT_ARGS(summary_end_lsn)));
+
+ /* Durably rename the new summary into place. */
+ durable_rename(temp_path, final_path, ERROR);
+ }
+
+ return summary_end_lsn;
+}
+
+/*
+ * Special handling for WAL records with RM_SMGR_ID.
+ */
+static void
+SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
+{
+ uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_SMGR_CREATE)
+ {
+ xl_smgr_create *xlrec;
+
+ /*
+ * If a new relation fork is created on disk, there is no point
+ * tracking anything about which blocks have been modified, because
+ * the whole thing will be new. Hence, set the limit block for this
+ * fork to 0.
+ *
+ * Ignore the FSM fork, which is not fully WAL-logged.
+ */
+ xlrec = (xl_smgr_create *) XLogRecGetData(xlogreader);
+
+ if (xlrec->forkNum != FSM_FORKNUM)
+ BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
+ xlrec->forkNum, 0);
+ }
+ else if (info == XLOG_SMGR_TRUNCATE)
+ {
+ xl_smgr_truncate *xlrec;
+
+ xlrec = (xl_smgr_truncate *) XLogRecGetData(xlogreader);
+
+ /*
+ * If a relation fork is truncated on disk, there is no point in
+ * tracking anything about block modifications beyond the truncation
+ * point.
+ *
+ * We ignore SMGR_TRUNCATE_FSM here because the FSM isn't fully
+ * WAL-logged and thus we can't track modified blocks for it anyway.
+ */
+ if ((xlrec->flags & SMGR_TRUNCATE_HEAP) != 0)
+ BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
+ MAIN_FORKNUM, xlrec->blkno);
+ if ((xlrec->flags & SMGR_TRUNCATE_VM) != 0)
+ BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
+ VISIBILITYMAP_FORKNUM, xlrec->blkno);
+ }
+}
+
+/*
+ * Special handling for WAL recods with RM_XACT_ID.
+ */
+static void
+SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
+{
+ uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+ uint8 xact_info = info & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader);
+ xl_xact_parsed_commit parsed;
+ int i;
+
+ /*
+ * Don't track modified blocks for any relations that were removed on
+ * commit.
+ */
+ ParseCommitRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed);
+ for (i = 0; i < parsed.nrels; ++i)
+ {
+ ForkNumber forknum;
+
+ for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
+ if (forknum != FSM_FORKNUM)
+ BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
+ forknum, 0);
+ }
+ }
+ else if (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader);
+ xl_xact_parsed_abort parsed;
+ int i;
+
+ /*
+ * Don't track modified blocks for any relations that were removed on
+ * abort.
+ */
+ ParseAbortRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed);
+ for (i = 0; i < parsed.nrels; ++i)
+ {
+ ForkNumber forknum;
+
+ for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
+ if (forknum != FSM_FORKNUM)
+ BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
+ forknum, 0);
+ }
+ }
+}
+
+/*
+ * Special handling for WAL recods with RM_XLOG_ID.
+ */
+static bool
+SummarizeXlogRecord(XLogReaderState *xlogreader)
+{
+ uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_CHECKPOINT_REDO || info == XLOG_CHECKPOINT_SHUTDOWN)
+ {
+ /*
+ * This is an LSN at which redo might begin, so we'd like
+ * summarization to stop just before this WAL record.
+ */
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Similar to read_local_xlog_page, but limited to read from one particular
+ * timeline. If the end of WAL is reached, it will wait for more if reading
+ * from the current timeline, or give up if reading from a historic timeline.
+ * In the latter case, it will also set private_data->end_of_wal = true.
+ *
+ * Caller must set private_data->tli to the TLI of interest,
+ * private_data->read_upto to the lowest LSN that is not known to be safe
+ * to read on that timeline, and private_data->historic to true if and only
+ * if the timeline is not the current timeline. This function will update
+ * private_data->read_upto and private_data->historic if more WAL appears
+ * on the current timeline or if the current timeline becomes historic.
+ */
+static int
+summarizer_read_local_xlog_page(XLogReaderState *state,
+ XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *cur_page)
+{
+ int count;
+ WALReadError errinfo;
+ SummarizerReadLocalXLogPrivate *private_data;
+
+ HandleWalSummarizerInterrupts();
+
+ private_data = (SummarizerReadLocalXLogPrivate *)
+ state->private_data;
+
+ while (1)
+ {
+ if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto)
+ {
+ /*
+ * more than one block available; read only that block, have
+ * caller come back if they need more.
+ */
+ count = XLOG_BLCKSZ;
+ break;
+ }
+ else if (targetPagePtr + reqLen > private_data->read_upto)
+ {
+ /* We don't seem to have enough data. */
+ if (private_data->historic)
+ {
+ /*
+ * This is a historic timeline, so there will never be any
+ * more data than we have currently.
+ */
+ private_data->end_of_wal = true;
+ return -1;
+ }
+ else
+ {
+ XLogRecPtr latest_lsn;
+ TimeLineID latest_tli;
+
+ /*
+ * This is - or at least was up until very recently - the
+ * current timeline, so more data might show up. Delay here
+ * so we don't tight-loop.
+ */
+ HandleWalSummarizerInterrupts();
+ summarizer_wait_for_wal();
+
+ /* Recheck end-of-WAL. */
+ latest_lsn = GetLatestLSN(&latest_tli);
+ if (private_data->tli == latest_tli)
+ {
+ /* Still the current timeline, update max LSN. */
+ Assert(latest_lsn >= private_data->read_upto);
+ private_data->read_upto = latest_lsn;
+ }
+ else
+ {
+ List *tles = readTimeLineHistory(latest_tli);
+ XLogRecPtr switchpoint;
+
+ /*
+ * The timeline we're scanning is no longer the latest
+ * one. Figure out when it ended.
+ */
+ private_data->historic = true;
+ switchpoint = tliSwitchPoint(private_data->tli, tles,
+ NULL);
+
+ /*
+ * Allow reads up to exactly the switch point.
+ *
+ * It's possible that this will cause read_upto to move
+ * backwards, because walreceiver might have read a
+ * partial record and flushed it to disk, and we'd view
+ * that data as safe to read. However, the
+ * XLOG_END_OF_RECOVERY record will be written at the end
+ * of the last complete WAL record, not at the end of the
+ * WAL that we've flushed to disk.
+ *
+ * So switchpoint < private->read_upto is possible here,
+ * but switchpoint < state->EndRecPtr should not be.
+ */
+ Assert(switchpoint >= state->EndRecPtr);
+ private_data->read_upto = switchpoint;
+
+ /* Debugging output. */
+ ereport(DEBUG1,
+ errmsg("timeline %u became historic, can read up to %X/%X",
+ private_data->tli, LSN_FORMAT_ARGS(private_data->read_upto)));
+ }
+
+ /* Go around and try again. */
+ }
+ }
+ else
+ {
+ /* enough bytes available to satisfy the request */
+ count = private_data->read_upto - targetPagePtr;
+ break;
+ }
+ }
+
+ /*
+ * Even though we just determined how much of the page can be validly read
+ * as 'count', read the whole page anyway. It's guaranteed to be
+ * zero-padded up to the page boundary if it's incomplete.
+ */
+ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ,
+ private_data->tli, &errinfo))
+ WALReadRaiseError(&errinfo);
+
+ /* Track that we read a page, for sleep time calculation. */
+ ++pages_read_since_last_sleep;
+
+ /* number of valid bytes in the buffer */
+ return count;
+}
+
+/*
+ * Sleep for long enough that we believe it's likely that more WAL will
+ * be available afterwards.
+ */
+static void
+summarizer_wait_for_wal(void)
+{
+ if (pages_read_since_last_sleep == 0)
+ {
+ /*
+ * No pages were read since the last sleep, so double the sleep time,
+ * but not beyond the maximum allowable value.
+ */
+ sleep_quanta = Min(sleep_quanta * 2, MAX_SLEEP_QUANTA);
+ }
+ else if (pages_read_since_last_sleep > 1)
+ {
+ /*
+ * Multiple pages were read since the last sleep, so reduce the sleep
+ * time.
+ *
+ * A large burst of activity should be able to quickly reduce the
+ * sleep time to the minimum, but we don't want a handful of extra WAL
+ * records to provoke a strong reaction. We choose to reduce the sleep
+ * time by 1 quantum for each page read beyond the first, which is a
+ * fairly arbitrary way of trying to be reactive without
+ * overrreacting.
+ */
+ if (pages_read_since_last_sleep > sleep_quanta - 1)
+ sleep_quanta = 1;
+ else
+ sleep_quanta -= pages_read_since_last_sleep;
+ }
+
+ /* OK, now sleep. */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_quanta * MS_PER_SLEEP_QUANTUM,
+ WAIT_EVENT_WAL_SUMMARIZER_WAL);
+ ResetLatch(MyLatch);
+
+ /* Reset count of pages read. */
+ pages_read_since_last_sleep = 0;
+}
+
+/*
+ * Most recent RedoRecPtr value observed by RemoveOldWalSummaries.
+ */
+static void
+MaybeRemoveOldWalSummaries(void)
+{
+ XLogRecPtr redo_pointer = GetRedoRecPtr();
+ List *wslist;
+ time_t cutoff_time;
+
+ /* If WAL summary removal is disabled, don't do anything. */
+ if (wal_summary_keep_time == 0)
+ return;
+
+ /*
+ * If the redo pointer has not advanced, don't do anything.
+ *
+ * This has the effect that we only try to remove old WAL summary files
+ * once per checkpoint cycle.
+ */
+ if (redo_pointer == redo_pointer_at_last_summary_removal)
+ return;
+ redo_pointer_at_last_summary_removal = redo_pointer;
+
+ /*
+ * Files should only be removed if the last modification time precedes the
+ * cutoff time we compute here.
+ */
+ cutoff_time = time(NULL) - 60 * wal_summary_keep_time;
+
+ /* Get all the summaries that currently exist. */
+ wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr);
+
+ /* Loop until all summaries have been considered for removal. */
+ while (wslist != NIL)
+ {
+ ListCell *lc;
+ XLogSegNo oldest_segno;
+ XLogRecPtr oldest_lsn = InvalidXLogRecPtr;
+ TimeLineID selected_tli;
+
+ HandleWalSummarizerInterrupts();
+
+ /*
+ * Pick a timeline for which some summary files still exist on disk,
+ * and find the oldest LSN that still exists on disk for that
+ * timeline.
+ */
+ selected_tli = ((WalSummaryFile *) linitial(wslist))->tli;
+ oldest_segno = XLogGetOldestSegno(selected_tli);
+ if (oldest_segno != 0)
+ XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size,
+ oldest_lsn);
+
+
+ /* Consider each WAL file on the selected timeline in turn. */
+ foreach(lc, wslist)
+ {
+ WalSummaryFile *ws = lfirst(lc);
+
+ HandleWalSummarizerInterrupts();
+
+ /* If it's not on this timeline, it's not time to consider it. */
+ if (selected_tli != ws->tli)
+ continue;
+
+ /*
+ * If the WAL doesn't exist any more, we can remove it if the file
+ * modification time is old enough.
+ */
+ if (XLogRecPtrIsInvalid(oldest_lsn) || ws->end_lsn <= oldest_lsn)
+ RemoveWalSummaryIfOlderThan(ws, cutoff_time);
+
+ /*
+ * Whether we removed the file or not, we need not consider it
+ * again.
+ */
+ wslist = foreach_delete_current(wslist, lc);
+ pfree(ws);
+ }
+ }
+}
WrapLimitsVacuumLock 46
NotifyQueueTailLock 47
WaitEventExtensionLock 48
+WALSummarizerLock 49
* - Syslogger because it is not connected to shared memory
* - Archiver because most relevant archiving IO is delegated to a
* specialized command or module
-* - WAL Receiver and WAL Writer IO is not tracked in pg_stat_io for now
+* - WAL Receiver, WAL Writer, and WAL Summarizer IO are not tracked in
+* pg_stat_io for now
*
* Function returns true if BackendType participates in the cumulative stats
* subsystem for IO and false if it does not.
case B_LOGGER:
case B_WAL_RECEIVER:
case B_WAL_WRITER:
+ case B_WAL_SUMMARIZER:
return false;
case B_AUTOVAC_LAUNCHER:
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
+WAL_SUMMARIZER_WAL "Waiting in WAL summarizer for more WAL to be generated."
WAL_WRITER_MAIN "Waiting in main loop of WAL writer process."
SYNC_REP "Waiting for confirmation from a remote server during synchronous replication."
WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit."
WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication."
+WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated."
XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at end of a parallel operation."
SPIN_DELAY "Waiting while acquiring a contended spinlock."
VACUUM_DELAY "Waiting in a cost-based vacuum delay point."
VACUUM_TRUNCATE "Waiting to acquire an exclusive lock to truncate off any empty pages at the end of a table vacuumed."
+WAL_SUMMARIZER_ERROR "Waiting after a WAL summarizer error."
#
WAL_INIT_SYNC "Waiting for a newly initialized WAL file to reach durable storage."
WAL_INIT_WRITE "Waiting for a write while initializing a new WAL file."
WAL_READ "Waiting for a read from a WAL file."
+WAL_SUMMARY_READ "Waiting for a read from a WAL summary file."
+WAL_SUMMARY_WRITE "Waiting for a write to a WAL summary file."
WAL_SYNC "Waiting for a WAL file to reach durable storage."
WAL_SYNC_METHOD_ASSIGN "Waiting for data to reach durable storage while assigning a new WAL sync method."
WAL_WRITE "Waiting for a write to a WAL file."
case B_WAL_SENDER:
backendDesc = "walsender";
break;
+ case B_WAL_SUMMARIZER:
+ backendDesc = "walsummarizer";
+ break;
case B_WAL_WRITER:
backendDesc = "walwriter";
break;
#include "postmaster/postmaster.h"
#include "postmaster/startup.h"
#include "postmaster/syslogger.h"
+#include "postmaster/walsummarizer.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
gettext_noop("Write-Ahead Log / Archive Recovery"),
/* WAL_RECOVERY_TARGET */
gettext_noop("Write-Ahead Log / Recovery Target"),
+ /* WAL_SUMMARIZATION */
+ gettext_noop("Write-Ahead Log / Summarization"),
/* REPLICATION_SENDING */
gettext_noop("Replication / Sending Servers"),
/* REPLICATION_PRIMARY */
NULL, NULL, NULL
},
+ {
+ {"summarize_wal", PGC_SIGHUP, WAL_SUMMARIZATION,
+ gettext_noop("Starts the WAL summarizer process to enable incremental backup."),
+ NULL
+ },
+ &summarize_wal,
+ false,
+ NULL, NULL, NULL
+ },
+
{
{"hot_standby", PGC_POSTMASTER, REPLICATION_STANDBY,
gettext_noop("Allows connections and queries during recovery."),
check_wal_segment_size, NULL, NULL
},
+ {
+ {"wal_summary_keep_time", PGC_SIGHUP, WAL_SUMMARIZATION,
+ gettext_noop("Time for which WAL summary files should be kept."),
+ NULL,
+ GUC_UNIT_MIN,
+ },
+ &wal_summary_keep_time,
+ 10 * 24 * 60, /* 10 days */
+ 0,
+ INT_MAX,
+ NULL, NULL, NULL
+ },
+
{
{"autovacuum_naptime", PGC_SIGHUP, AUTOVACUUM,
gettext_noop("Time to sleep between autovacuum runs."),
#recovery_target_action = 'pause' # 'pause', 'promote', 'shutdown'
# (change requires restart)
+# - WAL Summarization -
+
+#summarize_wal = off # run WAL summarizer process?
+#wal_summary_keep_time = '10d' # when to remove old summary files, 0 = never
+
#------------------------------------------------------------------------------
# REPLICATION
static const char *const subdirs[] = {
"global",
"pg_wal/archive_status",
+ "pg_wal/summaries",
"pg_commit_ts",
"pg_dynshmem",
"pg_notify",
archive.o \
base64.o \
binaryheap.o \
+ blkreftable.o \
checksum_helper.o \
compression.o \
config_info.o \
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * blkreftable.c
+ * Block reference tables.
+ *
+ * A block reference table is used to keep track of which blocks have
+ * been modified by WAL records within a certain LSN range.
+ *
+ * For each relation fork, we keep track of all blocks that have appeared
+ * in block reference in the WAL. We also keep track of the "limit block",
+ * which is the smallest relation length in blocks known to have occurred
+ * during that range of WAL records. This should be set to 0 if the relation
+ * fork is created or destroyed, and to the post-truncation length if
+ * truncated.
+ *
+ * Whenever we set the limit block, we also forget about any modified blocks
+ * beyond that point. Those blocks don't exist any more. Such blocks can
+ * later be marked as modified again; if that happens, it means the relation
+ * was re-extended.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/common/blkreftable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+
+#ifndef FRONTEND
+#include "postgres.h"
+#else
+#include "postgres_fe.h"
+#endif
+
+#ifdef FRONTEND
+#include "common/logging.h"
+#endif
+
+#include "common/blkreftable.h"
+#include "common/hashfn.h"
+#include "port/pg_crc32c.h"
+
+/*
+ * A block reference table keeps track of the status of each relation
+ * fork individually.
+ */
+typedef struct BlockRefTableKey
+{
+ RelFileLocator rlocator;
+ ForkNumber forknum;
+} BlockRefTableKey;
+
+/*
+ * We could need to store data either for a relation in which only a
+ * tiny fraction of the blocks have been modified or for a relation in
+ * which nearly every block has been modified, and we want a
+ * space-efficient representation in both cases. To accomplish this,
+ * we divide the relation into chunks of 2^16 blocks and choose between
+ * an array representation and a bitmap representation for each chunk.
+ *
+ * When the number of modified blocks in a given chunk is small, we
+ * essentially store an array of block numbers, but we need not store the
+ * entire block number: instead, we store each block number as a 2-byte
+ * offset from the start of the chunk.
+ *
+ * When the number of modified blocks in a given chunk is large, we switch
+ * to a bitmap representation.
+ *
+ * These same basic representational choices are used both when a block
+ * reference table is stored in memory and when it is serialized to disk.
+ *
+ * In the in-memory representation, we initially allocate each chunk with
+ * space for a number of entries given by INITIAL_ENTRIES_PER_CHUNK and
+ * increase that as necessary until we reach MAX_ENTRIES_PER_CHUNK.
+ * Any chunk whose allocated size reaches MAX_ENTRIES_PER_CHUNK is converted
+ * to a bitmap, and thus never needs to grow further.
+ */
+#define BLOCKS_PER_CHUNK (1 << 16)
+#define BLOCKS_PER_ENTRY (BITS_PER_BYTE * sizeof(uint16))
+#define MAX_ENTRIES_PER_CHUNK (BLOCKS_PER_CHUNK / BLOCKS_PER_ENTRY)
+#define INITIAL_ENTRIES_PER_CHUNK 16
+typedef uint16 *BlockRefTableChunk;
+
+/*
+ * State for one relation fork.
+ *
+ * 'rlocator' and 'forknum' identify the relation fork to which this entry
+ * pertains.
+ *
+ * 'limit_block' is the shortest known length of the relation in blocks
+ * within the LSN range covered by a particular block reference table.
+ * It should be set to 0 if the relation fork is created or dropped. If the
+ * relation fork is truncated, it should be set to the number of blocks that
+ * remain after truncation.
+ *
+ * 'nchunks' is the allocated length of each of the three arrays that follow.
+ * We can only represent the status of block numbers less than nchunks *
+ * BLOCKS_PER_CHUNK.
+ *
+ * 'chunk_size' is an array storing the allocated size of each chunk.
+ *
+ * 'chunk_usage' is an array storing the number of elements used in each
+ * chunk. If that value is less than MAX_ENTRIES_PER_CHUNK, the corresonding
+ * chunk is used as an array; else the corresponding chunk is used as a bitmap.
+ * When used as a bitmap, the least significant bit of the first array element
+ * is the status of the lowest-numbered block covered by this chunk.
+ *
+ * 'chunk_data' is the array of chunks.
+ */
+struct BlockRefTableEntry
+{
+ BlockRefTableKey key;
+ BlockNumber limit_block;
+ char status;
+ uint32 nchunks;
+ uint16 *chunk_size;
+ uint16 *chunk_usage;
+ BlockRefTableChunk *chunk_data;
+};
+
+/* Declare and define a hash table over type BlockRefTableEntry. */
+#define SH_PREFIX blockreftable
+#define SH_ELEMENT_TYPE BlockRefTableEntry
+#define SH_KEY_TYPE BlockRefTableKey
+#define SH_KEY key
+#define SH_HASH_KEY(tb, key) \
+ hash_bytes((const unsigned char *) &key, sizeof(BlockRefTableKey))
+#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(BlockRefTableKey)) == 0)
+#define SH_SCOPE static inline
+#ifdef FRONTEND
+#define SH_RAW_ALLOCATOR pg_malloc0
+#endif
+#define SH_DEFINE
+#define SH_DECLARE
+#include "lib/simplehash.h"
+
+/*
+ * A block reference table is basically just the hash table, but we don't
+ * want to expose that to outside callers.
+ *
+ * We keep track of the memory context in use explicitly too, so that it's
+ * easy to place all of our allocations in the same context.
+ */
+struct BlockRefTable
+{
+ blockreftable_hash *hash;
+#ifndef FRONTEND
+ MemoryContext mcxt;
+#endif
+};
+
+/*
+ * On-disk serialization format for block reference table entries.
+ */
+typedef struct BlockRefTableSerializedEntry
+{
+ RelFileLocator rlocator;
+ ForkNumber forknum;
+ BlockNumber limit_block;
+ uint32 nchunks;
+} BlockRefTableSerializedEntry;
+
+/*
+ * Buffer size, so that we avoid doing many small I/Os.
+ */
+#define BUFSIZE 65536
+
+/*
+ * Ad-hoc buffer for file I/O.
+ */
+typedef struct BlockRefTableBuffer
+{
+ io_callback_fn io_callback;
+ void *io_callback_arg;
+ char data[BUFSIZE];
+ int used;
+ int cursor;
+ pg_crc32c crc;
+} BlockRefTableBuffer;
+
+/*
+ * State for keeping track of progress while incrementally reading a block
+ * table reference file from disk.
+ *
+ * total_chunks means the number of chunks for the RelFileLocator/ForkNumber
+ * combination that is curently being read, and consumed_chunks is the number
+ * of those that have been read. (We always read all the information for
+ * a single chunk at one time, so we don't need to be able to represent the
+ * state where a chunk has been partially read.)
+ *
+ * chunk_size is the array of chunk sizes. The length is given by total_chunks.
+ *
+ * chunk_data holds the current chunk.
+ *
+ * chunk_position helps us figure out how much progress we've made in returning
+ * the block numbers for the current chunk to the caller. If the chunk is a
+ * bitmap, it's the number of bits we've scanned; otherwise, it's the number
+ * of chunk entries we've scanned.
+ */
+struct BlockRefTableReader
+{
+ BlockRefTableBuffer buffer;
+ char *error_filename;
+ report_error_fn error_callback;
+ void *error_callback_arg;
+ uint32 total_chunks;
+ uint32 consumed_chunks;
+ uint16 *chunk_size;
+ uint16 chunk_data[MAX_ENTRIES_PER_CHUNK];
+ uint32 chunk_position;
+};
+
+/*
+ * State for keeping track of progress while incrementally writing a block
+ * reference table file to disk.
+ */
+struct BlockRefTableWriter
+{
+ BlockRefTableBuffer buffer;
+};
+
+/* Function prototypes. */
+static int BlockRefTableComparator(const void *a, const void *b);
+static void BlockRefTableFlush(BlockRefTableBuffer *buffer);
+static void BlockRefTableRead(BlockRefTableReader *reader, void *data,
+ int length);
+static void BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data,
+ int length);
+static void BlockRefTableFileTerminate(BlockRefTableBuffer *buffer);
+
+/*
+ * Create an empty block reference table.
+ */
+BlockRefTable *
+CreateEmptyBlockRefTable(void)
+{
+ BlockRefTable *brtab = palloc(sizeof(BlockRefTable));
+
+ /*
+ * Even completely empty database has a few hundred relation forks, so it
+ * seems best to size the hash on the assumption that we're going to have
+ * at least a few thousand entries.
+ */
+#ifdef FRONTEND
+ brtab->hash = blockreftable_create(4096, NULL);
+#else
+ brtab->mcxt = CurrentMemoryContext;
+ brtab->hash = blockreftable_create(brtab->mcxt, 4096, NULL);
+#endif
+
+ return brtab;
+}
+
+/*
+ * Set the "limit block" for a relation fork and forget any modified blocks
+ * with equal or higher block numbers.
+ *
+ * The "limit block" is the shortest known length of the relation within the
+ * range of WAL records covered by this block reference table.
+ */
+void
+BlockRefTableSetLimitBlock(BlockRefTable *brtab,
+ const RelFileLocator *rlocator,
+ ForkNumber forknum,
+ BlockNumber limit_block)
+{
+ BlockRefTableEntry *brtentry;
+ BlockRefTableKey key = {0}; /* make sure any padding is zero */
+ bool found;
+
+ memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator));
+ key.forknum = forknum;
+ brtentry = blockreftable_insert(brtab->hash, key, &found);
+
+ if (!found)
+ {
+ /*
+ * We have no existing data about this relation fork, so just record
+ * the limit_block value supplied by the caller, and make sure other
+ * parts of the entry are properly initialized.
+ */
+ brtentry->limit_block = limit_block;
+ brtentry->nchunks = 0;
+ brtentry->chunk_size = NULL;
+ brtentry->chunk_usage = NULL;
+ brtentry->chunk_data = NULL;
+ return;
+ }
+
+ BlockRefTableEntrySetLimitBlock(brtentry, limit_block);
+}
+
+/*
+ * Mark a block in a given relation fork as known to have been modified.
+ */
+void
+BlockRefTableMarkBlockModified(BlockRefTable *brtab,
+ const RelFileLocator *rlocator,
+ ForkNumber forknum,
+ BlockNumber blknum)
+{
+ BlockRefTableEntry *brtentry;
+ BlockRefTableKey key = {0}; /* make sure any padding is zero */
+ bool found;
+#ifndef FRONTEND
+ MemoryContext oldcontext = MemoryContextSwitchTo(brtab->mcxt);
+#endif
+
+ memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator));
+ key.forknum = forknum;
+ brtentry = blockreftable_insert(brtab->hash, key, &found);
+
+ if (!found)
+ {
+ /*
+ * We want to set the initial limit block value to something higher
+ * than any legal block number. InvalidBlockNumber fits the bill.
+ */
+ brtentry->limit_block = InvalidBlockNumber;
+ brtentry->nchunks = 0;
+ brtentry->chunk_size = NULL;
+ brtentry->chunk_usage = NULL;
+ brtentry->chunk_data = NULL;
+ }
+
+ BlockRefTableEntryMarkBlockModified(brtentry, forknum, blknum);
+
+#ifndef FRONTEND
+ MemoryContextSwitchTo(oldcontext);
+#endif
+}
+
+/*
+ * Get an entry from a block reference table.
+ *
+ * If the entry does not exist, this function returns NULL. Otherwise, it
+ * returns the entry and sets *limit_block to the value from the entry.
+ */
+BlockRefTableEntry *
+BlockRefTableGetEntry(BlockRefTable *brtab, const RelFileLocator *rlocator,
+ ForkNumber forknum, BlockNumber *limit_block)
+{
+ BlockRefTableKey key = {0}; /* make sure any padding is zero */
+ BlockRefTableEntry *entry;
+
+ Assert(limit_block != NULL);
+
+ memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator));
+ key.forknum = forknum;
+ entry = blockreftable_lookup(brtab->hash, key);
+
+ if (entry != NULL)
+ *limit_block = entry->limit_block;
+
+ return entry;
+}
+
+/*
+ * Get block numbers from a table entry.
+ *
+ * 'blocks' must point to enough space to hold at least 'nblocks' block
+ * numbers, and any block numbers we manage to get will be written there.
+ * The return value is the number of block numbers actually written.
+ *
+ * We do not return block numbers unless they are greater than or equal to
+ * start_blkno and strictly less than stop_blkno.
+ */
+int
+BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry,
+ BlockNumber start_blkno,
+ BlockNumber stop_blkno,
+ BlockNumber *blocks,
+ int nblocks)
+{
+ uint32 start_chunkno;
+ uint32 stop_chunkno;
+ uint32 chunkno;
+ int nresults = 0;
+
+ Assert(entry != NULL);
+
+ /*
+ * Figure out which chunks could potentially contain blocks of interest.
+ *
+ * We need to be careful about overflow here, because stop_blkno could be
+ * InvalidBlockNumber or something very close to it.
+ */
+ start_chunkno = start_blkno / BLOCKS_PER_CHUNK;
+ stop_chunkno = stop_blkno / BLOCKS_PER_CHUNK;
+ if ((stop_blkno % BLOCKS_PER_CHUNK) != 0)
+ ++stop_chunkno;
+ if (stop_chunkno > entry->nchunks)
+ stop_chunkno = entry->nchunks;
+
+ /*
+ * Loop over chunks.
+ */
+ for (chunkno = start_chunkno; chunkno < stop_chunkno; ++chunkno)
+ {
+ uint16 chunk_usage = entry->chunk_usage[chunkno];
+ BlockRefTableChunk chunk_data = entry->chunk_data[chunkno];
+ unsigned start_offset = 0;
+ unsigned stop_offset = BLOCKS_PER_CHUNK;
+
+ /*
+ * If the start and/or stop block number falls within this chunk, the
+ * whole chunk may not be of interest. Figure out which portion we
+ * care about, if it's not the whole thing.
+ */
+ if (chunkno == start_chunkno)
+ start_offset = start_blkno % BLOCKS_PER_CHUNK;
+ if (chunkno == stop_chunkno - 1)
+ stop_offset = stop_blkno % BLOCKS_PER_CHUNK;
+
+ /*
+ * Handling differs depending on whether this is an array of offsets
+ * or a bitmap.
+ */
+ if (chunk_usage == MAX_ENTRIES_PER_CHUNK)
+ {
+ unsigned i;
+
+ /* It's a bitmap, so test every relevant bit. */
+ for (i = start_offset; i < stop_offset; ++i)
+ {
+ uint16 w = chunk_data[i / BLOCKS_PER_ENTRY];
+
+ if ((w & (1 << (i % BLOCKS_PER_ENTRY))) != 0)
+ {
+ BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + i;
+
+ blocks[nresults++] = blkno;
+
+ /* Early exit if we run out of output space. */
+ if (nresults == nblocks)
+ return nresults;
+ }
+ }
+ }
+ else
+ {
+ unsigned i;
+
+ /* It's an array of offsets, so check each one. */
+ for (i = 0; i < chunk_usage; ++i)
+ {
+ uint16 offset = chunk_data[i];
+
+ if (offset >= start_offset && offset < stop_offset)
+ {
+ BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + offset;
+
+ blocks[nresults++] = blkno;
+
+ /* Early exit if we run out of output space. */
+ if (nresults == nblocks)
+ return nresults;
+ }
+ }
+ }
+ }
+
+ return nresults;
+}
+
+/*
+ * Serialize a block reference table to a file.
+ */
+void
+WriteBlockRefTable(BlockRefTable *brtab,
+ io_callback_fn write_callback,
+ void *write_callback_arg)
+{
+ BlockRefTableSerializedEntry *sdata = NULL;
+ BlockRefTableBuffer buffer;
+ uint32 magic = BLOCKREFTABLE_MAGIC;
+
+ /* Prepare buffer. */
+ memset(&buffer, 0, sizeof(BlockRefTableBuffer));
+ buffer.io_callback = write_callback;
+ buffer.io_callback_arg = write_callback_arg;
+ INIT_CRC32C(buffer.crc);
+
+ /* Write magic number. */
+ BlockRefTableWrite(&buffer, &magic, sizeof(uint32));
+
+ /* Write the entries, assuming there are some. */
+ if (brtab->hash->members > 0)
+ {
+ unsigned i = 0;
+ blockreftable_iterator it;
+ BlockRefTableEntry *brtentry;
+
+ /* Extract entries into serializable format and sort them. */
+ sdata =
+ palloc(brtab->hash->members * sizeof(BlockRefTableSerializedEntry));
+ blockreftable_start_iterate(brtab->hash, &it);
+ while ((brtentry = blockreftable_iterate(brtab->hash, &it)) != NULL)
+ {
+ BlockRefTableSerializedEntry *sentry = &sdata[i++];
+
+ sentry->rlocator = brtentry->key.rlocator;
+ sentry->forknum = brtentry->key.forknum;
+ sentry->limit_block = brtentry->limit_block;
+ sentry->nchunks = brtentry->nchunks;
+
+ /* trim trailing zero entries */
+ while (sentry->nchunks > 0 &&
+ brtentry->chunk_usage[sentry->nchunks - 1] == 0)
+ sentry->nchunks--;
+ }
+ Assert(i == brtab->hash->members);
+ qsort(sdata, i, sizeof(BlockRefTableSerializedEntry),
+ BlockRefTableComparator);
+
+ /* Loop over entries in sorted order and serialize each one. */
+ for (i = 0; i < brtab->hash->members; ++i)
+ {
+ BlockRefTableSerializedEntry *sentry = &sdata[i];
+ BlockRefTableKey key = {0}; /* make sure any padding is zero */
+ unsigned j;
+
+ /* Write the serialized entry itself. */
+ BlockRefTableWrite(&buffer, sentry,
+ sizeof(BlockRefTableSerializedEntry));
+
+ /* Look up the original entry so we can access the chunks. */
+ memcpy(&key.rlocator, &sentry->rlocator, sizeof(RelFileLocator));
+ key.forknum = sentry->forknum;
+ brtentry = blockreftable_lookup(brtab->hash, key);
+ Assert(brtentry != NULL);
+
+ /* Write the untruncated portion of the chunk length array. */
+ if (sentry->nchunks != 0)
+ BlockRefTableWrite(&buffer, brtentry->chunk_usage,
+ sentry->nchunks * sizeof(uint16));
+
+ /* Write the contents of each chunk. */
+ for (j = 0; j < brtentry->nchunks; ++j)
+ {
+ if (brtentry->chunk_usage[j] == 0)
+ continue;
+ BlockRefTableWrite(&buffer, brtentry->chunk_data[j],
+ brtentry->chunk_usage[j] * sizeof(uint16));
+ }
+ }
+ }
+
+ /* Write out appropriate terminator and CRC and flush buffer. */
+ BlockRefTableFileTerminate(&buffer);
+}
+
+/*
+ * Prepare to incrementally read a block reference table file.
+ *
+ * 'read_callback' is a function that can be called to read data from the
+ * underlying file (or other data source) into our internal buffer.
+ *
+ * 'read_callback_arg' is an opaque argument to be passed to read_callback.
+ *
+ * 'error_filename' is the filename that should be included in error messages
+ * if the file is found to be malformed. The value is not copied, so the
+ * caller should ensure that it remains valid until done with this
+ * BlockRefTableReader.
+ *
+ * 'error_callback' is a function to be called if the file is found to be
+ * malformed. This is not used for I/O errors, which must be handled internally
+ * by read_callback.
+ *
+ * 'error_callback_arg' is an opaque arguent to be passed to error_callback.
+ */
+BlockRefTableReader *
+CreateBlockRefTableReader(io_callback_fn read_callback,
+ void *read_callback_arg,
+ char *error_filename,
+ report_error_fn error_callback,
+ void *error_callback_arg)
+{
+ BlockRefTableReader *reader;
+ uint32 magic;
+
+ /* Initialize data structure. */
+ reader = palloc0(sizeof(BlockRefTableReader));
+ reader->buffer.io_callback = read_callback;
+ reader->buffer.io_callback_arg = read_callback_arg;
+ reader->error_filename = error_filename;
+ reader->error_callback = error_callback;
+ reader->error_callback_arg = error_callback_arg;
+ INIT_CRC32C(reader->buffer.crc);
+
+ /* Verify magic number. */
+ BlockRefTableRead(reader, &magic, sizeof(uint32));
+ if (magic != BLOCKREFTABLE_MAGIC)
+ error_callback(error_callback_arg,
+ "file \"%s\" has wrong magic number: expected %u, found %u",
+ error_filename,
+ BLOCKREFTABLE_MAGIC, magic);
+
+ return reader;
+}
+
+/*
+ * Read next relation fork covered by this block reference table file.
+ *
+ * After calling this function, you must call BlockRefTableReaderGetBlocks
+ * until it returns 0 before calling it again.
+ */
+bool
+BlockRefTableReaderNextRelation(BlockRefTableReader *reader,
+ RelFileLocator *rlocator,
+ ForkNumber *forknum,
+ BlockNumber *limit_block)
+{
+ BlockRefTableSerializedEntry sentry;
+ BlockRefTableSerializedEntry zentry = {{0}};
+
+ /*
+ * Sanity check: caller must read all blocks from all chunks before moving
+ * on to the next relation.
+ */
+ Assert(reader->total_chunks == reader->consumed_chunks);
+
+ /* Read serialized entry. */
+ BlockRefTableRead(reader, &sentry,
+ sizeof(BlockRefTableSerializedEntry));
+
+ /*
+ * If we just read the sentinel entry indicating that we've reached the
+ * end, read and check the CRC.
+ */
+ if (memcmp(&sentry, &zentry, sizeof(BlockRefTableSerializedEntry)) == 0)
+ {
+ pg_crc32c expected_crc;
+ pg_crc32c actual_crc;
+
+ /*
+ * We want to know the CRC of the file excluding the 4-byte CRC
+ * itself, so copy the current value of the CRC accumulator before
+ * reading those bytes, and use the copy to finalize the calculation.
+ */
+ expected_crc = reader->buffer.crc;
+ FIN_CRC32C(expected_crc);
+
+ /* Now we can read the actual value. */
+ BlockRefTableRead(reader, &actual_crc, sizeof(pg_crc32c));
+
+ /* Throw an error if there is a mismatch. */
+ if (!EQ_CRC32C(expected_crc, actual_crc))
+ reader->error_callback(reader->error_callback_arg,
+ "file \"%s\" has wrong checksum: expected %08X, found %08X",
+ reader->error_filename, expected_crc, actual_crc);
+
+ return false;
+ }
+
+ /* Read chunk size array. */
+ if (reader->chunk_size != NULL)
+ pfree(reader->chunk_size);
+ reader->chunk_size = palloc(sentry.nchunks * sizeof(uint16));
+ BlockRefTableRead(reader, reader->chunk_size,
+ sentry.nchunks * sizeof(uint16));
+
+ /* Set up for chunk scan. */
+ reader->total_chunks = sentry.nchunks;
+ reader->consumed_chunks = 0;
+
+ /* Return data to caller. */
+ memcpy(rlocator, &sentry.rlocator, sizeof(RelFileLocator));
+ *forknum = sentry.forknum;
+ *limit_block = sentry.limit_block;
+ return true;
+}
+
+/*
+ * Get modified blocks associated with the relation fork returned by
+ * the most recent call to BlockRefTableReaderNextRelation.
+ *
+ * On return, block numbers will be written into the 'blocks' array, whose
+ * length should be passed via 'nblocks'. The return value is the number of
+ * entries actually written into the 'blocks' array, which may be less than
+ * 'nblocks' if we run out of modified blocks in the relation fork before
+ * we run out of room in the array.
+ */
+unsigned
+BlockRefTableReaderGetBlocks(BlockRefTableReader *reader,
+ BlockNumber *blocks,
+ int nblocks)
+{
+ unsigned blocks_found = 0;
+
+ /* Must provide space for at least one block number to be returned. */
+ Assert(nblocks > 0);
+
+ /* Loop collecting blocks to return to caller. */
+ for (;;)
+ {
+ uint16 next_chunk_size;
+
+ /*
+ * If we've read at least one chunk, maybe it contains some block
+ * numbers that could satisfy caller's request.
+ */
+ if (reader->consumed_chunks > 0)
+ {
+ uint32 chunkno = reader->consumed_chunks - 1;
+ uint16 chunk_size = reader->chunk_size[chunkno];
+
+ if (chunk_size == MAX_ENTRIES_PER_CHUNK)
+ {
+ /* Bitmap format, so search for bits that are set. */
+ while (reader->chunk_position < BLOCKS_PER_CHUNK &&
+ blocks_found < nblocks)
+ {
+ uint16 chunkoffset = reader->chunk_position;
+ uint16 w;
+
+ w = reader->chunk_data[chunkoffset / BLOCKS_PER_ENTRY];
+ if ((w & (1u << (chunkoffset % BLOCKS_PER_ENTRY))) != 0)
+ blocks[blocks_found++] =
+ chunkno * BLOCKS_PER_CHUNK + chunkoffset;
+ ++reader->chunk_position;
+ }
+ }
+ else
+ {
+ /* Not in bitmap format, so each entry is a 2-byte offset. */
+ while (reader->chunk_position < chunk_size &&
+ blocks_found < nblocks)
+ {
+ blocks[blocks_found++] = chunkno * BLOCKS_PER_CHUNK
+ + reader->chunk_data[reader->chunk_position];
+ ++reader->chunk_position;
+ }
+ }
+ }
+
+ /* We found enough blocks, so we're done. */
+ if (blocks_found >= nblocks)
+ break;
+
+ /*
+ * We didn't find enough blocks, so we must need the next chunk. If
+ * there are none left, though, then we're done anyway.
+ */
+ if (reader->consumed_chunks == reader->total_chunks)
+ break;
+
+ /*
+ * Read data for next chunk and reset scan position to beginning of
+ * chunk. Note that the next chunk might be empty, in which case we
+ * consume the chunk without actually consuming any bytes from the
+ * underlying file.
+ */
+ next_chunk_size = reader->chunk_size[reader->consumed_chunks];
+ if (next_chunk_size > 0)
+ BlockRefTableRead(reader, reader->chunk_data,
+ next_chunk_size * sizeof(uint16));
+ ++reader->consumed_chunks;
+ reader->chunk_position = 0;
+ }
+
+ return blocks_found;
+}
+
+/*
+ * Release memory used while reading a block reference table from a file.
+ */
+void
+DestroyBlockRefTableReader(BlockRefTableReader *reader)
+{
+ if (reader->chunk_size != NULL)
+ {
+ pfree(reader->chunk_size);
+ reader->chunk_size = NULL;
+ }
+ pfree(reader);
+}
+
+/*
+ * Prepare to write a block reference table file incrementally.
+ *
+ * Caller must be able to supply BlockRefTableEntry objects sorted in the
+ * appropriate order.
+ */
+BlockRefTableWriter *
+CreateBlockRefTableWriter(io_callback_fn write_callback,
+ void *write_callback_arg)
+{
+ BlockRefTableWriter *writer;
+ uint32 magic = BLOCKREFTABLE_MAGIC;
+
+ /* Prepare buffer and CRC check and save callbacks. */
+ writer = palloc0(sizeof(BlockRefTableWriter));
+ writer->buffer.io_callback = write_callback;
+ writer->buffer.io_callback_arg = write_callback_arg;
+ INIT_CRC32C(writer->buffer.crc);
+
+ /* Write magic number. */
+ BlockRefTableWrite(&writer->buffer, &magic, sizeof(uint32));
+
+ return writer;
+}
+
+/*
+ * Append one entry to a block reference table file.
+ *
+ * Note that entries must be written in the proper order, that is, sorted by
+ * tablespace, then database, then relfilenumber, then fork number. Caller
+ * is responsible for supplying data in the correct order. If that seems hard,
+ * use an in-memory BlockRefTable instead.
+ */
+void
+BlockRefTableWriteEntry(BlockRefTableWriter *writer, BlockRefTableEntry *entry)
+{
+ BlockRefTableSerializedEntry sentry;
+ unsigned j;
+
+ /* Convert to serialized entry format. */
+ sentry.rlocator = entry->key.rlocator;
+ sentry.forknum = entry->key.forknum;
+ sentry.limit_block = entry->limit_block;
+ sentry.nchunks = entry->nchunks;
+
+ /* Trim trailing zero entries. */
+ while (sentry.nchunks > 0 && entry->chunk_usage[sentry.nchunks - 1] == 0)
+ sentry.nchunks--;
+
+ /* Write the serialized entry itself. */
+ BlockRefTableWrite(&writer->buffer, &sentry,
+ sizeof(BlockRefTableSerializedEntry));
+
+ /* Write the untruncated portion of the chunk length array. */
+ if (sentry.nchunks != 0)
+ BlockRefTableWrite(&writer->buffer, entry->chunk_usage,
+ sentry.nchunks * sizeof(uint16));
+
+ /* Write the contents of each chunk. */
+ for (j = 0; j < entry->nchunks; ++j)
+ {
+ if (entry->chunk_usage[j] == 0)
+ continue;
+ BlockRefTableWrite(&writer->buffer, entry->chunk_data[j],
+ entry->chunk_usage[j] * sizeof(uint16));
+ }
+}
+
+/*
+ * Finalize an incremental write of a block reference table file.
+ */
+void
+DestroyBlockRefTableWriter(BlockRefTableWriter *writer)
+{
+ BlockRefTableFileTerminate(&writer->buffer);
+ pfree(writer);
+}
+
+/*
+ * Allocate a standalone BlockRefTableEntry.
+ *
+ * When we're manipulating a full in-memory BlockRefTable, the entries are
+ * part of the hash table and are allocated by simplehash. This routine is
+ * used by callers that want to write out a BlockRefTable to a file without
+ * needing to store the whole thing in memory at once.
+ *
+ * Entries allocated by this function can be manipulated using the functions
+ * BlockRefTableEntrySetLimitBlock and BlockRefTableEntryMarkBlockModified
+ * and then written using BlockRefTableWriteEntry and freed using
+ * BlockRefTableFreeEntry.
+ */
+BlockRefTableEntry *
+CreateBlockRefTableEntry(RelFileLocator rlocator, ForkNumber forknum)
+{
+ BlockRefTableEntry *entry = palloc0(sizeof(BlockRefTableEntry));
+
+ memcpy(&entry->key.rlocator, &rlocator, sizeof(RelFileLocator));
+ entry->key.forknum = forknum;
+ entry->limit_block = InvalidBlockNumber;
+
+ return entry;
+}
+
+/*
+ * Update a BlockRefTableEntry with a new value for the "limit block" and
+ * forget any equal-or-higher-numbered modified blocks.
+ *
+ * The "limit block" is the shortest known length of the relation within the
+ * range of WAL records covered by this block reference table.
+ */
+void
+BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry,
+ BlockNumber limit_block)
+{
+ unsigned chunkno;
+ unsigned limit_chunkno;
+ unsigned limit_chunkoffset;
+ BlockRefTableChunk limit_chunk;
+
+ /* If we already have an equal or lower limit block, do nothing. */
+ if (limit_block >= entry->limit_block)
+ return;
+
+ /* Record the new limit block value. */
+ entry->limit_block = limit_block;
+
+ /*
+ * Figure out which chunk would store the state of the new limit block,
+ * and which offset within that chunk.
+ */
+ limit_chunkno = limit_block / BLOCKS_PER_CHUNK;
+ limit_chunkoffset = limit_block % BLOCKS_PER_CHUNK;
+
+ /*
+ * If the number of chunks is not large enough for any blocks with equal
+ * or higher block numbers to exist, then there is nothing further to do.
+ */
+ if (limit_chunkno >= entry->nchunks)
+ return;
+
+ /* Discard entire contents of any higher-numbered chunks. */
+ for (chunkno = limit_chunkno + 1; chunkno < entry->nchunks; ++chunkno)
+ entry->chunk_usage[chunkno] = 0;
+
+ /*
+ * Next, we need to discard any offsets within the chunk that would
+ * contain the limit_block. We must handle this differenly depending on
+ * whether the chunk that would contain limit_block is a bitmap or an
+ * array of offsets.
+ */
+ limit_chunk = entry->chunk_data[limit_chunkno];
+ if (entry->chunk_usage[limit_chunkno] == MAX_ENTRIES_PER_CHUNK)
+ {
+ unsigned chunkoffset;
+
+ /* It's a bitmap. Unset bits. */
+ for (chunkoffset = limit_chunkoffset; chunkoffset < BLOCKS_PER_CHUNK;
+ ++chunkoffset)
+ limit_chunk[chunkoffset / BLOCKS_PER_ENTRY] &=
+ ~(1 << (chunkoffset % BLOCKS_PER_ENTRY));
+ }
+ else
+ {
+ unsigned i,
+ j = 0;
+
+ /* It's an offset array. Filter out large offsets. */
+ for (i = 0; i < entry->chunk_usage[limit_chunkno]; ++i)
+ {
+ Assert(j <= i);
+ if (limit_chunk[i] < limit_chunkoffset)
+ limit_chunk[j++] = limit_chunk[i];
+ }
+ Assert(j <= entry->chunk_usage[limit_chunkno]);
+ entry->chunk_usage[limit_chunkno] = j;
+ }
+}
+
+/*
+ * Mark a block in a given BlkRefTableEntry as known to have been modified.
+ */
+void
+BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry,
+ ForkNumber forknum,
+ BlockNumber blknum)
+{
+ unsigned chunkno;
+ unsigned chunkoffset;
+ unsigned i;
+
+ /*
+ * Which chunk should store the state of this block? And what is the
+ * offset of this block relative to the start of that chunk?
+ */
+ chunkno = blknum / BLOCKS_PER_CHUNK;
+ chunkoffset = blknum % BLOCKS_PER_CHUNK;
+
+ /*
+ * If 'nchunks' isn't big enough for us to be able to represent the state
+ * of this block, we need to enlarge our arrays.
+ */
+ if (chunkno >= entry->nchunks)
+ {
+ unsigned max_chunks;
+ unsigned extra_chunks;
+
+ /*
+ * New array size is a power of 2, at least 16, big enough so that
+ * chunkno will be a valid array index.
+ */
+ max_chunks = Max(16, entry->nchunks);
+ while (max_chunks < chunkno + 1)
+ chunkno *= 2;
+ Assert(max_chunks > chunkno);
+ extra_chunks = max_chunks - entry->nchunks;
+
+ if (entry->nchunks == 0)
+ {
+ entry->chunk_size = palloc0(sizeof(uint16) * max_chunks);
+ entry->chunk_usage = palloc0(sizeof(uint16) * max_chunks);
+ entry->chunk_data =
+ palloc0(sizeof(BlockRefTableChunk) * max_chunks);
+ }
+ else
+ {
+ entry->chunk_size = repalloc(entry->chunk_size,
+ sizeof(uint16) * max_chunks);
+ memset(&entry->chunk_size[entry->nchunks], 0,
+ extra_chunks * sizeof(uint16));
+ entry->chunk_usage = repalloc(entry->chunk_usage,
+ sizeof(uint16) * max_chunks);
+ memset(&entry->chunk_usage[entry->nchunks], 0,
+ extra_chunks * sizeof(uint16));
+ entry->chunk_data = repalloc(entry->chunk_data,
+ sizeof(BlockRefTableChunk) * max_chunks);
+ memset(&entry->chunk_data[entry->nchunks], 0,
+ extra_chunks * sizeof(BlockRefTableChunk));
+ }
+ entry->nchunks = max_chunks;
+ }
+
+ /*
+ * If the chunk that covers this block number doesn't exist yet, create it
+ * as an array and add the appropriate offset to it. We make it pretty
+ * small initially, because there might only be 1 or a few block
+ * references in this chunk and we don't want to use up too much memory.
+ */
+ if (entry->chunk_size[chunkno] == 0)
+ {
+ entry->chunk_data[chunkno] =
+ palloc(sizeof(uint16) * INITIAL_ENTRIES_PER_CHUNK);
+ entry->chunk_size[chunkno] = INITIAL_ENTRIES_PER_CHUNK;
+ entry->chunk_data[chunkno][0] = chunkoffset;
+ entry->chunk_usage[chunkno] = 1;
+ return;
+ }
+
+ /*
+ * If the number of entries in this chunk is already maximum, it must be a
+ * bitmap. Just set the appropriate bit.
+ */
+ if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK)
+ {
+ BlockRefTableChunk chunk = entry->chunk_data[chunkno];
+
+ chunk[chunkoffset / BLOCKS_PER_ENTRY] |=
+ 1 << (chunkoffset % BLOCKS_PER_ENTRY);
+ return;
+ }
+
+ /*
+ * There is an existing chunk and it's in array format. Let's find out
+ * whether it already has an entry for this block. If so, we do not need
+ * to do anything.
+ */
+ for (i = 0; i < entry->chunk_usage[chunkno]; ++i)
+ {
+ if (entry->chunk_data[chunkno][i] == chunkoffset)
+ return;
+ }
+
+ /*
+ * If the number of entries currently used is one less than the maximum,
+ * it's time to convert to bitmap format.
+ */
+ if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK - 1)
+ {
+ BlockRefTableChunk newchunk;
+ unsigned j;
+
+ /* Allocate a new chunk. */
+ newchunk = palloc0(MAX_ENTRIES_PER_CHUNK * sizeof(uint16));
+
+ /* Set the bit for each existing entry. */
+ for (j = 0; j < entry->chunk_usage[chunkno]; ++j)
+ {
+ unsigned coff = entry->chunk_data[chunkno][j];
+
+ newchunk[coff / BLOCKS_PER_ENTRY] |=
+ 1 << (coff % BLOCKS_PER_ENTRY);
+ }
+
+ /* Set the bit for the new entry. */
+ newchunk[chunkoffset / BLOCKS_PER_ENTRY] |=
+ 1 << (chunkoffset % BLOCKS_PER_ENTRY);
+
+ /* Swap the new chunk into place and update metadata. */
+ pfree(entry->chunk_data[chunkno]);
+ entry->chunk_data[chunkno] = newchunk;
+ entry->chunk_size[chunkno] = MAX_ENTRIES_PER_CHUNK;
+ entry->chunk_usage[chunkno] = MAX_ENTRIES_PER_CHUNK;
+ return;
+ }
+
+ /*
+ * OK, we currently have an array, and we don't need to convert to a
+ * bitmap, but we do need to add a new element. If there's not enough
+ * room, we'll have to expand the array.
+ */
+ if (entry->chunk_usage[chunkno] == entry->chunk_size[chunkno])
+ {
+ unsigned newsize = entry->chunk_size[chunkno] * 2;
+
+ Assert(newsize <= MAX_ENTRIES_PER_CHUNK);
+ entry->chunk_data[chunkno] = repalloc(entry->chunk_data[chunkno],
+ newsize * sizeof(uint16));
+ entry->chunk_size[chunkno] = newsize;
+ }
+
+ /* Now we can add the new entry. */
+ entry->chunk_data[chunkno][entry->chunk_usage[chunkno]] =
+ chunkoffset;
+ entry->chunk_usage[chunkno]++;
+}
+
+/*
+ * Release memory for a BlockRefTablEntry that was created by
+ * CreateBlockRefTableEntry.
+ */
+void
+BlockRefTableFreeEntry(BlockRefTableEntry *entry)
+{
+ if (entry->chunk_size != NULL)
+ {
+ pfree(entry->chunk_size);
+ entry->chunk_size = NULL;
+ }
+
+ if (entry->chunk_usage != NULL)
+ {
+ pfree(entry->chunk_usage);
+ entry->chunk_usage = NULL;
+ }
+
+ if (entry->chunk_data != NULL)
+ {
+ pfree(entry->chunk_data);
+ entry->chunk_data = NULL;
+ }
+
+ pfree(entry);
+}
+
+/*
+ * Comparator for BlockRefTableSerializedEntry objects.
+ *
+ * We make the tablespace OID the first column of the sort key to match
+ * the on-disk tree structure.
+ */
+static int
+BlockRefTableComparator(const void *a, const void *b)
+{
+ const BlockRefTableSerializedEntry *sa = a;
+ const BlockRefTableSerializedEntry *sb = b;
+
+ if (sa->rlocator.spcOid > sb->rlocator.spcOid)
+ return 1;
+ if (sa->rlocator.spcOid < sb->rlocator.spcOid)
+ return -1;
+
+ if (sa->rlocator.dbOid > sb->rlocator.dbOid)
+ return 1;
+ if (sa->rlocator.dbOid < sb->rlocator.dbOid)
+ return -1;
+
+ if (sa->rlocator.relNumber > sb->rlocator.relNumber)
+ return 1;
+ if (sa->rlocator.relNumber < sb->rlocator.relNumber)
+ return -1;
+
+ if (sa->forknum > sb->forknum)
+ return 1;
+ if (sa->forknum < sb->forknum)
+ return -1;
+
+ return 0;
+}
+
+/*
+ * Flush any buffered data out of a BlockRefTableBuffer.
+ */
+static void
+BlockRefTableFlush(BlockRefTableBuffer *buffer)
+{
+ buffer->io_callback(buffer->io_callback_arg, buffer->data, buffer->used);
+ buffer->used = 0;
+}
+
+/*
+ * Read data from a BlockRefTableBuffer, and update the running CRC
+ * calculation for the returned data (but not any data that we may have
+ * buffered but not yet actually returned).
+ */
+static void
+BlockRefTableRead(BlockRefTableReader *reader, void *data, int length)
+{
+ BlockRefTableBuffer *buffer = &reader->buffer;
+
+ /* Loop until read is fully satisfied. */
+ while (length > 0)
+ {
+ if (buffer->cursor < buffer->used)
+ {
+ /*
+ * If any buffered data is available, use that to satisfy as much
+ * of the request as possible.
+ */
+ int bytes_to_copy = Min(length, buffer->used - buffer->cursor);
+
+ memcpy(data, &buffer->data[buffer->cursor], bytes_to_copy);
+ COMP_CRC32C(buffer->crc, &buffer->data[buffer->cursor],
+ bytes_to_copy);
+ buffer->cursor += bytes_to_copy;
+ data = ((char *) data) + bytes_to_copy;
+ length -= bytes_to_copy;
+ }
+ else if (length >= BUFSIZE)
+ {
+ /*
+ * If the request length is long, read directly into caller's
+ * buffer.
+ */
+ int bytes_read;
+
+ bytes_read = buffer->io_callback(buffer->io_callback_arg,
+ data, length);
+ COMP_CRC32C(buffer->crc, data, bytes_read);
+ data = ((char *) data) + bytes_read;
+ length -= bytes_read;
+
+ /* If we didn't get anything, that's bad. */
+ if (bytes_read == 0)
+ reader->error_callback(reader->error_callback_arg,
+ "file \"%s\" ends unexpectedly",
+ reader->error_filename);
+ }
+ else
+ {
+ /*
+ * Refill our buffer.
+ */
+ buffer->used = buffer->io_callback(buffer->io_callback_arg,
+ buffer->data, BUFSIZE);
+ buffer->cursor = 0;
+
+ /* If we didn't get anything, that's bad. */
+ if (buffer->used == 0)
+ reader->error_callback(reader->error_callback_arg,
+ "file \"%s\" ends unexpectedly",
+ reader->error_filename);
+ }
+ }
+}
+
+/*
+ * Supply data to a BlockRefTableBuffer for write to the underlying File,
+ * and update the running CRC calculation for that data.
+ */
+static void
+BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data, int length)
+{
+ /* Update running CRC calculation. */
+ COMP_CRC32C(buffer->crc, data, length);
+
+ /* If the new data can't fit into the buffer, flush the buffer. */
+ if (buffer->used + length > BUFSIZE)
+ {
+ buffer->io_callback(buffer->io_callback_arg, buffer->data,
+ buffer->used);
+ buffer->used = 0;
+ }
+
+ /* If the new data would fill the buffer, or more, write it directly. */
+ if (length >= BUFSIZE)
+ {
+ buffer->io_callback(buffer->io_callback_arg, data, length);
+ return;
+ }
+
+ /* Otherwise, copy the new data into the buffer. */
+ memcpy(&buffer->data[buffer->used], data, length);
+ buffer->used += length;
+ Assert(buffer->used <= BUFSIZE);
+}
+
+/*
+ * Generate the sentinel and CRC required at the end of a block reference
+ * table file and flush them out of our internal buffer.
+ */
+static void
+BlockRefTableFileTerminate(BlockRefTableBuffer *buffer)
+{
+ BlockRefTableSerializedEntry zentry = {{0}};
+ pg_crc32c crc;
+
+ /* Write a sentinel indicating that there are no more entries. */
+ BlockRefTableWrite(buffer, &zentry,
+ sizeof(BlockRefTableSerializedEntry));
+
+ /*
+ * Writing the checksum will perturb the ongoing checksum calculation, so
+ * copy the state first and finalize the computation using the copy.
+ */
+ crc = buffer->crc;
+ FIN_CRC32C(crc);
+ BlockRefTableWrite(buffer, &crc, sizeof(pg_crc32c));
+
+ /* Flush any leftover data out of our buffer. */
+ BlockRefTableFlush(buffer);
+}
'archive.c',
'base64.c',
'binaryheap.c',
+ 'blkreftable.c',
'checksum_helper.c',
'compression.c',
'controldata_utils.c',
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern XLogSegNo XLogGetLastRemovedSegno(void);
+extern XLogSegNo XLogGetOldestSegno(TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN);
extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walsummary.h
+ * WAL summary management
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/include/backup/walsummary.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WALSUMMARY_H
+#define WALSUMMARY_H
+
+#include <time.h>
+
+#include "access/xlogdefs.h"
+#include "nodes/pg_list.h"
+#include "storage/fd.h"
+
+typedef struct WalSummaryIO
+{
+ File file;
+ off_t filepos;
+} WalSummaryIO;
+
+typedef struct WalSummaryFile
+{
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimeLineID tli;
+} WalSummaryFile;
+
+extern List *GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn);
+extern List *FilterWalSummaries(List *wslist, TimeLineID tli,
+ XLogRecPtr start_lsn, XLogRecPtr end_lsn);
+extern bool WalSummariesAreComplete(List *wslist,
+ XLogRecPtr start_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr *missing_lsn);
+extern File OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok);
+extern void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws,
+ time_t cutoff_time);
+
+extern int ReadWalSummary(void *wal_summary_io, void *data, int length);
+extern int WriteWalSummary(void *wal_summary_io, void *data, int length);
+extern void ReportWalSummaryError(void *callback_arg, char *fmt,...) pg_attribute_printf(2, 3);
+
+#endif /* WALSUMMARY_H */
proname => 'any_value_transfn', prorettype => 'anyelement',
proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' },
+{ oid => '8436',
+ descr => 'list of available WAL summary files',
+ proname => 'pg_available_wal_summaries', prorows => '100',
+ proretset => 't', provolatile => 'v', proparallel => 's',
+ prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int8,pg_lsn,pg_lsn}',
+ proargmodes => '{o,o,o}',
+ proargnames => '{tli,start_lsn,end_lsn}',
+ prosrc => 'pg_available_wal_summaries' },
+{ oid => '8437',
+ descr => 'contents of a WAL sumamry file',
+ proname => 'pg_wal_summary_contents', prorows => '100',
+ proretset => 't', provolatile => 'v', proparallel => 's',
+ prorettype => 'record', proargtypes => 'int8 pg_lsn pg_lsn',
+ proallargtypes => '{int8,pg_lsn,pg_lsn,oid,oid,oid,int2,int8,bool}',
+ proargmodes => '{i,i,i,o,o,o,o,o,o}',
+ proargnames => '{tli,start_lsn,end_lsn,relfilenode,reltablespace,reldatabase,relforknumber,relblocknumber,is_limit_block}',
+ prosrc => 'pg_wal_summary_contents' },
+
]
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * blkreftable.h
+ * Block reference tables.
+ *
+ * A block reference table is used to keep track of which blocks have
+ * been modified by WAL records within a certain LSN range.
+ *
+ * For each relation fork, there is a "limit block number". All existing
+ * blocks greater than or equal to the limit block number must be
+ * considered modified; for those less than the limit block number,
+ * we maintain a bitmap. When a relation fork is created or dropped,
+ * the limit block number should be set to 0. When it's truncated,
+ * the limit block number should be set to the length in blocks to
+ * which it was truncated.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/include/common/blkreftable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BLKREFTABLE_H
+#define BLKREFTABLE_H
+
+#include "storage/block.h"
+#include "storage/relfilelocator.h"
+
+/* Magic number for serialization file format. */
+#define BLOCKREFTABLE_MAGIC 0x652b137b
+
+typedef struct BlockRefTable BlockRefTable;
+typedef struct BlockRefTableEntry BlockRefTableEntry;
+typedef struct BlockRefTableReader BlockRefTableReader;
+typedef struct BlockRefTableWriter BlockRefTableWriter;
+
+/*
+ * The return value of io_callback_fn should be the number of bytes read
+ * or written. If an error occurs, the functions should report it and
+ * not return. When used as a write callback, short writes should be retried
+ * or treated as errors, so that if the callback returns, the return value
+ * is always the request length.
+ *
+ * report_error_fn should not return.
+ */
+typedef int (*io_callback_fn) (void *callback_arg, void *data, int length);
+typedef void (*report_error_fn) (void *calblack_arg, char *msg,...) pg_attribute_printf(2, 3);
+
+
+/*
+ * Functions for manipulating an entire in-memory block reference table.
+ */
+extern BlockRefTable *CreateEmptyBlockRefTable(void);
+extern void BlockRefTableSetLimitBlock(BlockRefTable *brtab,
+ const RelFileLocator *rlocator,
+ ForkNumber forknum,
+ BlockNumber limit_block);
+extern void BlockRefTableMarkBlockModified(BlockRefTable *brtab,
+ const RelFileLocator *rlocator,
+ ForkNumber forknum,
+ BlockNumber blknum);
+extern void WriteBlockRefTable(BlockRefTable *brtab,
+ io_callback_fn write_callback,
+ void *write_callback_arg);
+
+extern BlockRefTableEntry *BlockRefTableGetEntry(BlockRefTable *brtab,
+ const RelFileLocator *rlocator,
+ ForkNumber forknum,
+ BlockNumber *limit_block);
+extern int BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry,
+ BlockNumber start_blkno,
+ BlockNumber stop_blkno,
+ BlockNumber *blocks,
+ int nblocks);
+
+/*
+ * Functions for reading a block reference table incrementally from disk.
+ */
+extern BlockRefTableReader *CreateBlockRefTableReader(io_callback_fn read_callback,
+ void *read_callback_arg,
+ char *error_filename,
+ report_error_fn error_callback,
+ void *error_callback_arg);
+extern bool BlockRefTableReaderNextRelation(BlockRefTableReader *reader,
+ RelFileLocator *rlocator,
+ ForkNumber *forknum,
+ BlockNumber *limit_block);
+extern unsigned BlockRefTableReaderGetBlocks(BlockRefTableReader *reader,
+ BlockNumber *blocks,
+ int nblocks);
+extern void DestroyBlockRefTableReader(BlockRefTableReader *reader);
+
+/*
+ * Functions for writing a block reference table incrementally to disk.
+ *
+ * Note that entries must be written in the proper order, that is, sorted by
+ * database, then tablespace, then relfilenumber, then fork number. Caller
+ * is responsible for supplying data in the correct order. If that seems hard,
+ * use an in-memory BlockRefTable instead.
+ */
+extern BlockRefTableWriter *CreateBlockRefTableWriter(io_callback_fn write_callback,
+ void *write_callback_arg);
+extern void BlockRefTableWriteEntry(BlockRefTableWriter *writer,
+ BlockRefTableEntry *entry);
+extern void DestroyBlockRefTableWriter(BlockRefTableWriter *writer);
+
+extern BlockRefTableEntry *CreateBlockRefTableEntry(RelFileLocator rlocator,
+ ForkNumber forknum);
+extern void BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry,
+ BlockNumber limit_block);
+extern void BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry,
+ ForkNumber forknum,
+ BlockNumber blknum);
+extern void BlockRefTableFreeEntry(BlockRefTableEntry *entry);
+
+#endif /* BLKREFTABLE_H */
B_STARTUP,
B_WAL_RECEIVER,
B_WAL_SENDER,
+ B_WAL_SUMMARIZER,
B_WAL_WRITER,
} BackendType;
CheckpointerProcess,
WalWriterProcess,
WalReceiverProcess,
+ WalSummarizerProcess,
NUM_AUXPROCTYPES /* Must be last! */
} AuxProcType;
#define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess)
#define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess)
#define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess)
+#define AmWalSummarizerProcess() (MyAuxProcType == WalSummarizerProcess)
/*****************************************************************************
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * walsummarizer.h
+ *
+ * Header file for background WAL summarization process.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/postmaster/walsummarizer.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WALSUMMARIZER_H
+#define WALSUMMARIZER_H
+
+#include "access/xlogdefs.h"
+
+extern bool summarize_wal;
+extern int wal_summary_keep_time;
+
+extern Size WalSummarizerShmemSize(void);
+extern void WalSummarizerShmemInit(void);
+extern void WalSummarizerMain(void) pg_attribute_noreturn();
+
+extern XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli,
+ bool *lsn_is_exact,
+ bool reset_pending_lsn);
+extern void SetWalSummarizerLatch(void);
+extern XLogRecPtr WaitForWalSummarization(XLogRecPtr lsn, long timeout,
+ XLogRecPtr *pending_lsn);
+
+#endif
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.
*
- * Background writer, checkpointer, WAL writer and archiver run during normal
- * operation. Startup process and WAL receiver also consume 2 slots, but WAL
- * writer is launched only after startup has exited, so we only need 5 slots.
+ * Background writer, checkpointer, WAL writer, WAL summarizer, and archiver
+ * run during normal operation. Startup process and WAL receiver also consume
+ * 2 slots, but WAL writer is launched only after startup has exited, so we
+ * only need 6 slots.
*/
-#define NUM_AUXILIARY_PROCS 5
+#define NUM_AUXILIARY_PROCS 6
/* configurable options */
extern PGDLLIMPORT int DeadlockTimeout;
WAL_RECOVERY,
WAL_ARCHIVE_RECOVERY,
WAL_RECOVERY_TARGET,
+ WAL_SUMMARIZATION,
REPLICATION_SENDING,
REPLICATION_PRIMARY,
REPLICATION_STANDBY,
z_stream
z_streamp
zic_t
+BlockRefTable
+BlockRefTableBuffer
+BlockRefTableEntry
+BlockRefTableKey
+BlockRefTableReader
+BlockRefTableSerializedEntry
+BlockRefTableWriter
+SummarizerReadLocalXLogPrivate
+WalSummarizerData
+WalSummaryFile
+WalSummaryIO