什么是事务?

  • 事务是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消。也就是事务具有原子性,一个事务中的一系列的操作要么全部成功,要么一个都不做。

  • 数据库事务提供了一种机制,可用来将一系列数据库更改归入一个逻辑操作。更改数据库后,所做的更改可以作为一个单元进行提交或取消。

  • 事务可确保遵循原子性、一致性、隔离性和持续性(ACID)这几种属性,以使数据能够正确地提交到数据库中

事务的四大特性

  • 事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

  • 原子性(Atomicity)

    • 一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做
  • 一致性(Consistency)

    • 事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的

    • 当事务成功完成,系统中所有的改变都将正确应用,都处于成功状态

    • 当事务出现错误,系统中所有改变都将回滚,回滚到初始状态

      1
      2
      3
      如:转帐前,A账户有400元,B账户有300元,A向B转账200元
      如果转账事务成功,则A账户必然还剩200元,而B账户有500元,同为成功后状态;
      如果转账事务失败,则A账户必然还是400元,B账户还有300元,同为初始状态;
  • 隔离性(Isolation)

    • 并发环境下,当不同的事务同时操作相同数据时,每个事务都有各自的完整数据空间。

    • 即并发事务所做修改必须与任何其他并发事务修改隔离。其他事务的修改只会在另一事务修改成功之前或之后,而不存在中间状态的数据

  • 持久性(Durability)

    • 只要事务完成,状态将得到永久保存,崩溃可修复

什么是分布式事务?

  • 分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点上

为什么需要分布式事务?

  • 随着技术和数据的发展,数据量和访问量的急剧暴涨,单一的服务体系已经无法满足日益增长的压力,所以分布式系统应运而生。而分布式系统不可避免地一个设计就是跨数据库。

  • 在大型互联网公司中,数据库一般都会采取垂直分割的方式,按照业务需求划分具体数据库,分库。而在跨库甚至跨服务器节点的情况下,如何保证操作事务的一致性和准确性,显得更为复杂,也显得更为重要。

  • 简单来说,分布式事务的出现是为了保证分布式系统的事务一致性。

分布式事务的理论知识

CAP理论—–分布式系统的基础核心理论

  • C—- 数据一致性(consistency)

    • 如果系统对一个写操作返回成功,那么后续所有读请求都应该读取到这个写操作的新数据。
    • 如果系统对一个写操作返回失败,那么后续所有读请求都应该读取不到这个写操作的数据。
  • A—- 服务可用性(availability)

    • 所有读写操作都会在一定时间内得到响应,可终止,不会一直阻塞等待。
  • P—- 分区容错性(partition-tolerance)

    • 在网络分区的情况下,被分隔的节点仍能正常对外服务
  • 注:仅能选择CP架构或AP架构,因为如果放弃了分区容错性(P),则当某一分区节点故障,为了保证一致性(C),则必须拒绝连接,不满足服务可用性(A)。然而P的情况并不多见,故而尽量保证CA才是合理的

Bash理论—-基于CAP定理演化而来,对AP的扩展

  • Basically Available(基本可用):分布式系统在出现故障的时候,允许损失部分可用性,保证核心可用。[不代表不可用,如允许延迟和降级等]

  • Soft state(软状态):允许系统中存在中间状态,这个状态不影响系统可用性,这里指的是 CAP 中的不一致[允许时延]

  • Eventually consistent (最终一致性): 经过一定时间后,所有节点数据都会达到一致

  • 注:Bash理论完全不同于ACID的强一致模型,允许存在一定时间的数据不一致性,并在最终保证数据一致性。即牺牲一致性换取可用性。

分布式事务与本地事务的区别(个人理解)?

  • 本地事务:负责保证同一个服务的同一个数据库的操作同时成功与失败

  • 分布式事务:负责保证不同服务的同一个或不同数据库的操作的成功与失败(可以允许不同步)

  • 总的来说,分布式事务相当于作为管理者角色站在服务的层面去管理各个服务的本地事务提交

分布式事务解决方案

  • 目前,分布式事务的解决方案主要有

    • 全局消息【DTP模型】

    • 基于可靠消息服务的分布式事务【消息中间件扮演分布式事务协调者的角色】

    • TCC【Try Confirm Cancel,补偿型分布式事务】

    • 最大努力通知【定期校对】

基于XA协议的两阶段提交

  • 分布式事务通常采用2PC协议,全称Two Phase Commitment Protocol。该协议主要为了解决在分布式数据库场景下,所有节点间数据一致性的问题。

  • 分布式事务通过2PC协议将提交分成两个阶段:

    • prepare
    • commit/rollback
  • 阶段一为准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。

  • 阶段二为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。

消息事务+最终一致性

  • 所谓的消息事务就是基于消息中间件的两阶段提交,本质上是对消息中间件的一种特殊利用,它是将本地事务和发消息放在了一个分布式事务里,保证要么本地操作成功并且对外发消息成功,要么两者都失败

  • 分布式事务,本质上是对多个数据库的事务进行统一控制

    • 按照控制力度可以分为:不控制、部分控制和完全控制。
      • 不控制就是不引入分布式事务,部分控制就是各种变种的两阶段提交,包括上面提到的消息事务+最终一致性、TCC模式,而完全控制就是完全实现两阶段提交。
    • 部分控制的好处是并发量和性能很好,缺点是数据一致性减弱了,完全控制则是牺牲了性能,保障了一致性,具体用哪种方式,最终还是取决于业务场景。

分布式事务框架—TX-LCN

  • LCN并不生产事务,LCN只是本地事务的协调工【引自官网http://www.txlcn.org/zh-cn/index.html】

  • TX-LCN定位于一款事务协调性框架,框架其本身并不操作事务,而是基于对事务的协调从而达到事务一致性的效果。

命名由来

  • L—-锁定事务单元(Lock)

  • C—-确认事务模块状态(Confirm)

  • N—-通知事务(Notify)

兼容模式

  • LCN框架从5.0开始兼容了LCN、TCC、TXC三种事务模式,为了和LCN框架区分,从5.0开始把LCN框架更名为:TX-LCN分布式事务框架。

    • LCN模式:通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理
    • TCC模式:不依赖资源管理器(RM)对XA的支持,而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try: 尝试执行业务、 Confirm:确认执行业务、 Cancel: 取消执行业务。
    • TXC模式:在执行SQL之前,先查询SQL的影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制。
  • 每种模式在实际使用时有着自己对应的注解。

    • LCN:@LcnTransaction

    • TCC:@TccTransaction

    • TXC:@TxcTransaction

框架原理

  • TX-LCN由两大组件组成:

    • TxClient:作为模块的依赖框架,提供TX-LCN的支持,用于控制事务的发起方和参与方;
    • TxManager:作为分布式事务的控制方。

核心步骤

  1. 创建事务组 —— 事务发起方开始执行业务代码之前先调用TxManager创建事务组对象,然后拿到事务标示GroupId的过程。
  2. 加入事务组 —— 事务参与方执行完业务方法以后,将该模块的事务信息通知给TxManager的操作
  3. 通知事务组 —— 事务发起方执行完业务代码以后,将发起方执行结果状态通知给TxManager,TxManager将根据事务最终状态和事务组的信息来通知相应的事务参与方提交或回滚事务,并返回结果给事务发起方。

整合分布式事务

引依赖

  • 新建模块:system-tx-manager,作为事务管理器(协调器,控制方)

  • 核心依赖:txlcn-tm,该依赖包内置了JPA依赖,所以只需要再引入数据库连接池即可实现数据库操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    <dependencies>
    <!--分布式事务管理器-->
    <dependency>
    <groupId>com.codingapi.txlcn</groupId>
    <artifactId>txlcn-tm</artifactId>
    </dependency>
    <!--druid连接池-->
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    </dependency>
    <!--缓存依赖-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--eureka 客户端-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <!--实时健康监控-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--配置客户端-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-config-client</artifactId>
    </dependency>
    <!--作为web项目存在-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    </dependencies>

加配置

  • 在resource目录下添加application.propertise(貌似只支持propertise文件)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    spring.application.name=tx-manager
    server.port=7970
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring?characterEncoding=UTF-8
    spring.datasource.username=spring
    spring.datasource.password=spring
    mybatis.configuration.map-underscore-to-camel-case=true
    mybatis.configuration.use-generated-keys=true
    # 数据库方言
    spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect
    # 第一次运行可以设置为: create, 为TM创建持久化数据库表
    #spring.jpa.hibernate.ddl-auto=validate
    # TM后台登陆密码,默认值为codingapi;访问路径http://127.0.0.1:7970/admin/index.html
    tx-lcn.manager.admin-key=admin
    # TM监听IP. 默认为 127.0.0.1
    tx-lcn.manager.host=127.0.0.1
    # TM监听Socket端口. 默认为 ${server.port} - 100
    tx-lcn.manager.port=8070
    # 心跳检测时间(ms)
    tx-lcn.manager.heart-time=15000
    # 分布式事务执行总时间
    tx-lcn.manager.dtx-time=30000
    #参数延迟删除时间单位ms
    tx-lcn.message.netty.attr-delay-time=10000
    tx-lcn.manager.concurrent-level=128
    # 开启日志,记录在数据库的t_logger表格中
    tx-lcn.logger.enabled=true
    logging.level.com.codingapi=debug
    tx-lcn.logger.driver-class-name=${spring.datasource.driver-class-name}
    tx-lcn.logger.jdbc-url=${spring.datasource.url}
    tx-lcn.logger.username=${spring.datasource.username}
    tx-lcn.logger.password=${spring.datasource.password}
    #redis 主机
    #spring.redis.host=127.0.0.1
    #redis 端口
    #spring.redis.port=6379
    #redis 密码
    #spring.redis.password=

启动类

  • 使用@EnableTransactionManagerServer开启事务管理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * @description 启动类
    * @EnableTransactionManagerServer 开启事务管理器
    * @date: 2019-06-19 15:08
    */
    @EnableEurekaClient
    @SpringBootApplication
    @EnableTransactionManagerServer
    public class SpringCloudTMApplication {
    public static void main(String[] args) {
    SpringApplication.run(SpringCloudTMApplication.class, args);
    }
    }

测试

  • 此时启动服务,访问http://127.0.0.1:7970/admin/index.html 查看TM的管理页面,使用上述配置的密码admin进行登录

  • 系统后台提供了TxManager的配置信息查看,异常信息记录以及系统日志的展示

  • 其中的异常信息记录对应于自动生成的数据库表t_tx_exception

  • 系统日志对应于自动生成的数据库表t_logger

构建TxClient(TC)

  • 首先创造两个服务的调用环境,现有两个服务模块:springcloud-client和springcloud-client2

  • springcloud-client有一张表user结构如下:

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE `user` (
    `id` int NOT NULL AUTO_INCREMENT,
    `name` varchar(255) DEFAULT NULL,
    `age` int DEFAULT NULL,
    `is_delete` bit(1) DEFAULT b'0',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8
  • springcloud-client2有一张表user_total结构如下:

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE `user_total` (
    `id` int NOT NULL,
    `total` bigint DEFAULT NULL,
    `create_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `create_by` varchar(255) DEFAULT 'DBA',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

springcloud-client

  • 实现如下功能:通过springcloud-client模块新增user记录的时候,同步更改client_total对应客户端路径下的user_total总数,故而涉及服务间调用,为方便演示,使用同一个数据库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 新增User,用于测试分布式事务
    * @param user
    * @return
    */
    @PostMapping("/test/feign/user")
    public Boolean addUser(@RequestBody User user) {
    this.userService.addUser(user);
    return Boolean.TRUE;
    }
  • 其中,ClientService.addUser(User user)接口实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Service
    public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
    //内部调用接口的feignclient
    @Autowired
    private Client2FeignIntercept client2FeignIntercept;

    @Transactional //本地事务注解
    public void addUser(User user) {
    this.baseMapper.insert(user);
    //调用内部接口更新client_total
    this.client2FeignIntercept.addUser(user.getName());
    }
    }
  • Client2FeignIntercept..addUser(String name)接口如下:

    1
    2
    3
    4
    5
    6
    7
    //指向注册名为springcloud-client2的服务
    @FeignClient(value = "springcloud-client2", name = "springcloud-client2")
    public interface Client2FeignIntercept {
    //指向springcloud-client2中接口路径为/addUserTotal的POST接口
    @PostMapping("/addUserTotal")
    Boolean addUser(@RequestParam String name);
    }
  • 在springcloud-client的启动类加上@EnableFeignClients注解开启内部调用

    1
    2
    3
    4
    5
    6
    7
    8
    @EnableFeignClients
    @EnableEurekaClient
    @SpringBootApplication
    public class SpringCloudClientApplication {
    public static void main(String[] args) {
    SpringApplication.run(SpringCloudClientApplication.class, args);
    }
    }

springcloud-client2

  • 在springcloud-client2提供接口更新相应的UsertTotal信息的接口/addUserTotal

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @RestController
    public class ClientTotalController {
    private ClientTotalService clientTotalService;
    /**
    * 新增用户的总数
    * @param name 用户名
    * @return
    */
    @PostMapping("/addUserTotal")
    public Boolean addClient(String name) {
    this.userTotalService.incr(name);
    return Boolean.TRUE;
    }
    }
  • 其中userTotalService.incr(String name)接口实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Service
    public class UserTotalServiceImpl extends ServiceImpl<UserTotalMapper, UserTotal> implements UserTotalService {
    @Transactional //本地事务注解
    public void incr(String name) {
    UserTotal userTotal = new UserTotal();
    userTotal.setCreateBy(name);
    userTotal.setTotal(1L);
    this.baseMapper.insert(userTotal);
    }
    }

测试

正常事务提交

  • 清空数据库后, 启动客户端,调用接口增加用户 Test001

  • 查看数据库

两端事务不一致

  • 尝试结果:springcloud-client提交失败,springcloud-client2事务成功

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Transactional //本地事务注解
    public void incr(String name) {
    try {
    Thread.sleep(5000); //线程休眠5秒,造成访问超时
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    UserTotal userTotal = new UserTotal();
    userTotal.setCreateBy(name);
    userTotal.setTotal(1L);
    this.baseMapper.insert(userTotal);
    }
  • 发送请求,可以看到超时错误

  • 查询数据库,测试结果:springcloud-client事务提交后回滚,springcloud-client2事务提交成功

保证事务一致

  • 正式构建TC,引依赖,在springcloud-client和springcloud-client2中引入依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <!--分布式事务客户端-->
    <dependency>
    <groupId>com.codingapi.txlcn</groupId>
    <artifactId>txlcn-tc</artifactId>
    </dependency>
    <dependency>
    <groupId>com.codingapi.txlcn</groupId>
    <artifactId>txlcn-txmsg-netty</artifactId>
    </dependency>
  • 加注解;在启动类添加@EnableDistributedTransaction注解,开启TC

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @SpringBootApplication
    @EnableEurekaClient
    @EnableFeignClients
    @EnableDistributedTransaction // 开启TC
    public class SpringCloudClientApplication {
    public static void main(String[] args){
    SpringApplication.run(SpringCloudClientApplication.class, args);
    }

    }
    ---------------------------------------------
    @EnableFeignClients
    @EnableEurekaClient
    @SpringBootApplication
    @EnableDistributedTransaction // 开启TC
    public class SpringCloudClient2Application {
    public static void main(String[] args) {
    SpringApplication.run(SpringCloudClient2Application.class, args);
    }
    }
  • 在service接口中添加 @LcnTransaction 分布式事务注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Transactional //本地事务注解
    @LcnTransaction //分布式事务注解
    public void addUser(User user) {
    this.baseMapper.insert(user);
    //调用内部接口更新client_total
    this.client2FeignIntercept.addUser(user.getName());
    }
    ---------------------------------------------
    @Transactional //本地事务注解
    @LcnTransaction //分布式事务注解
    public void incr(String name) {
    try {
    Thread.sleep(5000); //线程休眠5秒,造成访问超时
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    UserTotal userTotal = new UserTotal();
    userTotal.setCreateBy(name);
    userTotal.setTotal(1L);
    this.baseMapper.insert(userTotal);
    }
  • 此时再去访问接口新增客户端,clinet访问超时,但是 client2中的事务也会失败。