数据增删改查

前一篇笔记我们学习了SQLAlchemy中如何定义数据模型以及声明关联关系等内容,这篇笔记我们继续学习如何实现单表增删改查以及复杂的关联查询。

创建会话

SQLAlchemy ORM中操作数据需要会话(Session)对象,会话需要通过工厂函数sessionmaker()获取,下面是一个比较通用的创建SQLAlchemy ORM会话的写法。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            # ... 继续操作数据
            pass

创建会话需要很多配置参数,这些配置参数都被sessionmaker()函数封装到了函数参数中便于我们使用。

  • autocommit:是否开启自动提交模式,默认值为False,SQLAlchemy推荐采用显式的事务管理,即显式调用begin()commit()rollback()
  • autoflush:是否在查询前自动刷新挂起的更改,默认值为True
  • expire_on_commit:提交后是否使会话中的对象过期(强制下次访问时重新查询),默认值为True
  • bind:绑定的数据库客户端Engine对象

注:虽然我们设置了autocommitFalse,但Python语言提供了with语句,session.begin()会返回上下文管理器,with语句能依赖上下文管理器自动调用commit()rollback()。如果我们不使用with语句,也可以手动调用这两个方法控制事务。

基础CRUD操作

插入数据

下面例子演示如何使用SQLAlchemy ORM插入数据,Conf是一个数据模型,它有keyvalue两个属性以及自增主键,主键的值我们不需要手动设置。创建好数据模型对象后,我们将其添加到会话即可。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            # 创建数据对象
            conf = Conf(key='enable_login', value='1')
            # 添加数据到ORM会话,with语句结束后自动commit
            session.add(conf)

代码中,由于我们用了with语句,因此代码的最后其实调用了session.commit()方法,只不过这一调用被with这个语法糖给省略了,这种写法仍是显式的事务管理方式;假如创建数据对象或是某些数据操作出错了,with语句会自动帮我们调用session.rollback()回滚事务。

插入数据时,一个常见的需求是获取自增主键的值,但事务提交之前数据还未写入数据库,因此直接获取主键的结果通常是None。如果不想提交事务,我们可以调用session.flush(),它会将操作写入数据库但暂不提交事务,此时我们是可以获取主键值的。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            conf = Conf(key='enable_login', value='1')
            session.add(conf)
            session.flush()
            print(conf.id)

此外,插入也可以使用add_all()方法插入多条,不过值得注意的是这个方法并不会批量插入,它仍会生成多条INSERT语句,逐条插入数据。如果想实现真正的批量插入,可以使用session.bulk_save_objects([conf1, conf2])方法,但注意这个方法会跳过ORM会话级别的对象状态管理,不会触发SQLAlchemy事件,也不能自动刷新主键值,因此仅限于大批量数据插入场景。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            conf1 = Conf(key='enable_login', value='1')
            conf2 = Conf(key='enable_register', value='1')
            session.add_all([conf1, conf2])

查询数据

最基础的查询方式是根据主键查询,这可以使用session.get()方法,他的参数是数据模型类以及主键的值,如果没查到会返回None

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            conf = session.get(Conf, 1)
            print(conf)

更新数据

ORM框架中,更新数据的流程通常是先查询再更新,对于这类需求,SQLAlchemy里我们直接修改查询得到的数据模型对象即可,session.commit()时修改会自动同步到数据库。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            conf = session.get(Conf, 1)
            if conf is not None:
                conf.value = '0'

代码中,我们先调用session.get()基于主键获取数据记录对象,然后修改了它的value属性。with语句会自动帮我们调用session.commit()提交事务。

删除数据

和更新数据类似,删除数据我们也通常是先查询再删除,下面是一个例子。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            conf = session.get(Conf, 1)
            if conf is not None:
                session.delete(conf)

删除数据需要调用session.delete()方法。

构建查询语句

前面我们介绍了如何根据主键查询数据模型对象。对于更复杂一点的情况,我们可以构造条件查询,下面是一个例子。

from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            stmt = select(Conf).where(Conf.key == 'enable_login')
            conf = session.scalars(stmt).first()
            print(conf)

select().where()构造了一个条件查询语句,where的参数是一个判断表达式,它会在数据库中查询key字段值为enable_login的所有数据记录,随后我们调用session.scalars()执行这个构建好的查询语句。这条SQL语句理论上查询可能返回多个结果,不过在业务逻辑上它只会有一个结果,因此最后我们拼接了first()方法,它会取出查询结果的第一个元素或是当没有查询结果时返回None

关于session.scalars(),它是SQLAlchemy 2.0新增加的方法,它用于替代SQLAlchemy 1.x时代操作结果集的复杂性。“scalar”这个单词是标量的意思,SQLAlchemy 1.x中查询数据需要调用session.execute()方法获取结果集对象,结果集中的一条结果通常是元组类型,它可以包含一个标量也可以包含多个标量,不过由于ORM查询中这个结果元组通常只有一个标量,因此SQLAlchemy 1.x的结果集对象提供了scalars()方法来仅取每个元组的第一个元素。SQLAlchemy 2.0更进一步的在会话对象上直接提供了scalars()方法进行仅单个标量的查询,因此新版本的ORM查询通常都是直接使用session.scalars(),而不是之前的session.execute()方法。

基础比较运算符

where()的参数中我们可以编写用于数据筛选的表达式,具体来说它是SQLAlchemy的表达式对象,通常可能是BinaryExpression类型或BooleanClauseList等,至于为什么Conf.id == 1BinaryExpression类型,这是SQLAlchemy通过复杂的运算符重载实现的,有兴趣可以查看SQLAlchemy的源码。

# 查询id等于1的对象
stmt = select(Conf).where(Conf.id == 1)
# 查询id大于1的对象
stmt = select(Conf).where(Conf.id > 1)
# 查询id大于等于1且小于等于5的对象
stmt = select(Conf).where(Conf.id.between(1, 5))

NULL检查

对于数据记录对应字段是否为NULL可以用is_()is_not()方法进行筛选。

stmt = select(Conf).where(Conf.value.is_(None))
stmt = select(Conf).where(Conf.value.is_not(None))

AND、OR和NOT条件组合

对于AND条件,我们其实可以直接传多个表达式对象。

stmt = select(Conf).where(Conf.key == 'enable_login', Conf.value.is_(None))

串联多个where()也是一样的效果。

stmt = select(Conf).where(Conf.key == 'enable_login').where(Conf.value.is_(None))

除此之外,我们也可以显式的使用and_()函数连接多个条件,不过这种写法比较麻烦,除非条件需要动态组合,否则我们一般都不这么写。

stmt = select(Conf).where(and_(Conf.key == 'enable_login', Conf.value.is_(None)))

类似的,OR条件组合使用or_()函数实现。

stmt = select(Conf).where(or_(Conf.id == 1, Conf.id == 2))

取反的NOT条件使用not_()函数实现。

stmt = select(Conf).where(not_(Conf.id == 1))

组合使用AND、OR、NOT和表达式对象可以构建非常复杂的查询逻辑。

IN查询

数据模型属性的in_()方法可以用于实现IN语句范围查询,同理not_in()可以实现NOT IN语句查询。

stmt = select(Conf).where(Conf.key.in_(['a', 'b', 'c']))
stmt = select(Conf).where(Conf.key.not_in(['a', 'b', 'c']))

SQL中IN的范围还可以是一个子查询,SQLAlchemy支持这种写法。下面例子中,subq就是一个子查询,它作为了in_()方法的参数。

subq = select(Order.user_id).where(Order.amount > 100)
stmt = select(User).where(User.id.in_(subq))

LIKE查询

like()notlike()方法可以实现字符串的模糊匹配查询。

stmt = select(Conf).where(Conf.key.like('%login%'))
stmt = select(Conf).where(Conf.key.notlike('%login%'))

标量查询

SQL中可以使用各种统计函数,SQLAlchemy中大部分通用的函数封装在了sqlalchemy.func对象中,下面例子查询了数据库中Conf数据模型的记录数。

stmt = select(func.count(Conf.id))

构造动态查询

SQLAlchemy构造动态查询非常简单,我们根据用户的查询条件使用and_()or_()not_()等函数拼接动态查询条件即可,下面是一个例子。

filters = []
if conf_key:
    filters.append(Conf.key == conf_key)
filters.append(Conf.value.is_not(None))
stmt = select(Conf).where(*filters)

分页查询

SQLAlchemy中分页查询主要使用limit()offset()实现,前者控制返回的结果数(即分页大小),后者控制查询的结果偏移,这可通过分页大小和页码计算,通常可以按照offset = (page_num - 1) * page_size设置。不过实际开发中,对于分页查询,除了分页的元素,我们通常还需要查询总记录数和总页数,下面是一个例子。

offset = (page_num - 1) * page_size
stmt = select(Conf).limit(page_size).offset(offset)
# 分页结果元素
items = session.scalars(stmt).all()
# 总记录数
total = session.scalar(select(func.count(Conf.id)))
# 总页数
total_pages = (total + page_size - 1) // page_size

关联查询

最简单的关联查询就是直接访问数据模型对象的关联属性。前面在数据模型章节我们介绍过关联映射的配置,其中有一个lazy属性能够设置关联映射的懒加载配置,默认情况下它的配置是select,即默认懒加载,访问关联对象时再执行SELECT查询。运行下面例子代码,我们可以观察到SQLAlchemy总共生成了两条SQL语句,第一条查询User对象,第二条再根据外键查询Order对象。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import *

if __name__ == '__main__':
    DATABASE_URL = 'mysql+pymysql://root:root@localhost:3306/netstore2'
    engine = create_engine(DATABASE_URL, echo=True, future=True)
    Session = sessionmaker(autocommit=False, autoflush=True, bind=engine)

    with Session() as session:
        with session.begin():
            user = session.get(User, 1)
            for order in user.orders:
                print(order)

此外,在一些比较复杂的查询场景,我们也可以显式的构建JOIN查询,下面例子构建了一个INNER JOIN内连接查询,查询符合VIP条件的用户的所有订单。

stmt = select(Order).join(User, Order.user_id == User.id).where(User.is_vip == True)
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap