Skip to content

Commit f4aa9bf

Browse files
un-defelprans
authored andcommitted
Fix _StatementCache.clear() PS memory leak
See #416
1 parent e91e491 commit f4aa9bf

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

asyncpg/connection.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -1792,13 +1792,18 @@ def iter_statements(self):
17921792
return (e._statement for e in self._entries.values())
17931793

17941794
def clear(self):
1795-
# First, make sure that we cancel all scheduled callbacks.
1796-
for entry in self._entries.values():
1797-
self._clear_entry_callback(entry)
1795+
# Store entries for later.
1796+
entries = tuple(self._entries.values())
17981797

17991798
# Clear the entries dict.
18001799
self._entries.clear()
18011800

1801+
# Make sure that we cancel all scheduled callbacks
1802+
# and call on_remove callback for each entry.
1803+
for entry in entries:
1804+
self._clear_entry_callback(entry)
1805+
self._on_remove(entry._statement)
1806+
18021807
def _set_entry_timeout(self, entry):
18031808
# Clear the existing timeout.
18041809
self._clear_entry_callback(entry)

tests/test_cache_invalidation.py

+77
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,20 @@
1313

1414

1515
class TestCacheInvalidation(tb.ConnectedTestCase):
16+
17+
def _get_cached_statements(self, connection=None):
18+
if connection is None:
19+
connection = self.con
20+
return list(connection._stmt_cache.iter_statements())
21+
22+
def _check_statements_are_not_closed(self, statements):
23+
self.assertGreater(len(statements), 0)
24+
self.assertTrue(all(not s.closed for s in statements))
25+
26+
def _check_statements_are_closed(self, statements):
27+
self.assertGreater(len(statements), 0)
28+
self.assertTrue(all(s.closed for s in statements))
29+
1630
async def test_prepare_cache_invalidation_silent(self):
1731
await self.con.execute('CREATE TABLE tab1(a int, b int)')
1832

@@ -21,11 +35,16 @@ async def test_prepare_cache_invalidation_silent(self):
2135
result = await self.con.fetchrow('SELECT * FROM tab1')
2236
self.assertEqual(result, (1, 2))
2337

38+
statements = self._get_cached_statements()
39+
self._check_statements_are_not_closed(statements)
40+
2441
await self.con.execute(
2542
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
2643

2744
result = await self.con.fetchrow('SELECT * FROM tab1')
2845
self.assertEqual(result, (1, '2'))
46+
47+
self._check_statements_are_closed(statements)
2948
finally:
3049
await self.con.execute('DROP TABLE tab1')
3150

@@ -37,6 +56,9 @@ async def test_prepare_cache_invalidation_in_transaction(self):
3756
result = await self.con.fetchrow('SELECT * FROM tab1')
3857
self.assertEqual(result, (1, 2))
3958

59+
statements = self._get_cached_statements()
60+
self._check_statements_are_not_closed(statements)
61+
4062
await self.con.execute(
4163
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
4264

@@ -45,6 +67,8 @@ async def test_prepare_cache_invalidation_in_transaction(self):
4567
async with self.con.transaction():
4668
result = await self.con.fetchrow('SELECT * FROM tab1')
4769

70+
self._check_statements_are_closed(statements)
71+
4872
# This is now OK,
4973
result = await self.con.fetchrow('SELECT * FROM tab1')
5074
self.assertEqual(result, (1, '2'))
@@ -69,6 +93,12 @@ async def test_prepare_cache_invalidation_in_pool(self):
6993
result = await con2.fetchrow('SELECT * FROM tab1')
7094
self.assertEqual(result, (1, 2))
7195

96+
statements1 = self._get_cached_statements(con1)
97+
self._check_statements_are_not_closed(statements1)
98+
99+
statements2 = self._get_cached_statements(con2)
100+
self._check_statements_are_not_closed(statements2)
101+
72102
await self.con.execute(
73103
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
74104

@@ -77,6 +107,9 @@ async def test_prepare_cache_invalidation_in_pool(self):
77107
result = await con1.fetchrow('SELECT * FROM tab1')
78108
self.assertEqual(result, (1, '2'))
79109

110+
self._check_statements_are_closed(statements1)
111+
self._check_statements_are_closed(statements2)
112+
80113
async with con2.transaction():
81114
# This should work, as con1 should have invalidated
82115
# the plan cache.
@@ -98,11 +131,17 @@ async def test_type_cache_invalidation_in_transaction(self):
98131
result = await self.con.fetchrow('SELECT * FROM tab1')
99132
self.assertEqual(result, (1, (2, 3)))
100133

134+
statements = self._get_cached_statements()
135+
self._check_statements_are_not_closed(statements)
136+
101137
async with self.con.transaction():
102138
await self.con.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
103139
with self.assertRaisesRegex(
104140
asyncpg.OutdatedSchemaCacheError, ERRNUM):
105141
await self.con.fetchrow('SELECT * FROM tab1')
142+
143+
self._check_statements_are_closed(statements)
144+
106145
# The second request must be correct (cache was dropped):
107146
result = await self.con.fetchrow('SELECT * FROM tab1')
108147
self.assertEqual(result, (1, (2, 3, None)))
@@ -123,13 +162,19 @@ async def test_type_cache_invalidation_in_cancelled_transaction(self):
123162
result = await self.con.fetchrow('SELECT * FROM tab1')
124163
self.assertEqual(result, (1, (2, 3)))
125164

165+
statements = self._get_cached_statements()
166+
self._check_statements_are_not_closed(statements)
167+
126168
try:
127169
async with self.con.transaction():
128170
await self.con.execute(
129171
'ALTER TYPE typ1 ADD ATTRIBUTE c text')
130172
with self.assertRaisesRegex(
131173
asyncpg.OutdatedSchemaCacheError, ERRNUM):
132174
await self.con.fetchrow('SELECT * FROM tab1')
175+
176+
self._check_statements_are_closed(statements)
177+
133178
# The second request must be correct (cache was dropped):
134179
result = await self.con.fetchrow('SELECT * FROM tab1')
135180
self.assertEqual(result, (1, (2, 3, None)))
@@ -158,13 +203,19 @@ async def test_prepared_type_cache_invalidation(self):
158203
result = await prep.fetchrow()
159204
self.assertEqual(result, (1, (2, 3)))
160205

206+
statements = self._get_cached_statements()
207+
self._check_statements_are_not_closed(statements)
208+
161209
try:
162210
async with self.con.transaction():
163211
await self.con.execute(
164212
'ALTER TYPE typ1 ADD ATTRIBUTE c text')
165213
with self.assertRaisesRegex(
166214
asyncpg.OutdatedSchemaCacheError, ERRNUM):
167215
await prep.fetchrow()
216+
217+
self._check_statements_are_closed(statements)
218+
168219
# PS has its local cache for types codecs, even after the
169220
# cache cleanup it is not possible to use it.
170221
# That's why it is marked as closed.
@@ -206,11 +257,16 @@ async def test_type_cache_invalidation_on_drop_type_attr(self):
206257
result = await self.con.fetchrow('SELECT * FROM tab1')
207258
self.assertEqual(result, (1, (2, 3, 'x')))
208259

260+
statements = self._get_cached_statements()
261+
self._check_statements_are_not_closed(statements)
262+
209263
await self.con.execute('ALTER TYPE typ1 DROP ATTRIBUTE x')
210264
with self.assertRaisesRegex(
211265
asyncpg.OutdatedSchemaCacheError, ERRNUM):
212266
await self.con.fetchrow('SELECT * FROM tab1')
213267

268+
self._check_statements_are_closed(statements)
269+
214270
# This is now OK, the cache is filled after being dropped.
215271
result = await self.con.fetchrow('SELECT * FROM tab1')
216272
self.assertEqual(result, (1, (3, 'x')))
@@ -228,6 +284,9 @@ async def test_type_cache_invalidation_on_change_attr(self):
228284
result = await self.con.fetchrow('SELECT * FROM tab1')
229285
self.assertEqual(result, (1, (2, 3)))
230286

287+
statements = self._get_cached_statements()
288+
self._check_statements_are_not_closed(statements)
289+
231290
# It is slightly artificial, but can take place in transactional
232291
# schema changing. Nevertheless, if the code checks and raises it
233292
# the most probable reason is a difference with the cache type.
@@ -237,6 +296,8 @@ async def test_type_cache_invalidation_on_change_attr(self):
237296
asyncpg.OutdatedSchemaCacheError, ERRTYP):
238297
await self.con.fetchrow('SELECT * FROM tab1')
239298

299+
self._check_statements_are_closed(statements)
300+
240301
# This is now OK, the cache is filled after being dropped.
241302
result = await self.con.fetchrow('SELECT * FROM tab1')
242303
self.assertEqual(result, (1, (2, None)))
@@ -265,9 +326,15 @@ async def test_type_cache_invalidation_in_pool(self):
265326
result = await con1.fetchrow('SELECT * FROM tab1')
266327
self.assertEqual(result, (1, (2, 3)))
267328

329+
statements1 = self._get_cached_statements(con1)
330+
self._check_statements_are_not_closed(statements1)
331+
268332
result = await con2.fetchrow('SELECT * FROM tab1')
269333
self.assertEqual(result, (1, (2, 3)))
270334

335+
statements2 = self._get_cached_statements(con2)
336+
self._check_statements_are_not_closed(statements2)
337+
271338
# Create the same schema in the "testdb", fetch data which caches
272339
# type info.
273340
con_chk = await pool_chk.acquire()
@@ -277,6 +344,9 @@ async def test_type_cache_invalidation_in_pool(self):
277344
result = await con_chk.fetchrow('SELECT * FROM tab1')
278345
self.assertEqual(result, (1, (2, 3)))
279346

347+
statements_chk = self._get_cached_statements(con_chk)
348+
self._check_statements_are_not_closed(statements_chk)
349+
280350
# Change schema in the databases.
281351
await self.con.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
282352
await con_chk.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
@@ -287,6 +357,9 @@ async def test_type_cache_invalidation_in_pool(self):
287357
asyncpg.OutdatedSchemaCacheError, ERRNUM):
288358
await con1.fetchrow('SELECT * FROM tab1')
289359

360+
self._check_statements_are_closed(statements1)
361+
self._check_statements_are_closed(statements2)
362+
290363
async with con2.transaction():
291364
# This should work, as con1 should have invalidated all caches.
292365
result = await con2.fetchrow('SELECT * FROM tab1')
@@ -298,10 +371,14 @@ async def test_type_cache_invalidation_in_pool(self):
298371

299372
# Check the invalidation is database-specific, i.e. cache entries
300373
# for pool_chk/con_chk was not dropped via pool/con1.
374+
375+
self._check_statements_are_not_closed(statements_chk)
376+
301377
with self.assertRaisesRegex(
302378
asyncpg.OutdatedSchemaCacheError, ERRNUM):
303379
await con_chk.fetchrow('SELECT * FROM tab1')
304380

381+
self._check_statements_are_closed(statements_chk)
305382
finally:
306383
await self.con.execute('DROP TABLE tab1')
307384
await self.con.execute('DROP TYPE typ1')

0 commit comments

Comments
 (0)