Generalize parallel slot result handling.
authorRobert Haas <rhaas@postgresql.org>
Fri, 5 Feb 2021 21:08:45 +0000 (16:08 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 5 Feb 2021 21:08:45 +0000 (16:08 -0500)
Instead of having a hard-coded behavior that we ignore missing
tables and report all other errors, let the caller decide what
to do by setting a callback.

Mark Dilger, reviewed and somewhat revised by me. The larger patch
series of which this is a part has also had review from Peter
Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul
Sul, but I don't know whether any of them have reviewed this bit
specifically.

Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com
Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com
Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com

src/bin/scripts/reindexdb.c
src/bin/scripts/vacuumdb.c
src/fe_utils/parallel_slot.c
src/include/fe_utils/parallel_slot.h

index 7781fb1151ab4942ced4f1a5b7466556a10f21f7..9f072ac49aea95f854e28510b44661a4f51c5adf 100644 (file)
@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
            goto finish;
        }
 
+       ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
        run_reindex_command(free_slot->connection, process_type, objname,
                            echo, verbose, concurrently, true);
 
index ed320817bc4feac6e90cac8f2bdaff6572d91810..9dc8aca29f351fb08a2cec8dcfd8a35d8055bc0e 100644 (file)
@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
         * Execute the vacuum.  All errors are handled in processQueryResult
         * through ParallelSlotsGetIdle.
         */
+       ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
        run_vacuum_command(free_slot->connection, sql.data,
                           echo, tabname);
 
index 3987a4702b5255a6258d1b4ade4a86cf810b7c23..b625deb2545a12fec6e22e2b913ebe4ce44878f8 100644 (file)
@@ -30,7 +30,7 @@
 
 static void init_slot(ParallelSlot *slot, PGconn *conn);
 static int select_loop(int maxFd, fd_set *workerset);
-static bool processQueryResult(PGconn *conn, PGresult *result);
+static bool processQueryResult(ParallelSlot *slot, PGresult *result);
 
 static void
 init_slot(ParallelSlot *slot, PGconn *conn)
@@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn)
    slot->connection = conn;
    /* Initially assume connection is idle */
    slot->isFree = true;
+   ParallelSlotClearHandler(slot);
 }
 
 /*
- * Process (and delete) a query result.  Returns true if there's no error,
- * false otherwise -- but errors about trying to work on a missing relation
- * are reported and subsequently ignored.
+ * Process (and delete) a query result.  Returns true if there's no problem,
+ * false otherwise. It's up to the handler to decide what cosntitutes a
+ * problem.
  */
 static bool
-processQueryResult(PGconn *conn, PGresult *result)
+processQueryResult(ParallelSlot *slot, PGresult *result)
 {
-   /*
-    * If it's an error, report it.  Errors about a missing table are harmless
-    * so we continue processing; but die for other errors.
-    */
-   if (PQresultStatus(result) != PGRES_COMMAND_OK)
-   {
-       char       *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+   Assert(slot->handler != NULL);
 
-       pg_log_error("processing of database \"%s\" failed: %s",
-                    PQdb(conn), PQerrorMessage(conn));
-
-       if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
-       {
-           PQclear(result);
-           return false;
-       }
-   }
+   /* On failure, the handler should return NULL after freeing the result */
+   if (!slot->handler(result, slot->connection, slot->handler_context))
+       return false;
 
+   /* Ok, we have to free it ourself */
    PQclear(result);
    return true;
 }
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
  * Note that this will block if the connection is busy.
  */
 static bool
-consumeQueryResult(PGconn *conn)
+consumeQueryResult(ParallelSlot *slot)
 {
    bool        ok = true;
    PGresult   *result;
 
-   SetCancelConn(conn);
-   while ((result = PQgetResult(conn)) != NULL)
+   SetCancelConn(slot->connection);
+   while ((result = PQgetResult(slot->connection)) != NULL)
    {
-       if (!processQueryResult(conn, result))
+       if (!processQueryResult(slot, result))
            ok = false;
    }
    ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
 
                if (result != NULL)
                {
-                   /* Check and discard the command result */
-                   if (!processQueryResult(slots[i].connection, result))
+                   /* Handle and discard the command result */
+                   if (!processQueryResult(slots + i, result))
                        return NULL;
                }
                else
                {
                    /* This connection has become idle */
                    slots[i].isFree = true;
+                   ParallelSlotClearHandler(slots + i);
                    if (firstFree < 0)
                        firstFree = i;
                    break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
 
    for (i = 0; i < numslots; i++)
    {
-       if (!consumeQueryResult((slots + i)->connection))
+       if (!consumeQueryResult(slots + i))
+           return false;
+   }
+
+   return true;
+}
+
+/*
+ * TableCommandResultHandler
+ *
+ * ParallelSlotResultHandler for results of commands (not queries) against
+ * tables.
+ *
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
+ * a missing table.  This is useful for utilities that compile a list of tables
+ * to process and then run commands (vacuum, reindex, or whatever) against
+ * those tables, as there is a race condition between the time the list is
+ * compiled and the time the command attempts to open the table.
+ *
+ * For missing tables, logs an error but allows processing to continue.
+ *
+ * For all other errors, logs an error and terminates further processing.
+ *
+ * res: PGresult from the query executed on the slot's connection
+ * conn: connection belonging to the slot
+ * context: unused
+ */
+bool
+TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
+{
+   /*
+    * If it's an error, report it.  Errors about a missing table are harmless
+    * so we continue processing; but die for other errors.
+    */
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+   {
+       char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+       pg_log_error("processing of database \"%s\" failed: %s",
+                    PQdb(conn), PQerrorMessage(conn));
+
+       if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+       {
+           PQclear(res);
            return false;
+       }
    }
 
    return true;
index 99eeb3328d60879cc9185f39de38c689e2ad9e65..8902f8d4f4816a13346a37bbd233d0d6f609e158 100644 (file)
 #include "fe_utils/connect_utils.h"
 #include "libpq-fe.h"
 
+typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
+                                          void *context);
+
 typedef struct ParallelSlot
 {
    PGconn     *connection;     /* One connection */
    bool        isFree;         /* Is it known to be idle? */
+
+   /*
+    * Prior to issuing a command or query on 'connection', a handler callback
+    * function may optionally be registered to be invoked to process the
+    * results, and context information may optionally be registered for use
+    * by the handler.  If unset, these fields should be NULL.
+    */
+   ParallelSlotResultHandler handler;
+   void       *handler_context;
 } ParallelSlot;
 
+static inline void
+ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
+                      void *context)
+{
+   slot->handler = handler;
+   slot->handler_context = context;
+}
+
+static inline void
+ParallelSlotClearHandler(ParallelSlot *slot)
+{
+   slot->handler = NULL;
+   slot->handler_context = NULL;
+}
+
 extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
 
 extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
 
 extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
 
+extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
+                                     void *context);
 
 #endif                         /* PARALLEL_SLOT_H */