如何通过rabbitmq定义消息队列任务? #

通过官方脚手架,示例工程已经有例子,参考如下:

using Mall3s.Common.Helper;
using Mall3s.Oms.SDK.DTO;
using Mall3s.Oms.Library.Consts;
using Mall3s.Rabbit.Attributes;
using Mall3s.Rabbit.Enum;
using Mall3s.Rabbit.Model;
using Mall3s.Rabbit.Receiver;
using RabbitMQ.Client;

namespace Mall3s.Oms.Api.RabbitMQJob
{
    /// <summary>
    /// MQ 消费者
    /// </summary>
    //启动服务就会启动该消费者(注意,需注入服务AddMall3sRabbitMQ才会自动启动)
    [RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue)]//参数含义请看描述
    ////存在环境变量为ReceiverRuningName,且值为demo、demo1 或demo2时启动该消费者
    //[RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue, 10, "demo","demo1", "demo2")]
    //存在环境变量为TaskType,且值为demo时启动该消费者
    //[RabbitReceiver(RabbitMQConsts.DemoExchange, RabbitMQConsts.DemoQueue, RabbitMQConsts.DemoQueue, 10, ExchangeTypeEnum.DIRECT, "TaskType", "demo")]
    public class DemoQueue : AbstractReceiver<RabbitMQDemo>
    {
        /// <summary>
        /// 管道关闭时处理
        /// </summary>
        /// <param name="channel"></param>
        /// <returns></returns>
        public override Task ChannelClosed(IModel channel)
        {
            Console.WriteLine("异常,管道关闭,请注意");
            //可以发送企业微信通知
            return Task.CompletedTask;
        }

        /// <summary>
        /// 失败时处理
        /// </summary>
        /// <param name="message"></param>
        /// <param name="ex"></param>
        /// <returns></returns>
        public override Task Failed(string message, Exception ex)
        {
            Console.WriteLine($"消息消费失败,异常:{ex}");
            //失败后处理,如发送企业微信,重新进队,放到错误队列等等操作
            return Task.CompletedTask;
        }

        /// <summary>
        /// 重写初始化配置,重写会导致RabbitReceiverAttribute失效(如果有)
        /// </summary>
        /// <returns></returns>
        /*public override ReceiveConfig Init()
        {
            
            return ReceiveConfig.Init()
                .SetExchange(MqExchange.Init(RabbitMQConsts.DemoExchange))//交换机,可以配置交换机参数
                .SetQueue(MqQueue.Init(RabbitMQConsts.DemoQueue))//队列名,可以配置队列名参数
                .SetRoutingKey(RabbitMQConsts.DemoQueue)//路由键
                .SetPrefetchCount(1);//预取数
        }*/

        /// <summary>
        /// 接收,即消费该消息
        /// </summary>
        /// <param name="body"></param>
        /// <returns></returns>
        public override async Task Receive(RabbitMQDemo body)
        {
            Console.WriteLine(body.ToJson());
            await Task.Delay(1000);
        }
    }
}

在program中启动rabbitmq监听:

//可选,如需RabbitMQ,则注入,也可以仅注入生产者AddMall3sRabbitMQSender
builder.Services.AddMall3sRabbitMQ(builder.Configuration);
上次更新: 3/8/2023, 4:23:32 PM