前一篇笔记我们学习了SQLAlchemy中如何定义数据模型以及声明关联关系等内容,这篇笔记我们继续学习如何实现单表增删改查以及复杂的关联查询。
SQLAlchemy ORM中操作数据需要会话(Session)对象,会话需要通过工厂函数sessionmaker()
获取,下面是一个比较通用的创建SQLAlchemy ORM会话的写法。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
# ... 继续操作数据
pass
创建会话需要很多配置参数,这些配置参数都被sessionmaker()
函数封装到了函数参数中便于我们使用。
False
,SQLAlchemy推荐采用显式的事务管理,即显式调用begin()
、commit()
和rollback()
True
True
注:虽然我们设置了autocommit
为False
,但Python语言提供了with
语句,session.begin()
会返回上下文管理器,with
语句能依赖上下文管理器自动调用commit()
和rollback()
。如果我们不使用with
语句,也可以手动调用这两个方法控制事务。
下面例子演示如何使用SQLAlchemy ORM插入数据,Conf
是一个数据模型,它有key
和value
两个属性以及自增主键,主键的值我们不需要手动设置。创建好数据模型对象后,我们将其添加到会话即可。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
# 创建数据对象
conf = Conf(key='enable_login', value='1')
# 添加数据到ORM会话,with语句结束后自动commit
session.add(conf)
代码中,由于我们用了with
语句,因此代码的最后其实调用了session.commit()
方法,只不过这一调用被with
这个语法糖给省略了,这种写法仍是显式的事务管理方式;假如创建数据对象或是某些数据操作出错了,with
语句会自动帮我们调用session.rollback()
回滚事务。
插入数据时,一个常见的需求是获取自增主键的值,但事务提交之前数据还未写入数据库,因此直接获取主键的结果通常是None
。如果不想提交事务,我们可以调用session.flush()
,它会将操作写入数据库但暂不提交事务,此时我们是可以获取主键值的。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
conf = Conf(key='enable_login', value='1')
session.add(conf)
session.flush()
print(conf.id)
此外,插入也可以使用add_all()
方法插入多条,不过值得注意的是这个方法并不会批量插入,它仍会生成多条INSERT语句,逐条插入数据。如果想实现真正的批量插入,可以使用session.bulk_save_objects([conf1, conf2])
方法,但注意这个方法会跳过ORM会话级别的对象状态管理,不会触发SQLAlchemy事件,也不能自动刷新主键值,因此仅限于大批量数据插入场景。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
conf1 = Conf(key='enable_login', value='1')
conf2 = Conf(key='enable_register', value='1')
session.add_all([conf1, conf2])
最基础的查询方式是根据主键查询,这可以使用session.get()
方法,他的参数是数据模型类以及主键的值,如果没查到会返回None
。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
conf = session.get(Conf, 1)
print(conf)
ORM框架中,更新数据的流程通常是先查询再更新,对于这类需求,SQLAlchemy里我们直接修改查询得到的数据模型对象即可,session.commit()
时修改会自动同步到数据库。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
conf = session.get(Conf, 1)
if conf is not None:
conf.value = '0'
代码中,我们先调用session.get()
基于主键获取数据记录对象,然后修改了它的value
属性。with
语句会自动帮我们调用session.commit()
提交事务。
和更新数据类似,删除数据我们也通常是先查询再删除,下面是一个例子。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
conf = session.get(Conf, 1)
if conf is not None:
session.delete(conf)
删除数据需要调用session.delete()
方法。
前面我们介绍了如何根据主键查询数据模型对象。对于更复杂一点的情况,我们可以构造条件查询,下面是一个例子。
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
stmt = select(Conf).where(Conf.key == 'enable_login')
conf = session.scalars(stmt).first()
print(conf)
select().where()
构造了一个条件查询语句,where
的参数是一个判断表达式,它会在数据库中查询key
字段值为enable_login
的所有数据记录,随后我们调用session.scalars()
执行这个构建好的查询语句。这条SQL语句理论上查询可能返回多个结果,不过在业务逻辑上它只会有一个结果,因此最后我们拼接了first()
方法,它会取出查询结果的第一个元素或是当没有查询结果时返回None
。
关于session.scalars()
,它是SQLAlchemy 2.0新增加的方法,它用于替代SQLAlchemy 1.x时代操作结果集的复杂性。“scalar”这个单词是标量的意思,SQLAlchemy 1.x中查询数据需要调用session.execute()
方法获取结果集对象,结果集中的一条结果通常是元组类型,它可以包含一个标量也可以包含多个标量,不过由于ORM查询中这个结果元组通常只有一个标量,因此SQLAlchemy 1.x的结果集对象提供了scalars()
方法来仅取每个元组的第一个元素。SQLAlchemy 2.0更进一步的在会话对象上直接提供了scalars()
方法进行仅单个标量的查询,因此新版本的ORM查询通常都是直接使用session.scalars()
,而不是之前的session.execute()
方法。
where()
的参数中我们可以编写用于数据筛选的表达式,具体来说它是SQLAlchemy的表达式对象,通常可能是BinaryExpression
类型或BooleanClauseList
等,至于为什么Conf.id == 1
是BinaryExpression
类型,这是SQLAlchemy通过复杂的运算符重载实现的,有兴趣可以查看SQLAlchemy的源码。
# 查询id等于1的对象
stmt = select(Conf).where(Conf.id == 1)
# 查询id大于1的对象
stmt = select(Conf).where(Conf.id > 1)
# 查询id大于等于1且小于等于5的对象
stmt = select(Conf).where(Conf.id.between(1, 5))
对于数据记录对应字段是否为NULL
可以用is_()
和is_not()
方法进行筛选。
stmt = select(Conf).where(Conf.value.is_(None))
stmt = select(Conf).where(Conf.value.is_not(None))
对于AND条件,我们其实可以直接传多个表达式对象。
stmt = select(Conf).where(Conf.key == 'enable_login', Conf.value.is_(None))
串联多个where()
也是一样的效果。
stmt = select(Conf).where(Conf.key == 'enable_login').where(Conf.value.is_(None))
除此之外,我们也可以显式的使用and_()
函数连接多个条件,不过这种写法比较麻烦,除非条件需要动态组合,否则我们一般都不这么写。
stmt = select(Conf).where(and_(Conf.key == 'enable_login', Conf.value.is_(None)))
类似的,OR条件组合使用or_()
函数实现。
stmt = select(Conf).where(or_(Conf.id == 1, Conf.id == 2))
取反的NOT条件使用not_()
函数实现。
stmt = select(Conf).where(not_(Conf.id == 1))
组合使用AND、OR、NOT和表达式对象可以构建非常复杂的查询逻辑。
数据模型属性的in_()
方法可以用于实现IN语句范围查询,同理not_in()
可以实现NOT IN语句查询。
stmt = select(Conf).where(Conf.key.in_(['a', 'b', 'c']))
stmt = select(Conf).where(Conf.key.not_in(['a', 'b', 'c']))
SQL中IN的范围还可以是一个子查询,SQLAlchemy支持这种写法。下面例子中,subq
就是一个子查询,它作为了in_()
方法的参数。
subq = select(Order.user_id).where(Order.amount > 100)
stmt = select(User).where(User.id.in_(subq))
like()
和notlike()
方法可以实现字符串的模糊匹配查询。
stmt = select(Conf).where(Conf.key.like('%login%'))
stmt = select(Conf).where(Conf.key.notlike('%login%'))
SQL中可以使用各种统计函数,SQLAlchemy中大部分通用的函数封装在了sqlalchemy.func
对象中,下面例子查询了数据库中Conf
数据模型的记录数。
stmt = select(func.count(Conf.id))
SQLAlchemy构造动态查询非常简单,我们根据用户的查询条件使用and_()
、or_()
、not_()
等函数拼接动态查询条件即可,下面是一个例子。
filters = []
if conf_key:
filters.append(Conf.key == conf_key)
filters.append(Conf.value.is_not(None))
stmt = select(Conf).where(*filters)
SQLAlchemy中分页查询主要使用limit()
和offset()
实现,前者控制返回的结果数(即分页大小),后者控制查询的结果偏移,这可通过分页大小和页码计算,通常可以按照offset = (page_num - 1) * page_size
设置。不过实际开发中,对于分页查询,除了分页的元素,我们通常还需要查询总记录数和总页数,下面是一个例子。
offset = (page_num - 1) * page_size
stmt = select(Conf).limit(page_size).offset(offset)
# 分页结果元素
items = session.scalars(stmt).all()
# 总记录数
total = session.scalar(select(func.count(Conf.id)))
# 总页数
total_pages = (total + page_size - 1) // page_size
最简单的关联查询就是直接访问数据模型对象的关联属性。前面在数据模型章节我们介绍过关联映射的配置,其中有一个lazy
属性能够设置关联映射的懒加载配置,默认情况下它的配置是select
,即默认懒加载,访问关联对象时再执行SELECT查询。运行下面例子代码,我们可以观察到SQLAlchemy总共生成了两条SQL语句,第一条查询User
对象,第二条再根据外键查询Order
对象。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *
if __name__ == '__main__':
DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
engine = create_engine(DATABASE_URL, echo=True, future=True)
Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with Session() as session:
with session.begin():
user = session.get(User, 1)
for order in user.orders:
print(order)
此外,在一些比较复杂的查询场景,我们也可以显式的构建JOIN查询,下面例子构建了一个INNER JOIN内连接查询,查询符合VIP条件的用户的所有订单。
stmt = select(Order).join(User, Order.user_id == User.id).where(User.is_vip == True)