为了账号安全,请及时绑定邮箱和手机立即绑定

MassTransit RabbitMq 调度消息未发送给消费者

MassTransit RabbitMq 调度消息未发送给消费者

C#
慕沐林林 2021-10-24 17:36:29
我想使用 MassTransit + RabbitMq 总线来调度来自总线的消息。我编写了两个 C# 控制台应用程序,一个用于消息创建者并将消息发送到调度程序,另一个用于消息使用者。以下代码用于在总线中进行调度,以便每秒向调度程序发送一条消息,然后调度程序以 10 秒的延迟发送给消费者。我的问题是在rabbitMq 客户端没有消息发送到消费者或消费者队列。我的错误在哪里?注意: UseInMemoryScheduler 工作正常,但 UseMessageScheduler 不起作用。总线消息创建器class Program{    public static void Main(string[] args)    {        MainAsync(args).GetAwaiter().GetResult();        Console.ReadKey();    }    static async Task MainAsync(string[] args)    {        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>        {            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>            {                settings.Username("guest");                settings.Password("guest");            });            //rabbit.UseInMemoryScheduler(); // This works            rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));// This doesn't work,        });        busControl.Start();        var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));        for (int i = 0; i < 1000000; i++)        {            await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),                 DateTime.Now.AddSeconds(10),                 new MessageCreated()                {                    Text = $"message {i}"                });            Thread.Sleep(1000);        }        Console.ReadKey();        busControl.Stop();    }}消息消费者。class Program{    static void Main(string[] args)    {        var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>        {            var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>            {                settings.Password("guest");                settings.Username("guest");            });            rabbit.ReceiveEndpoint(host, "publisher", conf =>            {                conf.Consumer<Consumer>();            });        });        busControl.Start();        Console.ReadKey();        busControl.Stop();    }}
查看完整描述

2 回答

?
杨__羊羊

TA贡献1943条经验 获得超7个赞

你快到了,只需用这个更新逻辑:


busControl.Start();

scheduler.JobFactory = new MassTransitJobFactory(busControl);

scheduler.Start().Wait();


Console.ReadKey();

busControl.Stop();


查看完整回答
反对 回复 2021-10-24
?
Cats萌萌

TA贡献1805条经验 获得超9个赞

所以你没有提到你是否有第三个消费者正在运行(在第三个控制台应用程序中)。为了调度使用 Quartz,您需要专门用于 Quartz 的第三个使用者。它必须正在运行,在这种情况下,石英接收端点将侦听“石英”队列。


[更新]


这是第三个控制台应用程序(石英服务)所需的配置示例:


var scheduler = CreateScheduler();

configurator.ReceiveEndpoint("quartz", e =>

{

    configurator.UseMessageScheduler(e.InputAddress);


    e.Consumer(() => new ScheduleMessageConsumer(scheduler));

    e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));

});


...


private static IScheduler CreateScheduler()

{

    ISchedulerFactory schedulerFactory = new StdSchedulerFactory();


    var scheduler = schedulerFactory.GetScheduler();


    return scheduler;

}

并且您还需要为我们配置石英存储(如果您想在内存中进行测试,则为 SQLite、MSSql、RAM)。请在此处查看示例配置。


[/更新]


[更新2]


有人在群里发过类似的问题。幸运的是,他们提供了一个示例 github,其中包含一系列不同的 MT 功能,其中之一是单独的调度程序。请看一下,那里应该有你需要的一切。


[更新2]


如果您想在不运行完整的石英调度程序的情况下进行测试,那么您可以使用InMemory 调度程序。但这仅用于测试目的,您不应该在生产中使用它。


此外,在您的第一个代码片段中,您不需要获取调度程序的发送端点: var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));


因为,在总线配置中,rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));表明无论何时您调用 ScheduleSend(从 ConsumeContext 或 IBus/IBusControl),它将始终使用该 /quartz 地址。


最后,这一行await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),,您可以更改为 busControl.ScheduleSend(...)


查看完整回答
反对 回复 2021-10-24
  • 2 回答
  • 0 关注
  • 155 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信