如何做跨服务分布式事务? #
一、CAP介绍 #
CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,具有轻量级,高性能,易使用等特点。我们可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。
CAP 的应用场景主要有以下两个:
- 分布式事务中的最终一致性(异步确保)的方案
- 具有高可用性的 EventBus
CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,我们不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的将CAP集成到项目中,目前我们项目中使用RabbitMQ。
二、CAP案例 #
1、OMS向WMS请求配货:
- OMS发送请求;
- WMS接收请求后处理,返回结果;
- OMS更新状态。
1.1、传统无一致性保证情况下,可能出现问题:
0、OMS发送失败,进行跳过;
1、OMS成功发送,但OMS接收响应超时,实际上WMS已配货,OMS未配货;
1.2、如使用CAP,可建立模型: #
- OMS通过CAP请求WMS配货;
- WMS接收到请求,返回callback;
- OMS根据callback更新状态。
期间,OMS请求WMS不需要立即收到结果,可以先假定成功,进行下一步流程,只需要等待成功或失败callback(目前设置失败自动重试5次);如成功,则正常更新,失败则人为介入补偿消息。这样,就算最终失败,也能通知到业务人员,最终人为介入,保证最终一致性,避免出现OMS和WMS状态和信息不同步的问题。
三、框架CAP使用 #
1、Startup.cs #
增加代码
services.AddMall3sCap(_configuration, "DemoConnectionStrings");//DemoConnectionStrings为CAP
持久化数据库,填写项目用到的数据库就行,对应nacos的配置netcore-datasource.json的数据库key;
2、Service层 #
继承ICapSubscribe,并注入ICapPublisher,代码示例如下
2.1、发送与回调代码 #
注:如无需事务,则把事务相关删除即可
public class ExchangeRateService : IExchangeRateService, IDynamicApiController, ITransient, ICapSubscribe
{
private readonly ICapPublisher _capBus;
...
...
public ExchangeRateService(ICapPublisher capBus,
...,
...)
{
_capBus = capBus;
...,
...
}
/// <summary>
/// cap测试,需要插入汇率和CAP消息持久同一事务示例
/// </summary>
/// <returns></returns>
[HttpPost("cap/test")]
public async Task CapTest()
{
using (var connection = _db.Ado.Connection)//获取数据库连接
{
using (var transation = connection.BeginTransaction(_capBus))//开始CAP事务
{
_db.Ado.Transaction = (IDbTransaction)transation.DbTransaction;//CAP事务赋值给SqlSugar事务,为了实现事务ID统一
var entity = new ExchangeRateEntity()
{
From = "USD",
To = "CNY",
Rate = 6.12M,
ExchangeDate = DateTime.Now,
};
entity.Creator();
var result = await _exchangeRateRepository.InsertAsync(entity);
//参数1:CAP endpoint;参数2:参数(endpoint接收参数类型要一致);参数3:回调endpoint
await _capBus.PublishAsync("testcap.testcapbyid", entity.Id, "testcap.testcapbyid.callback");
if (result > 0)
{
await transation.CommitAsync();//提交事务,会把汇率一并插入
}
else
{
await transation.RollbackAsync();//回滚事务,汇率也不会插入
}
}
}
}
/// <summary>
/// 回调endpoint,如有接收参数,接收参数要与返回值一致
/// 如这里接收string,则 endpoint “testcap.testcapbyid”需要return string;
/// </summary>
/// <param name="str"></param>
[NonAction]
[CapSubscribe("testcap.testcapbyid.callback")]
public void CapTestCallback(string str)
{
Console.WriteLine(str);
}
}
2.2、endpoint 代码 #
类的继承与上面代码一样,需要继承 ICapPublisher
[NonAction]
[CapSubscribe("testcap.testcapbyid"]
public string TestCapById(string id)
{
Console.WriteLine(id);
return "返回值" + id;
}
四、Demo地址 #
http://110.41.156.31/mall3s-group/mall3s.template.git
五、CAP相关 #
1、持久化数据:项目数据库;
2、Dashboard:http://项目地址/cap/index.html
3、RabbitMQ:netcore-common.json配置的MQ地址。
4、CapSubscribe属性解释:CapSubscribeAttribute中,[CapSubscribe("aaa", Group = "bbb")],aaa对应RabbitMQ routeKey,bbb对应RabbitMQ队列名(如有,Group可为空)
六、CAP的监控面板 #
http://cap.mall3s.com/cap/index.html
其他教程文档 #
https://www.cnblogs.com/cmliu/p/11767343.html