分布式事务 - AT模式Dubbo集成Seata

时间:2023-11-21 10:35:04  热度:0°C

本篇基于Dubbo集成Seata实现一个分布式事务的解决方案,在整个业务流程中,会涉及如下三个服务:

  • 订单服务:用于创建订单。

  • 账户服务:从账户中扣减余额。

  • 库存服务:扣减指定商品的库存数量。

下图是这三个微服务的整体架构图,用户执行下单请求时,会调用下单业务的REST接口,该接口会分别调用库存服务以及订单服务。另外,订单服务还会调用账户服务先进行资金冻结,整个流程涉及这三个服务的分布式事务问题。

项目准备

基于Spring Boot + Nacos + Dubbo构建项目,包含下面这些服务:

  • sample-order-service,订单服务

  • sample-repo-service,库存服务

  • sample-account-service,账户服务

  • sample-seata-common,公共服务组件

  • sample-rest-web,提供统一业务的REST接口 服务

其中sample-order-service、sample-repo-service、sample-account-service是基于Spring Boot + Dubbo构建的微服务,sample-rest-web提供统一的业务服务入口,sample-seata-common提供公共组件。

数据库准备

创建三个数据库:seata_order、seata_repo、seata_account,并分别在这三个数据库中创建对应的业务表。

--对应seata_order数据库CREATE TABLE tbl_order ( id int(11) NOT NULL AUTO_INCREMENT/ order_no varchar(255) DEFAULT NULL/ user_id varchar(255) DEFAULT NULL/ product_code varchar(255) DEFAULT NULL/ count int(11) DEFAULT 0/ amount int(11) DEFAULT 0/ PRIMARY KEY ( id )) ENGINE=InnoDB DEFAULT CHARSET=utf-8/--对应seata_repo数据库CREATE TABLE tbl_repo ( id int(11) NOT NULL AUTO_INCREMENT/ product_code varchar(255) DEFAULT NULL/ name varchar(255) DEFAULT NULL/ count int(11) DEFAULT 0/ PRIMARY KEY ( id )/ UNIQUE KEY ( product_code )) ENGINE=InnoDB DEFAULT CHARSET=utf-8/-- 初始数据INSERT INTO tbl_repo VALUES (1/ TEST20200606001 / 键盘 / 1000 )/INSERT INTO tbl_repo VALUES (2/ TEST20200606002 / 鼠标 / 100 )/-- 对应seata_account数据库CREATE TABLE tbl_account ( id int(11) NOT NULL AUTO_INCREMENT/ user_id varchar(255) DEFAULT NULL/ balance int(11) DEFAULT 0/ PRIMARY KEY ( id )) ENGINE=InnoDB DEFAULT CHARSET=utf-8/--初始数据INSERT INTO tbl_account VALUES (1/ 1001 / 10000 )

核心方法说明

下面介绍部分主要代码:sample-account-service:账户服务提供余额扣减的功能,具体代码如下:

@Slf4j@Servicepublic class AccountServiceImpl implements IAccountService{ @Autowired AccountMapper accountMapper/ @Override public ObjectResponse decreaseAccount(AccountDto accountDto) { ObjectResponse response=new ObjectResponse()/ try{ int rs=accountMapper/decreaseAccount(accountDto/getUserId()/accountDto/getBalance()/doubleValue())/ if(rs>/0){ response/setMsg(ResCode/SUCCESS/getMessage())/ response/setCode(ResCode/SUCCESS/getCode())/ return response/ } response/setMsg(ResCode/FAILED/getMessage())/ response/setCode(ResCode/FAILED/getCode())/ }catch (Exception e){ log/error( decreaseAccount Occur Exception/ +e)/ response/setCode(ResCode/SYSTEM_EXCEPTION/getCode())/ response/setMsg(ResCode/SYSTEM_EXCEPTION/getMessage()+ - +e/getMessage())/ } return response/ }}

sample-order-service:订单服务负责创建订单,并且在创建订单之前先基于Dubbo协议调用账户服务的资金扣减接口。

@Slf4j@Servicepublic class OrderServiceImpl implements IOrderService{ @Autowired OrderMapper orderMapper/ @Autowired OrderConvert orderConvert/ @Reference IAccountService accountService/ @Override public ObjectResponse</OrderDto>/ createOrder(OrderDto orderDto) { log/info( 全局事务ID: + RootContext/getXID())/ ObjectResponse response=new ObjectResponse()/ try { //账户扣款 AccountDto accountDto = new AccountDto()/ accountDto/setUserId(orderDto/getUserId())/ accountDto/setBalance(orderDto/getOrderAmount())/ ObjectResponse accountRes = accountService/decreaseAccount(accountDto)/ //创建订单 Order order=orderConvert/dto2Order(orderDto)/ order/setOrderNo(UUID/randomUUID()/toString())/ orderMapper/createOrder(order)/ //判断扣款状态(判断可以前置) if(accountRes/getCode()!=ResCode/SUCCESS/getCode()){ response/setMsg(ResCode/FAILED/getMessage())/ response/setCode(ResCode/FAILED/getCode())/ return response/ } response/setMsg(ResCode/SUCCESS/getMessage())/ response/setCode(ResCode/SUCCESS/getCode())/ }catch (Exception e){ log/error( createOrder Occur Exception/ +e)/ response/setCode(ResCode/SYSTEM_EXCEPTION/getCode())/ response/setMsg(ResCode/SYSTEM_EXCEPTION/getMessage()+ - +e/getMessage())/ } return response/ }}

sample-repo-service:库存服务提供库存扣减功能:

@Slf4j@Servicepublic class RepoServiceImpl implements IRepoService{ @Autowired RepoMapper repoMapper/ @Override public ObjectResponse decreaseRepo(ProductDto productDto) { ObjectResponse response=new ObjectResponse()/ try { int repo = repoMapper/decreaseRepo(productDto/getProductCode()/ productDto/getCount())/ if(repo>/0){ response/setMsg(ResCode/SUCCESS/getMessage())/ response/setCode(ResCode/SUCCESS/getCode())/ return response/ } response/setMsg(ResCode/FAILED/getMessage())/ response/setCode(ResCode/FAILED/getCode())/ }catch (Exception e){ log/error( decreaseRepo Occur Exception/ +e)/ response/setCode(ResCode/SYSTEM_EXCEPTION/getCode())/ response/setMsg(ResCode/SYSTEM_EXCEPTION/getMessage()+ - +e/getMessage())/ } return response/ }}

sample-rest-web/ 基于Spring Boot的web项目,主要用于对外提供以业务为维度的REST接口,会分别调用库存服务和订单服务,实现库存扣减及创建订单的功能。

@Slf4j@RestControllerpublic class OrderController { @Autowired IRestOrderService restOrderService/ @PostMapping( /order ) ObjectResponse order(@RequestBody OrderRequest orderRequest) throws Exception { return restOrderService/handleBusiness(orderRequest)/ }}

RestOrderServiceImpl的具体实现如下:

@Slf4j@Servicepublic class RestOrderServiceImpl implements IRestOrderService { @Reference IRepoService repoService/ @Reference IOrderService orderService/ @Override @GlobalTransactional(timeoutMills = 300000/ name = sample-rest-web ) public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception { log/info( 开始全局事务/xid= + RootContext/getXID())/ log/info( begin order/ +orderRequest)/ //1/ 扣减库存 ProductDto productDto=new ProductDto()/ productDto/setProductCode(orderRequest/getProductCode())/ productDto/setCount(orderRequest/getCount())/ ObjectResponse repoRes=repoService/decreaseRepo(productDto)/ //2/ 创建订单 OrderDto orderDto=new OrderDto()/ orderDto/setUserId(orderRequest/getUserId())/ orderDto/setOrderAmount(orderRequest/getAmount())/ orderDto/setOrderCount(orderRequest/getCount())/ orderDto/setProductCode(orderRequest/getProductCode())/ ObjectResponse orderRes=orderService/createOrder(orderDto)/ if(orderRequest/getProductCode()/equals( GP20200202002 )){ throw new Exception( 系统异常 )/ } ObjectResponse response=new ObjectResponse()/ response/setMsg(ResCode/SUCCESS/getMessage())/ response/setCode(ResCode/SUCCESS/getCode())/ response/setData(orderRes/getData())/ return response/ }}

项目启动顺序及访问

这几个项目彼此之间存在依赖关系,项目的启动顺序为:

  • sample-seata-common为公共组件,需要先通过mvn install到本地仓库给其他服务依赖。

  • 启动sample-account-service,它会被订单服务调用。

  • 启动订单服务sample-order-service。

  • 启动库存服务sample-repo-service。

  • 启动sample-rest-web,它作为REST的业务入口。

整合Seata实现分布式事务

在上述流程中,加入库存扣减成功了,但是在创建订单的时候,入股由于账户资金不足导致失败,就会出现数据不一致的场景。按照正常的流程来说,被扣减的库存需要加回去,这就是一个分布式事务的场景。接下来我们在项目中整合Seata来解决该问题。

添加Seata Jar包依赖分别在4个项目中添加Seata的starter组件依赖:

</dependency>/ </groupId>/io/seata<//groupId>/ </artifactId>/seata-spring-boot-starter<//artifactId>/ </version>/1/0/0<//version>/<//dependency>/

添加Seata配置项目同样分别在4个项目中的application/yml文件中添加Seata的配置项:

seata/ enabled/ true tx-service-group/ sample-rest-web transport/ type/ TCP server/ NIO heartbeat/ true #client和server通信心跳检测开关(默认为true) enable-client-batch-send-request/ true thread-factory/ boss-thread-prefix/ NettyBoss worker-thread-prefix/ NettyServerNIOWorker server-executor-thread-prefix/ NettyServerBizHandler share-boss-worker/ false client-selector-thread-prefix/ NettyClientSelector client-selector-thread-size/ 1 client-worker-thread-prefix/ NettyClientWorkerThread boss-thread-size/ 1 worker-thread-size/ 8 shutdown/ wait/ 3 serialization/ seata #client和server通信编***方式 compressor/ none service/ vgroup-mapping/ default #TC集群,需要和Seata-Server保持一致 enable-degrade/ false #降级开关,默认为false,业务根据连续错误数自动降级,不走Seata事务 disable-global-transaction/ false #全局事务开关,默认为false,false为开启,true为关闭 #grouplist/ 192/168/216/128/8091 #TC服务列表,也就是Seata服务端地址,只有当注册中心为file时使用 client/ rm/ lock/ lock-retry-interval/ 10 lock-retry-policy-branch-rollback-on-conflict/ true lock-retry-times/ 30 rm-async-commit-buffer-limit/ 10000 rm-report-retry-count/ 5 rm-table-meta-check-enable/ false rm-report-success-enable/ true tm-commit-retry-count/ 5 tm-rollback-retry-count/ 5 undo/ undo-log-table/ undo_log undo-data-validation/ true undo-log-serialization/ jackson log/ exception-rate/ 100 support/ spring/ datasource-autoproxy/ false registry/ type/ nacos nacos/ cluster/ default server-addr/ 192/168/216/128/8848

上述配置中有几个配置项需要注意:

  • seata/support/spring/datasource-autoproxy/ true 属性表示数据源自动代理开关,在sample-order-service、sample-account-service、sample-repo-service中设置为true,在sample-rest-web中设置为false,因为该项目并没有访问数据源,不需要代理。

  • 如果注册中心为file,seata/service/grouplist需要填写Seata服务端连接地址,在默认情况下,注册中心为file,如果需要从注册中心上进行服务发现,可以增加如下配置:

seata/ registry/ type/ nacos nacos/ cluster/ default server-addr/ 192/168/216/127/8848
  • tx-service-group表示指定服务所属的事务分组,如果没有指定,默认使用spring/application/name加上字符串-seata-service-group。需要注意这两项配置必须要配置一项,否则会报错。

添加回滚日志表分别在3个数据库seata-account、seata-repo、seata-order中添加一张回滚日志表,用于记录每个数据库表操作的回滚日志,当某个服务的事务出现异常时会根据该日志进行回滚。

CREATE TABLE undo_log ( id bigint(20) NOT NULL AUTO_INCREMENT/ branch_id bigint(20) NOT NULL/ xid varchar(100) NOT NULL/ context varchar(128) NOT NULL/ rollback_info longblob NOT NULL/ log_status int(11) NOT NULL/ log_created datetime NOT NULL/ log_modified datetime NOT NULL/ PRIMARY KEY ( id )/ UNIQUE KEY ux_undo_log ( xid / branch_id )) ENGINE=InnoDB DEFAULT CHARSET=utf-8/

sample-rest-web增加全局事务控制修改sample-rest-web工程的RestOrderServiceImpl,做两件事情:

  • 增加@GlobalTransactional全局事务注解

  • 模拟一个异常处理,当商品编号等于某个指定的值时抛出异常,触发整个事务的回滚。

@Slf4j@Servicepublic class RestOrderServiceImpl implements IRestOrderService { @Reference IRepoService repoService/ @Reference IOrderService orderService/ @Override @GlobalTransactional(timeoutMills = 300000/ name = sample-rest-web ) public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception { log/info( 开始全局事务/xid= + RootContext/getXID())/ log/info( begin order/ +orderRequest)/ //1/ 扣减库存 ProductDto productDto=new ProductDto()/ productDto/setProductCode(orderRequest/getProductCode())/ productDto/setCount(orderRequest/getCount())/ ObjectResponse repoRes=repoService/decreaseRepo(productDto)/ //2/ 创建订单 OrderDto orderDto=new OrderDto()/ orderDto/setUserId(orderRequest/getUserId())/ orderDto/setOrderAmount(orderRequest/getAmount())/ orderDto/setOrderCount(orderRequest/getCount())/ orderDto/setProductCode(orderRequest/getProductCode())/ ObjectResponse orderRes=orderService/createOrder(orderDto)/ if(orderRequest/getProductCode()/equals( GP20200202002 )){ throw new Exception( 系统异常 )/ } ObjectResponse response=new ObjectResponse()/ response/setMsg(ResCode/SUCCESS/getMessage())/ response/setCode(ResCode/SUCCESS/getCode())/ response/setData(orderRes/getData())/ return response/ }}

免责声明:
1. 《分布式事务 - AT模式Dubbo集成Seata》内容来源于互联网,版权归原著者或相关公司所有。
2. 若《86561825文库网》收录的文本内容侵犯了您的权益或隐私,请立即通知我们删除。