Skip to content

Commit d1483c2

Browse files
committed
Support timeouts in Connection.close() and Pool.release()
Connection.close() and Pool.release() each gained the new timeout parameter. The pool.acquire() context manager now applies the passed timeout to __aexit__() as well. Connection.close() is now actually graceful. Instead of simply dropping the connection, it attempts to cancel the running query (if any), asks the server to terminate the connection and waits for the connection to terminate. To test all this properly, implement a TCP proxy, which emulates sudden connectivity loss (i.e. packets not reaching the server). Closes: #220
1 parent d4c1b0f commit d1483c2

12 files changed

+582
-49
lines changed

.ci/appveyor.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ build_script:
3232
- "%PYTHON% setup.py build_ext --inplace"
3333

3434
test_script:
35-
- "%PYTHON% -m unittest discover -s tests"
35+
- "%PYTHON% setup.py test"
3636

3737
after_test:
3838
- "%PYTHON% setup.py bdist_wheel"

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*.ymlc~
66
*.scssc
77
*.so
8+
*.pyd
89
*~
910
.#*
1011
.DS_Store

asyncpg/_testbase.py renamed to asyncpg/_testbase/__init__.py

+66-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
from asyncpg import connection as pg_connection
2222
from asyncpg import pool as pg_pool
2323

24+
from . import fuzzer
25+
2426

2527
@contextlib.contextmanager
2628
def silence_asyncio_long_exec_warning():
@@ -36,7 +38,16 @@ def flt(log_record):
3638
logger.removeFilter(flt)
3739

3840

41+
def with_timeout(timeout):
42+
def wrap(func):
43+
func.__timeout__ = timeout
44+
return func
45+
46+
return wrap
47+
48+
3949
class TestCaseMeta(type(unittest.TestCase)):
50+
TEST_TIMEOUT = None
4051

4152
@staticmethod
4253
def _iter_methods(bases, ns):
@@ -64,7 +75,16 @@ def __new__(mcls, name, bases, ns):
6475
for methname, meth in mcls._iter_methods(bases, ns):
6576
@functools.wraps(meth)
6677
def wrapper(self, *args, __meth__=meth, **kwargs):
67-
self.loop.run_until_complete(__meth__(self, *args, **kwargs))
78+
coro = __meth__(self, *args, **kwargs)
79+
timeout = getattr(__meth__, '__timeout__', mcls.TEST_TIMEOUT)
80+
if timeout:
81+
coro = asyncio.wait_for(coro, timeout, loop=self.loop)
82+
try:
83+
self.loop.run_until_complete(coro)
84+
except asyncio.TimeoutError:
85+
raise self.failureException(
86+
'test timed out after {} seconds'.format(
87+
timeout)) from None
6888
ns[methname] = wrapper
6989

7090
return super().__new__(mcls, name, bases, ns)
@@ -169,7 +189,8 @@ def _start_default_cluster(server_settings={}, initdb_options=None):
169189

170190

171191
def _shutdown_cluster(cluster):
172-
cluster.stop()
192+
if cluster.get_status() == 'running':
193+
cluster.stop()
173194
cluster.destroy()
174195

175196

@@ -238,6 +259,49 @@ def start_cluster(cls, ClusterCls, *,
238259
server_settings, _get_initdb_options(initdb_options))
239260

240261

262+
class ProxiedClusterTestCase(ClusterTestCase):
263+
@classmethod
264+
def get_server_settings(cls):
265+
settings = dict(super().get_server_settings())
266+
settings['listen_addresses'] = '127.0.0.1'
267+
return settings
268+
269+
@classmethod
270+
def get_proxy_settings(cls):
271+
return {'fuzzing-mode': None}
272+
273+
@classmethod
274+
def setUpClass(cls):
275+
super().setUpClass()
276+
conn_spec = cls.cluster.get_connection_spec()
277+
host = conn_spec.get('host')
278+
if not host:
279+
host = '127.0.0.1'
280+
elif host.startswith('/'):
281+
host = '127.0.0.1'
282+
cls.proxy = fuzzer.TCPFuzzingProxy(
283+
backend_host=host,
284+
backend_port=conn_spec['port'],
285+
)
286+
cls.proxy.start()
287+
288+
@classmethod
289+
def tearDownClass(cls):
290+
cls.proxy.stop()
291+
super().tearDownClass()
292+
293+
@classmethod
294+
def get_connection_spec(cls, kwargs):
295+
conn_spec = super().get_connection_spec(kwargs)
296+
conn_spec['host'] = cls.proxy.listening_addr
297+
conn_spec['port'] = cls.proxy.listening_port
298+
return conn_spec
299+
300+
def tearDown(self):
301+
self.proxy.reset()
302+
super().tearDown()
303+
304+
241305
def with_connection_options(**options):
242306
if not options:
243307
raise ValueError('no connection options were specified')

0 commit comments

Comments
 (0)