异步支持

SQLAlchemy从1.4版本引入了对异步编程的支持,并在2.0版本中进一步完善。SQLAlchemy的异步支持基于Python内置的asyncio框架实现,适用于高并发的IO密集型应用程序,例如与FastAPI集成等。这篇笔记我们介绍SQLAlchemy的异步用法。

安装异步数据库驱动

SQLAlchemy的异步功能依赖于驱动实现,以MySQL数据库为例,我们需要安装异步版本的驱动包。

pip install aiomysql

除了aiomysql我们还可以使用asyncmy这个库,它是用C写的Python扩展模块,性能更强,但安装asyncmy可能需要系统安装C编译器,我们这里简单起见还是以轻量级的aiomysql为例进行演示。

使用异步引擎和会话

SQLAlchemy2.0中,异步API大多在sqlalchemy.ext.asyncio模块中,使用的引擎对象和会话对象也和同步方式不同,这里我们需要使用异步的AsyncEngineAsyncSession操作数据库,事务的开启、提交、回滚等都需要await关键字,下面是一个例子。

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from models import Conf


async def main():
    DATABASE_URL = 'mysql+aiomysql://root:root@localhost:3306/netstore'
    engine = create_async_engine(DATABASE_URL, echo=True, future=True)
    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with async_session() as session:
        async with session.begin():
            stmt = select(Conf).where(Conf.key == 'enable_login')
            result = await session.scalars(stmt)
            conf = result.first()
            print(conf)

    # 异步模式下,主协程结束事件循环关闭但SQLAlchemy引擎却还在运行会抛出RuntimeError: Event loop is closed,这里手动将其关闭
    await engine.dispose()


if __name__ == '__main__':
    asyncio.run(main())

代码中,我们先创建了异步的AsyncEngineAsyncSession,对于会话和事务我们使用的是async with语句,它的作用和同步的with语句类似,但管理的是异步上下文。查询条件的构造和同步写法一致,但对会话对象的操作都需要使用await语法,因为在这里它们都是异步的API。注意异步和同步的操作不可混用!此外,前面我们介绍过,SQLAlchemy2.0引入了session.scalars()简化查询语句的编写,这个方法在异步模式下需要使用await关键字来获取结果。

另外要注意的一点是异步模式下需要设置expire_on_commit=False。在同步模式下,expire_on_commit取默认值True没有问题,但在异步编程上下文中,异步代码要求所有的I/O操作都必须是显式的并使用await关键字,如果一个已提交对象的属性被访问,而该对象已过期,SQLAlchemy默认会尝试在后台同步地隐式执行数据库查询,这在异步事件循环中是不被允许的。为了避免出现这样的错误,在异步模式下通常将expire_on_commit固定设置为False

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。