分布式事务框架-CAP #

前言 #

很多同学想对CAP的机制以及用法等想有一个详细的了解,所以花了将近两周时间写了这份中文的CAP文档,对 CAP 还不知道的同学可以先看一下这篇文章 (opens new window)

本文档为 CAP 文献(Wiki),本文献同时提供中文和英文版本,英文版本目前还在翻译中,会放到Github Wiki 中。

1、Getting Started #

1.1 介绍 #

CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具有轻量级,高性能,易使用等特点。

目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0.

1.2 应用场景 #

CAP 的应用场景主要有以下两个:

    1. 分布式事务中的最终一致性(异步确保)的方案。

分布式事务是在分布式系统中不可避免的一个硬性需求,而目前的分布式事务的解决方案也无外乎就那么几种,在了解 CAP 的分布式事务方案前,可以阅读以下 这篇文章 (opens new window)

CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。

    1. 具有高可用性的 EventBus。

CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的,这是什么意思呢?

CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。

1.3 Quick Start #

  • 引用 NuGet 包

使用一下命令来引用CAP的NuGet包:

PM> Install-Package DotNetCore.CAP

根据使用的不同类型的消息队列,来引入不同的扩展包:

PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.Kafka

根据使用的不同类型的数据库,来引入不同的扩展包:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
  • 启动配置

在 ASP.NET Core 程序中,你可以在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服务:

public void ConfigureServices(IServiceCollection services)
{
    services.AddDbContext<AppDbContext>();

    services.AddCap(x =>
    {
        // If your SqlServer is using EF for data operations, you need to add the following configuration:
        // Notice: You don't need to config x.UseSqlServer(""") again!
        x.UseEntityFramework<AppDbContext>();
        
        // If you are using Dapper,you need to add the config:
        x.UseSqlServer("Your ConnectionStrings");
        
        // If your Message Queue is using RabbitMQ you need to add the config:
        x.UseRabbitMQ("localhost");
        
        // If your Message Queue is using Kafka you need to add the config:
        x.UseKafka("localhost");
    });
}

Configure() 中配置启动 CAP :

public void Configure(IApplicationBuilder app)
{
    app.UseCap();
}

2、API接口 #

CAP 的 API 接口只有一个,就是 ICapPublisher 接口,你可以从 DI 容器中获取到该接口的实例进行调用。

2.1 发布/发送 #

你可以使用 ICapPublisher 接口中的 Publish<T> 或者 PublishAsync<T> 方法来发送消息:

public class PublishController : Controller
{
    private readonly ICapPublisher _publisher;
    
    //在构造函数中获取接口实例
    public PublishController(ICapPublisher publisher)
    {
    	_publisher = publisher;
    }
    
    
    [Route("~/checkAccount")]
    public async Task<IActionResult> PublishMessage()
    {
    	await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
    
    	return Ok();
    }
}

下面是PublishAsync这个接口的签名:

PublishAsync<T>(string name,T object)

默认情况下,在调用此方法的时候 CAP 将在内部创建事务,然后将消息写入到 Cap.Published 这个消息表。

2.1.1 事务 #

事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。

这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。

只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败。

以下是两种使用事务进行Publish的代码:

  • EntityFramework
 using (var transaction = dbContext.Database.BeginTransaction())
 {
    
    await _publisher.PublishAsync("xxx.services.account.check",
        new Person { Name = "Foo", Age = 11 });
        
    // 你的业务代码。
    
    transaction.Commit();
 }

你的业务代码可以位于 Publish 之前或者之后,只需要保证在同一个事务。

当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储。

其中,发送的内容会序列化为Json存储到消息表中。

  • Dapper
var connString = "数据库连接字符串";
using (var connection = new MySqlConnection(connString))
{
    connection.Open();
    using (var transaction = connection.BeginTransaction())
    {
        await _publisher.PublishAsync("xxx.services.bar",
            new Person { Name = "Foo", Age = 11 }, 
            connection,
            transaction);
            
        // 你的业务代码。
        
        transaction.Commit();
    }
}

在 Dapper 中,由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中。

2.2 订阅/消费 #

注意:消息端在方法实现的过程中需要实现幂等性。

使用 CapSubscribeAttribute 来订阅 CAP 发布出去的消息。

[CapSubscribe("xxx.services.bar")]
public void BarMessageProcessor()
{
    
}

这里,你也可以使用多个 CapSubscribe[""] 来同时订阅多个不同的消息 :

[CapSubscribe("xxx.services.bar")]
[CapSubscribe("xxx.services.foo")]
public void BarAndFooMessageProcessor()
{
    
}

其中,xxx.services.bar 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。

RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:

*(星号)可以代替一个单词.

# (井号) 可以代替0个或多个单词.

比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)

img

在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。

然后在使用 CapSubscribe 绑定的时候,Q1绑定为 CapSubscribe["*.orange.*"], Q2 绑定为 CapSubscribe["*.*.rabbit"][CapSubscribe["lazy.#]

那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为 lazy.orange.elephant的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。

另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。

但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。

在 CAP 中,我们把每一个拥有 CapSubscribe[]标记的方法叫做订阅者,你可以把订阅者进行分组。

组(Group),是订阅者的一个集合,每一组可以有一个或者多个消费者,但是一个订阅者只能属于某一个组。同一个组内的订阅者订阅的消息只能被消费一次。

如果你在订阅的时候没有指定组,CAP会将订阅者设置到一个默认的组 cap.default.group

以下是使用组进行订阅的示例:

[CapSubscribe("xxx.services.foo", Group = "moduleA")]
public void FooMessageProcessor()
{
    
}

2.2.1 例外情况 #

这里有几种情况可能需要知道:

① 消息发布的时候订阅方还未启动

Kafka:

当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。

RabbitMQ:

在 RabbitMQ 中,应用程序首次启动会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失

针对RabbitMQ的消息丢失的问题,有两种解决方式:

i. 部署应用程序之前,在RabbitMQ中手动创建具有durable特性的Exchange和Queue,默认情况他们的名字分别是(cap.default.topic, cap.default.group)。

ii. 提前运行一遍所有实例,让Exchange和Queue初始化。

我们建议采用第 ii 种方案,因为很容易做到。

② 消息没有任何订阅者

如果你发送了一条个没有被任何订阅者订阅的消息,那么此消息将会被丢弃。

3、配置 #

Cap 使用 Microsoft.Extensions.DependencyInjection 进行配置的注入,你也可以依赖于 DI 从json文件中读取配置。

3.1 Cap Options #

你可以使用如下方式来配置 CAP 中的一些配置项,例如

services.AddCap(capOptions => {
    capOptions.FailedCallback = //...
});

CapOptions 提供了一下配置项:

NAME DESCRIPTION TYPE DEFAULT
PollingDelay 处理消息的线程默认轮询等待时间(秒) int 15 秒
QueueProcessorCount 启动队列中消息的处理器个数 int 2
FailedMessageWaitingInterval 轮询失败消息的间隔(秒) int 180 秒
FailedCallback 执行失败消息时的回调函数,详情见下文 Action NULL

CapOptions 提供了 FailedCallback 为处理失败的消息时的回调函数。当消息多次发送失败后,CAP会将消息状态标记为Failed,CAP有一个专门的处理者用来处理这种失败的消息,针对失败的消息会重新放入到队列中发送到MQ,在这之前如果FailedCallback具有值,那么将首先调用此回调函数来告诉客户端。

FailedCallback 的类型为 Action<MessageType,string,string>,第一个参数为消息类型(发送的还是接收到),第二个参数为消息的名称(name),第三个参数为消息的内容(content)。

3.2 RabbitMQ Options #

CAP 采用的是针对 CapOptions 进行扩展来实现RabbitMQ的配置功能,所以针对 RabbitMQ 的配置用法如下:

services.AddCap(capOptions => {
    capOptions.UseRabbitMQ(rabbitMQOption=>{
        // rabbitmq options.
    });
});

RabbitMQOptions 提供了有关RabbitMQ相关的配置:

NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址 string localhost
UserName 用户名 string guest
Password 密码 string guest
VirtualHost 虚拟主机 string /
Port 端口号 int -1
TopicExchangeName CAP默认Exchange名称 string cap.default.topic
RequestedConnectionTimeout RabbitMQ连接超时时间 int 30,000 毫秒
SocketReadTimeout RabbitMQ消息读取超时时间 int 30,000 毫秒
SocketWriteTimeout RabbitMQ消息写入超时时间 int 30,000 毫秒
QueueMessageExpires 队列中消息自动删除时间 int (10天) 毫秒

3.3 Kafka Options #

CAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,所以针对 Kafka 的配置用法如下:

services.AddCap(capOptions => {
    capOptions.UseKafka(kafkaOption=>{
        // kafka options.
        // kafkaOptions.MainConfig.Add("", "");
    });
});

KafkaOptions 提供了有关 Kafka 相关的配置,由于Kafka的配置比较多,所以此处使用的是提供的 MainConfig 字典来支持进行自定义配置,你可以查看这里来获取对配置项的支持信息。

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

3.4 SqlServer Options #

如果你使用的是 EntityFramewrok,你用不到该配置项下的内容。

CAP 采用的是针对 CapOptions 进行扩展来实现 SqlServer 的配置功能,所以针对 SqlServer 的配置用法如下:

services.AddCap(capOptions => {
    capOptions.UseSqlServer(sqlserverOptions => {
       // sqlserverOptions.ConnectionString
    });
});
NAME DESCRIPTION TYPE DEFAULT
Schema Cap表架构 string Cap
ConnectionString 数据库连接字符串 string null

3.5 MySql Options #

如果你使用的是 EntityFramewrok,你用不到该配置项下的内容。

CAP 采用的是针对 CapOptions 进行扩展来实现 MySql 的配置功能,所以针对 MySql 的配置用法如下:

services.AddCap(capOptions => {
    capOptions.UseMySql(mysqlOptions => {
       // mysqlOptions.ConnectionString
    });
});
NAME DESCRIPTION TYPE DEFAULT
TableNamePrefix Cap表名前缀 string cap
ConnectionString 数据库连接字符串 string null

4、设计原理 #

4.1 动机 #

随着微服务架构的流行,越来越多的人在尝试使用微服务来架构他们的系统,而在这其中我们会遇到例如分布式事务的问题,为了解决这些问题,我没有发现简单并且易于使用的解决方案,所以我决定来打造这样一个库来解决这个问题。

最初 CAP 是为了解决分布式系统中的事务问题,她采用的是 异步确保 这种机制实现了分布式事务的最终一致性,更多这方面的信息可以查看第6节。

现在 CAP 除了解决分布式事务的问题外,她另外一个重要的功能就是作为 EventBus 来使用,她具有 EventBus 的所有功能,并且提供了更加简化的方式来处理EventBus中的发布/订阅。

4.2 持久化 #

CAP 依靠本地数据库实现消息的持久化,CAP 使用这种方式来应对一切环境或者网络异常导致消息丢失的情况,消息的可靠性是分布式事务的基石,所以在任何情况下消息都不能丢失。

对于消息的持久化分为两种:

① 消息进入消息队列之前的持久化

在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的。

为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操作和CAP的消息在持久化的过程中是强一致的。也就是说在进行消息持久化的过程中,任何一方发生异常情况数据库都会进行回滚操作。

② 消息进入到消息队列之后的持久化

消息进入到消息队列之后,CAP会启动消息队列的持久化功能,我们需要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。

针对于 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消费者队列,但是这里面可能有例外情况,参加 2.2.1 章节。

由于 Kafka 天生设计的就是使用文件进行的消息持久化,在所以在消息进入到Kafka之后,Kafka会保证消息能够正确被持久化而不丢失。

4.3 通讯数据流 #

CAP 中消息的流转过程大致如下:

img

“ P ” 代表消息发送者(生产者)。 “ C ” 代表消息消费者(订阅者)。

4.4 一致性 #

CAP 采用最终一致性作为的一致性方案,此方案是遵循 CAP 理论,以下是CAP理论的描述。

C(一致性)一致性是指数据的原子性,在经典的数据库中通过事务来保障,事务完成时,无论成功或回滚,数据都会处于一致的状态,在分布式环境下,一致性是指多个节点数据是否一致;

A(可用性)服务一直保持可用的状态,当用户发出一个请求,服务能在一定的时间内返回结果;

P(分区容忍性)在分布式应用中,可能因为一些分布式的原因导致系统无法运转,好的分区容忍性,使应用虽然是一个分布式系统,但是好像一个可以正常运转的整体

根据 “CAP”分布式理论 (opens new window), 在一个分布式系统中,我们往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,我们是可以接受短暂的不一致的。

第 6 节将对此做进一步介绍。

5、实现 #

CAP 封装了在 ASP.NET Core 中的使用依赖注入来获取 Publisher (ICapPublisher)的接口。而启动方式类似于 “中间件” 的形式,通过在 Startup.cs 配置 ConfigureServicesConfigure 进行启动。

5.1 消息表 #

当系统引入CAP之后并首次启动后,CAP会在客户端生成 3 个表,分别是 Cap.Published, Cap.Received, Cap.Queue。注意表名可能在不同的数据库具有不同的大小写区分,如果你在运行项目的时候没有显式的指定数据库生成架构(SQL Server)或者表名前缀(MySql)的话,默认情况下就是以上3个名字。

Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher 接口 Publish 的消息内容。

Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[] 订阅的那些消息。

Cap.Queue: 这个表主要是CAP内部用来处理发送和接收消息的一个临时表,通常情况下,如果系统不出现问题,这个表将是空的。

PublishedReceived 表具有 StatusName 字段,这个字段用来标识当前消息的状态。目前共有 Scheduled,Enqueued,Processing,Successed,Failed 等几个状态。CAP 在处理消息的过程中会依次从 Scheduled 到 Successed 来改变这些消息状态的值。如果是状态值为 Successed,代表该消息已经成功的发送到了 MQ 中。如果为 Failed 则代表消息发送失败,消息发送失败后 CAP 会对消息进行重试,直到成功。

关于数据清理: CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。

5.2 消息格式 #

CAP 采用 JSON 格式进行消息传输,以下是消息的对象模型:

NAME DESCRIPTION TYPE
Id 消息编号 int
Name 消息名称 string
Content 内容 string
Group 所属消费组 string
Added 创建时间 DateTime
ExpiresAt 过期时间 DateTime
Retries 重试次数 int
StatusName 状态 string

对于 Cap.Received 中的消息,会多一个 Group 字段来标记所属的消费者组。

5.3 EventBus #

EventBus 采用 发布-订阅 风格进行组件之间的通讯,它不需要显式在组件中进行注册。

img

上图是EventBus的一个Event的流程,关于 EventBus 的更多信息就不在这里介绍了...

在 CAP 中,为什么说 CAP 实现了 EventBus 中的全部特性,因为 EventBus 具有的两个大功能就是发布和订阅, 在 CAP 中 使用了另外一种优雅的方式来实现的,另外一个 CAP 提供的强大功能就是消息的持久化,以及在任何异常情况下消息的可靠性,这是EventBus不具有的功能。

img

CAP 里面发送一个消息可以看做是一个 “Event”,一个使用了CAP的ASP.NET Core 应用程序既可以进行发送也可以进行订阅接收。

5.4 重试 #

重试在实现分布式事务中具有重要作用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程中有以下几处采用的重试策略。

① 消息发送重试

在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,重试策略为默认 15 次失败重试,当15次过后仍然失败时,CAP会将此消息状态标记为失败。

② 消息消费重试

当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和 ① 是相同的。

③ 失败消息重试

CAP 会定期针对 ① 和 ② 中状态为“失败的”消息进行重试,CAP会对他们进行重新“入队(Enqueue)”,入队时会将消息中的重试次数标记为0,状态置为 Enqueued。

6、分布式事务 #

针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。

6.1 异步确保 #

异步确保这种方案又叫做本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末链接。这种方案目前也是企业中使用最多的方案之一。

相对于 TCC 或者 2PC/3PC 来说,这个方案对于分布式事务来说是最简单的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必须具有事务协调器来处理每个不同服务之间的状态,而此种方案不需要事务协调器。 另外 2PC/TCC 这种方案如果服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。

但是,并不是说 2PC 或者 TCC 这种方案不好,因为每一种方案都有其相对优势的使用场景和优缺点,这里就不做过多介绍了。

中文:http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html 英文:http://queue.acm.org/detail.cfm?id=1394128

7、转载说明 #

本文转载自《http://www.cnblogs.com/savorboard/p/cap-document.html》

上次更新: 3/10/2023, 5:33:48 PM