MassTransit MSMQ Pub with multi-sub.运行时服务何时准备就绪



>我设置了一个简单的测试,其中包含一个具有两个订阅者的发布者,所有这些订阅者都在使用MSMQ和MassTransit (2.1.1) RuntimeServices的一台计算机上运行,该计算机使用本地SQL Server数据库。

我在下面包含了总线设置代码,以便您可以看到设置的内容。我正在手动和独立地启动每个组件,以尝试计算如果订阅者不运行会发生什么。

我首先运行了两个订阅者,以便队列和订阅都设置好,然后退出它们,而不取消订阅消息。如果我然后自行运行发布服务器,以最快的速度在队列中转储 400 条消息,我可以看到两个订阅者队列等待的消息数量不同。

我的假设是,我在 RuntimeServices 能够设置自己的两个目标队列之前发布。在总线设置和发布之间有 5 秒的延迟,我得到了我期望的 400 条消息在两个订阅者队列中等待,即某些消息未发布到两个队列。

我的问题是这个;有没有办法在发布者启动时判断 RuntimeServices 是否已准备好在其数据库中已有订阅者?

这是发布者代码

Bus.Initialize(sbc =>
        {
            sbc.SetCreateTransactionalQueues(true);
            sbc.ReceiveFrom("msmq://localhost/andy_publisher");
            sbc.UseSubscriptionService("msmq://localhost/mt_subscriptions");
            sbc.UseMsmq();
            sbc.VerifyMsmqConfiguration();
        });
        var bus = Bus.Instance;

        Thread.Sleep(5000); // this makes it all work :)
        int i = 0;
        foreach (string filename in System.IO.Directory.EnumerateFiles(@"C:Usersandy.bakerPictures", "*.*", SearchOption.AllDirectories))
        {
            Console.WriteLine(filename);
            bus.Publish(new Messages.FileRegistered {FilePath = filename});
            i++;
        }
        Console.WriteLine("Published {0} messages", i);
        Console.ReadLine();

订阅者是这样配置的;

Bus.Initialize(sbc => {
                     sbc.UseMsmq();
                     sbc.VerifyMsmqConfiguration();
                     sbc.ReceiveFrom("msmq://localhost/andy_subscriber1");
                              sbc.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                           }
            );

。和第二个订户...

Bus.Initialize(sbc =>
        {
            sbc.UseMsmq();
            sbc.VerifyMsmqConfiguration();
            sbc.ReceiveFrom("msmq://localhost/andy_subscriber2");
            sbc.UseSubscriptionService("msmq://localhost/mt_subscriptions");
        }

提前感谢您的任何建议。

你如何订阅你的消费者?为了在重新启动后幸存下来,它们应该是永久性的。http://docs.masstransit-project.com/en/latest/configuration/sub_config_api.html

s.Consumer<TConsumer>().Permanent();

最新更新