static void init_slot(ParallelSlot *slot, PGconn *conn);
static int select_loop(int maxFd, fd_set *workerset);
-static bool processQueryResult(PGconn *conn, PGresult *result);
+static bool processQueryResult(ParallelSlot *slot, PGresult *result);
static void
init_slot(ParallelSlot *slot, PGconn *conn)
slot->connection = conn;
/* Initially assume connection is idle */
slot->isFree = true;
+ ParallelSlotClearHandler(slot);
}
/*
- * Process (and delete) a query result. Returns true if there's no error,
- * false otherwise -- but errors about trying to work on a missing relation
- * are reported and subsequently ignored.
+ * Process (and delete) a query result. Returns true if there's no problem,
+ * false otherwise. It's up to the handler to decide what cosntitutes a
+ * problem.
*/
static bool
-processQueryResult(PGconn *conn, PGresult *result)
+processQueryResult(ParallelSlot *slot, PGresult *result)
{
- /*
- * If it's an error, report it. Errors about a missing table are harmless
- * so we continue processing; but die for other errors.
- */
- if (PQresultStatus(result) != PGRES_COMMAND_OK)
- {
- char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+ Assert(slot->handler != NULL);
- pg_log_error("processing of database \"%s\" failed: %s",
- PQdb(conn), PQerrorMessage(conn));
-
- if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
- {
- PQclear(result);
- return false;
- }
- }
+ /* On failure, the handler should return NULL after freeing the result */
+ if (!slot->handler(result, slot->connection, slot->handler_context))
+ return false;
+ /* Ok, we have to free it ourself */
PQclear(result);
return true;
}
* Note that this will block if the connection is busy.
*/
static bool
-consumeQueryResult(PGconn *conn)
+consumeQueryResult(ParallelSlot *slot)
{
bool ok = true;
PGresult *result;
- SetCancelConn(conn);
- while ((result = PQgetResult(conn)) != NULL)
+ SetCancelConn(slot->connection);
+ while ((result = PQgetResult(slot->connection)) != NULL)
{
- if (!processQueryResult(conn, result))
+ if (!processQueryResult(slot, result))
ok = false;
}
ResetCancelConn();
if (result != NULL)
{
- /* Check and discard the command result */
- if (!processQueryResult(slots[i].connection, result))
+ /* Handle and discard the command result */
+ if (!processQueryResult(slots + i, result))
return NULL;
}
else
{
/* This connection has become idle */
slots[i].isFree = true;
+ ParallelSlotClearHandler(slots + i);
if (firstFree < 0)
firstFree = i;
break;
for (i = 0; i < numslots; i++)
{
- if (!consumeQueryResult((slots + i)->connection))
+ if (!consumeQueryResult(slots + i))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * TableCommandResultHandler
+ *
+ * ParallelSlotResultHandler for results of commands (not queries) against
+ * tables.
+ *
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
+ * a missing table. This is useful for utilities that compile a list of tables
+ * to process and then run commands (vacuum, reindex, or whatever) against
+ * those tables, as there is a race condition between the time the list is
+ * compiled and the time the command attempts to open the table.
+ *
+ * For missing tables, logs an error but allows processing to continue.
+ *
+ * For all other errors, logs an error and terminates further processing.
+ *
+ * res: PGresult from the query executed on the slot's connection
+ * conn: connection belonging to the slot
+ * context: unused
+ */
+bool
+TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
+{
+ /*
+ * If it's an error, report it. Errors about a missing table are harmless
+ * so we continue processing; but die for other errors.
+ */
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+ pg_log_error("processing of database \"%s\" failed: %s",
+ PQdb(conn), PQerrorMessage(conn));
+
+ if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+ {
+ PQclear(res);
return false;
+ }
}
return true;
#include "fe_utils/connect_utils.h"
#include "libpq-fe.h"
+typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
+ void *context);
+
typedef struct ParallelSlot
{
PGconn *connection; /* One connection */
bool isFree; /* Is it known to be idle? */
+
+ /*
+ * Prior to issuing a command or query on 'connection', a handler callback
+ * function may optionally be registered to be invoked to process the
+ * results, and context information may optionally be registered for use
+ * by the handler. If unset, these fields should be NULL.
+ */
+ ParallelSlotResultHandler handler;
+ void *handler_context;
} ParallelSlot;
+static inline void
+ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
+ void *context)
+{
+ slot->handler = handler;
+ slot->handler_context = context;
+}
+
+static inline void
+ParallelSlotClearHandler(ParallelSlot *slot)
+{
+ slot->handler = NULL;
+ slot->handler_context = NULL;
+}
+
extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
+extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
+ void *context);
#endif /* PARALLEL_SLOT_H */