Skip to content

Commit 4ddb039

Browse files
authored
Remove connection parameter caching in Pool (#1053)
Currently, `asyncpt.Pool` will cache various aspects of the connection, like the selected host and connection parameters in an attempt to make subsequent connection attempts somewhat faster. This behavior is dubious because server host availability and role may change, such as when a primary becomes a standby and vice-versa or when a host becomes unavailable permanently, but another host from the DSN can be picked up. Just remove it.
1 parent deea86c commit 4ddb039

File tree

2 files changed

+75
-37
lines changed

2 files changed

+75
-37
lines changed

asyncpg/pool.py

+9-37
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
from . import compat
1616
from . import connection
17-
from . import connect_utils
1817
from . import exceptions
1918
from . import protocol
2019

@@ -311,7 +310,6 @@ class Pool:
311310
__slots__ = (
312311
'_queue', '_loop', '_minsize', '_maxsize',
313312
'_init', '_connect_args', '_connect_kwargs',
314-
'_working_addr', '_working_config', '_working_params',
315313
'_holders', '_initialized', '_initializing', '_closing',
316314
'_closed', '_connection_class', '_record_class', '_generation',
317315
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
@@ -377,10 +375,6 @@ def __init__(self, *connect_args,
377375
self._initializing = False
378376
self._queue = None
379377

380-
self._working_addr = None
381-
self._working_config = None
382-
self._working_params = None
383-
384378
self._connection_class = connection_class
385379
self._record_class = record_class
386380

@@ -430,9 +424,8 @@ async def _initialize(self):
430424
# first few connections in the queue, therefore we want to walk
431425
# `self._holders` in reverse.
432426

433-
# Connect the first connection holder in the queue so that it
434-
# can record `_working_addr` and `_working_opts`, which will
435-
# speed up successive connection attempts.
427+
# Connect the first connection holder in the queue so that
428+
# any connection issues are visible early.
436429
first_ch = self._holders[-1] # type: PoolConnectionHolder
437430
await first_ch.connect()
438431

@@ -504,36 +497,15 @@ def set_connect_args(self, dsn=None, **connect_kwargs):
504497

505498
self._connect_args = [dsn]
506499
self._connect_kwargs = connect_kwargs
507-
self._working_addr = None
508-
self._working_config = None
509-
self._working_params = None
510500

511501
async def _get_new_connection(self):
512-
if self._working_addr is None:
513-
# First connection attempt on this pool.
514-
con = await connection.connect(
515-
*self._connect_args,
516-
loop=self._loop,
517-
connection_class=self._connection_class,
518-
record_class=self._record_class,
519-
**self._connect_kwargs)
520-
521-
self._working_addr = con._addr
522-
self._working_config = con._config
523-
self._working_params = con._params
524-
525-
else:
526-
# We've connected before and have a resolved address,
527-
# and parsed options and config.
528-
con = await connect_utils._connect_addr(
529-
loop=self._loop,
530-
addr=self._working_addr,
531-
timeout=self._working_params.connect_timeout,
532-
config=self._working_config,
533-
params=self._working_params,
534-
connection_class=self._connection_class,
535-
record_class=self._record_class,
536-
)
502+
con = await connection.connect(
503+
*self._connect_args,
504+
loop=self._loop,
505+
connection_class=self._connection_class,
506+
record_class=self._record_class,
507+
**self._connect_kwargs,
508+
)
537509

538510
if self._init is not None:
539511
try:

tests/test_pool.py

+66
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import inspect
1010
import os
11+
import pathlib
1112
import platform
1213
import random
1314
import textwrap
@@ -18,6 +19,7 @@
1819
from asyncpg import _testbase as tb
1920
from asyncpg import connection as pg_connection
2021
from asyncpg import pool as pg_pool
22+
from asyncpg import cluster as pg_cluster
2123

2224
_system = platform.uname().system
2325

@@ -969,6 +971,70 @@ async def worker():
969971
await pool.release(conn)
970972

971973

974+
@unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster')
975+
class TestPoolReconnectWithTargetSessionAttrs(tb.ClusterTestCase):
976+
977+
@classmethod
978+
def setup_cluster(cls):
979+
cls.cluster = cls.new_cluster(pg_cluster.TempCluster)
980+
cls.start_cluster(cls.cluster)
981+
982+
async def simulate_cluster_recovery_mode(self):
983+
port = self.cluster.get_connection_spec()['port']
984+
await self.loop.run_in_executor(
985+
None,
986+
lambda: self.cluster.stop()
987+
)
988+
989+
# Simulate recovery mode
990+
(pathlib.Path(self.cluster._data_dir) / 'standby.signal').touch()
991+
992+
await self.loop.run_in_executor(
993+
None,
994+
lambda: self.cluster.start(
995+
port=port,
996+
server_settings=self.get_server_settings(),
997+
)
998+
)
999+
1000+
async def test_full_reconnect_on_node_change_role(self):
1001+
if self.cluster.get_pg_version() < (12, 0):
1002+
self.skipTest("PostgreSQL < 12 cannot support standby.signal")
1003+
return
1004+
1005+
pool = await self.create_pool(
1006+
min_size=1,
1007+
max_size=1,
1008+
target_session_attrs='primary'
1009+
)
1010+
1011+
# Force a new connection to be created
1012+
await pool.fetchval('SELECT 1')
1013+
1014+
await self.simulate_cluster_recovery_mode()
1015+
1016+
# current pool connection info cache is expired,
1017+
# but we don't know it yet
1018+
with self.assertRaises(asyncpg.TargetServerAttributeNotMatched) as cm:
1019+
await pool.execute('SELECT 1')
1020+
1021+
self.assertEqual(
1022+
cm.exception.args[0],
1023+
"None of the hosts match the target attribute requirement "
1024+
"<SessionAttribute.primary: 'primary'>"
1025+
)
1026+
1027+
# force reconnect
1028+
with self.assertRaises(asyncpg.TargetServerAttributeNotMatched) as cm:
1029+
await pool.execute('SELECT 1')
1030+
1031+
self.assertEqual(
1032+
cm.exception.args[0],
1033+
"None of the hosts match the target attribute requirement "
1034+
"<SessionAttribute.primary: 'primary'>"
1035+
)
1036+
1037+
9721038
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
9731039
class TestHotStandby(tb.HotStandbyTestCase):
9741040
def create_pool(self, **kwargs):

0 commit comments

Comments
 (0)