如何做跨服务分布式事务? #

一、CAP介绍 #

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,具有轻量级,高性能,易使用等特点。我们可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 的应用场景主要有以下两个:

  • 分布式事务中的最终一致性(异步确保)的方案
  • 具有高可用性的 EventBus

CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,我们不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的将CAP集成到项目中,目前我们项目中使用RabbitMQ。

二、CAP案例 #

1、OMS向WMS请求配货:

  1. OMS发送请求;
  2. WMS接收请求后处理,返回结果;
  3. OMS更新状态。

1.1、传统无一致性保证情况下,可能出现问题:

0、OMS发送失败,进行跳过;

1、OMS成功发送,但OMS接收响应超时,实际上WMS已配货,OMS未配货;

1.2、如使用CAP,可建立模型: #

  1. OMS通过CAP请求WMS配货;
  2. WMS接收到请求,返回callback;
  3. OMS根据callback更新状态。

期间,OMS请求WMS不需要立即收到结果,可以先假定成功,进行下一步流程,只需要等待成功或失败callback(目前设置失败自动重试5次);如成功,则正常更新,失败则人为介入补偿消息。这样,就算最终失败,也能通知到业务人员,最终人为介入,保证最终一致性,避免出现OMS和WMS状态和信息不同步的问题。

三、框架CAP使用 #

1、Startup.cs #

增加代码

services.AddMall3sCap(_configuration, "DemoConnectionStrings");//DemoConnectionStrings为CAP

持久化数据库,填写项目用到的数据库就行,对应nacos的配置netcore-datasource.json的数据库key

2、Service层 #

继承ICapSubscribe,并注入ICapPublisher,代码示例如下

2.1、发送与回调代码 #

注:如无需事务,则把事务相关删除即可

public class ExchangeRateService : IExchangeRateService, IDynamicApiController, ITransient, ICapSubscribe
{
    		private readonly ICapPublisher _capBus;
    		...
        ...
    		public ExchangeRateService(ICapPublisher capBus,
                ...,
                ...)
        {
        		_capBus = capBus;
        		...,
            ...
        }
            
    		/// <summary>
        /// cap测试,需要插入汇率和CAP消息持久同一事务示例
        /// </summary>
        /// <returns></returns>
        [HttpPost("cap/test")]
        public async Task CapTest()
        {
            using (var connection = _db.Ado.Connection)//获取数据库连接
            {
                using (var transation = connection.BeginTransaction(_capBus))//开始CAP事务
                {
                    _db.Ado.Transaction = (IDbTransaction)transation.DbTransaction;//CAP事务赋值给SqlSugar事务,为了实现事务ID统一
                    var entity = new ExchangeRateEntity()
                    { 
                        From = "USD",
                        To = "CNY",
                        Rate = 6.12M,
                        ExchangeDate = DateTime.Now,
                    };
                    entity.Creator();
                    var result = await _exchangeRateRepository.InsertAsync(entity);
                    
                    //参数1:CAP endpoint;参数2:参数(endpoint接收参数类型要一致);参数3:回调endpoint
                    await _capBus.PublishAsync("testcap.testcapbyid", entity.Id, "testcap.testcapbyid.callback");
                    if (result > 0)
                    {
                        await transation.CommitAsync();//提交事务,会把汇率一并插入
                    }
                    else
                    {
                        await transation.RollbackAsync();//回滚事务,汇率也不会插入
                    }
                }
            }
        }

        /// <summary>
        /// 回调endpoint,如有接收参数,接收参数要与返回值一致
        /// 如这里接收string,则 endpoint “testcap.testcapbyid”需要return string;
        /// </summary>
        /// <param name="str"></param>
        [NonAction]
        [CapSubscribe("testcap.testcapbyid.callback")]
        public void CapTestCallback(string str)
        { 
            Console.WriteLine(str);
        }
}

2.2、endpoint 代码 #

类的继承与上面代码一样,需要继承 ICapPublisher

				[NonAction]
        [CapSubscribe("testcap.testcapbyid"]
        public string TestCapById(string id)
        {
            Console.WriteLine(id);
            return "返回值" + id;
        }

四、Demo地址 #

http://110.41.156.31/mall3s-group/mall3s.template.git

五、CAP相关 #

1、持久化数据:项目数据库;

2、Dashboard:http://项目地址/cap/index.html

3、RabbitMQ:netcore-common.json配置的MQ地址。

4、CapSubscribe属性解释:CapSubscribeAttribute中,[CapSubscribe("aaa", Group = "bbb")],aaa对应RabbitMQ routeKey,bbb对应RabbitMQ队列名(如有,Group可为空)

六、CAP的监控面板 #

http://cap.mall3s.com/cap/index.html

img

其他教程文档 #

https://www.cnblogs.com/cmliu/p/11767343.html

上次更新: 3/8/2023, 4:23:32 PM