如何通过rabbitmq定义消息队列任务? #
通过官方脚手架,示例工程已经有例子,参考如下:
using Mall3s.Common.Helper;
using Mall3s.Oms.SDK.DTO;
using Mall3s.Oms.Library.Consts;
using Mall3s.Rabbit.Attributes;
using Mall3s.Rabbit.Enum;
using Mall3s.Rabbit.Model;
using Mall3s.Rabbit.Receiver;
using RabbitMQ.Client;
namespace Mall3s.Oms.Api.RabbitMQJob
{
/// <summary>
/// MQ 消费者
/// </summary>
//启动服务就会启动该消费者(注意,需注入服务AddMall3sRabbitMQ才会自动启动)
[RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue)]//参数含义请看描述
////存在环境变量为ReceiverRuningName,且值为demo、demo1 或demo2时启动该消费者
//[RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue, 10, "demo","demo1", "demo2")]
//存在环境变量为TaskType,且值为demo时启动该消费者
//[RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue, RabbitMQConsts.DemoQueue, 10, ExchangeTypeEnum.DIRECT, "TaskType", "demo")]
public class DemoQueue : AbstractReceiver<RabbitMQDemo>
{
/// <summary>
/// 管道关闭时处理
/// </summary>
/// <param name="channel"></param>
/// <returns></returns>
public override Task ChannelClosed(IModel channel)
{
Console.WriteLine("异常,管道关闭,请注意");
//可以发送企业微信通知
return Task.CompletedTask;
}
/// <summary>
/// 失败时处理
/// </summary>
/// <param name="message"></param>
/// <param name="ex"></param>
/// <returns></returns>
public override Task Failed(string message, Exception ex)
{
Console.WriteLine($"消息消费失败,异常:{ex}");
//失败后处理,如发送企业微信,重新进队,放到错误队列等等操作
return Task.CompletedTask;
}
/// <summary>
/// 重写初始化配置,重写会导致RabbitReceiverAttribute失效(如果有)
/// </summary>
/// <returns></returns>
/*public override ReceiveConfig Init()
{
return ReceiveConfig.Init()
.SetExchange(MqExchange.Init(RabbitMQConsts.DemoExchange))//交换机,可以配置交换机参数
.SetQueue(MqQueue.Init(RabbitMQConsts.DemoQueue))//队列名,可以配置队列名参数
.SetRoutingKey(RabbitMQConsts.DemoQueue)//路由键
.SetPrefetchCount(1);//预取数
}*/
/// <summary>
/// 接收,即消费该消息
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
public override async Task Receive(RabbitMQDemo body)
{
Console.WriteLine(body.ToJson());
await Task.Delay(1000);
}
}
}
在program中启动rabbitmq监听:
//可选,如需RabbitMQ,则注入,也可以仅注入生产者AddMall3sRabbitMQSender
builder.Services.AddMall3sRabbitMQ(builder.Configuration);