GetConnection(UserMapping *user, bool will_prep_stmt)
{
bool found;
+ volatile bool retry_conn = false;
ConnCacheEntry *entry;
ConnCacheKey key;
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
+retry:
+
/*
* If the connection needs to be remade due to invalidation, disconnect as
- * soon as we're out of all transactions.
+ * soon as we're out of all transactions. Also, if previous attempt to
+ * start new remote transaction failed on the cached connection,
+ * disconnect it to retry a new connection.
*/
- if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+ if ((entry->conn != NULL && entry->invalidated &&
+ entry->xact_depth == 0) || retry_conn)
{
- elog(DEBUG3, "closing connection %p for option changes to take effect",
- entry->conn);
+ if (retry_conn)
+ elog(DEBUG3, "closing connection %p to reestablish a new one",
+ entry->conn);
+ else
+ elog(DEBUG3, "closing connection %p for option changes to take effect",
+ entry->conn);
disconnect_pg_server(entry);
}
- /*
- * We don't check the health of cached connection here, because it would
- * require some overhead. Broken connection will be detected when the
- * connection is actually used.
- */
-
/*
* If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_pg_server throws an error, the cache entry
}
/*
- * Start a new transaction or subtransaction if needed.
+ * We check the health of the cached connection here when starting a new
+ * remote transaction. If a broken connection is detected in the first
+ * attempt, we try to reestablish a new connection. If broken connection
+ * is detected again here, we give up getting a connection.
*/
- begin_remote_xact(entry);
+ PG_TRY();
+ {
+ /* Start a new transaction or subtransaction if needed. */
+ begin_remote_xact(entry);
+ retry_conn = false;
+ }
+ PG_CATCH();
+ {
+ if (PQstatus(entry->conn) != CONNECTION_BAD ||
+ entry->xact_depth > 0 ||
+ retry_conn)
+ PG_RE_THROW();
+ retry_conn = true;
+ }
+ PG_END_TRY();
+
+ if (retry_conn)
+ {
+ ereport(DEBUG3,
+ (errmsg_internal("could not start remote transaction on connection %p",
+ entry->conn)),
+ errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
+ goto retry;
+ }
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables
ROLLBACK;
WARNING: there is no transaction in progress
+-- ===================================================================
+-- reestablish new connection
+-- ===================================================================
+-- Terminate the backend having the specified application_name and wait for
+-- the termination to complete.
+CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
+BEGIN
+ PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
+ WHERE application_name = appname;
+ LOOP
+ PERFORM * FROM pg_stat_activity WHERE application_name = appname;
+ EXIT WHEN NOT FOUND;
+ PERFORM pg_sleep(1), pg_stat_clear_snapshot();
+ END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+-- Change application_name of remote connection to special one
+-- so that we can easily terminate the connection later.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+-- Terminate the remote connection.
+CALL terminate_backend_and_wait('fdw_retry_check');
+-- This query should detect the broken connection when starting new remote
+-- transaction, reestablish new connection, and then succeed.
+BEGIN;
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+-- If the query detects the broken connection when starting new remote
+-- subtransaction, it doesn't reestablish new connection and should fail.
+CALL terminate_backend_and_wait('fdw_retry_check');
+SAVEPOINT s;
+SELECT 1 FROM ft1 LIMIT 1; -- should fail
+ERROR: server closed the connection unexpectedly
+ This probably means the server terminated abnormally
+ before or while processing the request.
+CONTEXT: remote SQL command: SAVEPOINT s2
+COMMIT;
+-- Clean up
+DROP PROCEDURE terminate_backend_and_wait(text);
-- error here
PREPARE TRANSACTION 'fdw_tpc';
ROLLBACK;
+
+-- ===================================================================
+-- reestablish new connection
+-- ===================================================================
+
+-- Terminate the backend having the specified application_name and wait for
+-- the termination to complete.
+CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
+BEGIN
+ PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
+ WHERE application_name = appname;
+ LOOP
+ PERFORM * FROM pg_stat_activity WHERE application_name = appname;
+ EXIT WHEN NOT FOUND;
+ PERFORM pg_sleep(1), pg_stat_clear_snapshot();
+ END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Change application_name of remote connection to special one
+-- so that we can easily terminate the connection later.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+SELECT 1 FROM ft1 LIMIT 1;
+
+-- Terminate the remote connection.
+CALL terminate_backend_and_wait('fdw_retry_check');
+
+-- This query should detect the broken connection when starting new remote
+-- transaction, reestablish new connection, and then succeed.
+BEGIN;
+SELECT 1 FROM ft1 LIMIT 1;
+
+-- If the query detects the broken connection when starting new remote
+-- subtransaction, it doesn't reestablish new connection and should fail.
+CALL terminate_backend_and_wait('fdw_retry_check');
+SAVEPOINT s;
+SELECT 1 FROM ft1 LIMIT 1; -- should fail
+COMMIT;
+
+-- Clean up
+DROP PROCEDURE terminate_backend_and_wait(text);