Seata是阿里开源的分布式事务框架,分布式事务是一个相对复杂和问题,Seata框架实现分布式事务具有高可用和简单灵活的特点。Seata为用户提供了XA、AT、TCC和SAGA事务模式,能够满足企业级的分布式事务需求。分布式事务问题有很多解决方案,实际开发中,我们不是一定要使用Seata这个框架,这个框架目前版本本身设计的很粗劣而且有很多Bug,不知道后续官方是否还会持续优化,不过它的思想还是非常值得学习的,这篇笔记我们介绍分布式事务的概念以及Seata框架的使用。
官方文档:https://seata.apache.org/
项目Github地址:https://github.com/apache/incubator-seata
了解什么是分布式事务前,我们还得回顾一下什么是事务。简单来说,事务是并发系统中处理数据的工作单元。事务必须满足ACID原则:
原子性(Atomicity):原子性确保事务中的所有操作要么全部完成,要么全部不完成。换句话说,事务是一个不可分割的工作单元,如果事务中的任何部分失败,整个事务将回滚到初始状态,确保数据库不会处于不一致的状态。
一致性(Consistency):一致性确保事务在完成时,数据库从一个一致状态转换到另一个一致状态。事务必须遵守所有的数据库规则(如约束、触发器等),以确保数据的完整性。
隔离性(Isolation):隔离性确保多个事务并发执行时,它们的执行结果与这些事务按某一顺序串行执行的结果相同。隔离性防止事务之间的相互干扰,确保每个事务的中间状态对其他事务不可见。
持久性(Durability):持久性确保一旦事务提交,其对数据库的更改将永久保存,即使系统发生故障也不会丢失。持久性通常通过将事务日志写入非易失性存储来实现。
对于单体系统,事务由数据库保证,这很容易理解。然而,在分布式系统中情况就复杂很多了。微服务架构中,我们的一个操作可能跨越多个微服务,每个微服务后端都有自己的数据库,仅能保证事务在一个微服务内部有效是不够的,我们必须保证整个操作都在一次事务内,否则将会出现数据不一致的情况。
下面是一个例子,假设我们有一个订单服务,用户下单共涉及3个操作,生成订单、扣减账户余额、扣减库存,这三个操作跨越了3个微服务。
假设现在扣减库存时出现了问题,商品服务报错了,但订单已经插入了,而且账户余额也已经扣减了,这种跨越3个微服务的事务性用常规手段是无法保证的,此时就出现了数据一致性问题!
那么如何解决这类分布式事务问题呢?实际上,有一种观点认为,“分布式事务”本身是个伪命题,它是Java开发领域过度设计的一个案例,属于没有问题就创造问题的典型,它本质上是由于不合理的微服务划分引起的,我们完全可以把所有事务操作放在一个微服务里,这样就不存在“分布式事务”了,自然也就没有任何问题了!当然,这也是一种解决问题的思路,不过现实世界总是复杂的,有时就是由于种种原因我们必须解决一个分布式事务问题,Seata分布式事务框架就是为了解决此类问题而出现的。
解决分布式事务问题有很多种方式,例如2PC、TCC等,这里我们简单介绍一下分布式事务相关的概念。
在2PC或TCC中,分布式事务都有协调者(Coordinator)和参与者(Participants)的概念。
协调者:协调者是负责管理和协调分布式事务的组件,它的主要职责是确保所有参与者在事务的各个阶段都能达成一致。
参与者:参与者是实际执行事务操作的组件,它们通常是微服务、数据库、消息队列或其他资源管理器。
2PC是一种确保分布式系统中事务一致性的协议,它将事务的提交过程分为两个阶段:准备阶段(Prepare Phase)和提交阶段(Commit Phase)。2PC依赖于数据库的事务实现,2PC理解起来很简单,准备阶段所有子任务执行但不提交,所有子任务执行成功后,协调者再统一通知所有子任务提交;如果某个子任务处理失败则协调者将通知所有子任务回滚。
当事务正常执行时,流程如下图。
当某个步骤出错时,流程如下图。
执行过程:
参与者执行本地事务操作但先不提交事务,并将结果(同意或拒绝)反馈给协调者。
提交阶段:
TCC是Try、Confirm和Cancel的缩写,TCC不依赖于数据库实现的事务,它将事务分为三个阶段:尝试(Try)、确认(Confirm)和取消(Cancel)。在尝试阶段,TCC会采用一种冻结策略而非直接操作数据,例如扣减余额在尝试阶段会冻结余额中的一部分(可以理解为将余额数据减少但新增了一个冻结的余额记录),这样操作后不影响其它并发进行的事务继续对余额进行操作。尝试阶段所有子任务报告尝试成功后,那么就删除冻结记录,确认事务完成;如果某一个子任务失败就解冻资源取消操作,例如将冻结的余额加回余额中。
当事务正常执行时,流程如下图。
当某个步骤出错时,流程如下图。
执行过程:
进行业务检查并预留资源。例如,冻结库存或预留资金。
确认阶段:
如果所有尝试操作成功,执行确认操作,正式提交事务,例如确认扣减库存或扣款。
取消阶段:
前面我们介绍过分布式事务中的协调者和参与者的概念,Seata内部实际上就是按照类似的架构来设计的。
事务协调者 TC(Transaction Coordinator):管理和协调整个分布式事务。
事务管理器 TM(Transaction Manager):定义全局事务的范围,开始、提交或回滚全局事务。
资源管理器 RM(Resource Manager):管理子任务处理的资源,向TC注册子任务或报告状态。
我们这里以SpringCloud工程为例进行介绍,我们使用的是最新版本的SpringBoot 2.7.18,SpringCloud 2021.0.8和SpringCloud Alibaba 2021.0.5.0版本,对应的Seata版本为1.6.1。
注意:在所有操作之前,我们需要先准备一个SpringCloud工程和对应的Nacos注册中心以及MySQL数据库服务,我们这里将创建3个微服务作为例子,分别是order
服务、product
服务和account
服务,它们用于订单管理、商品管理和账户余额管理,所有微服务注册到Nacos注册中心。有关工程搭建由于过于复杂,这里就不过多演示了,我们可以参考笔记库中其它关于SpringBoot、SpringCloud和SpringCloud Alibaba的相关章节进行搭建。后续的所有操作步骤都假定你已经搭建完成并成功启动了微服务、注册中心和数据库。
在具体修改代码之前,我们需要先部署Seata事务协调者TC服务,我们可以在Seata官方网站找到对应版本的安装包。下载后我们可以看到它是一个Java编写的服务端程序,我们首先需要修改它的配置文件。
conf/application.yml
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: public
group: SEATA_GROUP
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: public
cluster: default
store:
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
user: root
password: root
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
我们可以看到配置分为几大板块,其中最重要的就是config
、registry
和store
配置,它们分别是Seata服务的配置中心、注册中心和存储数据的位置,一般来说在SpringCloud工程中,配置中心和注册中心选择Nacos,数据持久化选择数据库是比较常见的方式。具体的配置值这里就不逐一说明了,我们参考官方文档即可。
配置修改好后,我们即可运行bin/seata-server.sh
启动Seata服务,启动成功后我们可以看到Nacos界面中Seata服务已经注册成功。
我们首先需要在pom.xml
中添加依赖。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
然后在applications.properties
中添加Seata相关的配置。
seata.data-source-proxy-mode=XA
seata.registry.type=nacos
seata.registry.nacos.server-addr=127.0.0.1:8848
seata.registry.nacos.namespace=public
seata.registry.nacos.group=SEATA_GROUP
seata.registry.nacos.application=seata-server
seata.tx-service-group=default_tx_group
seata.service.vgroup-mapping.default_tx_group=default
配置中,seata.data-source-proxy-mode
是分布式事务模式,我们这里暂时设置为XA模式,其余配置都是和Seata服务的注册位置相关的,前面我们介绍过例子中的Seata注册在Nacos注册中心上,因此这里指定Nacos的地址、命名空间、组、服务名等信息,其中tx-service-group
是事务分组名,它用于和异地容灾的集群名相关联,这里我们设置一个默认值。
我们后面代码中实际操作的数据表结构如图。
下面我们展示分布式事务涉及3个微服务中的核心方法。
order
服务的添加订单方法如下。
package com.gacfox.demo.order.service;
import com.gacfox.demo.account.client.AccountClient;
import com.gacfox.demo.account.model.AccountBalanceReq;
import com.gacfox.demo.order.model.Order;
import com.gacfox.demo.product.client.ProductClient;
import com.gacfox.demo.product.model.Product;
import com.gacfox.demo.product.model.ProductStorageReq;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.UUID;
@Service("orderService")
public class OrderServiceImpl implements OrderService {
@Resource
private ProductClient productClient;
@Resource
private AccountClient accountClient;
@Override
@GlobalTransactional
public Order addOrder(Long userId, Long productId, int count) {
// 计算订单总金额
Product product = productClient.getProductById(productId);
if (product == null) {
throw new RuntimeException("产品不存在!");
}
BigDecimal money = product.getPrice().multiply(BigDecimal.valueOf(count));
// 扣减库存
productClient.deductStorage(
ProductStorageReq.builder()
.productId(productId).amount(count)
.build()
);
// 扣减账户余额
accountClient.deductBalance(
AccountBalanceReq.builder()
.userId(userId)
.amount(money)
.build()
);
// 插入订单表
return Order.builder()
.orderCode(UUID.randomUUID().toString().replaceAll("-", ""))
.userId(userId)
.productId(productId)
.money(money)
.build();
}
}
product
服务的查询商品和扣减库存代码如下。
package com.gacfox.demo.product.service;
import com.gacfox.demo.product.model.Product;
import com.gacfox.demo.product.model.ProductStorage;
import com.gacfox.demo.product.model.ProductStorageReq;
import com.gacfox.demo.product.repository.ProductRepository;
import com.gacfox.demo.product.repository.ProductStorageRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service("productService")
public class ProductServiceImpl implements ProductService {
@Resource
private ProductRepository productRepository;
@Resource
private ProductStorageRepository productStorageRepository;
@Override
@Transactional(rollbackFor = Exception.class)
public Product getProductById(Long productId) {
return productRepository.findById(productId).orElse(null);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deductStorage(ProductStorageReq productStorageReq) {
ProductStorage productStorage = productStorageRepository.findByProductId(productStorageReq.getProductId());
if (productStorage.getAmount() < productStorageReq.getAmount()) {
throw new RuntimeException("库存不足!");
}
productStorage.setAmount(productStorage.getAmount() - productStorageReq.getAmount());
productStorageRepository.save(productStorage);
}
}
account
服务的扣减账户余额代码如下。
package com.gacfox.demo.account.service;
import com.gacfox.demo.account.model.AccountBalance;
import com.gacfox.demo.account.model.AccountBalanceReq;
import com.gacfox.demo.account.repository.AccountBalanceRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service("accountService")
public class AccountServiceImpl implements AccountService {
@Resource
private AccountBalanceRepository accountBalanceRepository;
@Override
@Transactional(rollbackFor = Exception.class)
public void deductBalance(AccountBalanceReq accountBalanceReq) {
AccountBalance accountBalance = accountBalanceRepository.findByUserId(accountBalanceReq.getUserId());
if (accountBalance.getBalance().compareTo(accountBalanceReq.getAmount()) < 0) {
throw new RuntimeException("余额不足");
}
accountBalance.setBalance(accountBalance.getBalance().subtract(accountBalanceReq.getAmount()));
accountBalanceRepository.save(accountBalance);
}
}
代码很简单,其实就是创建订单时会调用扣减库存和扣减账户余额两个位于其它微服务的操作,然而由于跨越了微服务因此产生了分布式事务问题。先不考虑分布式事务,按照代码的逻辑,扣减库存、扣减余额、插入订单表是依次发生的,如果扣减库存成功而扣减余额却失败,此时就出现了数据一致性问题,库存数据莫名的减少了!
不过上面和我们熟悉的代码之间唯一的不同就是order
服务的方法上添加了一个@GlobalTransactional
注解,它是Seata提供的注解,用于开启全局分布式事务。这里我们使用的是XA事务模式,它采用两阶段提交,具有强分布式一致性,因此实际上当扣减余额失败时,所有事务都会回滚。由此可见,Seata的使用实在太简单了,我们甚至只修改了一行代码就实现了复杂的分布式事务!
注意:Seata是通过拦截JDBC方法实现的,因此无论我们使用MyBatis还是Hibernate都可以正常使用Seata。
Seata中有几种常用的分布式事务模式,这里我们简单介绍一下。
XA(X/Open XA,一个分布式事务标准名)模式和我们前面介绍的2PC方案流程完全一致。开启XA模式需要修改application.properties
中的seata.data-source-proxy-mode
,除此之外不需要额外代码。
seata.data-source-proxy-mode=XA
前面例子我们使用的就是XA模式,它采用标准的两阶段提交,具备强分布式一致性。
AT(Automatic Transaction)模式是Seata是默认分布式事务模式,它也是一种两阶段提交实现的分布式事务,但和XA模式有一些区别。XA模式中,第一阶段所有的微服务执行数据操作但不提交,而AT模式则是直接提交数据,AT模式的回滚依靠被操作数据的两个快照,执行前快照和执行后的快照。
举例来说,假设我们扣减库存将执行SQL语句update t_product_storage set amount=9 where product_id=1
,在第一阶段AT模式的事务会直接执行并提交这个SQL语句,然而它同时还会自动保存两个快照,执行前的amount
值例如10,以及执行后的amount
值9。在第二阶段,如果全部子任务执行成功,那么分布式事务成功结束,对原始数据不需要任何修改,删除快照即可;但如果某个子任务执行失败,此时就要回滚了,Seata将首先比对数据是否仍是9(如果不是9说明数据还被其它事务修改了,此时就无法处理了,必须人工介入),如果仍是9就可以放心大胆的用10这个快照值来回滚数据。
由此可见,AT模式不存在XA模式长时间锁定数据的缺点,在高并发的场景下能够提供更好的性能,然而AT模式不具备强分布式一致性,它只能说具备最终一致性,事务执行过程中具有那么一瞬间的不一致状态(也被称为软状态),且数据更新的过程中是有小概率出现不可恢复失败的。对于那些不太重要的数据一般来说都推荐使用AT模式,只有处理金额等敏感数据时才会考虑使用最严格的XA模式。
使用AT模式配置方式也是类似的,除此之外不需要额外代码。
seata.data-source-proxy-mode=AT
TCC的具体流程前面我们已经介绍过,通过前面流程图我们可以知道,TCC有3个重要操作:Try、Commit和Cancel,显然这些操作是不能通过代码框架帮我们生成的,此时我们就必须手动编写代码了。TCC模式使用起来非常复杂,我们这里以一个例子的形式介绍如何实现TCC模式。
在具体编码前,我们还需要再额外了解两个概念。
空回滚:空回滚问题是指如果子任务的Try阶段没执行,我们的Cancel应该是个空操作,而如果Try确实执行了,Cancel才是正常的回滚操作,例子如下图所示。
图中,TM通知微服务B执行任务时阻塞了,任务超时开始回滚,此时TC通知执行Cancel操作,由于微服务B的Try操作并未执行,因此它的Cancel操作应该只记录“Cancel已执行”,而不对真正的数据做任何更新。
业务悬挂:还是上面的图,假如Cancel已经执行完成,这意味事务的生命周期已经结束,然而此时之前阻塞的请求突然又发到了微服务B,如果B执行Try操作冻结资源,数据一致性就出问题了,B必须通过一定的机制判断是否要执行这个Try操作。
幂等性:在不稳定的网络环境中,Try、Cancel等操作可能被多次重试,微服务可能接到多次指令,幂等性要求即使这些操作被调用多次,也应保持数据的一致性。
我们实现TCC具体代码时,必须严格考虑空回滚、业务悬挂和幂等性,否则很容易导致难以定位的复杂问题。
在Seata中,AT和TCC其实可以混合使用,它们都具有分布式最终一致性,这里我们修改之前例子中product
微服务的扣减库存操作,将其改为TCC模式。TCC模式的Try操作是一个冻结操作,我们这里需要新建一个t_product_storage_freeze
表来记录冻结数据,不同业务逻辑的TCC做法可能不同,同一种逻辑也可能有不同的实现方式,因此不能一概而论,这里我们的写法仅供参考。
表结构中,xid
是事务ID,由Seata框架提供,state
是TCC状态,其中0表示Try,1表示Confirm,2表示Cancel,子任务执行了对应操作后会将状态设置为对应值。
TCC要求对于子事务逻辑创建一个接口,接口的实现分别是Try、Confirm和Cancel逻辑。
package com.gacfox.demo.product.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface ProductStorageService {
@TwoPhaseBusinessAction(name = "prepare", commitMethod = "confirm", rollbackMethod = "cancel")
void prepare(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "amount") Integer amount
);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
代码中prepare()
方法就是Try阶段时调用的,它有两个参数,分别是锁定库存需要用到的产品ID和库存数,这些参数可以使用@BusinessActionContextParameter
注解标注,被标注的参数会被存储在Seata内部的ActionContext
中,在后续的Confirm和Cancel阶段我们还可以将其取出来。
注意:ActionContext
底层会使用阿里的FastJSON对数据进行序列化和反序列化,不知道这算不算一个Bug,上面代码其实就存在一个Integer
和Long
类型的问题,JSON反序列化时productId
其实会被设置为整型,这会造成后续类型转换错误,不知道后续版本会不会改进这一点。
下面代码是ProductStorageService
的实现类,代码中对前面的空回滚、业务悬挂和幂等性都有相对应的处理方式。
package com.gacfox.demo.product.service;
import com.gacfox.demo.product.model.ProductStorage;
import com.gacfox.demo.product.model.ProductStorageFreeze;
import com.gacfox.demo.product.repository.ProductStorageFreezeRepository;
import com.gacfox.demo.product.repository.ProductStorageRepository;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
@Service("productStorageService")
public class ProductStorageServiceImpl implements ProductStorageService {
@Resource
private ProductStorageRepository productStorageRepository;
@Resource
private ProductStorageFreezeRepository productStorageFreezeRepository;
@Override
public void prepare(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "amount") Integer amount) {
String xid = RootContext.getXID();
if (StringUtils.hasText(xid)) {
// 判断业务悬挂
ProductStorageFreeze freeze = productStorageFreezeRepository.findById(xid).orElse(null);
if (freeze != null) {
// CANCEL执行过,拒绝执行
return;
}
// 扣减库存
ProductStorage productStorage = productStorageRepository.findByProductId(productId);
if (productStorage.getAmount() < amount) {
throw new RuntimeException("库存不足!");
}
productStorage.setAmount(productStorage.getAmount() - amount);
productStorageRepository.save(productStorage);
// 插入冻结记录
ProductStorageFreeze newFreeze = ProductStorageFreeze.builder()
.xid(xid)
.productId(productId)
.amount(amount)
.state(ProductStorageFreeze.STATE_TRY)
.build();
productStorageFreezeRepository.save(newFreeze);
}
}
@Override
public boolean confirm(BusinessActionContext context) {
boolean result = false;
String xid = RootContext.getXID();
if (StringUtils.hasText(xid)) {
// CONFIRM,直接删除冻结记录即可
return productStorageFreezeRepository.deleteByXid(xid) == 1;
}
return result;
}
@Override
public boolean cancel(BusinessActionContext context) {
boolean result = false;
String xid = RootContext.getXID();
Long productId = (Long) context.getActionContext("productId");
Integer amount = (Integer) context.getActionContext("amount");
if (StringUtils.hasText(xid) && productId != null && amount != null) {
// 判断是否空回滚,如果TRY未执行则需要空回滚
ProductStorageFreeze freeze = productStorageFreezeRepository.findById(xid).orElse(null);
if (freeze == null) {
// 是空回滚,记录CANCEL执行信息,冻结字段置为0
ProductStorageFreeze newFreeze = ProductStorageFreeze.builder()
.xid(xid)
.productId(productId)
.amount(0)
.state(ProductStorageFreeze.STATE_CANCEL)
.build();
productStorageFreezeRepository.save(newFreeze);
return true;
}
// 幂等性判断
if (freeze.getState() == ProductStorageFreeze.STATE_CANCEL) {
// 重复执行
return true;
}
// 回滚逻辑,恢复扣减的库存,冻结字段清零,状态改为CANCEL
ProductStorage productStorage = productStorageRepository.findByProductId(productId);
productStorage.setAmount(productStorage.getAmount() + amount);
productStorageRepository.save(productStorage);
freeze.setAmount(0);
freeze.setState(ProductStorageFreeze.STATE_CANCEL);
productStorageFreezeRepository.save(freeze);
return true;
}
return result;
}
}
注:我使用的这个版本Seata似乎还有另一个Bug,我使用SpringCloud OpenFeign进行微服务调用,但Cancel阶段的xid
会丢失,而且是只有Cancel阶段才有问题,因此实际上回滚是一直失败的,不知道官方后续会不会解决这个问题。
此外,我们要知道的是TCC模式也具有较好的并发性能,TCC是基于巧妙的“资源冻结”实现的数据提交和回滚,它不需要数据库支持事务,甚至可以用于读写非数据库的场景,不锁定数据意味着TCC有最好的并发性能;当然和AT类似,TCC也不具有强分布式一致性,只能说具有最终一致性。