vacuumdb: enable parallel mode
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 23 Jan 2015 18:02:45 +0000 (15:02 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 23 Jan 2015 18:02:45 +0000 (15:02 -0300)
This mode allows vacuumdb to open several server connections to vacuum
or analyze several tables simultaneously.

Author: Dilip Kumar.  Some reworking by Álvaro Herrera
Reviewed by: Jeff Janes, Amit Kapila, Magnus Hagander, Andres Freund

doc/src/sgml/ref/vacuumdb.sgml
src/bin/pg_dump/parallel.c
src/bin/scripts/common.c
src/bin/scripts/common.h
src/bin/scripts/vacuumdb.c

index 3ecd9999812080e41270540f402f2829227a193d..e38c34aea37890f57d8a49fd0f87de27715378e2 100644 (file)
@@ -203,6 +203,30 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the vacuum or analyze commands in parallel by running
+        <replaceable class="parameter">njobs</replaceable>
+        commands simultaneously.  This option reduces the time of the
+        processing but it also increases the load on the database server.
+       </para>
+       <para>
+        <application>vacuumdb</application> will open
+        <replaceable class="parameter">njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections">
+        setting is high enough to accommodate all connections.
+       </para>
+       <para>
+        Note that using this mode together with the <option>-f</option>
+        (<literal>FULL</literal>) option might cause deadlock failures if
+        certain system catalogs are processed in parallel.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--analyze-in-stages</option></term>
       <listitem>
index d942a75f7cdf0d30279fd81b2f32c94afb9779b2..1bf76114c09159cb91412fb5f593cf8a8f7c28c1 100644 (file)
@@ -1160,7 +1160,7 @@ select_loop(int maxFd, fd_set *workerset)
        i = select(maxFd + 1, workerset, NULL, NULL, NULL);
 
        /*
-        * If we Ctrl-C the master process , it's likely that we interrupt
+        * If we Ctrl-C the master process, it's likely that we interrupt
         * select() here. The signal handler will set wantAbort == true and
         * the shutdown journey starts from here. Note that we'll come back
         * here later when we tell all workers to terminate and read their
index 6bfe2e628b1a61bb4055951fd3196a9df9b7217e..da142aaa643eb19a9a9ca0d0ced3cacd6b7df4aa 100644 (file)
 
 #include "common.h"
 
-static void SetCancelConn(PGconn *conn);
-static void ResetCancelConn(void);
 
 static PGcancel *volatile cancelConn = NULL;
+bool CancelRequested = false;
 
 #ifdef WIN32
 static CRITICAL_SECTION cancelConnLock;
@@ -291,7 +290,7 @@ yesno_prompt(const char *question)
  *
  * Set cancelConn to point to the current database connection.
  */
-static void
+void
 SetCancelConn(PGconn *conn)
 {
    PGcancel   *oldCancelConn;
@@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn)
  *
  * Free the current cancel connection, if any, and set to NULL.
  */
-static void
+void
 ResetCancelConn(void)
 {
    PGcancel   *oldCancelConn;
@@ -345,9 +344,8 @@ ResetCancelConn(void)
 
 #ifndef WIN32
 /*
- * Handle interrupt signals by canceling the current command,
- * if it's being executed through executeMaintenanceCommand(),
- * and thus has a cancelConn set.
+ * Handle interrupt signals by canceling the current command, if a cancelConn
+ * is set.
  */
 static void
 handle_sigint(SIGNAL_ARGS)
@@ -359,10 +357,15 @@ handle_sigint(SIGNAL_ARGS)
    if (cancelConn != NULL)
    {
        if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+       {
+           CancelRequested = true;
            fprintf(stderr, _("Cancel request sent\n"));
+       }
        else
            fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
    }
+   else
+       CancelRequested = true;
 
    errno = save_errno;         /* just in case the write changed it */
 }
@@ -392,10 +395,16 @@ consoleHandler(DWORD dwCtrlType)
        if (cancelConn != NULL)
        {
            if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+           {
                fprintf(stderr, _("Cancel request sent\n"));
+               CancelRequested = true;
+           }
            else
                fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
        }
+       else
+           CancelRequested = true;
+
        LeaveCriticalSection(&cancelConnLock);
 
        return TRUE;
index c0c1715bc16846335683b13f5326701acf2cf59b..b5ce1ed7444012921ec175ea2dbde00e7aebd432 100644 (file)
@@ -21,6 +21,8 @@ enum trivalue
    TRI_YES
 };
 
+extern bool CancelRequested;
+
 typedef void (*help_handler) (const char *progname);
 
 extern void handle_help_version_opts(int argc, char *argv[],
@@ -49,4 +51,8 @@ extern bool yesno_prompt(const char *question);
 
 extern void setup_cancel_handler(void);
 
+extern void SetCancelConn(PGconn *conn);
+extern void ResetCancelConn(void);
+
+
 #endif   /* COMMON_H */
index 957fdb6e189e1725f32de6cac725921d718792ae..506cdc7def27adbfa5a925b1becbfeca131eeefb 100644 (file)
  */
 
 #include "postgres_fe.h"
+
 #include "common.h"
 #include "dumputils.h"
 
 
-static void vacuum_one_database(const char *dbname, bool full, bool verbose,
-   bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
-                   const char *table, const char *host, const char *port,
+#define ERRCODE_UNDEFINED_TABLE  "42P01"
+
+/* Parallel vacuuming stuff */
+typedef struct ParallelSlot
+{
+   PGconn     *connection;
+   pgsocket    sock;
+   bool        isFree;
+} ParallelSlot;
+
+/* vacuum options controlled by user flags */
+typedef struct vacuumingOptions
+{
+   bool        analyze_only;
+   bool        verbose;
+   bool        and_analyze;
+   bool        full;
+   bool        freeze;
+} vacuumingOptions;
+
+
+static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+                   int stage,
+                   SimpleStringList *tables,
+                   const char *host, const char *port,
                    const char *username, enum trivalue prompt_password,
+                   int concurrentCons,
                    const char *progname, bool echo, bool quiet);
-static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
-                    bool analyze_only, bool analyze_in_stages, bool freeze,
+
+static void vacuum_all_databases(vacuumingOptions *vacopts,
+                    bool analyze_in_stages,
                     const char *maintenance_db,
                     const char *host, const char *port,
                     const char *username, enum trivalue prompt_password,
+                    int concurrentCons,
                     const char *progname, bool echo, bool quiet);
 
+static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
+                      vacuumingOptions *vacopts, const char *table);
+
+static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
+                  const char *dbname, const char *table,
+                  const char *progname, bool async);
+
+static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
+           const char *dbname, const char *progname);
+
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+              const char *progname);
+
+static void DisconnectDatabase(ParallelSlot *slot);
+
+static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
+
+static void init_slot(ParallelSlot *slot, PGconn *conn);
+
 static void help(const char *progname);
 
+/* For analyze-in-stages mode */
+#define ANALYZE_NO_STAGE   -1
+#define ANALYZE_NUM_STAGES 3
+
 
 int
 main(int argc, char *argv[])
@@ -49,6 +98,7 @@ main(int argc, char *argv[])
        {"table", required_argument, NULL, 't'},
        {"full", no_argument, NULL, 'f'},
        {"verbose", no_argument, NULL, 'v'},
+       {"jobs", required_argument, NULL, 'j'},
        {"maintenance-db", required_argument, NULL, 2},
        {"analyze-in-stages", no_argument, NULL, 3},
        {NULL, 0, NULL, 0}
@@ -57,7 +107,6 @@ main(int argc, char *argv[])
    const char *progname;
    int         optindex;
    int         c;
-
    const char *dbname = NULL;
    const char *maintenance_db = NULL;
    char       *host = NULL;
@@ -66,21 +115,23 @@ main(int argc, char *argv[])
    enum trivalue prompt_password = TRI_DEFAULT;
    bool        echo = false;
    bool        quiet = false;
-   bool        and_analyze = false;
-   bool        analyze_only = false;
+   vacuumingOptions vacopts;
    bool        analyze_in_stages = false;
-   bool        freeze = false;
    bool        alldb = false;
-   bool        full = false;
-   bool        verbose = false;
    SimpleStringList tables = {NULL, NULL};
+   int         concurrentCons = 1;
+   int         tbl_count = 0;
+
+   /* initialize options to all false */
+   memset(&vacopts, 0, sizeof(vacopts));
 
    progname = get_progname(argv[0]);
+
    set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
 
    handle_help_version_opts(argc, argv, "vacuumdb", help);
 
-   while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
+   while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
    {
        switch (c)
        {
@@ -109,31 +160,49 @@ main(int argc, char *argv[])
                dbname = pg_strdup(optarg);
                break;
            case 'z':
-               and_analyze = true;
+               vacopts.and_analyze = true;
                break;
            case 'Z':
-               analyze_only = true;
+               vacopts.analyze_only = true;
                break;
            case 'F':
-               freeze = true;
+               vacopts.freeze = true;
                break;
            case 'a':
                alldb = true;
                break;
            case 't':
-               simple_string_list_append(&tables, optarg);
-               break;
+               {
+                   simple_string_list_append(&tables, optarg);
+                   tbl_count++;
+                   break;
+               }
            case 'f':
-               full = true;
+               vacopts.full = true;
                break;
            case 'v':
-               verbose = true;
+               vacopts.verbose = true;
+               break;
+           case 'j':
+               concurrentCons = atoi(optarg);
+               if (concurrentCons <= 0)
+               {
+                   fprintf(stderr, _("%s: number of parallel \"jobs\" must be at least 1\n"),
+                           progname);
+                   exit(1);
+               }
+               if (concurrentCons > FD_SETSIZE - 1)
+               {
+                   fprintf(stderr, _("%s: too many parallel jobs requested (maximum: %d)\n"),
+                           progname, FD_SETSIZE - 1);
+                   exit(1);
+               }
                break;
            case 2:
                maintenance_db = pg_strdup(optarg);
                break;
            case 3:
-               analyze_in_stages = analyze_only = true;
+               analyze_in_stages = vacopts.analyze_only = true;
                break;
            default:
                fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
@@ -141,7 +210,6 @@ main(int argc, char *argv[])
        }
    }
 
-
    /*
     * Non-option argument specifies database name as long as it wasn't
     * already specified with -d / --dbname
@@ -160,18 +228,18 @@ main(int argc, char *argv[])
        exit(1);
    }
 
-   if (analyze_only)
+   if (vacopts.analyze_only)
    {
-       if (full)
+       if (vacopts.full)
        {
-           fprintf(stderr, _("%s: cannot use the \"full\" option when performing only analyze\n"),
-                   progname);
+           fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+                   progname, "full");
            exit(1);
        }
-       if (freeze)
+       if (vacopts.freeze)
        {
-           fprintf(stderr, _("%s: cannot use the \"freeze\" option when performing only analyze\n"),
-                   progname);
+           fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+                   progname, "freeze");
            exit(1);
        }
        /* allow 'and_analyze' with 'analyze_only' */
@@ -179,6 +247,10 @@ main(int argc, char *argv[])
 
    setup_cancel_handler();
 
+   /* Avoid opening extra connections. */
+   if (tbl_count && (concurrentCons > tbl_count))
+       concurrentCons = tbl_count;
+
    if (alldb)
    {
        if (dbname)
@@ -194,9 +266,12 @@ main(int argc, char *argv[])
            exit(1);
        }
 
-       vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
-                            maintenance_db, host, port, username,
-                            prompt_password, progname, echo, quiet);
+       vacuum_all_databases(&vacopts,
+                            analyze_in_stages,
+                            maintenance_db,
+                            host, port, username, prompt_password,
+                            concurrentCons,
+                            progname, echo, quiet);
    }
    else
    {
@@ -210,213 +285,628 @@ main(int argc, char *argv[])
                dbname = get_user_name_or_exit(progname);
        }
 
-       if (tables.head != NULL)
+       if (analyze_in_stages)
        {
-           SimpleStringListCell *cell;
+           int         stage;
 
-           for (cell = tables.head; cell; cell = cell->next)
+           for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
            {
-               vacuum_one_database(dbname, full, verbose, and_analyze,
-                                   analyze_only, analyze_in_stages, -1,
-                                   freeze, cell->val,
+               vacuum_one_database(dbname, &vacopts,
+                                   stage,
+                                   &tables,
                                    host, port, username, prompt_password,
+                                   concurrentCons,
                                    progname, echo, quiet);
            }
        }
        else
-           vacuum_one_database(dbname, full, verbose, and_analyze,
-                               analyze_only, analyze_in_stages, -1,
-                               freeze, NULL,
+           vacuum_one_database(dbname, &vacopts,
+                               ANALYZE_NO_STAGE,
+                               &tables,
                                host, port, username, prompt_password,
+                               concurrentCons,
                                progname, echo, quiet);
    }
 
    exit(0);
 }
 
-
+/*
+ * vacuum_one_database
+ *
+ * Process tables in the given database.  If the 'tables' list is empty,
+ * process all tables in the database.
+ *
+ * Note that this function is only concerned with running exactly one stage
+ * when in analyze-in-stages mode; caller must iterate on us if necessary.
+ *
+ * If concurrentCons is > 1, multiple connections are used to vacuum tables
+ * in parallel.  In this case and if the table list is empty, we first obtain
+ * a list of tables from the database.
+ */
 static void
-run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, const char *table, const char *progname)
+vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+                   int stage,
+                   SimpleStringList *tables,
+                   const char *host, const char *port,
+                   const char *username, enum trivalue prompt_password,
+                   int concurrentCons,
+                   const char *progname, bool echo, bool quiet)
 {
-   if (!executeMaintenanceCommand(conn, sql, echo))
+   PQExpBufferData sql;
+   PGconn     *conn;
+   SimpleStringListCell *cell;
+   ParallelSlot *slots = NULL;
+   SimpleStringList dbtables = {NULL, NULL};
+   int         i;
+   bool        result = 0;
+   bool        parallel = concurrentCons > 1;
+   const char *stage_commands[] = {
+       "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+       "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+       "RESET default_statistics_target;"
+   };
+   const char *stage_messages[] = {
+       gettext_noop("Generating minimal optimizer statistics (1 target)"),
+       gettext_noop("Generating medium optimizer statistics (10 targets)"),
+       gettext_noop("Generating default (full) optimizer statistics")
+   };
+
+   Assert(stage == ANALYZE_NO_STAGE ||
+          (stage >= 0 && stage < ANALYZE_NUM_STAGES));
+
+   if (!quiet)
    {
-       if (table)
-           fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
-                   progname, table, dbname, PQerrorMessage(conn));
+       if (stage != ANALYZE_NO_STAGE)
+           printf(_("%s: processing database \"%s\": %s\n"), progname, dbname,
+                  stage_messages[stage]);
        else
-           fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
-                   progname, dbname, PQerrorMessage(conn));
-       PQfinish(conn);
-       exit(1);
+           printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
+       fflush(stdout);
    }
-}
 
+   conn = connectDatabase(dbname, host, port, username, prompt_password,
+                          progname, false);
+
+   initPQExpBuffer(&sql);
+
+   /*
+    * If a table list is not provided and we're using multiple connections,
+    * prepare the list of tables by querying the catalogs.
+    */
+   if (parallel && (!tables || !tables->head))
+   {
+       PQExpBufferData buf;
+       PGresult   *res;
+       int         ntups;
+       int         i;
+
+       initPQExpBuffer(&buf);
 
+       res = executeQuery(conn,
+           "SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns\n"
+            " WHERE relkind IN (\'r\', \'m\') AND c.relnamespace = ns.oid\n"
+                          " ORDER BY c.relpages DESC;",
+                          progname, echo);
+
+       ntups = PQntuples(res);
+       for (i = 0; i < ntups; i++)
+       {
+           appendPQExpBuffer(&buf, "%s",
+                             fmtQualifiedId(PQserverVersion(conn),
+                                            PQgetvalue(res, i, 1),
+                                            PQgetvalue(res, i, 0)));
+
+           simple_string_list_append(&dbtables, buf.data);
+           resetPQExpBuffer(&buf);
+       }
+
+       termPQExpBuffer(&buf);
+       tables = &dbtables;
+
+       /*
+        * If there are more connections than vacuumable relations, we don't
+        * need to use them all.
+        */
+       if (concurrentCons > ntups)
+           concurrentCons = ntups;
+       if (concurrentCons <= 1)
+           parallel = false;
+   }
+
+   /*
+    * Setup the database connections. We reuse the connection we already have
+    * for the first slot.  If not in parallel mode, the first slot in the
+    * array contains the connection.
+    */
+   slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
+   init_slot(slots, conn);
+   if (parallel)
+   {
+       for (i = 1; i < concurrentCons; i++)
+       {
+           conn = connectDatabase(dbname, host, port, username, prompt_password,
+                                  progname, false);
+           init_slot(slots + i, conn);
+       }
+   }
+
+   /*
+    * Prepare all the connections to run the appropriate analyze stage, if
+    * caller requested that mode.
+    */
+   if (stage != ANALYZE_NO_STAGE)
+   {
+       int         j;
+
+       /* We already emitted the message above */
+
+       for (j = 0; j < concurrentCons; j++)
+           executeCommand((slots + j)->connection,
+                          stage_commands[stage], progname, echo);
+   }
+
+   cell = tables ? tables->head : NULL;
+   do
+   {
+       ParallelSlot *free_slot;
+       const char *tabname = cell ? cell->val : NULL;
+
+       prepare_vacuum_command(&sql, conn, vacopts, tabname);
+
+       if (CancelRequested)
+       {
+           result = -1;
+           goto finish;
+       }
+
+       /*
+        * Get the connection slot to use.  If in parallel mode, here we wait
+        * for one connection to become available if none already is.  In
+        * non-parallel mode we simply use the only slot we have, which we
+        * know to be free.
+        */
+       if (parallel)
+       {
+           /*
+            * Get a free slot, waiting until one becomes free if none
+            * currently is.
+            */
+           free_slot = GetIdleSlot(slots, concurrentCons, dbname, progname);
+           if (!free_slot)
+           {
+               result = -1;
+               goto finish;
+           }
+
+           free_slot->isFree = false;
+       }
+       else
+           free_slot = slots;
+
+       run_vacuum_command(free_slot->connection, sql.data,
+                          echo, dbname, tabname, progname, parallel);
+
+       if (cell)
+           cell = cell->next;
+   } while (cell != NULL);
+
+   if (parallel)
+   {
+       int         j;
+
+       for (j = 0; j < concurrentCons; j++)
+       {
+           /* wait for all connection to return the results */
+           if (!GetQueryResult((slots + j)->connection, dbname, progname))
+               goto finish;
+
+           (slots + j)->isFree = true;
+       }
+   }
+
+finish:
+   for (i = 0; i < concurrentCons; i++)
+       DisconnectDatabase(slots + i);
+   pfree(slots);
+
+   termPQExpBuffer(&sql);
+
+   if (result == -1)
+       exit(1);
+}
+
+/*
+ * Vacuum/analyze all connectable databases.
+ *
+ * In analyze-in-stages mode, we process all databases in one stage before
+ * moving on to the next stage.  That ensure minimal stats are available
+ * quickly everywhere before generating more detailed ones.
+ */
 static void
-vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
-   bool analyze_only, bool analyze_in_stages, int stage, bool freeze, const char *table,
-                   const char *host, const char *port,
-                   const char *username, enum trivalue prompt_password,
-                   const char *progname, bool echo, bool quiet)
+vacuum_all_databases(vacuumingOptions *vacopts,
+                    bool analyze_in_stages,
+                    const char *maintenance_db, const char *host,
+                    const char *port, const char *username,
+                    enum trivalue prompt_password,
+                    int concurrentCons,
+                    const char *progname, bool echo, bool quiet)
 {
-   PQExpBufferData sql;
-
    PGconn     *conn;
+   PGresult   *result;
+   int         stage;
+   int         i;
 
-   initPQExpBuffer(&sql);
+   conn = connectMaintenanceDatabase(maintenance_db, host, port,
+                                     username, prompt_password, progname);
+   result = executeQuery(conn,
+           "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
+                         progname, echo);
+   PQfinish(conn);
 
-   conn = connectDatabase(dbname, host, port, username, prompt_password,
-                          progname, false);
+   if (analyze_in_stages)
+   {
+       /*
+        * When analyzing all databases in stages, we analyze them all in the
+        * fastest stage first, so that initial statistics become available
+        * for all of them as soon as possible.
+        *
+        * This means we establish several times as many connections, but
+        * that's a secondary consideration.
+        */
+       for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
+       {
+           for (i = 0; i < PQntuples(result); i++)
+           {
+               const char *dbname;
+
+               dbname = PQgetvalue(result, i, 0);
+               vacuum_one_database(dbname, vacopts,
+                                   stage,
+                                   NULL,
+                                   host, port, username, prompt_password,
+                                   concurrentCons,
+                                   progname, echo, quiet);
+           }
+       }
+   }
+   else
+   {
+       for (i = 0; i < PQntuples(result); i++)
+       {
+           const char *dbname;
 
-   if (analyze_only)
+           dbname = PQgetvalue(result, i, 0);
+           vacuum_one_database(dbname, vacopts,
+                               ANALYZE_NO_STAGE,
+                               NULL,
+                               host, port, username, prompt_password,
+                               concurrentCons,
+                               progname, echo, quiet);
+       }
+   }
+
+   PQclear(result);
+}
+
+/*
+ * Construct a vacuum/analyze command to run based on the given options, in the
+ * given string buffer, which may contain previous garbage.
+ *
+ * An optional table name can be passed; this must be already be properly
+ * quoted.  The command is semicolon-terminated.
+ */
+static void
+prepare_vacuum_command(PQExpBuffer sql, PGconn *conn, vacuumingOptions *vacopts,
+                      const char *table)
+{
+   resetPQExpBuffer(sql);
+
+   if (vacopts->analyze_only)
    {
-       appendPQExpBufferStr(&sql, "ANALYZE");
-       if (verbose)
-           appendPQExpBufferStr(&sql, " VERBOSE");
+       appendPQExpBufferStr(sql, "ANALYZE");
+       if (vacopts->verbose)
+           appendPQExpBufferStr(sql, " VERBOSE");
    }
    else
    {
-       appendPQExpBufferStr(&sql, "VACUUM");
+       appendPQExpBufferStr(sql, "VACUUM");
        if (PQserverVersion(conn) >= 90000)
        {
            const char *paren = " (";
            const char *comma = ", ";
            const char *sep = paren;
 
-           if (full)
+           if (vacopts->full)
            {
-               appendPQExpBuffer(&sql, "%sFULL", sep);
+               appendPQExpBuffer(sql, "%sFULL", sep);
                sep = comma;
            }
-           if (freeze)
+           if (vacopts->freeze)
            {
-               appendPQExpBuffer(&sql, "%sFREEZE", sep);
+               appendPQExpBuffer(sql, "%sFREEZE", sep);
                sep = comma;
            }
-           if (verbose)
+           if (vacopts->verbose)
            {
-               appendPQExpBuffer(&sql, "%sVERBOSE", sep);
+               appendPQExpBuffer(sql, "%sVERBOSE", sep);
                sep = comma;
            }
-           if (and_analyze)
+           if (vacopts->and_analyze)
            {
-               appendPQExpBuffer(&sql, "%sANALYZE", sep);
+               appendPQExpBuffer(sql, "%sANALYZE", sep);
                sep = comma;
            }
            if (sep != paren)
-               appendPQExpBufferStr(&sql, ")");
+               appendPQExpBufferStr(sql, ")");
        }
        else
        {
-           if (full)
-               appendPQExpBufferStr(&sql, " FULL");
-           if (freeze)
-               appendPQExpBufferStr(&sql, " FREEZE");
-           if (verbose)
-               appendPQExpBufferStr(&sql, " VERBOSE");
-           if (and_analyze)
-               appendPQExpBufferStr(&sql, " ANALYZE");
+           if (vacopts->full)
+               appendPQExpBufferStr(sql, " FULL");
+           if (vacopts->freeze)
+               appendPQExpBufferStr(sql, " FREEZE");
+           if (vacopts->verbose)
+               appendPQExpBufferStr(sql, " VERBOSE");
+           if (vacopts->and_analyze)
+               appendPQExpBufferStr(sql, " ANALYZE");
        }
    }
+
    if (table)
-       appendPQExpBuffer(&sql, " %s", table);
-   appendPQExpBufferStr(&sql, ";");
+       appendPQExpBuffer(sql, " %s", table);
+   appendPQExpBufferChar(sql, ';');
+}
 
-   if (analyze_in_stages)
+/*
+ * Execute a vacuum/analyze command to the server.
+ *
+ * Result status is checked only if 'async' is false.
+ */
+static void
+run_vacuum_command(PGconn *conn, const char *sql, bool echo,
+                  const char *dbname, const char *table,
+                  const char *progname, bool async)
+{
+   if (async)
+   {
+       if (echo)
+           printf("%s\n", sql);
+
+       PQsendQuery(conn, sql);
+   }
+   else if (!executeMaintenanceCommand(conn, sql, echo))
+   {
+       if (table)
+           fprintf(stderr,
+           _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
+                   progname, table, dbname, PQerrorMessage(conn));
+       else
+           fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+                   progname, dbname, PQerrorMessage(conn));
+       PQfinish(conn);
+       exit(1);
+   }
+}
+
+/*
+ * GetIdleSlot
+ *     Return a connection slot that is ready to execute a command.
+ *
+ * We return the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available.  When
+ * this happens, we read the whole set and mark as free all sockets that become
+ * available.
+ *
+ * Process the slot list, if any free slot is available then return the slotid
+ * else perform the select on all the socket's and wait until at least one slot
+ * becomes available.
+ *
+ * If an error occurs, NULL is returned.
+ */
+static ParallelSlot *
+GetIdleSlot(ParallelSlot slots[], int numslots, const char *dbname,
+           const char *progname)
+{
+   int         i;
+   int         firstFree = -1;
+   fd_set      slotset;
+   pgsocket    maxFd;
+
+   for (i = 0; i < numslots; i++)
+       if ((slots + i)->isFree)
+           return slots + i;
+
+   FD_ZERO(&slotset);
+
+   maxFd = slots->sock;
+   for (i = 0; i < numslots; i++)
+   {
+       FD_SET((slots + i)->sock, &slotset);
+       if ((slots + i)->sock > maxFd)
+           maxFd = (slots + i)->sock;
+   }
+
+   /*
+    * No free slot found, so wait until one of the connections has finished
+    * its task and return the available slot.
+    */
+   for (firstFree = -1; firstFree < 0;)
    {
-       const char *stage_commands[] = {
-           "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
-           "SET default_statistics_target=10; RESET vacuum_cost_delay;",
-           "RESET default_statistics_target;"
-       };
-       const char *stage_messages[] = {
-           gettext_noop("Generating minimal optimizer statistics (1 target)"),
-           gettext_noop("Generating medium optimizer statistics (10 targets)"),
-           gettext_noop("Generating default (full) optimizer statistics")
-       };
-
-       if (stage == -1)
+       bool        aborting;
+
+       SetCancelConn(slots->connection);
+       i = select_loop(maxFd, &slotset, &aborting);
+       ResetCancelConn();
+
+       if (aborting)
        {
-           int     i;
+           /*
+            * We set the cancel-receiving connection to the one in the zeroth
+            * slot above, so fetch the error from there.
+            */
+           GetQueryResult(slots->connection, dbname, progname);
+           return NULL;
+       }
+       Assert(i != 0);
 
-           /* Run all stages. */
-           for (i = 0; i < 3; i++)
-           {
-               if (!quiet)
-               {
-                   puts(gettext(stage_messages[i]));
-                   fflush(stdout);
-               }
-               executeCommand(conn, stage_commands[i], progname, echo);
-               run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
-           }
+       for (i = 0; i < numslots; i++)
+       {
+           if (!FD_ISSET((slots + i)->sock, &slotset))
+               continue;
+
+           PQconsumeInput((slots + i)->connection);
+           if (PQisBusy((slots + i)->connection))
+               continue;
+
+           (slots + i)->isFree = true;
+
+           if (!GetQueryResult((slots + i)->connection, dbname, progname))
+               return NULL;
+
+           if (firstFree < 0)
+               firstFree = i;
        }
-       else
+   }
+
+   return slots + firstFree;
+}
+
+/*
+ * GetQueryResult
+ *
+ * Process the query result.  Returns true if there's no error, false
+ * otherwise -- but errors about trying to vacuum a missing relation are
+ * reported and subsequently ignored.
+ */
+static bool
+GetQueryResult(PGconn *conn, const char *dbname, const char *progname)
+{
+   PGresult   *result;
+
+   SetCancelConn(conn);
+   while ((result = PQgetResult(conn)) != NULL)
+   {
+       /*
+        * If errors are found, report them.  Errors about a missing table are
+        * harmless so we continue processing; but die for other errors.
+        */
+       if (PQresultStatus(result) != PGRES_COMMAND_OK)
        {
-           /* Otherwise, we got a stage from vacuum_all_databases(), so run
-            * only that one. */
-           if (!quiet)
+           char       *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+           fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+                   progname, dbname, PQerrorMessage(conn));
+
+           if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
            {
-               puts(gettext(stage_messages[stage]));
-               fflush(stdout);
+               PQclear(result);
+               return false;
            }
-           executeCommand(conn, stage_commands[stage], progname, echo);
-           run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
        }
 
+       PQclear(result);
    }
-   else
-       run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname);
+   ResetCancelConn();
 
-   PQfinish(conn);
-   termPQExpBuffer(&sql);
+   return true;
 }
 
-
+/*
+ * DisconnectDatabase
+ *     Disconnect the connection associated with the given slot
+ */
 static void
-vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
-            bool analyze_in_stages, bool freeze, const char *maintenance_db,
-                    const char *host, const char *port,
-                    const char *username, enum trivalue prompt_password,
-                    const char *progname, bool echo, bool quiet)
+DisconnectDatabase(ParallelSlot *slot)
 {
-   PGconn     *conn;
-   PGresult   *result;
-   int         stage;
+   char        errbuf[256];
 
-   conn = connectMaintenanceDatabase(maintenance_db, host, port,
-                                     username, prompt_password, progname);
-   result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
-   PQfinish(conn);
+   if (!slot->connection)
+       return;
 
-   /* If analyzing in stages, then run through all stages.  Otherwise just
-    * run once, passing -1 as the stage. */
-   for (stage = (analyze_in_stages ? 0 : -1);
-        stage < (analyze_in_stages ? 3 : 0);
-        stage++)
+   if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
    {
-       int         i;
+       PGcancel   *cancel;
 
-       for (i = 0; i < PQntuples(result); i++)
+       if ((cancel = PQgetCancel(slot->connection)))
        {
-           char       *dbname = PQgetvalue(result, i, 0);
+           PQcancel(cancel, errbuf, sizeof(errbuf));
+           PQfreeCancel(cancel);
+       }
+   }
 
-           if (!quiet)
-           {
-               printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
-               fflush(stdout);
-           }
+   PQfinish(slot->connection);
+   slot->connection = NULL;
+}
 
-           vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
-                               analyze_in_stages, stage,
-                           freeze, NULL, host, port, username, prompt_password,
-                               progname, echo, quiet);
+/*
+ * Loop on select() until a descriptor from the given set becomes readable.
+ *
+ * If we get a cancel request while we're waiting, we forego all further
+ * processing and set the *aborting flag to true.  The return value must be
+ * ignored in this case.  Otherwise, *aborting is set to false.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset, bool *aborting)
+{
+   int         i;
+   fd_set      saveSet = *workerset;
+
+   if (CancelRequested)
+   {
+       *aborting = true;
+       return -1;
+   }
+   else
+       *aborting = false;
+
+   for (;;)
+   {
+       /*
+        * On Windows, we need to check once in a while for cancel requests;
+        * on other platforms we rely on select() returning when interrupted.
+        */
+       struct timeval *tvp;
+#ifdef WIN32
+       struct timeval tv = {0, 1000000};
+
+       tvp = &tv;
+#else
+       tvp = NULL;
+#endif
+
+       *workerset = saveSet;
+       i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+       if (i == SOCKET_ERROR)
+       {
+           i = -1;
+
+           if (WSAGetLastError() == WSAEINTR)
+               errno == EINTR;
        }
+#endif
+
+       if (i < 0 && errno == EINTR)
+           continue;           /* ignore this */
+       if (i < 0 || CancelRequested)
+           *aborting = true;   /* but not this */
+       if (i == 0)
+           continue;           /* timeout (Win32 only) */
+       break;
    }
 
-   PQclear(result);
+   return i;
 }
 
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+   slot->connection = conn;
+   slot->isFree = true;
+   slot->sock = PQsocket(conn);
+}
 
 static void
 help(const char *progname)
@@ -436,6 +926,7 @@ help(const char *progname)
    printf(_("  -V, --version                   output version information, then exit\n"));
    printf(_("  -z, --analyze                   update optimizer statistics\n"));
    printf(_("  -Z, --analyze-only              only update optimizer statistics\n"));
+   printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
    printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
           "                                  stages for faster results\n"));
    printf(_("  -?, --help                      show this help, then exit\n"));