分布式事务
分布式事务问题
在微服务中,由于数据库的垂直拆分和 RPC 远程调用,导致当 A 服务的数据源发生回滚,不会影响到 B 服务的数据源回滚,产生分布式事务问题。
解决方案
- 2PC
- 3PC
- CAP
- 柔性事务
- 刚性事务
- LCN
Seata
Seata(Simple Extensible Autonomous Transaction Architecture)简单可扩展自治事务框架是阿里巴巴开源的分布式事务解决方案。
Seata 中有三大模块:
- TC:事务协调器
- TM:事务管理器
- RM:资源管理器
其中 TM 和 RM 作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署,负责协调各个分支事务。
Seata 有 4 种分布式事务解决方案: AT 模式、TCC 模式、Saga 模式和 XA 模式。
使用Seata
准备TC服务
核心配置
注册中心的配置:在 ${seata_home}/conf/
目录中,一般是 registry.conf
文件。
主要配置两个内容:
注册中心的类型及地址:
如选择Eureka
做注册中心。
eureka.serviceUrl:Eureka地址,如http://localhost:8761/eureka
application:TC 注册到 Eureka 时的服务名称,如seata_tc_server
.
当前服务的配置,两种配置方式:分布式服务的统一配置中心或本地文件。
registry {
# 指定注册中心类型,这里使用eureka类型
type = "eureka"
# 各种注册中心的配置。。这里省略,只保留了eureka和Zookeeper
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "seata_tc_server"
weight = "1"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
}
config {
# 配置文件方式,可以支持 file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}
关键配置:
- store:TC的服务端数据存储配置
- mode:数据存储方式,支持两种:file和db
- file:将数据存储在本地文件中,性能比较好,但不支持水平扩展
- db:将数据保存在指定的数据库中,需要指定数据库连接信息
- mode:数据存储方式,支持两种:file和db
## transaction log store, only used in seata-server
store {
## store mode: file、db
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata_demo"
user = "root"
password = "123"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
如果用文件作为存储介质,不需要其它配置了,直接运行即可。 但是如果使用db作为存储介质,还需要在数据库中创建3张表:
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
启动TC服务。
改造事务发起者服务
微服务的改造,不管是哪一个微服务,只要是事务的参与者,步骤基本一致。
- 引入依赖。
- 添加配置文件。
- 代理DataSource
- 添加事务注解
在父工程seata-demo中已经对依赖做了管理:
<alibaba.seata.version>2.1.0.RELEASE</alibaba.seata.version>
<seata.version>1.1.0</seata.version>
因此,我们在子项目的pom文件中,引入依赖坐标即可:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${alibaba.seata.version}</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
<version>${seata.version}</version>
</dependency>
首先在application.yml中添加一行配置:定义事务组的名称,接下来会用到。
spring:
cloud:
alibaba:
seata:
tx-service-group: test_tx_group # 定义事务组的名称
然后是在resources目录下放两个配置文件:file.conf和registry.conf
其中,registry.conf与TC服务端的一样。
file.conf:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroup_mapping.test_tx_group = "seata_tc_server"
#only support when registry.type=file, please don't set multiple addresses
seata_tc_server.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
配置解读:
- transport:与TC交互的一些配置
- heartbeat:client和server通信心跳检测开关
- enableClientBatchSendRequest:客户端事务消息请求是否批量合并发送
- service:TC的地址配置,用于获取TC的地址
- vgroup_mapping.test_tx_group = "seata_tc_server":
- test_tx_group:是事务组名称,要与application.yml中配置一致,
- seata_tc_server:是TC服务端在注册中心的id,将来通过注册中心获取TC地址
- enableDegrade:服务降级开关,默认关闭。如果开启,当业务重试多次失败后会放弃全局事务
- disableGlobalTransaction:全局事务开关,默认false。false为开启,true为关闭
- default.grouplist:这个当注册中心为file的时候,才用到
- vgroup_mapping.test_tx_group = "seata_tc_server":
- client:客户端配置
- rm:资源管理器配
- asynCommitBufferLimit:二阶段提交默认是异步执行,这里指定异步队列的大小
- lock:全局锁配置
- retryInterval:校验或占用全局锁重试间隔,默认10,单位毫秒
- retryTimes:校验或占用全局锁重试次数,默认30次
- retryPolicyBranchRollbackOnConflict:分支事务与其它全局回滚事务冲突时锁策略,默认true,优先释放本地锁让回滚成功
- reportRetryCount:一阶段结果上报TC失败后重试次数,默认5次
- tm:事务管理器配置
- commitRetryCount:一阶段全局提交结果上报TC重试次数,默认1
- rollbackRetryCount:一阶段全局回滚结果上报TC重试次数,默认1
- undo:undo_log的配置
- dataValidation:是否开启二阶段回滚镜像校验,默认true
- logSerialization:undo序列化方式,默认Jackson
- logTable:自定义undo表名,默认是undo_log
- log:日志配置
- exceptionRate:出现回滚异常时的日志记录频率,默认100,百分之一概率。回滚失败基本是脏数据,无需输出堆栈占用硬盘空间
- rm:资源管理器配
Seata的二阶段执行是通过拦截sql语句,分析语义来指定回滚策略,因此需要对DataSource做代理。
在项目的config包中,添加一个配置类: 注意,这里因为订单服务使用了mybatis-plus这个框架(这是一个mybatis集成框架,自动生成单表Sql),因此我们需要用mybatis-plus的MybatisSqlSessionFactoryBean代替SqlSessionFactoryBean如果用的是原生的mybatis,请使用SqlSessionFactoryBean。
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
// 订单服务中引入了mybatis-plus,所以要使用特殊的SqlSessionFactoryBean
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// 代理数据源
sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
// 生成SqlSessionFactory
return sqlSessionFactoryBean.getObject();
}
}
给事务发起者的服务实现类中的方法添加@GlobalTransactional注解,开启全局事务。
改造事务被调用服务
与发起者类似,这里也要经过下面的步骤:
- 引入依赖:与order-service一致,略
- 添加配置文件:与order-service一致,略
- 代理DataSource,我们的storage-service和account-service都没有用mybatis-plus,所以配置要使用SqlSessionFactory:
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
// 因为使用的是mybatis,这里定义SqlSessionFactoryBean
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
// 配置数据源代理
sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
return sqlSessionFactoryBean.getObject();
}
}
另外,事务注解可以使用 @Transactionnal
,而不是 @GlobalTransactional
,事务发起者才需要添加 @GlobalTransactional
.
AT模式
比较常用的是 AT 模式。
LCN分布式事务框架
LCN 分布式事务框架其本身并不创建事务,而是基于对本地事务的协调从而达到事务一致性的效果。
- 启动redis数据库用于tm-manager处理过程中事务补偿。
- 在mysql数据库中建立表用于TM-manager在处理过程中记录相关的事务组信息,创建tx-manager数据库并且创建一个表。
- 新建一个模块,作为事务管理TM-manager项目。导入依赖。
- 编写配置文件,LCN官方建议TM微服务的端口为8069,TM客户端请求TM服务器的端口为8070.
- 在TM服务器的启动类上添加两个注解。
- TM客户端配置,导入依赖
- 每个客户端的yml文件中增加配置,用于连接TM服务器。
- 客户端启动类添加注解开启分布式事务的支持。
- 客户端在需要添加事务处理的方法上添加注解。
CREATE TABLE `t_tx_exception` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`group_id` varchar(64) DEFAULT NULL,
`unit_id` varchar(32) DEFAULT NULL,
`mod_id` varchar(128) DEFAULT NULL,
`transaction_state` tinyint(4) DEFAULT NULL,
`registrar` tinyint(4) DEFAULT NULL,
`remark` varchar(4096) DEFAULT NULL,
`ex_state` tinyint(4) DEFAULT NULL COMMENT '0 未解决 1已解决',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
spring.application.name=txlcn-tm
server.port=8069
eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8050/eureka/
#健康检查(需要spring-boot-starter-actuator依赖)
eureka.client.healthcheck.enabled=true
# 续约更新时间间隔(默认30秒)
eureka.instance.lease-renewal-interval-in-seconds=10
# 续约到期时间(默认90秒)
eureka.instance.lease-expiration-duration-in-seconds=10
#eureka服务列表显示ip+端口
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=http://${spring.cloud.client.ip-address}:${server.port}
eureka.instance.hostname= ${spring.cloud.client.ip-address}
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/tx-manager?serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.password=123456
spring.datasource.username=root
# TM后台登陆密码
tx-lcn.manager.admin-key=123456
tx-lcn.manager.host=127.0.0.1
tx-lcn.manager.port=8070
# 开启日志,默认为false
tx-lcn.logger.enabled=true
tx-lcn.logger.driver-class-name=${spring.datasource.driver-class-name}
tx-lcn.logger.jdbc-url=${spring.datasource.url}
tx-lcn.logger.username=${spring.datasource.username}
tx-lcn.logger.password=${spring.datasource.password}
#redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
@EnableTransactionManagerServer @EnableEurekaClient
<!--lcn事务客户端开始-->
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<!--lcn事务客户端结束-->
#lcn事务管理器ip端口
tx-lcn:
client:
manager-address: 127.0.0.1:8070
@EnableDistributedTransaction
@LcnTransaction//分布式事务 @Transactional//本地事务注解
分阶段提交
DTP和XA
分布式事务解决手段之一,两阶段提交协议(2PC:Two-Phase Commit) 该模式包括四个角色:
- 应用程序(AP):微服务
- 事务管理器(TM):全局事务管理者
- 资源管理器(RM):数据库
- 通信资源管理器(CRM):TM和RM的通信中间件
在该模型中,一个分布式事务可以被拆分成许多个本地事务,运行在不同的AP和RM上。每个本地事务的ACID很好实现,当全局事务必须保证其中每一个事务都能同时成功,若有一个事务失败,则其他事务都必须回滚。问题是本地事务处理时,不知道其他事务的运行状态。因此需要CRM来通知各个本地事务,同步事务执行的状态。
因此各个事务间的通信必须有统一的标准,否则不同数据库间就无法通信,XA就是X/Open DTP中通信中间件与TM间联系的接口规范,定义了用于通知事务开始、提交、终止、回滚等接口,各个数据库厂商都必须实现这些接口。
二阶段提交
二阶提交协议,将全局事务拆分位两个阶段来执行:
- 一阶段:准备阶段,各个本地事务完成本地事务的准备工作。
- 二阶段:执行阶段,各个本地事务根据上一阶段的执行结果,进行提交或回滚。