pg_dump: allow multiple rows per insert
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Mar 2019 12:26:14 +0000 (09:26 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Mar 2019 12:34:17 +0000 (09:34 -0300)
This is useful to speed up loading data in a different database engine.

Authors: Surafel Temesgen and David Rowley.  Lightly edited by Álvaro.
Reviewed-by: Fabien Coelho
Discussion: https://postgr.es/m/CALAY4q9kumSdnRBzvRJvSRf2+BH20YmSvzqOkvwpEmodD-xv6g@mail.gmail.com

doc/src/sgml/ref/pg_dump.sgml
src/bin/pg_dump/pg_backup.h
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/t/001_basic.pl
src/bin/pg_dump/t/002_pg_dump.pl

index 033eae9b469ba855875586fc875c43a4fdf3f5ac..e0e65f9c21c463c00fd786f9534ea6d31b4dec65 100644 (file)
@@ -661,9 +661,9 @@ PostgreSQL documentation
         ...</literal>).  This will make restoration very slow; it is mainly
         useful for making dumps that can be loaded into
         non-<productname>PostgreSQL</productname> databases.
-        However, since this option generates a separate command for each row,
-        an error in reloading a row causes only that row to be lost rather
-        than the entire table contents.
+        Any error during reloading will cause only rows that are part of the
+        problematic <command>INSERT</command> to be lost, rather than the
+        entire table contents.
        </para>
       </listitem>
      </varlistentry>
@@ -775,13 +775,12 @@ PostgreSQL documentation
         than <command>COPY</command>).  This will make restoration very slow;
         it is mainly useful for making dumps that can be loaded into
         non-<productname>PostgreSQL</productname> databases.
-        However, since this option generates a separate command for each row,
-        an error in reloading a row causes only that row to be lost rather
-        than the entire table contents.
-        Note that
-        the restore might fail altogether if you have rearranged column order.
-        The <option>--column-inserts</option> option is safe against column
-        order changes, though even slower.
+        Any error during reloading will cause only rows that are part of the
+        problematic <command>INSERT</command> to be lost, rather than the
+        entire table contents.  Note that the restore might fail altogether if
+        you have rearranged column order.  The
+        <option>--column-inserts</option> option is safe against column order
+        changes, though even slower.
        </para>
       </listitem>
      </varlistentry>
@@ -925,8 +924,9 @@ PostgreSQL documentation
        <para>
         Add <literal>ON CONFLICT DO NOTHING</literal> to
         <command>INSERT</command> commands.
-        This option is not valid unless <option>--inserts</option> or
-        <option>--column-inserts</option> is also specified.
+        This option is not valid unless <option>--inserts</option>,
+        <option>--column-inserts</option> or
+        <option>--rows-per-insert</option> is also specified.
        </para>
       </listitem>
      </varlistentry>
@@ -949,6 +949,20 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--rows-per-insert=<replaceable class="parameter">nrows</replaceable></option></term>
+      <listitem>
+       <para>
+        Dump data as <command>INSERT</command> commands (rather than
+        <command>COPY</command>).  Controls the maximum number of rows per
+        <command>INSERT</command> command. The value specified must be a
+        number greater than zero.  Any error during reloading will cause only
+        rows that are part of the problematic <command>INSERT</command> to be
+        lost, rather than the entire table contents.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
        <listitem>
index 4a2e122e2d22c6ea0d2f933790bb35d9691f5868..7ab27391fbaf1d9c41230aefe515ff91b7602bfc 100644 (file)
@@ -140,10 +140,10 @@ typedef struct _dumpOptions
        int                     dumpSections;   /* bitmask of chosen sections */
        bool            aclsSkip;
        const char *lockWaitTimeout;
+       int                     dump_inserts;   /* 0 = COPY, otherwise rows per INSERT */
 
        /* flags for various command-line long options */
        int                     disable_dollar_quoting;
-       int                     dump_inserts;
        int                     column_inserts;
        int                     if_exists;
        int                     no_comments;
index 679ff3365c7a9606a0a4b2e6905b1db08b941b36..082dc841e9b4c133e85a224dfaff78713feaead8 100644 (file)
@@ -138,6 +138,12 @@ static const CatalogId nilCatalogId = {0, 0};
 static bool have_extra_float_digits = false;
 static int extra_float_digits;
 
+/*
+ * The default number of rows per INSERT when
+ * --inserts is specified without --rows-per-insert
+ */
+#define DUMP_DEFAULT_ROWS_PER_INSERT 1
+
 /*
  * Macro for producing quoted, schema-qualified name of a dumpable object.
  */
@@ -306,11 +312,13 @@ main(int argc, char **argv)
        DumpableObject *boundaryObjs;
        int                     i;
        int                     optindex;
+       char       *endptr;
        RestoreOptions *ropt;
        Archive    *fout;                       /* the script file */
        const char *dumpencoding = NULL;
        const char *dumpsnapshot = NULL;
        char       *use_role = NULL;
+       long            rowsPerInsert;
        int                     numWorkers = 1;
        trivalue        prompt_password = TRI_DEFAULT;
        int                     compressLevel = -1;
@@ -363,7 +371,7 @@ main(int argc, char **argv)
                {"exclude-table-data", required_argument, NULL, 4},
                {"extra-float-digits", required_argument, NULL, 8},
                {"if-exists", no_argument, &dopt.if_exists, 1},
-               {"inserts", no_argument, &dopt.dump_inserts, 1},
+               {"inserts", no_argument, NULL, 9},
                {"lock-wait-timeout", required_argument, NULL, 2},
                {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
                {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
@@ -382,6 +390,7 @@ main(int argc, char **argv)
                {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
                {"no-sync", no_argument, NULL, 7},
                {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
+               {"rows-per-insert", required_argument, NULL, 10},
 
                {NULL, 0, NULL, 0}
        };
@@ -572,6 +581,31 @@ main(int argc, char **argv)
                                }
                                break;
 
+                       case 9:                         /* inserts */
+
+                               /*
+                                * dump_inserts also stores --rows-per-insert, careful not to
+                                * overwrite that.
+                                */
+                               if (dopt.dump_inserts == 0)
+                                       dopt.dump_inserts = DUMP_DEFAULT_ROWS_PER_INSERT;
+                               break;
+
+                       case 10:                        /* rows per insert */
+                               errno = 0;
+                               rowsPerInsert = strtol(optarg, &endptr, 10);
+
+                               if (endptr == optarg || *endptr != '\0' ||
+                                       rowsPerInsert <= 0 || rowsPerInsert > INT_MAX ||
+                                       errno == ERANGE)
+                               {
+                                       write_msg(NULL, "rows-per-insert must be in range %d..%d\n",
+                                                         1, INT_MAX);
+                                       exit_nicely(1);
+                               }
+                               dopt.dump_inserts = (int) rowsPerInsert;
+                               break;
+
                        default:
                                fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                                exit_nicely(1);
@@ -596,8 +630,8 @@ main(int argc, char **argv)
        }
 
        /* --column-inserts implies --inserts */
-       if (dopt.column_inserts)
-               dopt.dump_inserts = 1;
+       if (dopt.column_inserts && dopt.dump_inserts == 0)
+               dopt.dump_inserts = DUMP_DEFAULT_ROWS_PER_INSERT;
 
        /*
         * Binary upgrade mode implies dumping sequence data even in schema-only
@@ -622,8 +656,12 @@ main(int argc, char **argv)
        if (dopt.if_exists && !dopt.outputClean)
                exit_horribly(NULL, "option --if-exists requires option -c/--clean\n");
 
-       if (dopt.do_nothing && !(dopt.dump_inserts || dopt.column_inserts))
-               exit_horribly(NULL, "option --on-conflict-do-nothing requires option --inserts or --column-inserts\n");
+       /*
+        * --inserts are already implied above if --column-inserts or
+        * --rows-per-insert were specified.
+        */
+       if (dopt.do_nothing && dopt.dump_inserts == 0)
+               exit_horribly(NULL, "option --on-conflict-do-nothing requires option --inserts, --rows-per-insert or --column-inserts\n");
 
        /* Identify archive format to emit */
        archiveFormat = parseArchiveFormat(format, &archiveMode);
@@ -993,6 +1031,7 @@ help(const char *progname)
        printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
        printf(_("  --on-conflict-do-nothing     add ON CONFLICT DO NOTHING to INSERT commands\n"));
        printf(_("  --quote-all-identifiers      quote all identifiers, even if not key words\n"));
+       printf(_("  --rows-per-insert=NROWS      number of rows per INSERT; implies --inserts\n"));
        printf(_("  --section=SECTION            dump named section (pre-data, data, or post-data)\n"));
        printf(_("  --serializable-deferrable    wait until the dump can run without anomalies\n"));
        printf(_("  --snapshot=SNAPSHOT          use given snapshot for the dump\n"));
@@ -1909,9 +1948,9 @@ dumpTableData_insert(Archive *fout, void *dcontext)
        PQExpBuffer q = createPQExpBuffer();
        PQExpBuffer insertStmt = NULL;
        PGresult   *res;
-       int                     tuple;
        int                     nfields;
-       int                     field;
+       int                     rows_per_statement = dopt->dump_inserts;
+       int                     rows_this_statement = 0;
 
        appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
                                          "SELECT * FROM ONLY %s",
@@ -1926,69 +1965,88 @@ dumpTableData_insert(Archive *fout, void *dcontext)
                res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
                                                          PGRES_TUPLES_OK);
                nfields = PQnfields(res);
-               for (tuple = 0; tuple < PQntuples(res); tuple++)
+
+               /*
+                * First time through, we build as much of the INSERT statement as
+                * possible in "insertStmt", which we can then just print for each
+                * statement. If the table happens to have zero columns then this will
+                * be a complete statement, otherwise it will end in "VALUES" and be
+                * ready to have the row's column values printed.
+                */
+               if (insertStmt == NULL)
                {
-                       /*
-                        * First time through, we build as much of the INSERT statement as
-                        * possible in "insertStmt", which we can then just print for each
-                        * line. If the table happens to have zero columns then this will
-                        * be a complete statement, otherwise it will end in "VALUES(" and
-                        * be ready to have the row's column values appended.
-                        */
-                       if (insertStmt == NULL)
-                       {
-                               TableInfo  *targettab;
+                       TableInfo  *targettab;
 
-                               insertStmt = createPQExpBuffer();
+                       insertStmt = createPQExpBuffer();
 
-                               /*
-                                * When load-via-partition-root is set, get the root table
-                                * name for the partition table, so that we can reload data
-                                * through the root table.
-                                */
-                               if (dopt->load_via_partition_root && tbinfo->ispartition)
-                                       targettab = getRootTableInfo(tbinfo);
-                               else
-                                       targettab = tbinfo;
+                       /*
+                        * When load-via-partition-root is set, get the root table name
+                        * for the partition table, so that we can reload data through the
+                        * root table.
+                        */
+                       if (dopt->load_via_partition_root && tbinfo->ispartition)
+                               targettab = getRootTableInfo(tbinfo);
+                       else
+                               targettab = tbinfo;
 
-                               appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
-                                                                 fmtQualifiedDumpable(targettab));
+                       appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
+                                                         fmtQualifiedDumpable(targettab));
 
-                               /* corner case for zero-column table */
-                               if (nfields == 0)
-                               {
-                                       appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
-                               }
-                               else
+                       /* corner case for zero-column table */
+                       if (nfields == 0)
+                       {
+                               appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
+                       }
+                       else
+                       {
+                               /* append the list of column names if required */
+                               if (dopt->column_inserts)
                                {
-                                       /* append the list of column names if required */
-                                       if (dopt->column_inserts)
+                                       appendPQExpBufferChar(insertStmt, '(');
+                                       for (int field = 0; field < nfields; field++)
                                        {
-                                               appendPQExpBufferChar(insertStmt, '(');
-                                               for (field = 0; field < nfields; field++)
-                                               {
-                                                       if (field > 0)
-                                                               appendPQExpBufferStr(insertStmt, ", ");
-                                                       appendPQExpBufferStr(insertStmt,
-                                                                                                fmtId(PQfname(res, field)));
-                                               }
-                                               appendPQExpBufferStr(insertStmt, ") ");
+                                               if (field > 0)
+                                                       appendPQExpBufferStr(insertStmt, ", ");
+                                               appendPQExpBufferStr(insertStmt,
+                                                                                        fmtId(PQfname(res, field)));
                                        }
+                                       appendPQExpBufferStr(insertStmt, ") ");
+                               }
 
-                                       if (tbinfo->needs_override)
-                                               appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
+                               if (tbinfo->needs_override)
+                                       appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
 
-                                       appendPQExpBufferStr(insertStmt, "VALUES (");
-                               }
+                               appendPQExpBufferStr(insertStmt, "VALUES");
                        }
+               }
 
-                       archputs(insertStmt->data, fout);
+               for (int tuple = 0; tuple < PQntuples(res); tuple++)
+               {
+                       /* Write the INSERT if not in the middle of a multi-row INSERT. */
+                       if (rows_this_statement == 0)
+                               archputs(insertStmt->data, fout);
 
-                       /* if it is zero-column table then we're done */
+                       /*
+                        * If it is zero-column table then we've aleady written the
+                        * complete statement, which will mean we've disobeyed
+                        * --rows-per-insert when it's set greater than 1.  We do support
+                        * a way to make this multi-row with: SELECT UNION ALL SELECT
+                        * UNION ALL ... but that's non-standard so we should avoid it
+                        * given that using INSERTs is mostly only ever needed for
+                        * cross-database exports.
+                        */
                        if (nfields == 0)
                                continue;
 
-                       for (field = 0; field < nfields; field++)
+                       /* Emit a row heading */
+                       if (rows_per_statement == 1)
+                               archputs(" (", fout);
+                       else if (rows_this_statement > 0)
+                               archputs(",\n\t(", fout);
+                       else
+                               archputs("\n\t(", fout);
+
+                       for (int field = 0; field < nfields; field++)
                        {
                                if (field > 0)
                                        archputs(", ", fout);
@@ -2053,10 +2111,19 @@ dumpTableData_insert(Archive *fout, void *dcontext)
                                }
                        }
 
-                       if (!dopt->do_nothing)
-                               archputs(");\n", fout);
-                       else
-                               archputs(") ON CONFLICT DO NOTHING;\n", fout);
+                       /* Terminate the row ... */
+                       archputs(")", fout);
+
+                       /* ... and the statement, if the target no. of rows is reached */
+                       if (++rows_this_statement >= rows_per_statement)
+                       {
+                               if (dopt->do_nothing)
+                                       archputs(" ON CONFLICT DO NOTHING;\n", fout);
+                               else
+                                       archputs(";\n", fout);
+                               /* Reset the row counter */
+                               rows_this_statement = 0;
+                       }
                }
 
                if (PQntuples(res) <= 0)
@@ -2067,6 +2134,15 @@ dumpTableData_insert(Archive *fout, void *dcontext)
                PQclear(res);
        }
 
+       /* Terminate any statements that didn't make the row count. */
+       if (rows_this_statement > 0)
+       {
+               if (dopt->do_nothing)
+                       archputs(" ON CONFLICT DO NOTHING;\n", fout);
+               else
+                       archputs(";\n", fout);
+       }
+
        archputs("\n\n", fout);
 
        ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
index 1dad55373984070fe6428cb3ddd0a2e07902fc5d..3a58f9b8ceeeb199fb45aba07b6e4935fadba8ca 100644 (file)
@@ -118,8 +118,8 @@ command_fails_like(
 
 command_fails_like(
        [ 'pg_dump', '--on-conflict-do-nothing' ],
-       qr/\Qpg_dump: option --on-conflict-do-nothing requires option --inserts or --column-inserts\E/,
-       'pg_dump: option --on-conflict-do-nothing requires option --inserts or --column-inserts');
+       qr/pg_dump: option --on-conflict-do-nothing requires option --inserts, --rows-per-insert or --column-inserts/,
+       'pg_dump: --on-conflict-do-nothing requires --inserts, --rows-per-insert, --column-inserts');
 
 # pg_dumpall command-line argument checks
 command_fails_like(
index afaa26badbe449044c30dbb19ea40435b9c7d9dc..de6895122e51e9dcdea9eb6f54b55de18101071a 100644 (file)
@@ -295,6 +295,18 @@ my %pgdump_runs = (
                        "$tempdir/role_parallel",
                ],
        },
+       rows_per_insert => {
+               dump_cmd => [
+                       'pg_dump',
+                       '--no-sync',
+                       "--file=$tempdir/rows_per_insert.sql",
+                       '-a',
+                       '--rows-per-insert=4',
+                       '--table=dump_test.test_table',
+                       '--table=dump_test.test_fourth_table',
+                       'postgres',
+               ],
+       },
        schema_only => {
                dump_cmd => [
                        'pg_dump',                         '--format=plain',
@@ -1228,10 +1240,11 @@ my %tests = (
        'COPY test_fourth_table' => {
                create_order => 7,
                create_sql =>
-                 'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;',
+                 'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;'
+                 . 'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;',
                regexp => qr/^
                        \QCOPY dump_test.test_fourth_table  FROM stdin;\E
-                       \n\n\\\.\n
+                       \n\n\n\\\.\n
                        /xm,
                like => {
                        %full_runs,
@@ -1295,6 +1308,19 @@ my %tests = (
                like => { column_inserts => 1, },
        },
 
+       'test_table with 4-row INSERTs' => {
+               regexp => qr/^
+                       (?:
+                               INSERT\ INTO\ dump_test\.test_table\ VALUES\n
+                               (?:\t\(\d,\ NULL,\ NULL,\ NULL\),\n){3}
+                               \t\(\d,\ NULL,\ NULL,\ NULL\);\n
+                       ){2}
+                       INSERT\ INTO\ dump_test\.test_table\ VALUES\n
+                       \t\(\d,\ NULL,\ NULL,\ NULL\);
+                       /xm,
+               like => { rows_per_insert => 1, },
+       },
+
        'INSERT INTO test_second_table' => {
                regexp => qr/^
                        (?:INSERT\ INTO\ dump_test\.test_second_table\ \(col1,\ col2\)
@@ -1304,8 +1330,8 @@ my %tests = (
 
        'INSERT INTO test_fourth_table' => {
                regexp =>
-                 qr/^\QINSERT INTO dump_test.test_fourth_table DEFAULT VALUES;\E/m,
-               like => { column_inserts => 1, },
+                 qr/^(?:INSERT INTO dump_test\.test_fourth_table DEFAULT VALUES;\n){2}/m,
+               like => { column_inserts => 1, rows_per_insert => 1, },
        },
 
        'INSERT INTO test_fifth_table' => {