#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_locale.h"
+#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+/*
+ * Create database strategy.
+ *
+ * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
+ * copied block.
+ *
+ * CREATEDB_FILE_COPY will simply perform a file system level copy of the
+ * database and log a single record for each tablespace copied. To make this
+ * safe, it also triggers checkpoints before and after the operation.
+ */
+typedef enum CreateDBStrategy
+{
+ CREATEDB_WAL_LOG,
+ CREATEDB_FILE_COPY
+} CreateDBStrategy;
+
typedef struct
{
Oid src_dboid; /* source (template) DB */
Oid dest_dboid; /* DB we are trying to create */
+ CreateDBStrategy strategy; /* create db strategy */
} createdb_failure_params;
typedef struct
Oid dest_tsoid; /* tablespace we are trying to move to */
} movedb_failure_params;
+/*
+ * Information about a relation to be copied when creating a database.
+ */
+typedef struct CreateDBRelInfo
+{
+ RelFileNode rnode; /* physical relation identifier */
+ Oid reloid; /* relation oid */
+ bool permanent; /* relation is permanent or unlogged */
+} CreateDBRelInfo;
+
+
/* non-export function prototypes */
static void createdb_failure_callback(int code, Datum arg);
static void movedb(const char *dbname, const char *tblspcname);
static void remove_dbtablespaces(Oid db_id);
static bool check_db_file_conflict(Oid db_id);
static int errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid,
+ Oid dst_tsid);
+static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath);
+static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
+ Oid dbid, char *srcpath,
+ List *rnodelist, Snapshot snapshot);
+static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
+ Oid tbid, Oid dbid,
+ char *srcpath);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+ bool isRedo);
+static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid,
+ Oid dst_tsid);
+
+/*
+ * Create a new database using the WAL_LOG strategy.
+ *
+ * Each copied block is separately written to the write-ahead log.
+ */
+static void
+CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
+ Oid src_tsid, Oid dst_tsid)
+{
+ char *srcpath;
+ char *dstpath;
+ List *rnodelist = NULL;
+ ListCell *cell;
+ LockRelId srcrelid;
+ LockRelId dstrelid;
+ RelFileNode srcrnode;
+ RelFileNode dstrnode;
+ CreateDBRelInfo *relinfo;
+
+ /* Get source and destination database paths. */
+ srcpath = GetDatabasePath(src_dboid, src_tsid);
+ dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+ /* Create database directory and write PG_VERSION file. */
+ CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+ /* Copy relmap file from source database to the destination database. */
+ RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
+
+ /* Get list of relfilenodes to copy from the source database. */
+ rnodelist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
+ Assert(rnodelist != NIL);
+
+ /*
+ * Database IDs will be the same for all relations so set them before
+ * entering the loop.
+ */
+ srcrelid.dbId = src_dboid;
+ dstrelid.dbId = dst_dboid;
+
+ /* Loop over our list of relfilenodes and copy each one. */
+ foreach(cell, rnodelist)
+ {
+ relinfo = lfirst(cell);
+ srcrnode = relinfo->rnode;
+
+ /*
+ * If the relation is from the source db's default tablespace then we
+ * need to create it in the destinations db's default tablespace.
+ * Otherwise, we need to create in the same tablespace as it is in the
+ * source database.
+ */
+ if (srcrnode.spcNode == src_tsid)
+ dstrnode.spcNode = dst_tsid;
+ else
+ dstrnode.spcNode = srcrnode.spcNode;
+
+ dstrnode.dbNode = dst_dboid;
+ dstrnode.relNode = srcrnode.relNode;
+
+ /*
+ * Acquire locks on source and target relations before copying.
+ *
+ * We typically do not read relation data into shared_buffers without
+ * holding a relation lock. It's unclear what could go wrong if we
+ * skipped it in this case, because nobody can be modifying either
+ * the source or destination database at this point, and we have locks
+ * on both databases, too, but let's take the conservative route.
+ */
+ dstrelid.relId = srcrelid.relId = relinfo->reloid;
+ LockRelationId(&srcrelid, AccessShareLock);
+ LockRelationId(&dstrelid, AccessShareLock);
+
+ /* Copy relation storage from source to the destination. */
+ CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->permanent);
+
+ /* Release the relation locks. */
+ UnlockRelationId(&srcrelid, AccessShareLock);
+ UnlockRelationId(&dstrelid, AccessShareLock);
+ }
+
+ list_free_deep(rnodelist);
+}
+
+/*
+ * Scan the pg_class table in the source database to identify the relations
+ * that need to be copied to the destination database.
+ *
+ * This is an exception to the usual rule that cross-database access is
+ * not possible. We can make it work here because we know that there are no
+ * connections to the source database and (since there can't be prepared
+ * transactions touching that database) no in-doubt tuples either. This
+ * means that we don't need to worry about pruning removing anything from
+ * under us, and we don't need to be too picky about our snapshot either.
+ * As long as it sees all previously-committed XIDs as committed and all
+ * aborted XIDs as aborted, we should be fine: nothing else is possible
+ * here.
+ *
+ * We can't rely on the relcache for anything here, because that only knows
+ * about the database to which we are connected, and can't handle access to
+ * other databases. That also means we can't rely on the heap scan
+ * infrastructure, which would be a bad idea anyway since it might try
+ * to do things like HOT pruning which we definitely can't do safely in
+ * a database to which we're not even connected.
+ */
+static List *
+ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
+{
+ RelFileNode rnode;
+ BlockNumber nblocks;
+ BlockNumber blkno;
+ Buffer buf;
+ Oid relfilenode;
+ Page page;
+ List *rnodelist = NIL;
+ LockRelId relid;
+ Relation rel;
+ Snapshot snapshot;
+ BufferAccessStrategy bstrategy;
+
+ /* Get pg_class relfilenode. */
+ relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+ RelationRelationId);
+
+ /* Don't read data into shared_buffers without holding a relation lock. */
+ relid.dbId = dbid;
+ relid.relId = RelationRelationId;
+ LockRelationId(&relid, AccessShareLock);
+
+ /* Prepare a RelFileNode for the pg_class relation. */
+ rnode.spcNode = tbid;
+ rnode.dbNode = dbid;
+ rnode.relNode = relfilenode;
+
+ /*
+ * We can't use a real relcache entry for a relation in some other
+ * database, but since we're only going to access the fields related
+ * to physical storage, a fake one is good enough. If we didn't do this
+ * and used the smgr layer directly, we would have to worry about
+ * invalidations.
+ */
+ rel = CreateFakeRelcacheEntry(rnode);
+ nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
+ FreeFakeRelcacheEntry(rel);
+
+ /* Use a buffer access strategy since this is a bulk read operation. */
+ bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+ /*
+ * As explained in the function header comments, we need a snapshot that
+ * will see all committed transactions as committed, and our transaction
+ * snapshot - or the active snapshot - might not be new enough for that,
+ * but the return value of GetLatestSnapshot() should work fine.
+ */
+ snapshot = GetLatestSnapshot();
+
+ /* Process the relation block by block. */
+ for (blkno = 0; blkno < nblocks; blkno++)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, bstrategy, false);
+
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+ page = BufferGetPage(buf);
+ if (PageIsNew(page) || PageIsEmpty(page))
+ {
+ UnlockReleaseBuffer(buf);
+ continue;
+ }
+
+ /* Append relevant pg_class tuples for current page to rnodelist. */
+ rnodelist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
+ srcpath, rnodelist,
+ snapshot);
+
+ UnlockReleaseBuffer(buf);
+ }
+
+ /* Release relation lock. */
+ UnlockRelationId(&relid, AccessShareLock);
+
+ return rnodelist;
+}
+
+/*
+ * Scan one page of the source database's pg_class relation and add relevant
+ * entries to rnodelist. The return value is the updated list.
+ */
+static List *
+ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
+ char *srcpath, List *rnodelist,
+ Snapshot snapshot)
+{
+ BlockNumber blkno = BufferGetBlockNumber(buf);
+ OffsetNumber offnum;
+ OffsetNumber maxoff;
+ HeapTupleData tuple;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ /* Loop over offsets. */
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemid;
+
+ itemid = PageGetItemId(page, offnum);
+
+ /* Nothing to do if slot is empty or already dead. */
+ if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+ ItemIdIsRedirected(itemid))
+ continue;
+
+ Assert(ItemIdIsNormal(itemid));
+ ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+ /* Initialize a HeapTupleData structure. */
+ tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+ tuple.t_len = ItemIdGetLength(itemid);
+ tuple.t_tableOid = RelationRelationId;
+
+ /* Skip tuples that are not visible to this snapshot. */
+ if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
+ {
+ CreateDBRelInfo *relinfo;
+
+ /*
+ * ScanSourceDatabasePgClassTuple is in charge of constructing
+ * a CreateDBRelInfo object for this tuple, but can also decide
+ * that this tuple isn't something we need to copy. If we do need
+ * to copy the relation, add it to the list.
+ */
+ relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
+ srcpath);
+ if (relinfo != NULL)
+ rnodelist = lappend(rnodelist, relinfo);
+ }
+ }
+ return rnodelist;
+}
+
+/*
+ * Decide whether a certain pg_class tuple represents something that
+ * needs to be copied from the source database to the destination database,
+ * and if so, construct a CreateDBRelInfo for it.
+ *
+ * Visbility checks are handled by the caller, so our job here is just
+ * to assess the data stored in the tuple.
+ */
+CreateDBRelInfo *
+ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
+ char *srcpath)
+{
+ CreateDBRelInfo *relinfo;
+ Form_pg_class classForm;
+ Oid relfilenode = InvalidOid;
+
+ classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+ /*
+ * Return NULL if this object does not need to be copied.
+ *
+ * Shared objects don't need to be copied, because they are shared.
+ * Objects without storage can't be copied, because there's nothing to
+ * copy. Temporary relations don't need to be copied either, because
+ * they are inaccessible outside of the session that created them,
+ * which must be gone already, and couldn't connect to a different database
+ * if it still existed. autovacuum will eventually remove the pg_class
+ * entries as well.
+ */
+ if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
+ !RELKIND_HAS_STORAGE(classForm->relkind) ||
+ classForm->relpersistence == RELPERSISTENCE_TEMP)
+ return NULL;
+
+ /*
+ * If relfilenode is valid then directly use it. Otherwise, consult the
+ * relmap.
+ */
+ if (OidIsValid(classForm->relfilenode))
+ relfilenode = classForm->relfilenode;
+ else
+ relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+ classForm->oid);
+
+ /* We must have a valid relfilenode oid. */
+ if (!OidIsValid(relfilenode))
+ elog(ERROR, "relation with OID %u does not have a valid relfilenode",
+ classForm->oid);
+
+ /* Prepare a rel info element and add it to the list. */
+ relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+ if (OidIsValid(classForm->reltablespace))
+ relinfo->rnode.spcNode = classForm->reltablespace;
+ else
+ relinfo->rnode.spcNode = tbid;
+
+ relinfo->rnode.dbNode = dbid;
+ relinfo->rnode.relNode = relfilenode;
+ relinfo->reloid = classForm->oid;
+
+ /* Temporary relations were rejected above. */
+ Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
+ relinfo->permanent =
+ (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
+
+ return relinfo;
+}
+
+/*
+ * Create database directory and write out the PG_VERSION file in the database
+ * path. If isRedo is true, it's okay for the database directory to exist
+ * already.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+ int fd;
+ int nbytes;
+ char versionfile[MAXPGPATH];
+ char buf[16];
+
+ /*
+ * Prepare version data before starting a critical section.
+ *
+ * Note that we don't have to copy this from the source database; there's
+ * only one legal value.
+ */
+ sprintf(buf, "%s\n", PG_MAJORVERSION);
+ nbytes = strlen(PG_MAJORVERSION) + 1;
+
+ /* If we are not in WAL replay then write the WAL. */
+ if (!isRedo)
+ {
+ xl_dbase_create_wal_log_rec xlrec;
+ XLogRecPtr lsn;
+
+ START_CRIT_SECTION();
+
+ xlrec.db_id = dbid;
+ xlrec.tablespace_id = tsid;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec),
+ sizeof(xl_dbase_create_wal_log_rec));
+
+ lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
+
+ /* As always, WAL must hit the disk before the data update does. */
+ XLogFlush(lsn);
+ }
+
+ /* Create database directory. */
+ if (MakePGDirectory(dbpath) < 0)
+ {
+ /* Failure other than already exists or not in WAL replay? */
+ if (errno != EEXIST || !isRedo)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m", dbpath)));
+ }
+
+ /*
+ * Create PG_VERSION file in the database path. If the file already
+ * exists and we are in WAL replay then try again to open it in write
+ * mode.
+ */
+ snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+ fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+ if (fd < 0 && errno == EEXIST && isRedo)
+ fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
+
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", versionfile)));
+
+ /* Write PG_MAJORVERSION in the PG_VERSION file. */
+ pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
+ errno = 0;
+ if ((int) write(fd, buf, nbytes) != nbytes)
+ {
+ /* If write didn't set errno, assume problem is no disk space. */
+ if (errno == 0)
+ errno = ENOSPC;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", versionfile)));
+ }
+ pgstat_report_wait_end();
+
+ /* Close the version file. */
+ CloseTransientFile(fd);
+
+ /* Critical section done. */
+ if (!isRedo)
+ END_CRIT_SECTION();
+}
+
+/*
+ * Create a new database using the FILE_COPY strategy.
+ *
+ * Copy each tablespace at the filesystem level, and log a single WAL record
+ * for each tablespace copied. This requires a checkpoint before and after the
+ * copy, which may be expensive, but it does greatly reduce WAL generation
+ * if the copied database is large.
+ */
+static void
+CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
+ Oid dst_tsid)
+{
+ TableScanDesc scan;
+ Relation rel;
+ HeapTuple tuple;
+
+ /*
+ * Force a checkpoint before starting the copy. This will force all dirty
+ * buffers, including those of unlogged tables, out to disk, to ensure
+ * source database is up-to-date on disk for the copy.
+ * FlushDatabaseBuffers() would suffice for that, but we also want to
+ * process any pending unlink requests. Otherwise, if a checkpoint
+ * happened while we're copying files, a file might be deleted just when
+ * we're about to copy it, causing the lstat() call in copydir() to fail
+ * with ENOENT.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+ CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+
+ /*
+ * Iterate through all tablespaces of the template database, and copy each
+ * one to the new database.
+ */
+ rel = table_open(TableSpaceRelationId, AccessShareLock);
+ scan = table_beginscan_catalog(rel, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
+ Oid srctablespace = spaceform->oid;
+ Oid dsttablespace;
+ char *srcpath;
+ char *dstpath;
+ struct stat st;
+
+ /* No need to copy global tablespace */
+ if (srctablespace == GLOBALTABLESPACE_OID)
+ continue;
+
+ srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+ if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+ directory_is_empty(srcpath))
+ {
+ /* Assume we can ignore it */
+ pfree(srcpath);
+ continue;
+ }
+
+ if (srctablespace == src_tsid)
+ dsttablespace = dst_tsid;
+ else
+ dsttablespace = srctablespace;
+
+ dstpath = GetDatabasePath(dst_dboid, dsttablespace);
+
+ /*
+ * Copy this subdirectory to the new location
+ *
+ * We don't need to copy subdirectories
+ */
+ copydir(srcpath, dstpath, false);
+
+ /* Record the filesystem change in XLOG */
+ {
+ xl_dbase_create_file_copy_rec xlrec;
+
+ xlrec.db_id = dst_dboid;
+ xlrec.tablespace_id = dsttablespace;
+ xlrec.src_db_id = src_dboid;
+ xlrec.src_tablespace_id = srctablespace;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec,
+ sizeof(xl_dbase_create_file_copy_rec));
+
+ (void) XLogInsert(RM_DBASE_ID,
+ XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
+ }
+ }
+ table_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ /*
+ * We force a checkpoint before committing. This effectively means that
+ * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
+ * replayed (at least not in ordinary crash recovery; we still have to
+ * make the XLOG entry for the benefit of PITR operations). This avoids
+ * two nasty scenarios:
+ *
+ * #1: When PITR is off, we don't XLOG the contents of newly created
+ * indexes; therefore the drop-and-recreate-whole-directory behavior of
+ * DBASE_CREATE replay would lose such indexes.
+ *
+ * #2: Since we have to recopy the source database during DBASE_CREATE
+ * replay, we run the risk of copying changes in it that were committed
+ * after the original CREATE DATABASE command but before the system crash
+ * that led to the replay. This is at least unexpected and at worst could
+ * lead to inconsistencies, eg duplicate table names.
+ *
+ * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+ *
+ * In PITR replay, the first of these isn't an issue, and the second is
+ * only a risk if the CREATE DATABASE and subsequent template database
+ * change both occur while a base backup is being taken. There doesn't
+ * seem to be much we can do about that except document it as a
+ * limitation.
+ *
+ * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
+ * strategy that avoids these problems.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+}
/*
* CREATE DATABASE
Oid
createdb(ParseState *pstate, const CreatedbStmt *stmt)
{
- TableScanDesc scan;
- Relation rel;
Oid src_dboid;
Oid src_owner;
int src_encoding = -1;
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL;
DefElem *dcollversion = NULL;
+ DefElem *dstrategy = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
const char *dbtemplate = NULL;
char *dbcollversion = NULL;
int notherbackends;
int npreparedxacts;
+ CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
createdb_failure_params fparms;
/* Extract options from the statement node tree */
(errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
}
+ else if (strcmp(defel->defname, "strategy") == 0)
+ {
+ if (dstrategy)
+ errorConflictingDefElem(defel, pstate);
+ dstrategy = defel;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
dbtemplate)));
}
+ /* Validate the database creation strategy. */
+ if (dstrategy && dstrategy->arg)
+ {
+ char *strategy;
+
+ strategy = defGetString(dstrategy);
+ if (strcmp(strategy, "wal_log") == 0)
+ dbstrategy = CREATEDB_WAL_LOG;
+ else if (strcmp(strategy, "file_copy") == 0)
+ dbstrategy = CREATEDB_FILE_COPY;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid create database strategy %s", strategy),
+ errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
+ }
+
/* If encoding or locales are defaulted, use source's setting */
if (encoding < 0)
encoding = src_encoding;
InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
/*
- * Force a checkpoint before starting the copy. This will force all dirty
- * buffers, including those of unlogged tables, out to disk, to ensure
- * source database is up-to-date on disk for the copy.
- * FlushDatabaseBuffers() would suffice for that, but we also want to
- * process any pending unlink requests. Otherwise, if a checkpoint
- * happened while we're copying files, a file might be deleted just when
- * we're about to copy it, causing the lstat() call in copydir() to fail
- * with ENOENT.
+ * If we're going to be reading data for the to-be-created database
+ * into shared_buffers, take a lock on it. Nobody should know that this
+ * database exists yet, but it's good to maintain the invariant that a
+ * lock an AccessExclusiveLock on the database is sufficient to drop all
+ * of its buffers without worrying about more being read later.
+ *
+ * Note that we need to do this before entering the PG_ENSURE_ERROR_CLEANUP
+ * block below, because createdb_failure_callback expects this lock to
+ * be held already.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
- | CHECKPOINT_FLUSH_ALL);
+ if (dbstrategy == CREATEDB_WAL_LOG)
+ LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
/*
* Once we start copying subdirectories, we need to be able to clean 'em
*/
fparms.src_dboid = src_dboid;
fparms.dest_dboid = dboid;
+ fparms.strategy = dbstrategy;
+
PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
PointerGetDatum(&fparms));
{
/*
- * Iterate through all tablespaces of the template database, and copy
- * each one to the new database.
- */
- rel = table_open(TableSpaceRelationId, AccessShareLock);
- scan = table_beginscan_catalog(rel, 0, NULL);
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
- Oid srctablespace = spaceform->oid;
- Oid dsttablespace;
- char *srcpath;
- char *dstpath;
- struct stat st;
-
- /* No need to copy global tablespace */
- if (srctablespace == GLOBALTABLESPACE_OID)
- continue;
-
- srcpath = GetDatabasePath(src_dboid, srctablespace);
-
- if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
- directory_is_empty(srcpath))
- {
- /* Assume we can ignore it */
- pfree(srcpath);
- continue;
- }
-
- if (srctablespace == src_deftablespace)
- dsttablespace = dst_deftablespace;
- else
- dsttablespace = srctablespace;
-
- dstpath = GetDatabasePath(dboid, dsttablespace);
-
- /*
- * Copy this subdirectory to the new location
- *
- * We don't need to copy subdirectories
- */
- copydir(srcpath, dstpath, false);
-
- /* Record the filesystem change in XLOG */
- {
- xl_dbase_create_rec xlrec;
-
- xlrec.db_id = dboid;
- xlrec.tablespace_id = dsttablespace;
- xlrec.src_db_id = src_dboid;
- xlrec.src_tablespace_id = srctablespace;
-
- XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
- (void) XLogInsert(RM_DBASE_ID,
- XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
- }
- }
- table_endscan(scan);
- table_close(rel, AccessShareLock);
-
- /*
- * We force a checkpoint before committing. This effectively means
- * that committed XLOG_DBASE_CREATE operations will never need to be
- * replayed (at least not in ordinary crash recovery; we still have to
- * make the XLOG entry for the benefit of PITR operations). This
- * avoids two nasty scenarios:
- *
- * #1: When PITR is off, we don't XLOG the contents of newly created
- * indexes; therefore the drop-and-recreate-whole-directory behavior
- * of DBASE_CREATE replay would lose such indexes.
- *
- * #2: Since we have to recopy the source database during DBASE_CREATE
- * replay, we run the risk of copying changes in it that were
- * committed after the original CREATE DATABASE command but before the
- * system crash that led to the replay. This is at least unexpected
- * and at worst could lead to inconsistencies, eg duplicate table
- * names.
- *
- * (Both of these were real bugs in releases 8.0 through 8.0.3.)
- *
- * In PITR replay, the first of these isn't an issue, and the second
- * is only a risk if the CREATE DATABASE and subsequent template
- * database change both occur while a base backup is being taken.
- * There doesn't seem to be much we can do about that except document
- * it as a limitation.
- *
- * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
- * we can avoid this.
+ * If the user has asked to create a database with WAL_LOG strategy
+ * then call CreateDatabaseUsingWalLog, which will copy the database
+ * at the block level and it will WAL log each copied block.
+ * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
+ * database file by file.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+ if (dbstrategy == CREATEDB_WAL_LOG)
+ CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
+ dst_deftablespace);
+ else
+ CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
+ dst_deftablespace);
/*
* Close pg_database, but keep lock till commit.
{
createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
+ /*
+ * If we were copying database at block levels then drop pages for the
+ * destination database that are in the shared buffer cache. And tell
+ * checkpointer to forget any pending fsync and unlink requests for files
+ * in the database. The reasoning behind doing this is same as explained
+ * in dropdb function. But unlike dropdb we don't need to call
+ * pgstat_drop_database because this database is still not created so
+ * there should not be any stat for this.
+ */
+ if (fparms->strategy == CREATEDB_WAL_LOG)
+ {
+ DropDatabaseBuffers(fparms->dest_dboid);
+ ForgetDatabaseSyncRequests(fparms->dest_dboid);
+
+ /* Release lock on the target database. */
+ UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
+ AccessShareLock);
+ }
+
/*
* Release lock on source database before doing recursive remove. This is
* not essential but it seems desirable to release the lock as soon as
* Record the filesystem change in XLOG
*/
{
- xl_dbase_create_rec xlrec;
+ xl_dbase_create_file_copy_rec xlrec;
xlrec.db_id = db_id;
xlrec.tablespace_id = dst_tblspcoid;
xlrec.src_tablespace_id = src_tblspcoid;
XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
+ XLogRegisterData((char *) &xlrec,
+ sizeof(xl_dbase_create_file_copy_rec));
(void) XLogInsert(RM_DBASE_ID,
- XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
+ XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
}
/*
/*
* Force another checkpoint here. As in CREATE DATABASE, this is to
- * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
- * operation, which would cause us to lose any unlogged operations
- * done in the new DB tablespace before the next checkpoint.
+ * ensure that we don't have to replay a committed
+ * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
+ * any unlogged operations done in the new DB tablespace before the
+ * next checkpoint.
*/
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/* Backup blocks are not used in dbase records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_DBASE_CREATE)
+ if (info == XLOG_DBASE_CREATE_FILE_COPY)
{
- xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+ xl_dbase_create_file_copy_rec *xlrec =
+ (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
char *src_path;
char *dst_path;
struct stat st;
*/
copydir(src_path, dst_path, false);
}
+ else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+ {
+ xl_dbase_create_wal_log_rec *xlrec =
+ (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
+ char *dbpath;
+
+ dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+ /* Create the database directory with the version file. */
+ CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
+ true);
+ }
else if (info == XLOG_DBASE_DROP)
{
xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
#include "executor/instrument.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
ForkNumber forkNum,
BlockNumber nForkBlock,
BlockNumber firstDelBlock);
+static void RelationCopyStorageUsingBuffer(Relation src, Relation dst,
+ ForkNumber forkNum,
+ bool isunlogged);
static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void);
static int rnode_comparator(const void *p1, const void *p2);
* ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
* a relcache entry for the relation.
*
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay. If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
+ * Pass permanent = true for a RELPERSISTENCE_PERMANENT relation, and
+ * permanent = false for a RELPERSISTENCE_UNLOGGED relation. This function
+ * cannot be used for temporary relations (and making that work might be
+ * difficult, unless we only want to read temporary relations for our own
+ * BackendId).
*/
Buffer
ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
- BufferAccessStrategy strategy)
+ BufferAccessStrategy strategy, bool permanent)
{
bool hit;
SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
- Assert(InRecovery);
-
- return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
+ return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+ RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
mode, strategy, &hit);
}
pfree(srels);
}
+/* ---------------------------------------------------------------------
+ * RelationCopyStorageUsingBuffer
+ *
+ * Copy fork's data using bufmgr. Same as RelationCopyStorage but instead
+ * of using smgrread and smgrextend this will copy using bufmgr APIs.
+ *
+ * Refer comments atop CreateAndCopyRelationData() for details about
+ * 'permanent' parameter.
+ * --------------------------------------------------------------------
+ */
+static void
+RelationCopyStorageUsingBuffer(Relation src, Relation dst, ForkNumber forkNum,
+ bool permanent)
+{
+ Buffer srcBuf;
+ Buffer dstBuf;
+ Page srcPage;
+ Page dstPage;
+ bool use_wal;
+ BlockNumber nblocks;
+ BlockNumber blkno;
+ BufferAccessStrategy bstrategy_src;
+ BufferAccessStrategy bstrategy_dst;
+
+ /*
+ * In general, we want to write WAL whenever wal_level > 'minimal', but
+ * we can skip it when copying any fork of an unlogged relation other
+ * than the init fork.
+ */
+ use_wal = XLogIsNeeded() && (permanent || forkNum == INIT_FORKNUM);
+
+ /* Get number of blocks in the source relation. */
+ nblocks = smgrnblocks(RelationGetSmgr(src), forkNum);
+
+ /* Nothing to copy; just return. */
+ if (nblocks == 0)
+ return;
+
+ /* This is a bulk operation, so use buffer access strategies. */
+ bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
+ bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
+
+ /* Iterate over each block of the source relation file. */
+ for (blkno = 0; blkno < nblocks; blkno++)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Read block from source relation. */
+ srcBuf = ReadBufferWithoutRelcache(src->rd_node, forkNum, blkno,
+ RBM_NORMAL, bstrategy_src,
+ permanent);
+ srcPage = BufferGetPage(srcBuf);
+ if (PageIsNew(srcPage) || PageIsEmpty(srcPage))
+ {
+ ReleaseBuffer(srcBuf);
+ continue;
+ }
+
+ /* Use P_NEW to extend the destination relation. */
+ dstBuf = ReadBufferWithoutRelcache(dst->rd_node, forkNum, P_NEW,
+ RBM_NORMAL, bstrategy_dst,
+ permanent);
+ LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE);
+
+ START_CRIT_SECTION();
+
+ /* Copy page data from the source to the destination. */
+ dstPage = BufferGetPage(dstBuf);
+ memcpy(dstPage, srcPage, BLCKSZ);
+ MarkBufferDirty(dstBuf);
+
+ /* WAL-log the copied page. */
+ if (use_wal)
+ log_newpage_buffer(dstBuf, true);
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(dstBuf);
+ ReleaseBuffer(srcBuf);
+ }
+}
+
+/* ---------------------------------------------------------------------
+ * CreateAndCopyRelationData
+ *
+ * Create destination relation storage and copy all forks from the
+ * source relation to the destination.
+ *
+ * Pass permanent as true for permanent relations and false for
+ * unlogged relations. Currently this API is not supported for
+ * temporary relations.
+ * --------------------------------------------------------------------
+ */
+void
+CreateAndCopyRelationData(RelFileNode src_rnode, RelFileNode dst_rnode,
+ bool permanent)
+{
+ Relation src_rel;
+ Relation dst_rel;
+ char relpersistence;
+
+ /* Set the relpersistence. */
+ relpersistence = permanent ?
+ RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED;
+
+ /*
+ * We can't use a real relcache entry for a relation in some other
+ * database, but since we're only going to access the fields related
+ * to physical storage, a fake one is good enough. If we didn't do this
+ * and used the smgr layer directly, we would have to worry about
+ * invalidations.
+ */
+ src_rel = CreateFakeRelcacheEntry(src_rnode);
+ dst_rel = CreateFakeRelcacheEntry(dst_rnode);
+
+ /*
+ * Create and copy all forks of the relation. During create database we
+ * have a separate cleanup mechanism which deletes complete database
+ * directory. Therefore, each individual relation doesn't need to be
+ * registered for cleanup.
+ */
+ RelationCreateStorage(dst_rnode, relpersistence, false);
+
+ /* copy main fork. */
+ RelationCopyStorageUsingBuffer(src_rel, dst_rel, MAIN_FORKNUM, permanent);
+
+ /* copy those extra forks that exist */
+ for (ForkNumber forkNum = MAIN_FORKNUM + 1;
+ forkNum <= MAX_FORKNUM; forkNum++)
+ {
+ if (smgrexists(RelationGetSmgr(src_rel), forkNum))
+ {
+ smgrcreate(RelationGetSmgr(dst_rel), forkNum, false);
+
+ /*
+ * WAL log creation if the relation is persistent, or this is the
+ * init fork of an unlogged relation.
+ */
+ if (permanent || forkNum == INIT_FORKNUM)
+ log_smgrcreate(&dst_rnode, forkNum);
+
+ /* Copy a fork's data, block by block. */
+ RelationCopyStorageUsingBuffer(src_rel, dst_rel, forkNum,
+ permanent);
+ }
+ }
+
+ /* Release fake relcache entries. */
+ FreeFakeRelcacheEntry(src_rel);
+ FreeFakeRelcacheEntry(dst_rel);
+}
+
/* ---------------------------------------------------------------------
* FlushDatabaseBuffers
*