Add new block-by-block strategy for CREATE DATABASE.
authorRobert Haas <rhaas@postgresql.org>
Tue, 29 Mar 2022 15:31:43 +0000 (11:31 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 29 Mar 2022 15:48:36 +0000 (11:48 -0400)
Because this strategy logs changes on a block-by-block basis, it
avoids the need to checkpoint before and after the operation.
However, because it logs each changed block individually, it might
generate a lot of extra write-ahead logging if the template database
is large. Therefore, the older strategy remains available via a new
STRATEGY parameter to CREATE DATABASE, and a corresponding --strategy
option to createdb.

Somewhat controversially, this patch assembles the list of relations
to be copied to the new database by reading the pg_class relation of
the template database. Cross-database access like this isn't normally
possible, but it can be made to work here because there can't be any
connections to the database being copied, nor can it contain any
in-doubt transactions. Even so, we have to use lower-level interfaces
than normal, since the table scan and relcache interfaces will not
work for a database to which we're not connected. The advantage of
this approach is that we do not need to rely on the filesystem to
determine what ought to be copied, but instead on PostgreSQL's own
knowledge of the database structure. This avoids, for example,
copying stray files that happen to be located in the source database
directory.

Dilip Kumar, with a fairly large number of cosmetic changes by me.
Reviewed and tested by Ashutosh Sharma, Andres Freund, John Naylor,
Greg Nancarrow, Neha Sharma. Additional feedback from Bruce Momjian,
Heikki Linnakangas, Julien Rouhaud, Adam Brusselback, Kyotaro
Horiguchi, Tomas Vondra, Andrew Dunstan, Álvaro Herrera, and others.

Discussion: http://postgr.es/m/CA+TgmoYtcdxBjLh31DLxUXHxFVMPGzrU5_T=CYCvRyFHywSBUQ@mail.gmail.com

28 files changed:
contrib/bloom/blinsert.c
doc/src/sgml/monitoring.sgml
doc/src/sgml/ref/create_database.sgml
doc/src/sgml/ref/createdb.sgml
src/backend/access/heap/heapam_handler.c
src/backend/access/nbtree/nbtree.c
src/backend/access/rmgrdesc/dbasedesc.c
src/backend/access/transam/xlogutils.c
src/backend/catalog/heap.c
src/backend/catalog/storage.c
src/backend/commands/dbcommands.c
src/backend/commands/tablecmds.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/lmgr/lmgr.c
src/backend/utils/activity/wait_event.c
src/backend/utils/cache/relcache.c
src/backend/utils/cache/relmapper.c
src/bin/pg_rewind/parsexlog.c
src/bin/psql/tab-complete.c
src/bin/scripts/createdb.c
src/bin/scripts/t/020_createdb.pl
src/include/catalog/storage.h
src/include/commands/dbcommands_xlog.h
src/include/storage/bufmgr.h
src/include/storage/lmgr.h
src/include/utils/relmapper.h
src/include/utils/wait_event.h
src/tools/pgindent/typedefs.list

index c94cf34e698e2c6069c5c5fb13506d8631503f5f..82378db441a9a9cc682050dfc96f8d3c110af5d5 100644 (file)
@@ -173,7 +173,7 @@ blbuildempty(Relation index)
     * Write the page and log it.  It might seem that an immediate sync would
     * be sufficient to guarantee that the file exists on disk, but recovery
     * itself might remove it while replaying, for example, an
-    * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
+    * XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE record.  Therefore, we need
     * this even when wal_level=minimal.
     */
    PageSetChecksumInplace(metapage, BLOOM_METAPAGE_BLKNO);
index 6a6b09dc4560203761528de99ec253a26831ca81..3b9172f65bd9ab52b07812082bd77cf13bd49d5d 100644 (file)
@@ -1502,6 +1502,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry><literal>TwophaseFileWrite</literal></entry>
       <entry>Waiting for a write of a two phase state file.</entry>
      </row>
+     <row>
+      <entry><literal>VersionFileWrite</literal></entry>
+      <entry>Waiting for the version file to be written while creating a database.</entry>
+     </row>
      <row>
       <entry><literal>WALBootstrapSync</literal></entry>
       <entry>Waiting for WAL to reach durable storage during
index 5ae785ab95aaa988df572883bca6a567c08ff652..255ad3a1ce04b8a1df6133c640eaa6dd8ddc6609 100644 (file)
@@ -25,6 +25,7 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
     [ [ WITH ] [ OWNER [=] <replaceable class="parameter">user_name</replaceable> ]
            [ TEMPLATE [=] <replaceable class="parameter">template</replaceable> ]
            [ ENCODING [=] <replaceable class="parameter">encoding</replaceable> ]
+           [ STRATEGY [=] <replaceable class="parameter">strategy</replaceable> ] ]
            [ LOCALE [=] <replaceable class="parameter">locale</replaceable> ]
            [ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ]
            [ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ]
@@ -118,6 +119,27 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
        </para>
       </listitem>
      </varlistentry>
+     <varlistentry id="create-database-strategy" xreflabel="CREATE DATABASE STRATEGY">
+      <term><replaceable class="parameter">strategy</replaceable></term>
+      <listitem>
+       <para>
+        Strategy to be used in creating the new database.  If
+        the <literal>WAL_LOG</literal> strategy is used, the database will be
+        copied block by block and each block will be separately written
+        to the write-ahead log. This is the most efficient strategy in
+        cases where the template database is small, and therefore it is the
+        default. The older <literal>FILE_COPY</literal> strategy is also
+        available. This strategy writes a small record to the write-ahead log
+        for each tablespace used by the target database. Each such record
+        represents copying an entire directory to a new location at the
+        filesystem level. While this does reduce the write-ahed
+        log volume substantially, especially if the template database is large,
+        it also forces the system to perform a checkpoint both before and
+        after the creation of the new database. In some situations, this may
+        have a noticeable negative impact on overall system performance.
+       </para>
+      </listitem>
+     </varlistentry>
      <varlistentry>
       <term><replaceable class="parameter">locale</replaceable></term>
       <listitem>
index be42e502d69925dc2adab1cee07a39b56deeaf66..671cd362d94a4cb866bce32d9bd72050513af7e7 100644 (file)
@@ -177,6 +177,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-S <replaceable class="parameter">template</replaceable></option></term>
+      <term><option>--strategy=<replaceable class="parameter">strategy</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the database creation strategy.  See
+        <xref linkend="create-database-strategy" /> for more details.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-T <replaceable class="parameter">template</replaceable></option></term>
       <term><option>--template=<replaceable class="parameter">template</replaceable></option></term>
index 39ef8a0b77ded9040570d58f2ddc675251357cb9..dee264e8596b11e02dda18907068ec38eb5f8ba6 100644 (file)
@@ -593,7 +593,7 @@ heapam_relation_set_new_filenode(Relation rel,
     */
    *minmulti = GetOldestMultiXactId();
 
-   srel = RelationCreateStorage(*newrnode, persistence);
+   srel = RelationCreateStorage(*newrnode, persistence, true);
 
    /*
     * If required, set up an init fork for an unlogged table so that it can
@@ -601,7 +601,7 @@ heapam_relation_set_new_filenode(Relation rel,
     * even if the page has been logged, because the write did not go through
     * shared_buffers and therefore a concurrent checkpoint may have moved the
     * redo pointer past our xlog record.  Recovery may as well remove it
-    * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
+    * while replaying, for example, XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE
     * record. Therefore, logging is necessary even if wal_level=minimal.
     */
    if (persistence == RELPERSISTENCE_UNLOGGED)
@@ -645,7 +645,7 @@ heapam_relation_copy_data(Relation rel, const RelFileNode *newrnode)
     * NOTE: any conflict in relfilenode value will be caught in
     * RelationCreateStorage().
     */
-   RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence);
+   RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, true);
 
    /* copy main fork */
    RelationCopyStorage(RelationGetSmgr(rel), dstrel, MAIN_FORKNUM,
index c9b4964c1e809df0677cbcf78ca45373fadb1860..dacf3f7a587d63abb78a101416a660faeee4c3c3 100644 (file)
@@ -161,7 +161,7 @@ btbuildempty(Relation index)
     * Write the page and log it.  It might seem that an immediate sync would
     * be sufficient to guarantee that the file exists on disk, but recovery
     * itself might remove it while replaying, for example, an
-    * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
+    * XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE record.  Therefore, we need
     * this even when wal_level=minimal.
     */
    PageSetChecksumInplace(metapage, BTREE_METAPAGE);
index 03af3fdbcfdecbbccc38de3b9a39b6bdeb2a7073..523d0b3c1daf7c603815e70007743aa37e8446a2 100644 (file)
@@ -24,14 +24,23 @@ dbase_desc(StringInfo buf, XLogReaderState *record)
    char       *rec = XLogRecGetData(record);
    uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-   if (info == XLOG_DBASE_CREATE)
+   if (info == XLOG_DBASE_CREATE_FILE_COPY)
    {
-       xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
+       xl_dbase_create_file_copy_rec *xlrec =
+       (xl_dbase_create_file_copy_rec *) rec;
 
        appendStringInfo(buf, "copy dir %u/%u to %u/%u",
                         xlrec->src_tablespace_id, xlrec->src_db_id,
                         xlrec->tablespace_id, xlrec->db_id);
    }
+   else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+   {
+       xl_dbase_create_wal_log_rec *xlrec =
+       (xl_dbase_create_wal_log_rec *) rec;
+
+       appendStringInfo(buf, "create dir %u/%u",
+                        xlrec->tablespace_id, xlrec->db_id);
+   }
    else if (info == XLOG_DBASE_DROP)
    {
        xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
@@ -51,8 +60,11 @@ dbase_identify(uint8 info)
 
    switch (info & ~XLR_INFO_MASK)
    {
-       case XLOG_DBASE_CREATE:
-           id = "CREATE";
+       case XLOG_DBASE_CREATE_FILE_COPY:
+           id = "CREATE_FILE_COPY";
+           break;
+       case XLOG_DBASE_CREATE_WAL_LOG:
+           id = "CREATE_WAL_LOG";
            break;
        case XLOG_DBASE_DROP:
            id = "DROP";
index 511f2f186f56fe06dbcb9808481d71537ae4411a..a4dedc58b71e0c6710a22c1005b80cb66b2671d4 100644 (file)
@@ -484,7 +484,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
    {
        /* page exists in file */
        buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-                                          mode, NULL);
+                                          mode, NULL, true);
    }
    else
    {
@@ -509,7 +509,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
                ReleaseBuffer(buffer);
            }
            buffer = ReadBufferWithoutRelcache(rnode, forknum,
-                                              P_NEW, mode, NULL);
+                                              P_NEW, mode, NULL, true);
        }
        while (BufferGetBlockNumber(buffer) < blkno);
        /* Handle the corner case that P_NEW returns non-consecutive pages */
@@ -519,7 +519,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
            ReleaseBuffer(buffer);
            buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-                                              mode, NULL);
+                                              mode, NULL, true);
        }
    }
 
index 696fd5977e032a270d2028bf04365cd370ecdd88..6eb78a9c0ff1e74859dcb1327e69e272b3bd192c 100644 (file)
@@ -387,7 +387,7 @@ heap_create(const char *relname,
                                            relpersistence,
                                            relfrozenxid, relminmxid);
        else if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
-           RelationCreateStorage(rel->rd_node, relpersistence);
+           RelationCreateStorage(rel->rd_node, relpersistence, true);
        else
            Assert(false);
    }
index ce5568ff0849c60974367dc64d47834e9f0a1b1b..9898701a4387b2006a2e9e27cf9a7f36757d8270 100644 (file)
@@ -112,12 +112,14 @@ AddPendingSync(const RelFileNode *rnode)
  * modules that need them.
  *
  * This function is transactional. The creation is WAL-logged, and if the
- * transaction aborts later on, the storage will be destroyed.
+ * transaction aborts later on, the storage will be destroyed.  A caller
+ * that does not want the storage to be destroyed in case of an abort may
+ * pass register_delete = false.
  */
 SMgrRelation
-RelationCreateStorage(RelFileNode rnode, char relpersistence)
+RelationCreateStorage(RelFileNode rnode, char relpersistence,
+                     bool register_delete)
 {
-   PendingRelDelete *pending;
    SMgrRelation srel;
    BackendId   backend;
    bool        needs_wal;
@@ -149,15 +151,23 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
    if (needs_wal)
        log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
-   /* Add the relation to the list of stuff to delete at abort */
-   pending = (PendingRelDelete *)
-       MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-   pending->relnode = rnode;
-   pending->backend = backend;
-   pending->atCommit = false;  /* delete if abort */
-   pending->nestLevel = GetCurrentTransactionNestLevel();
-   pending->next = pendingDeletes;
-   pendingDeletes = pending;
+   /*
+    * Add the relation to the list of stuff to delete at abort, if we are
+    * asked to do so.
+    */
+   if (register_delete)
+   {
+       PendingRelDelete *pending;
+
+       pending = (PendingRelDelete *)
+           MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+       pending->relnode = rnode;
+       pending->backend = backend;
+       pending->atCommit = false;  /* delete if abort */
+       pending->nestLevel = GetCurrentTransactionNestLevel();
+       pending->next = pendingDeletes;
+       pendingDeletes = pending;
+   }
 
    if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
    {
index 623e5ec77895d161d51a074f6679e42a54f5ad84..df16533901e8239cb64638234080da34d47754a7 100644 (file)
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_locale.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
+/*
+ * Create database strategy.
+ *
+ * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
+ * copied block.
+ *
+ * CREATEDB_FILE_COPY will simply perform a file system level copy of the
+ * database and log a single record for each tablespace copied. To make this
+ * safe, it also triggers checkpoints before and after the operation.
+ */
+typedef enum CreateDBStrategy
+{
+   CREATEDB_WAL_LOG,
+   CREATEDB_FILE_COPY
+} CreateDBStrategy;
+
 typedef struct
 {
    Oid         src_dboid;      /* source (template) DB */
    Oid         dest_dboid;     /* DB we are trying to create */
+   CreateDBStrategy strategy;  /* create db strategy */
 } createdb_failure_params;
 
 typedef struct
@@ -78,6 +96,17 @@ typedef struct
    Oid         dest_tsoid;     /* tablespace we are trying to move to */
 } movedb_failure_params;
 
+/*
+ * Information about a relation to be copied when creating a database.
+ */
+typedef struct CreateDBRelInfo
+{
+   RelFileNode rnode;          /* physical relation identifier */
+   Oid         reloid;         /* relation oid */
+   bool        permanent;      /* relation is permanent or unlogged */
+} CreateDBRelInfo;
+
+
 /* non-export function prototypes */
 static void createdb_failure_callback(int code, Datum arg);
 static void movedb(const char *dbname, const char *tblspcname);
@@ -93,7 +122,546 @@ static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid,
+                                     Oid dst_tsid);
+static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath);
+static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
+                                          Oid dbid, char *srcpath,
+                                          List *rnodelist, Snapshot snapshot);
+static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
+                                                      Oid tbid, Oid dbid,
+                                                      char *srcpath);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+                                   bool isRedo);
+static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid,
+                                       Oid dst_tsid);
+
+/*
+ * Create a new database using the WAL_LOG strategy.
+ *
+ * Each copied block is separately written to the write-ahead log.
+ */
+static void
+CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
+                         Oid src_tsid, Oid dst_tsid)
+{
+   char       *srcpath;
+   char       *dstpath;
+   List       *rnodelist = NULL;
+   ListCell   *cell;
+   LockRelId   srcrelid;
+   LockRelId   dstrelid;
+   RelFileNode srcrnode;
+   RelFileNode dstrnode;
+   CreateDBRelInfo *relinfo;
+
+   /* Get source and destination database paths. */
+   srcpath = GetDatabasePath(src_dboid, src_tsid);
+   dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+   /* Create database directory and write PG_VERSION file. */
+   CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+   /* Copy relmap file from source database to the destination database. */
+   RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
+
+   /* Get list of relfilenodes to copy from the source database. */
+   rnodelist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
+   Assert(rnodelist != NIL);
+
+   /*
+    * Database IDs will be the same for all relations so set them before
+    * entering the loop.
+    */
+   srcrelid.dbId = src_dboid;
+   dstrelid.dbId = dst_dboid;
+
+   /* Loop over our list of relfilenodes and copy each one. */
+   foreach(cell, rnodelist)
+   {
+       relinfo = lfirst(cell);
+       srcrnode = relinfo->rnode;
+
+       /*
+        * If the relation is from the source db's default tablespace then we
+        * need to create it in the destinations db's default tablespace.
+        * Otherwise, we need to create in the same tablespace as it is in the
+        * source database.
+        */
+       if (srcrnode.spcNode == src_tsid)
+           dstrnode.spcNode = dst_tsid;
+       else
+           dstrnode.spcNode = srcrnode.spcNode;
+
+       dstrnode.dbNode = dst_dboid;
+       dstrnode.relNode = srcrnode.relNode;
+
+       /*
+        * Acquire locks on source and target relations before copying.
+        *
+        * We typically do not read relation data into shared_buffers without
+        * holding a relation lock. It's unclear what could go wrong if we
+        * skipped it in this case, because nobody can be modifying either
+        * the source or destination database at this point, and we have locks
+        * on both databases, too, but let's take the conservative route.
+        */
+       dstrelid.relId = srcrelid.relId = relinfo->reloid;
+       LockRelationId(&srcrelid, AccessShareLock);
+       LockRelationId(&dstrelid, AccessShareLock);
+
+       /* Copy relation storage from source to the destination. */
+       CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->permanent);
+
+       /* Release the relation locks. */
+       UnlockRelationId(&srcrelid, AccessShareLock);
+       UnlockRelationId(&dstrelid, AccessShareLock);
+   }
+
+   list_free_deep(rnodelist);
+}
+
+/*
+ * Scan the pg_class table in the source database to identify the relations
+ * that need to be copied to the destination database.
+ *
+ * This is an exception to the usual rule that cross-database access is
+ * not possible. We can make it work here because we know that there are no
+ * connections to the source database and (since there can't be prepared
+ * transactions touching that database) no in-doubt tuples either. This
+ * means that we don't need to worry about pruning removing anything from
+ * under us, and we don't need to be too picky about our snapshot either.
+ * As long as it sees all previously-committed XIDs as committed and all
+ * aborted XIDs as aborted, we should be fine: nothing else is possible
+ * here.
+ *
+ * We can't rely on the relcache for anything here, because that only knows
+ * about the database to which we are connected, and can't handle access to
+ * other databases. That also means we can't rely on the heap scan
+ * infrastructure, which would be a bad idea anyway since it might try
+ * to do things like HOT pruning which we definitely can't do safely in
+ * a database to which we're not even connected.
+ */
+static List *
+ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
+{
+   RelFileNode rnode;
+   BlockNumber nblocks;
+   BlockNumber blkno;
+   Buffer      buf;
+   Oid         relfilenode;
+   Page        page;
+   List       *rnodelist = NIL;
+   LockRelId   relid;
+   Relation    rel;
+   Snapshot    snapshot;
+   BufferAccessStrategy bstrategy;
+
+   /* Get pg_class relfilenode. */
+   relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+                                                     RelationRelationId);
+
+   /* Don't read data into shared_buffers without holding a relation lock. */
+   relid.dbId = dbid;
+   relid.relId = RelationRelationId;
+   LockRelationId(&relid, AccessShareLock);
+
+   /* Prepare a RelFileNode for the pg_class relation. */
+   rnode.spcNode = tbid;
+   rnode.dbNode = dbid;
+   rnode.relNode = relfilenode;
+
+   /*
+    * We can't use a real relcache entry for a relation in some other
+    * database, but since we're only going to access the fields related
+    * to physical storage, a fake one is good enough. If we didn't do this
+    * and used the smgr layer directly, we would have to worry about
+    * invalidations.
+    */
+   rel = CreateFakeRelcacheEntry(rnode);
+   nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
+   FreeFakeRelcacheEntry(rel);
+
+   /* Use a buffer access strategy since this is a bulk read operation. */
+   bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+   /*
+    * As explained in the function header comments, we need a snapshot that
+    * will see all committed transactions as committed, and our transaction
+    * snapshot - or the active snapshot - might not be new enough for that,
+    * but the return value of GetLatestSnapshot() should work fine.
+    */
+   snapshot = GetLatestSnapshot();
+
+   /* Process the relation block by block. */
+   for (blkno = 0; blkno < nblocks; blkno++)
+   {
+       CHECK_FOR_INTERRUPTS();
+
+       buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+                                       RBM_NORMAL, bstrategy, false);
+
+       LockBuffer(buf, BUFFER_LOCK_SHARE);
+       page = BufferGetPage(buf);
+       if (PageIsNew(page) || PageIsEmpty(page))
+       {
+           UnlockReleaseBuffer(buf);
+           continue;
+       }
+
+       /* Append relevant pg_class tuples for current page to rnodelist. */
+       rnodelist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
+                                                 srcpath, rnodelist,
+                                                 snapshot);
+
+       UnlockReleaseBuffer(buf);
+   }
+
+   /* Release relation lock. */
+   UnlockRelationId(&relid, AccessShareLock);
+
+   return rnodelist;
+}
+
+/*
+ * Scan one page of the source database's pg_class relation and add relevant
+ * entries to rnodelist. The return value is the updated list.
+ */
+static List *
+ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
+                             char *srcpath, List *rnodelist,
+                             Snapshot snapshot)
+{
+   BlockNumber     blkno = BufferGetBlockNumber(buf);
+   OffsetNumber    offnum;
+   OffsetNumber    maxoff;
+   HeapTupleData   tuple;
+
+   maxoff = PageGetMaxOffsetNumber(page);
+
+   /* Loop over offsets. */
+   for (offnum = FirstOffsetNumber;
+        offnum <= maxoff;
+        offnum = OffsetNumberNext(offnum))
+   {
+       ItemId      itemid;
+
+       itemid = PageGetItemId(page, offnum);
+
+       /* Nothing to do if slot is empty or already dead. */
+       if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+           ItemIdIsRedirected(itemid))
+           continue;
+
+       Assert(ItemIdIsNormal(itemid));
+       ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+       /* Initialize a HeapTupleData structure. */
+       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+       tuple.t_len = ItemIdGetLength(itemid);
+       tuple.t_tableOid = RelationRelationId;
+
+       /* Skip tuples that are not visible to this snapshot. */
+       if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
+       {
+           CreateDBRelInfo *relinfo;
+
+           /*
+            * ScanSourceDatabasePgClassTuple is in charge of constructing
+            * a CreateDBRelInfo object for this tuple, but can also decide
+            * that this tuple isn't something we need to copy. If we do need
+            * to copy the relation, add it to the list.
+            */
+           relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
+                                                    srcpath);
+           if (relinfo != NULL)
+               rnodelist = lappend(rnodelist, relinfo);
+       }
+   }
 
+   return rnodelist;
+}
+
+/*
+ * Decide whether a certain pg_class tuple represents something that
+ * needs to be copied from the source database to the destination database,
+ * and if so, construct a CreateDBRelInfo for it.
+ *
+ * Visbility checks are handled by the caller, so our job here is just
+ * to assess the data stored in the tuple.
+ */
+CreateDBRelInfo *
+ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
+                              char *srcpath)
+{
+   CreateDBRelInfo    *relinfo;
+   Form_pg_class       classForm;
+   Oid                 relfilenode = InvalidOid;
+
+   classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+   /*
+    * Return NULL if this object does not need to be copied.
+    *
+    * Shared objects don't need to be copied, because they are shared.
+    * Objects without storage can't be copied, because there's nothing to
+    * copy. Temporary relations don't need to be copied either, because
+    * they are inaccessible outside of the session that created them,
+    * which must be gone already, and couldn't connect to a different database
+    * if it still existed. autovacuum will eventually remove the pg_class
+    * entries as well.
+    */
+   if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
+       !RELKIND_HAS_STORAGE(classForm->relkind) ||
+       classForm->relpersistence == RELPERSISTENCE_TEMP)
+       return NULL;
+
+   /*
+    * If relfilenode is valid then directly use it.  Otherwise, consult the
+    * relmap.
+    */
+   if (OidIsValid(classForm->relfilenode))
+       relfilenode = classForm->relfilenode;
+   else
+       relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+                                                         classForm->oid);
+
+   /* We must have a valid relfilenode oid. */
+   if (!OidIsValid(relfilenode))
+       elog(ERROR, "relation with OID %u does not have a valid relfilenode",
+            classForm->oid);
+
+   /* Prepare a rel info element and add it to the list. */
+   relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+   if (OidIsValid(classForm->reltablespace))
+       relinfo->rnode.spcNode = classForm->reltablespace;
+   else
+       relinfo->rnode.spcNode = tbid;
+
+   relinfo->rnode.dbNode = dbid;
+   relinfo->rnode.relNode = relfilenode;
+   relinfo->reloid = classForm->oid;
+
+   /* Temporary relations were rejected above. */
+   Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
+   relinfo->permanent =
+       (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
+
+   return relinfo;
+}
+
+/*
+ * Create database directory and write out the PG_VERSION file in the database
+ * path.  If isRedo is true, it's okay for the database directory to exist
+ * already.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+   int         fd;
+   int         nbytes;
+   char        versionfile[MAXPGPATH];
+   char        buf[16];
+
+   /*
+    * Prepare version data before starting a critical section.
+    *
+    * Note that we don't have to copy this from the source database; there's
+    * only one legal value.
+    */
+   sprintf(buf, "%s\n", PG_MAJORVERSION);
+   nbytes = strlen(PG_MAJORVERSION) + 1;
+
+   /* If we are not in WAL replay then write the WAL. */
+   if (!isRedo)
+   {
+       xl_dbase_create_wal_log_rec xlrec;
+       XLogRecPtr  lsn;
+
+       START_CRIT_SECTION();
+
+       xlrec.db_id = dbid;
+       xlrec.tablespace_id = tsid;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) (&xlrec),
+                        sizeof(xl_dbase_create_wal_log_rec));
+
+       lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
+
+       /* As always, WAL must hit the disk before the data update does. */
+       XLogFlush(lsn);
+   }
+
+   /* Create database directory. */
+   if (MakePGDirectory(dbpath) < 0)
+   {
+       /* Failure other than already exists or not in WAL replay? */
+       if (errno != EEXIST || !isRedo)
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not create directory \"%s\": %m", dbpath)));
+   }
+
+   /*
+    * Create PG_VERSION file in the database path.  If the file already
+    * exists and we are in WAL replay then try again to open it in write
+    * mode.
+    */
+   snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+   fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+   if (fd < 0 && errno == EEXIST && isRedo)
+       fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
+
+   if (fd < 0)
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not create file \"%s\": %m", versionfile)));
+
+   /* Write PG_MAJORVERSION in the PG_VERSION file. */
+   pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
+   errno = 0;
+   if ((int) write(fd, buf, nbytes) != nbytes)
+   {
+       /* If write didn't set errno, assume problem is no disk space. */
+       if (errno == 0)
+           errno = ENOSPC;
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not write to file \"%s\": %m", versionfile)));
+   }
+   pgstat_report_wait_end();
+
+   /* Close the version file. */
+   CloseTransientFile(fd);
+
+   /* Critical section done. */
+   if (!isRedo)
+       END_CRIT_SECTION();
+}
+
+/*
+ * Create a new database using the FILE_COPY strategy.
+ *
+ * Copy each tablespace at the filesystem level, and log a single WAL record
+ * for each tablespace copied.  This requires a checkpoint before and after the
+ * copy, which may be expensive, but it does greatly reduce WAL generation
+ * if the copied database is large.
+ */
+static void
+CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
+                           Oid dst_tsid)
+{
+   TableScanDesc scan;
+   Relation    rel;
+   HeapTuple   tuple;
+
+   /*
+    * Force a checkpoint before starting the copy. This will force all dirty
+    * buffers, including those of unlogged tables, out to disk, to ensure
+    * source database is up-to-date on disk for the copy.
+    * FlushDatabaseBuffers() would suffice for that, but we also want to
+    * process any pending unlink requests. Otherwise, if a checkpoint
+    * happened while we're copying files, a file might be deleted just when
+    * we're about to copy it, causing the lstat() call in copydir() to fail
+    * with ENOENT.
+    */
+   RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+                     CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+
+   /*
+    * Iterate through all tablespaces of the template database, and copy each
+    * one to the new database.
+    */
+   rel = table_open(TableSpaceRelationId, AccessShareLock);
+   scan = table_beginscan_catalog(rel, 0, NULL);
+   while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+   {
+       Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
+       Oid         srctablespace = spaceform->oid;
+       Oid         dsttablespace;
+       char       *srcpath;
+       char       *dstpath;
+       struct stat st;
+
+       /* No need to copy global tablespace */
+       if (srctablespace == GLOBALTABLESPACE_OID)
+           continue;
+
+       srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+       if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+           directory_is_empty(srcpath))
+       {
+           /* Assume we can ignore it */
+           pfree(srcpath);
+           continue;
+       }
+
+       if (srctablespace == src_tsid)
+           dsttablespace = dst_tsid;
+       else
+           dsttablespace = srctablespace;
+
+       dstpath = GetDatabasePath(dst_dboid, dsttablespace);
+
+       /*
+        * Copy this subdirectory to the new location
+        *
+        * We don't need to copy subdirectories
+        */
+       copydir(srcpath, dstpath, false);
+
+       /* Record the filesystem change in XLOG */
+       {
+           xl_dbase_create_file_copy_rec xlrec;
+
+           xlrec.db_id = dst_dboid;
+           xlrec.tablespace_id = dsttablespace;
+           xlrec.src_db_id = src_dboid;
+           xlrec.src_tablespace_id = srctablespace;
+
+           XLogBeginInsert();
+           XLogRegisterData((char *) &xlrec,
+                            sizeof(xl_dbase_create_file_copy_rec));
+
+           (void) XLogInsert(RM_DBASE_ID,
+                             XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
+       }
+   }
+   table_endscan(scan);
+   table_close(rel, AccessShareLock);
+
+   /*
+    * We force a checkpoint before committing.  This effectively means that
+    * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
+    * replayed (at least not in ordinary crash recovery; we still have to
+    * make the XLOG entry for the benefit of PITR operations). This avoids
+    * two nasty scenarios:
+    *
+    * #1: When PITR is off, we don't XLOG the contents of newly created
+    * indexes; therefore the drop-and-recreate-whole-directory behavior of
+    * DBASE_CREATE replay would lose such indexes.
+    *
+    * #2: Since we have to recopy the source database during DBASE_CREATE
+    * replay, we run the risk of copying changes in it that were committed
+    * after the original CREATE DATABASE command but before the system crash
+    * that led to the replay.  This is at least unexpected and at worst could
+    * lead to inconsistencies, eg duplicate table names.
+    *
+    * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+    *
+    * In PITR replay, the first of these isn't an issue, and the second is
+    * only a risk if the CREATE DATABASE and subsequent template database
+    * change both occur while a base backup is being taken. There doesn't
+    * seem to be much we can do about that except document it as a
+    * limitation.
+    *
+    * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
+    * strategy that avoids these problems.
+    */
+   RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+}
 
 /*
  * CREATE DATABASE
@@ -101,8 +669,6 @@ static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
 Oid
 createdb(ParseState *pstate, const CreatedbStmt *stmt)
 {
-   TableScanDesc scan;
-   Relation    rel;
    Oid         src_dboid;
    Oid         src_owner;
    int         src_encoding = -1;
@@ -137,6 +703,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
    DefElem    *dallowconnections = NULL;
    DefElem    *dconnlimit = NULL;
    DefElem    *dcollversion = NULL;
+   DefElem    *dstrategy = NULL;
    char       *dbname = stmt->dbname;
    char       *dbowner = NULL;
    const char *dbtemplate = NULL;
@@ -152,6 +719,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
    char       *dbcollversion = NULL;
    int         notherbackends;
    int         npreparedxacts;
+   CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
    createdb_failure_params fparms;
 
    /* Extract options from the statement node tree */
@@ -269,6 +837,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
                        errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
        }
+       else if (strcmp(defel->defname, "strategy") == 0)
+       {
+           if (dstrategy)
+               errorConflictingDefElem(defel, pstate);
+           dstrategy = defel;
+       }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
@@ -413,6 +987,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                            dbtemplate)));
    }
 
+   /* Validate the database creation strategy. */
+   if (dstrategy && dstrategy->arg)
+   {
+       char       *strategy;
+
+       strategy = defGetString(dstrategy);
+       if (strcmp(strategy, "wal_log") == 0)
+           dbstrategy = CREATEDB_WAL_LOG;
+       else if (strcmp(strategy, "file_copy") == 0)
+           dbstrategy = CREATEDB_FILE_COPY;
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg("invalid create database strategy %s", strategy),
+                    errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
+   }
+
    /* If encoding or locales are defaulted, use source's setting */
    if (encoding < 0)
        encoding = src_encoding;
@@ -753,17 +1344,18 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
    InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
 
    /*
-    * Force a checkpoint before starting the copy. This will force all dirty
-    * buffers, including those of unlogged tables, out to disk, to ensure
-    * source database is up-to-date on disk for the copy.
-    * FlushDatabaseBuffers() would suffice for that, but we also want to
-    * process any pending unlink requests. Otherwise, if a checkpoint
-    * happened while we're copying files, a file might be deleted just when
-    * we're about to copy it, causing the lstat() call in copydir() to fail
-    * with ENOENT.
+    * If we're going to be reading data for the to-be-created database
+    * into shared_buffers, take a lock on it. Nobody should know that this
+    * database exists yet, but it's good to maintain the invariant that a
+    * lock an AccessExclusiveLock on the database is sufficient to drop all
+    * of its buffers without worrying about more being read later.
+    *
+    * Note that we need to do this before entering the PG_ENSURE_ERROR_CLEANUP
+    * block below, because createdb_failure_callback expects this lock to
+    * be held already.
     */
-   RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-                     | CHECKPOINT_FLUSH_ALL);
+   if (dbstrategy == CREATEDB_WAL_LOG)
+       LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
 
    /*
     * Once we start copying subdirectories, we need to be able to clean 'em
@@ -774,101 +1366,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
     */
    fparms.src_dboid = src_dboid;
    fparms.dest_dboid = dboid;
+   fparms.strategy = dbstrategy;
+
    PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
                            PointerGetDatum(&fparms));
    {
        /*
-        * Iterate through all tablespaces of the template database, and copy
-        * each one to the new database.
-        */
-       rel = table_open(TableSpaceRelationId, AccessShareLock);
-       scan = table_beginscan_catalog(rel, 0, NULL);
-       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-       {
-           Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
-           Oid         srctablespace = spaceform->oid;
-           Oid         dsttablespace;
-           char       *srcpath;
-           char       *dstpath;
-           struct stat st;
-
-           /* No need to copy global tablespace */
-           if (srctablespace == GLOBALTABLESPACE_OID)
-               continue;
-
-           srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-           if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-               directory_is_empty(srcpath))
-           {
-               /* Assume we can ignore it */
-               pfree(srcpath);
-               continue;
-           }
-
-           if (srctablespace == src_deftablespace)
-               dsttablespace = dst_deftablespace;
-           else
-               dsttablespace = srctablespace;
-
-           dstpath = GetDatabasePath(dboid, dsttablespace);
-
-           /*
-            * Copy this subdirectory to the new location
-            *
-            * We don't need to copy subdirectories
-            */
-           copydir(srcpath, dstpath, false);
-
-           /* Record the filesystem change in XLOG */
-           {
-               xl_dbase_create_rec xlrec;
-
-               xlrec.db_id = dboid;
-               xlrec.tablespace_id = dsttablespace;
-               xlrec.src_db_id = src_dboid;
-               xlrec.src_tablespace_id = srctablespace;
-
-               XLogBeginInsert();
-               XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-               (void) XLogInsert(RM_DBASE_ID,
-                                 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-           }
-       }
-       table_endscan(scan);
-       table_close(rel, AccessShareLock);
-
-       /*
-        * We force a checkpoint before committing.  This effectively means
-        * that committed XLOG_DBASE_CREATE operations will never need to be
-        * replayed (at least not in ordinary crash recovery; we still have to
-        * make the XLOG entry for the benefit of PITR operations). This
-        * avoids two nasty scenarios:
-        *
-        * #1: When PITR is off, we don't XLOG the contents of newly created
-        * indexes; therefore the drop-and-recreate-whole-directory behavior
-        * of DBASE_CREATE replay would lose such indexes.
-        *
-        * #2: Since we have to recopy the source database during DBASE_CREATE
-        * replay, we run the risk of copying changes in it that were
-        * committed after the original CREATE DATABASE command but before the
-        * system crash that led to the replay.  This is at least unexpected
-        * and at worst could lead to inconsistencies, eg duplicate table
-        * names.
-        *
-        * (Both of these were real bugs in releases 8.0 through 8.0.3.)
-        *
-        * In PITR replay, the first of these isn't an issue, and the second
-        * is only a risk if the CREATE DATABASE and subsequent template
-        * database change both occur while a base backup is being taken.
-        * There doesn't seem to be much we can do about that except document
-        * it as a limitation.
-        *
-        * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
-        * we can avoid this.
+        * If the user has asked to create a database with WAL_LOG strategy
+        * then call CreateDatabaseUsingWalLog, which will copy the database
+        * at the block level and it will WAL log each copied block.
+        * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
+        * database file by file.
         */
-       RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+       if (dbstrategy == CREATEDB_WAL_LOG)
+           CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
+                                     dst_deftablespace);
+       else
+           CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
+                                       dst_deftablespace);
 
        /*
         * Close pg_database, but keep lock till commit.
@@ -954,6 +1469,25 @@ createdb_failure_callback(int code, Datum arg)
 {
    createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
 
+   /*
+    * If we were copying database at block levels then drop pages for the
+    * destination database that are in the shared buffer cache.  And tell
+    * checkpointer to forget any pending fsync and unlink requests for files
+    * in the database.  The reasoning behind doing this is same as explained
+    * in dropdb function.  But unlike dropdb we don't need to call
+    * pgstat_drop_database because this database is still not created so
+    * there should not be any stat for this.
+    */
+   if (fparms->strategy == CREATEDB_WAL_LOG)
+   {
+       DropDatabaseBuffers(fparms->dest_dboid);
+       ForgetDatabaseSyncRequests(fparms->dest_dboid);
+
+       /* Release lock on the target database. */
+       UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
+                          AccessShareLock);
+   }
+
    /*
     * Release lock on source database before doing recursive remove. This is
     * not essential but it seems desirable to release the lock as soon as
@@ -1478,7 +2012,7 @@ movedb(const char *dbname, const char *tblspcname)
         * Record the filesystem change in XLOG
         */
        {
-           xl_dbase_create_rec xlrec;
+           xl_dbase_create_file_copy_rec xlrec;
 
            xlrec.db_id = db_id;
            xlrec.tablespace_id = dst_tblspcoid;
@@ -1486,10 +2020,11 @@ movedb(const char *dbname, const char *tblspcname)
            xlrec.src_tablespace_id = src_tblspcoid;
 
            XLogBeginInsert();
-           XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
+           XLogRegisterData((char *) &xlrec,
+                            sizeof(xl_dbase_create_file_copy_rec));
 
            (void) XLogInsert(RM_DBASE_ID,
-                             XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
+                             XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
        }
 
        /*
@@ -1525,9 +2060,10 @@ movedb(const char *dbname, const char *tblspcname)
 
        /*
         * Force another checkpoint here.  As in CREATE DATABASE, this is to
-        * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
-        * operation, which would cause us to lose any unlogged operations
-        * done in the new DB tablespace before the next checkpoint.
+        * ensure that we don't have to replay a committed
+        * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
+        * any unlogged operations done in the new DB tablespace before the
+        * next checkpoint.
         */
        RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
 
@@ -2478,9 +3014,10 @@ dbase_redo(XLogReaderState *record)
    /* Backup blocks are not used in dbase records */
    Assert(!XLogRecHasAnyBlockRefs(record));
 
-   if (info == XLOG_DBASE_CREATE)
+   if (info == XLOG_DBASE_CREATE_FILE_COPY)
    {
-       xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+       xl_dbase_create_file_copy_rec *xlrec =
+       (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
        char       *src_path;
        char       *dst_path;
        struct stat st;
@@ -2515,6 +3052,18 @@ dbase_redo(XLogReaderState *record)
         */
        copydir(src_path, dst_path, false);
    }
+   else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+   {
+       xl_dbase_create_wal_log_rec *xlrec =
+       (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
+       char       *dbpath;
+
+       dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+       /* Create the database directory with the version file. */
+       CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
+                               true);
+   }
    else if (info == XLOG_DBASE_DROP)
    {
        xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
index 124b9961dc98781be7dbf4928a8adb8936d8d5f2..51b4a00d50d52caef864dfa31dd828cbc3f4c975 100644 (file)
@@ -14626,7 +14626,7 @@ index_copy_data(Relation rel, RelFileNode newrnode)
     * NOTE: any conflict in relfilenode value will be caught in
     * RelationCreateStorage().
     */
-   RelationCreateStorage(newrnode, rel->rd_rel->relpersistence);
+   RelationCreateStorage(newrnode, rel->rd_rel->relpersistence, true);
 
    /* copy main fork */
    RelationCopyStorage(RelationGetSmgr(rel), dstrel, MAIN_FORKNUM,
index 11005edc7359787e0aa19d141640a71a8aaa261f..d73a40c1bc63384a36f816fd5809d797a62179af 100644 (file)
@@ -38,6 +38,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -486,6 +487,9 @@ static void FindAndDropRelFileNodeBuffers(RelFileNode rnode,
                                          ForkNumber forkNum,
                                          BlockNumber nForkBlock,
                                          BlockNumber firstDelBlock);
+static void RelationCopyStorageUsingBuffer(Relation src, Relation dst,
+                                          ForkNumber forkNum,
+                                          bool isunlogged);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int rnode_comparator(const void *p1, const void *p2);
@@ -772,23 +776,23 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
  *     a relcache entry for the relation.
  *
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay.  If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
+ * Pass permanent = true for a RELPERSISTENCE_PERMANENT relation, and
+ * permanent = false for a RELPERSISTENCE_UNLOGGED relation. This function
+ * cannot be used for temporary relations (and making that work might be
+ * difficult, unless we only want to read temporary relations for our own
+ * BackendId).
  */
 Buffer
 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
                          BlockNumber blockNum, ReadBufferMode mode,
-                         BufferAccessStrategy strategy)
+                         BufferAccessStrategy strategy, bool permanent)
 {
    bool        hit;
 
    SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
 
-   Assert(InRecovery);
-
-   return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
+   return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+                            RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
                             mode, strategy, &hit);
 }
 
@@ -3676,6 +3680,158 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
    pfree(srels);
 }
 
+/* ---------------------------------------------------------------------
+ *     RelationCopyStorageUsingBuffer
+ *
+ *     Copy fork's data using bufmgr.  Same as RelationCopyStorage but instead
+ *     of using smgrread and smgrextend this will copy using bufmgr APIs.
+ *
+ *     Refer comments atop CreateAndCopyRelationData() for details about
+ *     'permanent' parameter.
+ * --------------------------------------------------------------------
+ */
+static void
+RelationCopyStorageUsingBuffer(Relation src, Relation dst, ForkNumber forkNum,
+                              bool permanent)
+{
+   Buffer      srcBuf;
+   Buffer      dstBuf;
+   Page        srcPage;
+   Page        dstPage;
+   bool        use_wal;
+   BlockNumber nblocks;
+   BlockNumber blkno;
+   BufferAccessStrategy bstrategy_src;
+   BufferAccessStrategy bstrategy_dst;
+
+   /*
+    * In general, we want to write WAL whenever wal_level > 'minimal', but
+    * we can skip it when copying any fork of an unlogged relation other
+    * than the init fork.
+    */
+   use_wal = XLogIsNeeded() && (permanent || forkNum == INIT_FORKNUM);
+
+   /* Get number of blocks in the source relation. */
+   nblocks = smgrnblocks(RelationGetSmgr(src), forkNum);
+
+   /* Nothing to copy; just return. */
+   if (nblocks == 0)
+       return;
+
+   /* This is a bulk operation, so use buffer access strategies. */
+   bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
+   bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
+
+   /* Iterate over each block of the source relation file. */
+   for (blkno = 0; blkno < nblocks; blkno++)
+   {
+       CHECK_FOR_INTERRUPTS();
+
+       /* Read block from source relation. */
+       srcBuf = ReadBufferWithoutRelcache(src->rd_node, forkNum, blkno,
+                                          RBM_NORMAL, bstrategy_src,
+                                          permanent);
+       srcPage = BufferGetPage(srcBuf);
+       if (PageIsNew(srcPage) || PageIsEmpty(srcPage))
+       {
+           ReleaseBuffer(srcBuf);
+           continue;
+       }
+
+       /* Use P_NEW to extend the destination relation. */
+       dstBuf = ReadBufferWithoutRelcache(dst->rd_node, forkNum, P_NEW,
+                                          RBM_NORMAL, bstrategy_dst,
+                                          permanent);
+       LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE);
+
+       START_CRIT_SECTION();
+
+       /* Copy page data from the source to the destination. */
+       dstPage = BufferGetPage(dstBuf);
+       memcpy(dstPage, srcPage, BLCKSZ);
+       MarkBufferDirty(dstBuf);
+
+       /* WAL-log the copied page. */
+       if (use_wal)
+           log_newpage_buffer(dstBuf, true);
+
+       END_CRIT_SECTION();
+
+       UnlockReleaseBuffer(dstBuf);
+       ReleaseBuffer(srcBuf);
+   }
+}
+
+/* ---------------------------------------------------------------------
+ *     CreateAndCopyRelationData
+ *
+ *     Create destination relation storage and copy all forks from the
+ *     source relation to the destination.
+ *
+ *     Pass permanent as true for permanent relations and false for
+ *     unlogged relations.  Currently this API is not supported for
+ *     temporary relations.
+ * --------------------------------------------------------------------
+ */
+void
+CreateAndCopyRelationData(RelFileNode src_rnode, RelFileNode dst_rnode,
+                         bool permanent)
+{
+   Relation        src_rel;
+   Relation        dst_rel;
+   char            relpersistence;
+
+   /* Set the relpersistence. */
+   relpersistence = permanent ?
+       RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED;
+
+   /*
+    * We can't use a real relcache entry for a relation in some other
+    * database, but since we're only going to access the fields related
+    * to physical storage, a fake one is good enough. If we didn't do this
+    * and used the smgr layer directly, we would have to worry about
+    * invalidations.
+    */
+   src_rel = CreateFakeRelcacheEntry(src_rnode);
+   dst_rel = CreateFakeRelcacheEntry(dst_rnode);
+
+   /*
+    * Create and copy all forks of the relation.  During create database we
+    * have a separate cleanup mechanism which deletes complete database
+    * directory.  Therefore, each individual relation doesn't need to be
+    * registered for cleanup.
+    */
+   RelationCreateStorage(dst_rnode, relpersistence, false);
+
+   /* copy main fork. */
+   RelationCopyStorageUsingBuffer(src_rel, dst_rel, MAIN_FORKNUM, permanent);
+
+   /* copy those extra forks that exist */
+   for (ForkNumber forkNum = MAIN_FORKNUM + 1;
+        forkNum <= MAX_FORKNUM; forkNum++)
+   {
+       if (smgrexists(RelationGetSmgr(src_rel), forkNum))
+       {
+           smgrcreate(RelationGetSmgr(dst_rel), forkNum, false);
+
+           /*
+            * WAL log creation if the relation is persistent, or this is the
+            * init fork of an unlogged relation.
+            */
+           if (permanent || forkNum == INIT_FORKNUM)
+               log_smgrcreate(&dst_rnode, forkNum);
+
+           /* Copy a fork's data, block by block. */
+           RelationCopyStorageUsingBuffer(src_rel, dst_rel, forkNum,
+                                          permanent);
+       }
+   }
+
+   /* Release fake relcache entries. */
+   FreeFakeRelcacheEntry(src_rel);
+   FreeFakeRelcacheEntry(dst_rel);
+}
+
 /* ---------------------------------------------------------------------
  *     FlushDatabaseBuffers
  *
index 5ae52dd14db0dabbb8075a9b4de95954851dd6f9..1543da61620db2071546cb30d8852cf8a02269dd 100644 (file)
@@ -175,6 +175,34 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
    return true;
 }
 
+/*
+ *     LockRelationId
+ *
+ * Lock, given a LockRelId.  Same as LockRelationOid but take LockRelId as an
+ * input.
+ */
+void
+LockRelationId(LockRelId *relid, LOCKMODE lockmode)
+{
+   LOCKTAG     tag;
+   LOCALLOCK  *locallock;
+   LockAcquireResult res;
+
+   SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
+
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+
+   /*
+    * Now that we have the lock, check for invalidation messages; see notes
+    * in LockRelationOid.
+    */
+   if (res != LOCKACQUIRE_ALREADY_CLEAR)
+   {
+       AcceptInvalidationMessages();
+       MarkLockClear(locallock);
+   }
+}
+
 /*
  *     UnlockRelationId
  *
index ff46a0e3c717ec01329e7ce5fba8015038345d34..1c8aba492591e62e9dabcb1bdd1b54c75ad4a32e 100644 (file)
@@ -705,6 +705,9 @@ pgstat_get_wait_io(WaitEventIO w)
        case WAIT_EVENT_TWOPHASE_FILE_WRITE:
            event_name = "TwophaseFileWrite";
            break;
+       case WAIT_EVENT_VERSION_FILE_WRITE:
+           event_name = "VersionFileWrite";
+           break;
        case WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ:
            event_name = "WALSenderTimelineHistoryRead";
            break;
index d47fac7bb98db9f748679961e9ed9362bc600b78..a15ce9edb13e0d486d441a68f96092bf5165816f 100644 (file)
@@ -3746,7 +3746,7 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
        /* handle these directly, at least for now */
        SMgrRelation srel;
 
-       srel = RelationCreateStorage(newrnode, persistence);
+       srel = RelationCreateStorage(newrnode, persistence, true);
        smgrclose(srel);
    }
    else
index 4d0718f0018fd60cf34c42ea7e115722a372d4b2..dee3387d0265dca4e50af4b49c8c828698c7650d 100644 (file)
@@ -251,6 +251,63 @@ RelationMapFilenodeToOid(Oid filenode, bool shared)
    return InvalidOid;
 }
 
+/*
+ * RelationMapOidToFilenodeForDatabase
+ *
+ * Like RelationMapOidToFilenode, but reads the mapping from the indicated
+ * path instead of using the one for the current database.
+ */
+Oid
+RelationMapOidToFilenodeForDatabase(char *dbpath, Oid relationId)
+{
+   RelMapFile  map;
+   int         i;
+
+   /* Read the relmap file from the source database. */
+   read_relmap_file(&map, dbpath, false, ERROR);
+
+   /* Iterate over the relmap entries to find the input relation OID. */
+   for (i = 0; i < map.num_mappings; i++)
+   {
+       if (relationId == map.mappings[i].mapoid)
+           return map.mappings[i].mapfilenode;
+   }
+
+   return InvalidOid;
+}
+
+/*
+ * RelationMapCopy
+ *
+ * Copy relmapfile from source db path to the destination db path and WAL log
+ * the operation. This is intended for use in creating a new relmap file
+ * for a database that doesn't have one yet, not for replacing an existing
+ * relmap file.
+ */
+void
+RelationMapCopy(Oid dbid, Oid tsid, char *srcdbpath, char *dstdbpath)
+{
+   RelMapFile map;
+
+   /*
+    * Read the relmap file from the source database.
+    */
+   read_relmap_file(&map, srcdbpath, false, ERROR);
+
+   /*
+    * Write the same data into the destination database's relmap file.
+    *
+    * No sinval is needed because no one can be connected to the destination
+    * database yet. For the same reason, there is no need to acquire
+    * RelationMappingLock.
+    *
+    * There's no point in trying to preserve files here. The new database
+    * isn't usable yet anyway, and won't ever be if we can't install a
+    * relmap file.
+    */
+   write_relmap_file(&map, true, false, false, dbid, tsid, dstdbpath);
+}
+
 /*
  * RelationMapUpdateMap
  *
@@ -1031,6 +1088,13 @@ relmap_redo(XLogReaderState *record)
         *
         * There shouldn't be anyone else updating relmaps during WAL replay,
         * but grab the lock to interlock against load_relmap_file().
+        *
+        * Note that we use the same WAL record for updating the relmap of
+        * an existing database as we do for creating a new database. In
+        * the latter case, taking the relmap log and sending sinval messages
+        * is unnecessary, but harmless. If we wanted to avoid it, we could
+        * add a flag to the WAL record to indicate which opration is being
+        * performed.
         */
        LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
        write_relmap_file(&newmap, false, true, false,
index 3ed2a2e811e85645677e47d491b1a77fc314a426..49966e7b7fd6b42decac794b8f71567d706a928c 100644 (file)
@@ -372,7 +372,7 @@ extractPageInfo(XLogReaderState *record)
 
    /* Is this a special record type that I recognize? */
 
-   if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE)
+   if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE_FILE_COPY)
    {
        /*
         * New databases can be safely ignored. It won't be present in the
@@ -384,6 +384,13 @@ extractPageInfo(XLogReaderState *record)
         * overwriting the database created in the target system.
         */
    }
+   else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE_WAL_LOG)
+   {
+       /*
+        * New databases can be safely ignored. It won't be present in the
+        * source system, so it will be deleted.
+        */
+   }
    else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
    {
        /*
index c97d3e87f0ca5bb4fe5da6c8aaa09bfd551a4353..3f9dfffd57fa18c0072f4229a961846559f897cd 100644 (file)
@@ -2791,13 +2791,15 @@ psql_completion(const char *text, int start, int end)
    /* CREATE DATABASE */
    else if (Matches("CREATE", "DATABASE", MatchAny))
        COMPLETE_WITH("OWNER", "TEMPLATE", "ENCODING", "TABLESPACE",
-                     "IS_TEMPLATE",
+                     "IS_TEMPLATE", "STRATEGY",
                      "ALLOW_CONNECTIONS", "CONNECTION LIMIT",
                      "LC_COLLATE", "LC_CTYPE", "LOCALE", "OID",
                      "LOCALE_PROVIDER", "ICU_LOCALE");
 
    else if (Matches("CREATE", "DATABASE", MatchAny, "TEMPLATE"))
        COMPLETE_WITH_QUERY(Query_for_list_of_template_databases);
+   else if (Matches("CREATE", "DATABASE", MatchAny, "STRATEGY"))
+       COMPLETE_WITH("WAL_LOG", "FILE_COPY");
 
    /* CREATE DOMAIN */
    else if (Matches("CREATE", "DOMAIN", MatchAny))
index 6f612abf7c6b27fcd05e54fd0c365f1fa3f6c392..0bffa2f3ee4e1d4e0d203dac971cf877f28a5231 100644 (file)
@@ -34,6 +34,7 @@ main(int argc, char *argv[])
        {"tablespace", required_argument, NULL, 'D'},
        {"template", required_argument, NULL, 'T'},
        {"encoding", required_argument, NULL, 'E'},
+       {"strategy", required_argument, NULL, 'S'},
        {"lc-collate", required_argument, NULL, 1},
        {"lc-ctype", required_argument, NULL, 2},
        {"locale", required_argument, NULL, 'l'},
@@ -60,6 +61,7 @@ main(int argc, char *argv[])
    char       *tablespace = NULL;
    char       *template = NULL;
    char       *encoding = NULL;
+   char       *strategy = NULL;
    char       *lc_collate = NULL;
    char       *lc_ctype = NULL;
    char       *locale = NULL;
@@ -77,7 +79,7 @@ main(int argc, char *argv[])
 
    handle_help_version_opts(argc, argv, "createdb", help);
 
-   while ((c = getopt_long(argc, argv, "h:p:U:wWeO:D:T:E:l:", long_options, &optindex)) != -1)
+   while ((c = getopt_long(argc, argv, "h:p:U:wWeO:D:T:E:l:S:", long_options, &optindex)) != -1)
    {
        switch (c)
        {
@@ -111,6 +113,9 @@ main(int argc, char *argv[])
            case 'E':
                encoding = pg_strdup(optarg);
                break;
+           case 'S':
+               strategy = pg_strdup(optarg);
+               break;
            case 1:
                lc_collate = pg_strdup(optarg);
                break;
@@ -215,6 +220,8 @@ main(int argc, char *argv[])
        appendPQExpBufferStr(&sql, " ENCODING ");
        appendStringLiteralConn(&sql, encoding, conn);
    }
+   if (strategy)
+       appendPQExpBuffer(&sql, " STRATEGY %s", fmtId(strategy));
    if (template)
        appendPQExpBuffer(&sql, " TEMPLATE %s", fmtId(template));
    if (lc_collate)
@@ -294,6 +301,7 @@ help(const char *progname)
    printf(_("      --locale-provider={libc|icu}\n"
             "                               locale provider for the database's default collation\n"));
    printf(_("  -O, --owner=OWNER            database user to own the new database\n"));
+   printf(_("  -S, --strategy=STRATEGY      database creation strategy wal_log or file_copy\n"));
    printf(_("  -T, --template=TEMPLATE      template database to copy\n"));
    printf(_("  -V, --version                output version information, then exit\n"));
    printf(_("  -?, --help                   show this help, then exit\n"));
index 35deec9a929bb86a245d132e6bd5b12261dd60c6..14d3a9563d1ffd204890f7bd007ffb84a5dfe172 100644 (file)
@@ -104,4 +104,24 @@ $node->command_checks_all(
    ],
    'createdb with incorrect --lc-ctype');
 
+$node->command_checks_all(
+   [ 'createdb', '--strategy', "foo", 'foobar2' ],
+   1,
+   [qr/^$/],
+   [
+       qr/^createdb: error: database creation failed: ERROR:  invalid create database strategy|^createdb: error: database creation failed: ERROR:  invalid create database strategy foo/s
+   ],
+   'createdb with incorrect --strategy');
+
+# Check database creation strategy
+$node->issues_sql_like(
+   [ 'createdb', '-T', 'foobar2', 'foobar6', '-S', 'wal_log'],
+   qr/statement: CREATE DATABASE foobar6 STRATEGY wal_log TEMPLATE foobar2/,
+   'create database with WAL_LOG strategy');
+
+$node->issues_sql_like(
+   [ 'createdb', '-T', 'foobar2', 'foobar7', '-S', 'file_copy'],
+   qr/statement: CREATE DATABASE foobar7 STRATEGY file_copy TEMPLATE foobar2/,
+   'create database with FILE_COPY strategy');
+
 done_testing();
index 9ffc74191312aebad46616434b5ea5d90cd43d07..844a023b2ce68bbe0606652c709bceceba773d9c 100644 (file)
@@ -22,7 +22,9 @@
 /* GUC variables */
 extern int wal_skip_threshold;
 
-extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern SMgrRelation RelationCreateStorage(RelFileNode rnode,
+                                         char relpersistence,
+                                         bool register_delete);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
index 593a8578a41ce872a6b266c876f7da6d21782083..0ee2452febacc112b5cdf7453701a27dc0c4dc7b 100644 (file)
 #include "lib/stringinfo.h"
 
 /* record types */
-#define XLOG_DBASE_CREATE      0x00
-#define XLOG_DBASE_DROP            0x10
+#define XLOG_DBASE_CREATE_FILE_COPY        0x00
+#define XLOG_DBASE_CREATE_WAL_LOG      0x10
+#define XLOG_DBASE_DROP                    0x20
 
-typedef struct xl_dbase_create_rec
+/*
+ * Single WAL record for an entire CREATE DATABASE operation. This is used
+ * by the FILE_COPY strategy.
+ */
+typedef struct xl_dbase_create_file_copy_rec
 {
-   /* Records copying of a single subdirectory incl. contents */
    Oid         db_id;
    Oid         tablespace_id;
    Oid         src_db_id;
    Oid         src_tablespace_id;
-} xl_dbase_create_rec;
+} xl_dbase_create_file_copy_rec;
+
+/*
+ * WAL record for the beginning of a CREATE DATABASE operation, when the
+ * WAL_LOG strategy is used. Each individual block will be logged separately
+ * afterward.
+ */
+typedef struct xl_dbase_create_wal_log_rec
+{
+   Oid         db_id;
+   Oid         tablespace_id;
+} xl_dbase_create_wal_log_rec;
 
 typedef struct xl_dbase_drop_rec
 {
index dd01841c30078c5d2ad01707afabd5d538bba0aa..a6b657f0ba50f9cb57453fe975835f8aa4e20a37 100644 (file)
@@ -184,7 +184,8 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
                                 BufferAccessStrategy strategy);
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
                                        ForkNumber forkNum, BlockNumber blockNum,
-                                       ReadBufferMode mode, BufferAccessStrategy strategy);
+                                       ReadBufferMode mode, BufferAccessStrategy strategy,
+                                       bool permanent);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
@@ -203,6 +204,9 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
 extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels);
+extern void CreateAndCopyRelationData(RelFileNode src_rnode,
+                                     RelFileNode dst_rnode,
+                                     bool permanent);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
                                   int nforks, BlockNumber *firstDelBlock);
index 49edbcc81bea87a331c21afb90f2a86d0051aa65..be1d2c99a95055d10021d7a252d3ac0981f99afa 100644 (file)
@@ -38,6 +38,7 @@ extern void RelationInitLockInfo(Relation relation);
 
 /* Lock a relation */
 extern void LockRelationOid(Oid relid, LOCKMODE lockmode);
+extern void LockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
index 9fbb5a7f9b554d6486a1bc273f54f3a7a4b9b5c0..f10353e1390e79ad564378033099de59aa3fb750 100644 (file)
@@ -38,7 +38,9 @@ typedef struct xl_relmap_update
 extern Oid RelationMapOidToFilenode(Oid relationId, bool shared);
 
 extern Oid RelationMapFilenodeToOid(Oid relationId, bool shared);
-
+extern Oid RelationMapOidToFilenodeForDatabase(char *dbpath, Oid relationId);
+extern void RelationMapCopy(Oid dbid, Oid tsid, char *srcdbpath,
+                           char *dstdbpath);
 extern void RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared,
                                 bool immediate);
 
index 1c39ce031a797db4683a4359d6a2f4f98d81ad14..d870c592632385aeec2735a532a5566b963432b8 100644 (file)
@@ -218,6 +218,7 @@ typedef enum
    WAIT_EVENT_TWOPHASE_FILE_READ,
    WAIT_EVENT_TWOPHASE_FILE_SYNC,
    WAIT_EVENT_TWOPHASE_FILE_WRITE,
+   WAIT_EVENT_VERSION_FILE_WRITE,
    WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ,
    WAIT_EVENT_WAL_BOOTSTRAP_SYNC,
    WAIT_EVENT_WAL_BOOTSTRAP_WRITE,
index 410c9f6b0d183756c16d72002fd1ba50ed40e66c..6b77cc64ef4fd510199bf25783d929683f2d8d0b 100644 (file)
@@ -461,6 +461,8 @@ CoverPos
 CreateAmStmt
 CreateCastStmt
 CreateConversionStmt
+CreateDBRelInfo
+CreateDBStrategy
 CreateDomainStmt
 CreateEnumStmt
 CreateEventTrigStmt
@@ -3705,7 +3707,8 @@ xl_btree_update
 xl_btree_vacuum
 xl_clog_truncate
 xl_commit_ts_truncate
-xl_dbase_create_rec
+xl_dbase_create_file_copy_rec
+xl_dbase_create_wal_log_rec
 xl_dbase_drop_rec
 xl_end_of_recovery
 xl_hash_add_ovfl_page