异步支持
SQLAlchemy从1.4版本引入了对异步编程的支持,并在2.0版本中进一步完善。SQLAlchemy的异步支持基于Python内置的asyncio框架实现,适用于高并发的IO密集型应用程序,例如与FastAPI集成等。这篇笔记我们介绍SQLAlchemy的异步用法。
安装异步数据库驱动
SQLAlchemy的异步功能依赖于驱动实现,以MySQL数据库为例,我们需要安装异步版本的驱动包。
pip install aiomysql
除了aiomysql
我们还可以使用asyncmy
这个库,它是用C写的Python扩展模块,性能更强,但安装asyncmy
可能需要系统安装C编译器,我们这里简单起见还是以轻量级的aiomysql
为例进行演示。
使用异步引擎和会话
SQLAlchemy2.0中,异步API大多在sqlalchemy.ext.asyncio
模块中,使用的引擎对象和会话对象也和同步方式不同,这里我们需要使用异步的AsyncEngine
和AsyncSession
操作数据库,事务的开启、提交、回滚等都需要await
关键字,下面是一个例子。
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from models import Conf
async def main():
DATABASE_URL = 'mysql+aiomysql://root:root@localhost:3306/netstore'
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
async with session.begin():
stmt = select(Conf).where(Conf.key == 'enable_login')
result = await session.scalars(stmt)
conf = result.first()
print(conf)
# 异步模式下,主协程结束事件循环关闭但SQLAlchemy引擎却还在运行会抛出RuntimeError: Event loop is closed,这里手动将其关闭
await engine.dispose()
if __name__ == '__main__':
asyncio.run(main())
代码中,我们先创建了异步的AsyncEngine
和AsyncSession
,对于会话和事务我们使用的是async with
语句,它的作用和同步的with
语句类似,但管理的是异步上下文。查询条件的构造和同步写法一致,但对会话对象的操作都需要使用await
语法,因为在这里它们都是异步的API。注意异步和同步的操作不可混用!此外,前面我们介绍过,SQLAlchemy2.0引入了session.scalars()
简化查询语句的编写,这个方法在异步模式下需要使用await
关键字来获取结果。
另外要注意的一点是异步模式下需要设置expire_on_commit=False
。在同步模式下,expire_on_commit
取默认值True
没有问题,但在异步编程上下文中,异步代码要求所有的I/O操作都必须是显式的并使用await
关键字,如果一个已提交对象的属性被访问,而该对象已过期,SQLAlchemy默认会尝试在后台同步地隐式执行数据库查询,这在异步事件循环中是不被允许的。为了避免出现这样的错误,在异步模式下通常将expire_on_commit
固定设置为False
。