Improve connection-failure error handling in contrib/postgres_fdw.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 4 Feb 2014 02:30:02 +0000 (21:30 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 4 Feb 2014 02:30:20 +0000 (21:30 -0500)
postgres_fdw tended to say "unknown error" if it tried to execute a command
on an already-dead connection, because some paths in libpq just return a
null PGresult for such cases.  Out-of-memory might result in that, too.
To fix, pass the PGconn to pgfdw_report_error, and look at its
PQerrorMessage() string if we can't get anything out of the PGresult.

Also, fix the transaction-exit logic to reliably drop a dead connection.
It was attempting to do that already, but it assumed that only connection
cache entries with xact_depth > 0 needed to be examined.  The folly in that
is that if we fail while issuing START TRANSACTION, we'll not have bumped
xact_depth.  (At least for the case I was testing, this fix masks the
other problem; but it still seems like a good idea to have the PGconn
fallback logic.)

Per investigation of bug #9087 from Craig Lucas.  Backpatch to 9.3 where
this code was introduced.

contrib/postgres_fdw/connection.c
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h

index 93ea498012609c1fdba918f3a29d964b14e0baf5..b688241283805d7d0c57375c90ccd610967def5e 100644 (file)
@@ -355,7 +355,7 @@ do_sql_command(PGconn *conn, const char *sql)
 
    res = PQexec(conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       pgfdw_report_error(ERROR, res, true, sql);
+       pgfdw_report_error(ERROR, res, conn, true, sql);
    PQclear(res);
 }
 
@@ -454,6 +454,7 @@ GetPrepStmtNumber(PGconn *conn)
  *
  * elevel: error level to use (typically ERROR, but might be less)
  * res: PGresult containing the error
+ * conn: connection we did the query on
  * clear: if true, PQclear the result (otherwise caller will handle it)
  * sql: NULL, or text of remote command we tried to execute
  *
@@ -462,7 +463,8 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
+pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+                  bool clear, const char *sql)
 {
    /* If requested, PGresult must be released before leaving this function. */
    PG_TRY();
@@ -483,6 +485,14 @@ pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
        else
            sqlstate = ERRCODE_CONNECTION_FAILURE;
 
+       /*
+        * If we don't get a message from the PGresult, try the PGconn.  This
+        * is needed because for connection-level failures, PQexec may just
+        * return NULL, not a PGresult at all.
+        */
+       if (message_primary == NULL)
+           message_primary = PQerrorMessage(conn);
+
        ereport(elevel,
                (errcode(sqlstate),
                 message_primary ? errmsg_internal("%s", message_primary) :
@@ -525,74 +535,37 @@ pgfdw_xact_callback(XactEvent event, void *arg)
    {
        PGresult   *res;
 
-       /* We only care about connections with open remote transactions */
-       if (entry->conn == NULL || entry->xact_depth == 0)
+       /* Ignore cache entry if no open connection right now */
+       if (entry->conn == NULL)
            continue;
 
-       elog(DEBUG3, "closing remote transaction on connection %p",
-            entry->conn);
-
-       switch (event)
+       /* If it has an open remote transaction, try to close it */
+       if (entry->xact_depth > 0)
        {
-           case XACT_EVENT_PRE_COMMIT:
-               /* Commit all remote transactions during pre-commit */
-               do_sql_command(entry->conn, "COMMIT TRANSACTION");
-
-               /*
-                * If there were any errors in subtransactions, and we made
-                * prepared statements, do a DEALLOCATE ALL to make sure we
-                * get rid of all prepared statements.  This is annoying and
-                * not terribly bulletproof, but it's probably not worth
-                * trying harder.
-                *
-                * DEALLOCATE ALL only exists in 8.3 and later, so this
-                * constrains how old a server postgres_fdw can communicate
-                * with.  We intentionally ignore errors in the DEALLOCATE, so
-                * that we can hobble along to some extent with older servers
-                * (leaking prepared statements as we go; but we don't really
-                * support update operations pre-8.3 anyway).
-                */
-               if (entry->have_prep_stmt && entry->have_error)
-               {
-                   res = PQexec(entry->conn, "DEALLOCATE ALL");
-                   PQclear(res);
-               }
-               entry->have_prep_stmt = false;
-               entry->have_error = false;
-               break;
-           case XACT_EVENT_PRE_PREPARE:
-
-               /*
-                * We disallow remote transactions that modified anything,
-                * since it's not really reasonable to hold them open until
-                * the prepared transaction is committed.  For the moment,
-                * throw error unconditionally; later we might allow read-only
-                * cases.  Note that the error will cause us to come right
-                * back here with event == XACT_EVENT_ABORT, so we'll clean up
-                * the connection state at that point.
-                */
-               ereport(ERROR,
-                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("cannot prepare a transaction that modified remote tables")));
-               break;
-           case XACT_EVENT_COMMIT:
-           case XACT_EVENT_PREPARE:
-               /* Should not get here -- pre-commit should have handled it */
-               elog(ERROR, "missed cleaning up connection during pre-commit");
-               break;
-           case XACT_EVENT_ABORT:
-               /* Assume we might have lost track of prepared statements */
-               entry->have_error = true;
-               /* If we're aborting, abort all remote transactions too */
-               res = PQexec(entry->conn, "ABORT TRANSACTION");
-               /* Note: can't throw ERROR, it would be infinite loop */
-               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                   pgfdw_report_error(WARNING, res, true,
-                                      "ABORT TRANSACTION");
-               else
-               {
-                   PQclear(res);
-                   /* As above, make sure we've cleared any prepared stmts */
+           elog(DEBUG3, "closing remote transaction on connection %p",
+                entry->conn);
+
+           switch (event)
+           {
+               case XACT_EVENT_PRE_COMMIT:
+                   /* Commit all remote transactions during pre-commit */
+                   do_sql_command(entry->conn, "COMMIT TRANSACTION");
+
+                   /*
+                    * If there were any errors in subtransactions, and we
+                    * made prepared statements, do a DEALLOCATE ALL to make
+                    * sure we get rid of all prepared statements. This is
+                    * annoying and not terribly bulletproof, but it's
+                    * probably not worth trying harder.
+                    *
+                    * DEALLOCATE ALL only exists in 8.3 and later, so this
+                    * constrains how old a server postgres_fdw can
+                    * communicate with.  We intentionally ignore errors in
+                    * the DEALLOCATE, so that we can hobble along to some
+                    * extent with older servers (leaking prepared statements
+                    * as we go; but we don't really support update operations
+                    * pre-8.3 anyway).
+                    */
                    if (entry->have_prep_stmt && entry->have_error)
                    {
                        res = PQexec(entry->conn, "DEALLOCATE ALL");
@@ -600,8 +573,50 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                    }
                    entry->have_prep_stmt = false;
                    entry->have_error = false;
-               }
-               break;
+                   break;
+               case XACT_EVENT_PRE_PREPARE:
+
+                   /*
+                    * We disallow remote transactions that modified anything,
+                    * since it's not very reasonable to hold them open until
+                    * the prepared transaction is committed.  For the moment,
+                    * throw error unconditionally; later we might allow
+                    * read-only cases.  Note that the error will cause us to
+                    * come right back here with event == XACT_EVENT_ABORT, so
+                    * we'll clean up the connection state at that point.
+                    */
+                   ereport(ERROR,
+                           (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                            errmsg("cannot prepare a transaction that modified remote tables")));
+                   break;
+               case XACT_EVENT_COMMIT:
+               case XACT_EVENT_PREPARE:
+                   /* Pre-commit should have closed the open transaction */
+                   elog(ERROR, "missed cleaning up connection during pre-commit");
+                   break;
+               case XACT_EVENT_ABORT:
+                   /* Assume we might have lost track of prepared statements */
+                   entry->have_error = true;
+                   /* If we're aborting, abort all remote transactions too */
+                   res = PQexec(entry->conn, "ABORT TRANSACTION");
+                   /* Note: can't throw ERROR, it would be infinite loop */
+                   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       pgfdw_report_error(WARNING, res, entry->conn, true,
+                                          "ABORT TRANSACTION");
+                   else
+                   {
+                       PQclear(res);
+                       /* As above, make sure to clear any prepared stmts */
+                       if (entry->have_prep_stmt && entry->have_error)
+                       {
+                           res = PQexec(entry->conn, "DEALLOCATE ALL");
+                           PQclear(res);
+                       }
+                       entry->have_prep_stmt = false;
+                       entry->have_error = false;
+                   }
+                   break;
+           }
        }
 
        /* Reset state to show we're out of a transaction */
@@ -689,7 +704,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
                     curlevel, curlevel);
            res = PQexec(entry->conn, sql);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(WARNING, res, true, sql);
+               pgfdw_report_error(WARNING, res, entry->conn, true, sql);
            else
                PQclear(res);
        }
index ae3ab000e1f34a4a665676d869713b7f396d5dcf..fde1ec13617d80ead676860a57a07b7c648575a4 100644 (file)
@@ -1040,7 +1040,7 @@ postgresReScanForeignScan(ForeignScanState *node)
     */
    res = PQexec(fsstate->conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       pgfdw_report_error(ERROR, res, true, sql);
+       pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
    PQclear(res);
 
    /* Now force a fresh FETCH. */
@@ -1374,7 +1374,7 @@ postgresExecForeignInsert(EState *estate,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-       pgfdw_report_error(ERROR, res, true, fmstate->query);
+       pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
@@ -1444,7 +1444,7 @@ postgresExecForeignUpdate(EState *estate,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-       pgfdw_report_error(ERROR, res, true, fmstate->query);
+       pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
@@ -1514,7 +1514,7 @@ postgresExecForeignDelete(EState *estate,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-       pgfdw_report_error(ERROR, res, true, fmstate->query);
+       pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
@@ -1563,7 +1563,7 @@ postgresEndForeignModify(EState *estate,
         */
        res = PQexec(fmstate->conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           pgfdw_report_error(ERROR, res, true, sql);
+           pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
        PQclear(res);
        fmstate->p_name = NULL;
    }
@@ -1800,7 +1800,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
         */
        res = PQexec(conn, sql);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-           pgfdw_report_error(ERROR, res, false, sql);
+           pgfdw_report_error(ERROR, res, conn, false, sql);
 
        /*
         * Extract cost numbers for topmost plan node.  Note we search for a
@@ -1934,7 +1934,7 @@ create_cursor(ForeignScanState *node)
    res = PQexecParams(conn, buf.data, numParams, NULL, values,
                       NULL, NULL, 0);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       pgfdw_report_error(ERROR, res, true, fsstate->query);
+       pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
    PQclear(res);
 
    /* Mark the cursor as created, and show no tuples have been retrieved */
@@ -1985,7 +1985,7 @@ fetch_more_data(ForeignScanState *node)
        res = PQexec(conn, sql);
        /* On error, report the original query, not the FETCH. */
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-           pgfdw_report_error(ERROR, res, false, fsstate->query);
+           pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
        /* Convert the data into HeapTuples */
        numrows = PQntuples(res);
@@ -2091,7 +2091,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
     */
    res = PQexec(conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       pgfdw_report_error(ERROR, res, true, sql);
+       pgfdw_report_error(ERROR, res, conn, true, sql);
    PQclear(res);
 }
 
@@ -2128,7 +2128,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
                    NULL);
 
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       pgfdw_report_error(ERROR, res, true, fmstate->query);
+       pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
    PQclear(res);
 
    /* This action shows that the prepare has been done. */
@@ -2278,7 +2278,7 @@ postgresAnalyzeForeignTable(Relation relation,
    {
        res = PQexec(conn, sql.data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-           pgfdw_report_error(ERROR, res, false, sql.data);
+           pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
        if (PQntuples(res) != 1 || PQnfields(res) != 1)
            elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
@@ -2372,7 +2372,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
    {
        res = PQexec(conn, sql.data);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           pgfdw_report_error(ERROR, res, false, sql.data);
+           pgfdw_report_error(ERROR, res, conn, false, sql.data);
        PQclear(res);
        res = NULL;
 
@@ -2403,7 +2403,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
            res = PQexec(conn, fetch_sql);
            /* On error, report the original query, not the FETCH. */
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
-               pgfdw_report_error(ERROR, res, false, sql.data);
+               pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
            /* Process whatever we got. */
            numrows = PQntuples(res);
index 85fc25aaef4a1bcd0050bd1e679efb1abc2d46ee..228345d78644df7972d95e5d4129e4920d6e6874 100644 (file)
@@ -30,8 +30,8 @@ extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, bool clear,
-                  const char *sql);
+extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+                  bool clear, const char *sql);
 
 /* in option.c */
 extern int ExtractConnectionOptions(List *defelems,