tableam: Add pg_dump support.
authorAndres Freund <andres@anarazel.de>
Wed, 6 Mar 2019 17:54:38 +0000 (09:54 -0800)
committerAndres Freund <andres@anarazel.de>
Wed, 6 Mar 2019 17:54:38 +0000 (09:54 -0800)
This adds pg_dump support for table AMs in a similar manner to how
tablespaces are handled. That is, instead of specifying the AM for
every CREATE TABLE etc, emit SET default_table_access_method
statements. That makes it easier to change the AM for all/most tables
in a dump, and allows restore to succeed even if some AM is not
available.

This increases the dump archive version, as a tables/matview's AM
needs to be tracked therein.

Author: Dimitri Dolgov, Andres Freund
Discussion:
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
    https://postgr.es/m/20190304234700.w5tmhducs5wxgzls@alap3.anarazel.de

src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/pg_dump/t/002_pg_dump.pl

index 0f1afeacf79a29044e8722488edf551eac76b199..62bf1493aa232f8a1abd47a8aecd825902953182 100644 (file)
@@ -85,6 +85,7 @@ static void _becomeUser(ArchiveHandle *AH, const char *user);
 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
+static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
@@ -1090,6 +1091,7 @@ ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
        newToc->tag = pg_strdup(opts->tag);
        newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
        newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
+       newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
        newToc->owner = pg_strdup(opts->owner);
        newToc->desc = pg_strdup(opts->description);
        newToc->defn = pg_strdup(opts->createStmt);
@@ -2350,6 +2352,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
        AH->currUser = NULL;            /* unknown */
        AH->currSchema = NULL;          /* ditto */
        AH->currTablespace = NULL;      /* ditto */
+       AH->currTableAm = NULL; /* ditto */
 
        AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
 
@@ -2576,6 +2579,7 @@ WriteToc(ArchiveHandle *AH)
                WriteStr(AH, te->copyStmt);
                WriteStr(AH, te->namespace);
                WriteStr(AH, te->tablespace);
+               WriteStr(AH, te->tableam);
                WriteStr(AH, te->owner);
                WriteStr(AH, "false");
 
@@ -2678,6 +2682,9 @@ ReadToc(ArchiveHandle *AH)
                if (AH->version >= K_VERS_1_10)
                        te->tablespace = ReadStr(AH);
 
+               if (AH->version >= K_VERS_1_14)
+                       te->tableam = ReadStr(AH);
+
                te->owner = ReadStr(AH);
                if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
                        write_msg(modulename,
@@ -3431,6 +3438,48 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace)
        destroyPQExpBuffer(qry);
 }
 
+/*
+ * Set the proper default_table_access_method value for the table.
+ */
+static void
+_selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
+{
+       PQExpBuffer cmd;
+       const char *want, *have;
+
+       have = AH->currTableAm;
+       want = tableam;
+
+       if (!want)
+               return;
+
+       if (have && strcmp(want, have) == 0)
+               return;
+
+       cmd = createPQExpBuffer();
+       appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
+
+       if (RestoringToDB(AH))
+       {
+               PGresult   *res;
+
+               res = PQexec(AH->connection, cmd->data);
+
+               if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+                       warn_or_exit_horribly(AH, modulename,
+                                                                 "could not set default_table_access_method: %s",
+                                                                 PQerrorMessage(AH->connection));
+
+               PQclear(res);
+       }
+       else
+               ahprintf(AH, "%s\n\n", cmd->data);
+
+       destroyPQExpBuffer(cmd);
+
+       AH->currTableAm = pg_strdup(want);
+}
+
 /*
  * Extract an object description for a TOC entry, and append it to buf.
  *
@@ -3526,10 +3575,11 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 {
        RestoreOptions *ropt = AH->public.ropt;
 
-       /* Select owner, schema, and tablespace as necessary */
+       /* Select owner, schema, tablespace and default AM as necessary */
        _becomeOwner(AH, te);
        _selectOutputSchema(AH, te->namespace);
        _selectTablespace(AH, te->tablespace);
+       _selectTableAccessMethod(AH, te->tableam);
 
        /* Emit header comment for item */
        if (!AH->noTocComments)
@@ -4006,6 +4056,9 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
        if (AH->currTablespace)
                free(AH->currTablespace);
        AH->currTablespace = NULL;
+       if (AH->currTableAm)
+               free(AH->currTableAm);
+       AH->currTableAm = NULL;
 }
 
 /*
@@ -4891,6 +4944,8 @@ DeCloneArchive(ArchiveHandle *AH)
                free(AH->currSchema);
        if (AH->currTablespace)
                free(AH->currTablespace);
+       if (AH->currTableAm)
+               free(AH->currTableAm);
        if (AH->savedPassword)
                free(AH->savedPassword);
 
index ebf3d209ea18b6d6dd9d2a47b09b9f589b3cffed..2015b735ae306e4a32170d3d6a68763a991c9f30 100644 (file)
@@ -94,6 +94,7 @@ typedef z_stream *z_streamp;
                                                                                                         * entries */
 #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0)     /* change search_path
                                                                                                         * behavior */
+#define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0)     /* add tableam */
 
 /*
  * Current archive version number (the format we can output)
@@ -102,7 +103,7 @@ typedef z_stream *z_streamp;
  * https://postgr.es/m/20190227123217.GA27552@alvherre.pgsql
  */
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 13
+#define K_VERS_MINOR 14
 #define K_VERS_REV 0
 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV);
 
@@ -352,6 +353,7 @@ struct _archiveHandle
        char       *currUser;           /* current username, or NULL if unknown */
        char       *currSchema;         /* current schema, or NULL */
        char       *currTablespace; /* current tablespace, or NULL */
+       char       *currTableAm;        /* current table access method, or NULL */
 
        void       *lo_buf;
        size_t          lo_buf_used;
@@ -378,6 +380,7 @@ struct _tocEntry
        char       *namespace;          /* null or empty string if not in a schema */
        char       *tablespace;         /* null if not in a tablespace; empty string
                                                                 * means use database default */
+       char       *tableam;            /* table access method, only for TABLE tags */
        char       *owner;
        char       *desc;
        char       *defn;
@@ -416,6 +419,7 @@ typedef struct _archiveOpts
        const char *tag;
        const char *namespace;
        const char *tablespace;
+       const char *tableam;
        const char *owner;
        const char *description;
        teSection       section;
index 5d830383485dc3e1b5f144efd4efe84d6e04e24e..679ff3365c7a9606a0a4b2e6905b1db08b941b36 100644 (file)
@@ -5856,6 +5856,7 @@ getTables(Archive *fout, int *numTables)
        int                     i_partkeydef;
        int                     i_ispartition;
        int                     i_partbound;
+       int                     i_amname;
 
        /*
         * Find all the tables and table-like objects.
@@ -5941,7 +5942,7 @@ getTables(Archive *fout, int *numTables)
                                                  "tc.relfrozenxid AS tfrozenxid, "
                                                  "tc.relminmxid AS tminmxid, "
                                                  "c.relpersistence, c.relispopulated, "
-                                                 "c.relreplident, c.relpages, "
+                                                 "c.relreplident, c.relpages, am.amname, "
                                                  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
                                                  "d.refobjid AS owning_tab, "
                                                  "d.refobjsubid AS owning_col, "
@@ -5972,6 +5973,7 @@ getTables(Archive *fout, int *numTables)
                                                  "d.objsubid = 0 AND "
                                                  "d.refclassid = c.tableoid AND d.deptype IN ('a', 'i')) "
                                                  "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+                                                 "LEFT JOIN pg_am am ON (c.relam = am.oid) "
                                                  "LEFT JOIN pg_init_privs pip ON "
                                                  "(c.oid = pip.objoid "
                                                  "AND pip.classoid = 'pg_class'::regclass "
@@ -6439,6 +6441,7 @@ getTables(Archive *fout, int *numTables)
        i_partkeydef = PQfnumber(res, "partkeydef");
        i_ispartition = PQfnumber(res, "ispartition");
        i_partbound = PQfnumber(res, "partbound");
+       i_amname = PQfnumber(res, "amname");
 
        if (dopt->lockWaitTimeout)
        {
@@ -6508,6 +6511,10 @@ getTables(Archive *fout, int *numTables)
                else
                        tblinfo[i].checkoption = pg_strdup(PQgetvalue(res, i, i_checkoption));
                tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
+               if (PQgetisnull(res, i, i_amname))
+                       tblinfo[i].amname = NULL;
+               else
+                       tblinfo[i].amname = pg_strdup(PQgetvalue(res, i, i_amname));
 
                /* other fields were zeroed above */
 
@@ -12632,6 +12639,9 @@ dumpAccessMethod(Archive *fout, AccessMethodInfo *aminfo)
                case AMTYPE_INDEX:
                        appendPQExpBuffer(q, "TYPE INDEX ");
                        break;
+               case AMTYPE_TABLE:
+                       appendPQExpBuffer(q, "TYPE TABLE ");
+                       break;
                default:
                        write_msg(NULL, "WARNING: invalid type \"%c\" of access method \"%s\"\n",
                                          aminfo->amtype, qamname);
@@ -16067,18 +16077,26 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
                                                                                tbinfo->dobj.namespace->dobj.name);
 
        if (tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+       {
+               char *tableam = NULL;
+
+               if (tbinfo->relkind == RELKIND_RELATION ||
+                       tbinfo->relkind == RELKIND_MATVIEW)
+                       tableam = tbinfo->amname;
+
                ArchiveEntry(fout, tbinfo->dobj.catId, tbinfo->dobj.dumpId,
                                         ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
                                                                  .namespace = tbinfo->dobj.namespace->dobj.name,
                                                                  .tablespace = (tbinfo->relkind == RELKIND_VIEW) ?
                                                                  NULL : tbinfo->reltablespace,
+                                                                 .tableam = tableam,
                                                                  .owner = tbinfo->rolname,
                                                                  .description = reltypename,
                                                                  .section = tbinfo->postponed_def ?
                                                                  SECTION_POST_DATA : SECTION_PRE_DATA,
                                                                  .createStmt = q->data,
                                                                  .dropStmt = delq->data));
-
+       }
 
        /* Dump Table Comments */
        if (tbinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
index 21d2ab05b0e29f50e6dc2f9dd73c9bd03c5cf895..2e1b90acd0a2f20e5a5d661760d1736a3524952e 100644 (file)
@@ -324,6 +324,7 @@ typedef struct _tableInfo
        char       *partkeydef;         /* partition key definition */
        char       *partbound;          /* partition bound definition */
        bool            needs_override; /* has GENERATED ALWAYS AS IDENTITY */
+       char       *amname;                     /* relation access method */
 
        /*
         * Stuff computed only for dumpable tables.
index 668cb0f803f4d74c781186618cf5f54a01289bce..afaa26badbe449044c30dbb19ea40435b9c7d9dc 100644 (file)
@@ -3070,6 +3070,66 @@ my %tests = (
                unlike => { no_privs => 1, },
        },
 
+
+       'CREATE ACCESS METHOD regress_test_table_am' => {
+               create_order => 11,
+               create_sql   => 'CREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;',
+               regexp => qr/^
+                       \QCREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;\E
+                       \n/xm,
+               like => {
+                       %full_runs,
+                       section_pre_data        => 1,
+               },
+       },
+
+       # It's a bit tricky to ensure that the proper SET of default table
+       # AM occurs. To achieve that we create a table with the standard
+       # AM, test AM, standard AM. That guarantees that there needs to be
+       # a SET interspersed.  Then use a regex that prevents interspersed
+       # SET ...; statements, followed by the exptected CREATE TABLE. Not
+       # pretty, but seems hard to do better in this framework.
+       'CREATE TABLE regress_pg_dump_table_am' => {
+               create_order => 12,
+               create_sql => '
+                       CREATE TABLE dump_test.regress_pg_dump_table_am_0() USING heap;
+                       CREATE TABLE dump_test.regress_pg_dump_table_am_1 (col1 int) USING regress_table_am;
+                       CREATE TABLE dump_test.regress_pg_dump_table_am_2() USING heap;',
+               regexp => qr/^
+                       \QSET default_table_access_method = regress_table_am;\E
+                       (\n(?!SET[^;]+;)[^\n]*)*
+                       \n\QCREATE TABLE dump_test.regress_pg_dump_table_am_1 (\E
+                       \n\s+\Qcol1 integer\E
+                       \n\);/xm,
+               like => {
+                       %full_runs,
+                       %dump_test_schema_runs,
+                       section_pre_data => 1,
+               },
+               unlike => { exclude_dump_test_schema => 1},
+       },
+
+       'CREATE MATERIALIZED VIEW regress_pg_dump_matview_am' => {
+               create_order => 13,
+               create_sql => '
+                       CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_0 USING heap AS SELECT 1;
+                       CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_1
+                               USING regress_table_am AS SELECT count(*) FROM pg_class;
+                       CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_2 USING heap AS SELECT 1;',
+               regexp => qr/^
+                       \QSET default_table_access_method = regress_table_am;\E
+                       (\n(?!SET[^;]+;)[^\n]*)*
+                       \QCREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_1 AS\E
+                       \n\s+\QSELECT count(*) AS count\E
+                       \n\s+\QFROM pg_class\E
+                       \n\s+\QWITH NO DATA;\E\n/xm,
+               like => {
+                       %full_runs,
+                       %dump_test_schema_runs,
+                       section_pre_data => 1,
+               },
+               unlike => { exclude_dump_test_schema => 1},
+       }
 );
 
 #########################################