-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
Using statement_cache_size asyncpg setting / prepared statement name for asyncpg w pgbouncer #6467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
SQLAlchemy doesn't make use of "statement_cache_size" as it necessarily uses the asyncpg.connection.prepare() method directly, so there is no way to disable the use of prepared statements across the board. However, you can remove the use of prepared statement caching itself, so that it will make a new prepared statement each time, using prepared_statement_cache_size=0, if you can try that and see if it works we can mention this in the docs: https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#prepared-statement-cache |
Thank you, @zzzeek In my case it does not work. I found in : await_only(self.asyncpg.connect(*arg, **kw)) in **kw we does not pass statement_cache_size. If i add: if 'prepared_statement_cache_size' in kw:
kw['statement_cache_size'] = kw['prepared_statement_cache_size'] It helps sometimes, but sometimes does not (i don't know why...) |
hi - did you try just direct usage of the prepared_statement_cache_size variable as given:
? if that doesn't work, we are out of luck. SQLAlchemy is forced to use connection.prepare() which means we have to use prepared statements. |
Yes, i use this: engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0") |
all of our SELECT statements have to use connection.prepare() because we need to be able to call get_attributes(). if asyncpg could be convinced to give us this API without necessitating the use of connection.prepare() we could begin to think about how to support that. |
the doc at https://magicstack.github.io/asyncpg/current/api/index.html?highlight=statement_cache_size doesnt claim this disables prepared statements, just that it doesn't cache them:
oh perhaps this is needed for when we do INSERT/UPDATE/DELETE, OK.... the patch you tried at |
that is, if you use prepared_statement_cache_size=0 and add code like this: diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 4a191cd286..415862a17b 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -735,6 +735,7 @@ class AsyncAdapt_asyncpg_dbapi:
prepared_statement_cache_size = kw.pop(
"prepared_statement_cache_size", 100
)
+ kw["statement_cache_size"] = prepared_statement_cache_size
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self, that's the best we can do. if there are still problems then we need support from asyncpg |
I try this. But for some reason this is not help. (sometimes an error appears but sometimes not...) |
this might be related to the fact that we still use connection.prepare(). I dont know the internals of asyncpg enough to advise further on what might be going on. |
Yes, this is a reason.. Also in AsyncAdapt_asyncpg_cursor._prepare_and_execute We actively use prepared statements for executing queries. In asyncpg, as i can see, for not prepared queries they does not use prepared statements # Pseudo code:
# In asyncpg for statement_cache_size = 0
stmt = Connection._get_statement(
self,
query,
named=False # named=True is used for connection.prepare call
)
# in deep of this function, if named = False and statement_cache_size = 0
# asyncpg use empty name for prepared statements.
result = Connection._protocol.bind_execute(stmt, args, ...) # a little wrong but essence is |
We had a similar problem due to multiple web workers. They generated prepared statements with the same names - original function to generate IDs looks like this: def _get_unique_id(self, prefix):
global _uid
_uid += 1
return '__asyncpg_{}_{:x}__'.format(prefix, _uid) So we just changed the Connection class a bit from uuid import uuid4
from asyncpg import Connection
class CConnection(Connection):
def _get_unique_id(self, prefix: str) -> str:
return f'__asyncpg_{prefix}_{uuid4()}__' You need to provide it when you create the engine engine = create_async_engine(
settings.database_url,
connect_args={
'connection_class': CConnection,
},
) |
@SlavaSkvortsov that seems to be something we cannot change on out end, other than subclassing the connection class to change a private method like in you example. You may want to open an issue on the asyncpg repo for a solution out of the box. Imho |
engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0") Should this configuration pass the prepared statement cache size parameter to asyncpg connection? So i managed to set cache size on asyncpg side like this: engine = create_async_engine(
"postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0",
poolclass=NullPool,
future=True,
connect_args={'statement_cache_size': 0},
) Note the And in combination with @SlavaSkvortsov's unique ids suggestion it seems that i got rid of prepared statement errors with pgbouncer. |
When using asyncpg only, you can use statement_cache_size=0 and it won't use prepared statements at all, thus working behind pgbouncer in transaction mode. My understanding is that sqlalchemy/asyncpg will use prepared statements no matter what the setting prepared_statement_cache_size value. This means that it will create prepared statements in any case. Using the unique ID workaround only hides the problem, you will create a lot of prepared statements in your open sessions to the database (each with a unique name), and each transaction will get a random session including some subset of those statements. Since you are disabling the cache, it won't be a real issue as it will keep creating new ones, and they will never conflict because the name is random, but they are still there and created. I'm not sure what's the mechanism to delete them, maybe they just get deleted whenever the backend session is closed, once in a while? As for why sqlalchemy needs to use prepared statements in any case, I have no idea. |
asyncpg has no other way of returning the type information of the selected columns in the query when using a normal query, so sqlalchemy needs to use prepared statements: sqlalchemy/lib/sqlalchemy/dialects/postgresql/asyncpg.py Lines 613 to 616 in 990eb3d
and sqlalchemy/lib/sqlalchemy/dialects/postgresql/asyncpg.py Lines 392 to 404 in 990eb3d
|
|
assuming we keep using prepared statements theres a bunch of people on this issue now can we come up with a way to fix what's wrong here and get this closed? thanks |
MagicStack/asyncpg#837 is closed via MagicStack/asyncpg#846 . we will want to expose this and then add a documentation section that this is a means of using our asyncpg driver with pgbouncer along with a recipe based on UUID or similar. |
Is there any update on how use sqlalchemy+asyncpg+pgbouncer? I'm new to sqlalchemy's async engine and stumbled onto this issue while trying out Prefect's v2 beta. Their package uses EDIT: I should add -- |
yes something like that. but i dont think it should be necessary, because once the transaction is released back to pgbouncer, that prepared statement is useless anyway, so just DEALLOCATE them all, at the start of a transaction is best. then, we also can't rely on any kind of caching of these prepared statements, because with transactional mode, they are similarly lost every transaction to us, hence prepared statement cache needs to be zero. those two steps should solve the problem. if not, then i dont understand what's going on. |
we could probably support statement mode also if the engine is run with autocommit |
confirm the prepared_statement_cache_size works setting it to zero, test case: import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
# when commenting this out, we get
# "prepared statement "__asyncpg_stmt_7__" does not exist"
# setting the cache to zero allows the below statement to invoke
connect_args=dict(prepared_statement_cache_size=0)
)
for i in range(3):
async with engine.begin() as conn:
await conn.execute(text("select 1"))
await conn.exec_driver_sql("DEALLOCATE ALL")
asyncio.run(async_main()) |
Just want to chime in and say that setting |
@vamshiaruru-virgodesigns can you confirm some points for me?
The solution of just naming them all randomly seems very much like it would fill up the PostgreSQL session with thousands of unused prepared statements which we would assume uses memory. |
@zzzeek , I'm using transaction mode in pgbouncer, and just setting @event.listens_for(engine.sync_engine, "begin")
def clear_prepared_statements_on_begin(conn):
conn.exec_driver_sql("DEALLOCATE ALL") I can quickly try with that and see if it works without custom connector. |
thanks for the quick reply. if you keep working with the other approach, can you check on the memory use of your postgresql workers, if you are doing things at scale. it intuitively seems like it would fill up memory but I don't really know the specifics. |
Update, with this code engine = create_async_engine(
settings.database_dsn,
pool_size=settings.sqlalchemy_pool_size,
pool_pre_ping=True,
connect_args={
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
# "connection_class": CConnection,
},
)
@event.listens_for(engine.sync_engine, "begin")
def clear_prepared_statements_on_begin(conn):
conn.exec_driver_sql("DEALLOCATE ALL") It works. But I am not sure how expensive it is to run Here's how asyncpg constructs the name: def _get_unique_id(self, prefix):
global _uid
_uid += 1
return '__asyncpg_{}_{:x}__'.format(prefix, _uid) I don't think they are keeping track of this |
pgbouncer itself, when using session mode (which I still think is a much better idea here) uses DISCARD ALL by default which is more "heavy", although it does this after connection release so I suppose this is something it can do in the background on its end.
it actually gives a hint:
we are using connection.prepare() for all statements. We have no choice in that regard due to limitations in asyncpg's API.
no, because PG Bouncer is pooling the connections, hence those sessions are still opened. per docs the server reset query isn't used. There is also server_reset_query_always which as they advertise is for "broken" apps. From my POV using PGBouncer transaction mode with asyncpg's prepared statements is "broken", I would agree. I dont see the wisdom at all in using a different connection for every transaction, when SQLAlchemy already has a clear system of distinguishing the scope of a "connection" from that of a "transaction" and should not be worked around.
they dont need to make it globally unique because asyncpg assumes their connection corresponds to a single PostgreSQL session that did not exist beforehand. |
I mean performancewise, the prepared_statement_cache_size=0 setting definitely degrades performance very measurably. I really think someone should do some benches here with pgbouncer session vs. transaction mode, I think the former will perform better in every way. transaction mode seems like it is intended for applications that are mis-designed to hold onto single connections for long idle spans, or something (Edit: yes like when you still want to have client side pooling in place also. OK). |
im actually going to document this. I see zero advantage at all to transaction pooling mode (Edit: OK it can reduce latency on connect by keeping QueuePool in place) |
Thanks for your explanations. In our requirements, we have a django application connecting to same DB and I have read that session pooling with django is not very performant compared to transaction level pooling. But I haven't personally performed any bench marks against it. |
OK yes, thinking some more, using transaction mode, you can keep using QueuePool and that will save you on the connectivity overhead as well, if PGBouncer is able to keep many client connections persistently linked to a smaller set of PG server connections, OK, I get it then. From all of this it seems like the best approach would be to use server_reset_query_always . that way the server reset query is kept out of the client, it's done on the server after the connection has been released. |
I wanted to chime in here, as I recently faced this problem and solved an edge case that wasn't discussed above. While the solutions from #6467 (comment) and #6467 (comment) work in most scenarios, there is a further problem, if one wants to use transaction isolation levels. SET TRANSACTION ISOLATION LEVEL <level> then you need to run this as first statement of a connection. In such a scenario the event listener from #6467 (comment) doesn't work, as it only rhttps://github.com//issues/6467#issuecomment-864943824uns after the However, you can hook into the async_engine = ...
@event.listens_for(async_engine.sync_engine.pool, "connect")
def clear_prepared_statements_on_begin_and_isolate(conn, branch):
conn.run_async(
lambda con: con.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
)
conn.run_async(lambda con: con.execute("DEALLOCATE ALL;")) As I had the situation that the transaction isolation is only necessary in some cases, I've created this gist to allow acquiring a session with and without the isolation level. Note that, if you use the uuid approach (#6467 (comment)) this is probably not necessary, but it was not an option in my scenario. |
@mimre25 I don't think that gist works at all:
this returns you a Session, not an AsyncSession. You should need to use |
Just so you know, for the isolation level part of the above, this feature is built in. Simply use engine-level isolation level. does the exact same thing. |
I faced an issue related to pg bouncer and prepared statement cache flow in asyncpg dialect. Regarding this discussion #6467 I prepared PR to support an optional parameter `name` in prepared statement which is allowed, since 0.25.0 version in `asyncpg` MagicStack/asyncpg#846 **UPD:** the issue with proposal: #9608 ### Description Added optional parameter `name_func` to `AsyncAdapt_asyncpg_connection` class which will call on the `self._connection.prepare()` function and populate a unique name. so in general instead this ```python from uuid import uuid4 from asyncpg import Connection class CConnection(Connection): def _get_unique_id(self, prefix: str) -> str: return f'__asyncpg_{prefix}_{uuid4()}__' engine = create_async_engine(..., connect_args={ 'connection_class': CConnection, }, ) ``` would be enough ```python from uuid import uuid4 engine = create_async_engine(..., connect_args={ 'name_func': lambda: f'__asyncpg_{uuid4()}__', }, ) ``` ### Checklist <!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once) --> This pull request is: - [ ] A documentation / typographical error fix - Good to go, no issue or tests are needed - [ ] A short code fix - please include the issue number, and create an issue if none exists, which must include a complete example of the issue. one line code fixes without an issue and demonstration will not be accepted. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. one line code fixes without tests will not be accepted. - [x] A new feature implementation - please include the issue number, and create an issue if none exists, which must include a complete example of how the feature would look. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. **Have a nice day!** Fixes: #9608 Closes: #9607 Pull-request: #9607 Pull-request-sha: b4bc8d3 Change-Id: Icd753366cba166b8a60d1c8566377ec8335cd828
why is this issue opened? you can use prepared statements with pgbouncer + asyncpg. it's working here right in Is there anyone here still unclear on how to use asyncpg with pgbouncer ? |
it may make sense to move to a discussion? |
I think so? |
ok, so once fixed we can add a new post with the link to the fix |
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
Hi!
I use sqlalchemy 1.4 with asyncpg driver with pgbouncer.
I have an error:
How i can pass this setting (statement_cache_size=0) to asyncpg connection object?
The text was updated successfully, but these errors were encountered: