在使用FromeventPattern订阅之前捕捉事件



我正在使用RX框架为消息写一个侦听器。

我面临的问题是,我使用的库使用了每当消息到达时发布事件的消费者。

我设法通过Observable.FromEventPattern消耗了传入的消息,但是我对已经在服务器中的消息有问题。

目前我有以下命令链

  1. 创建消费者
  2. 使用FromEventPattern创建可观察的序列并应用所需的转换
  3. 告诉消费者开始
  4. 订阅序列

最简单的解决方案是交换步骤3。

理想情况下,我想在步骤4发生时执行步骤3(例如OnSubscribe方法(。

感谢您的帮助:(

ps:为了添加更多详细信息,这些事件来自兔子队队,我正在使用rabbitmq.client软件包中的EventingBasicConsumer类。

在这里您可以找到我正在工作的库。具体来说,这是给我问题的类/方法。

编辑

这是有问题代码的剥离版本

void Main()
{
    var engine = new Engine();
    var messages = engine.Start();
    messages.Subscribe(m => m.Dump());
    Console.ReadLine();
    engine.Stop();
}
public class Engine
{
    IConnection _connection;
    IModel _channel;
    public IObservable<Message> Start()
    {
        var connectionFactory = new ConnectionFactory();
        _connection = connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();
        EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
        var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
                                        a => consumer.Received += a, 
                                        a => consumer.Received -= a)
                                    .Select(e => e.EventArgs);
        _channel.BasicConsume("a_queue", false, consumer);
        return observable.Select(Transform);
    }
    private Message Transform(BasicDeliverEventArgs args) => new Message();
    public void Stop()
    {
        _channel.Dispose();
        _connection.Dispose();
    }
}
public class Message { }

我遇到的症状是,自从我在订阅序列之前调用基本计算机以来,兔子队列中的任何消息都会被获取但未传递到管道中。

,由于我没有"自动",因此一旦程序停止,消息就会返回队列。

正如某些评论中指出的那样,正如您在问题中所指出的那样,问题是由于您使用RabbitMQ客户端的方式。

要解决其中一些问题,我实际上所做的就是创建一个观察力的班级类。这是当前正在使用的EventingBasicConsumer的替代方法。我这样做的一个原因是处理问题中描述的问题,但另一件事是让您重复使用单个连接/频道实例之外的消费者对象。这具有允许您的下游反应代码尽管有瞬时连接/频道特征。

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;
namespace com.rabbitmq.consumers
{
    public sealed class ObservableConsumer : IBasicConsumer
    {
        private readonly List<string> _consumerTags = new List<string>();
        private readonly object _consumerTagsLock = new object();
        private readonly Subject<Message> _subject = new Subject<Message>();
        public ushort PrefetchCount { get; set; }
        public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }
        /// <summary>
        /// Registers this consumer on the given queue. 
        /// </summary>
        /// <returns>The consumer tag assigned.</returns>
        public string ConsumeFrom(IModel channel, string queueName)
        {
            Model = channel;
            return Model.BasicConsume(queueName, false, this);
        }
        /// <summary>
        /// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
        /// </summary>
        public IObservable<Message> IncomingMessages
        {
            get { return _subject.ObserveOn(Scheduler.ThreadPool); }
        }
        ///<summary>Retrieve the IModel instance this consumer is
        ///registered with.</summary>
        public IModel Model { get; private set; }
        ///<summary>Returns true while the consumer is registered and
        ///expecting deliveries from the broker.</summary>
        public bool IsRunning
        {
            get { return _consumerTags.Count > 0; }
        }
        /// <summary>
        /// Run after a consumer is cancelled.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerCanceled(string consumerTag)
        {
        }
        /// <summary>
        /// Run after a consumer is added.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerAdded(string consumerTag)
        {
        }
        public void HandleBasicConsumeOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (!_consumerTags.Contains(consumerTag))
                    _consumerTags.Add(consumerTag);
            }
        }
        public void HandleBasicCancelOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }
        public void HandleBasicCancel(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }
        public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
        {
            //Don't need to do anything.
        }
        public void HandleBasicDeliver(string consumerTag,
                                       ulong deliveryTag,
                                       bool redelivered,
                                       string exchange,
                                       string routingKey,
                                       IBasicProperties properties,
                                       byte[] body)
        {
            //Hack - prevents the broker from sending too many messages.
            //if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
            //    Model.BasicReject(deliveryTag, true);
            //    return;
            //}
            var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
            var deliveryData = new MessageDeliveryData()
            {
                ConsumerTag = consumerTag,
                DeliveryTag = deliveryTag,
                Redelivered = redelivered,
            };
            message.Tag = deliveryData;
            if (AckMode != AcknowledgeMode.AckWhenReceived) {
                message.Acknowledged += messageAcknowledged;
                message.Failed += messageFailed;
            }
            _subject.OnNext(message);
        }
        void messageFailed(Message message, Exception ex, bool requeue)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;
                if (message.Tag is MessageDeliveryData) {
                    Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
                }
            }
            catch {}
        }
        void messageAcknowledged(Message message)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;
                if (message.Tag is MessageDeliveryData) {
                    var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
                    Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
                }
            }
            catch {}
        }
    }
}

我认为无需实际订阅兔子队列(通过BasicConsume(,直到您对可观察到的订阅者为止。现在,您正在立即开始兔子订阅,即使没有人订阅它,也可以将物品推到可观察的。

假设我们有此样本类:

class Events {
    public event Action<string> MessageArrived;
    Timer _timer;
    public void Start()
    {
        Console.WriteLine("Timer starting");
        int i = 0;
        _timer = new Timer(_ => {
            this.MessageArrived?.Invoke(i.ToString());
            i++;
        }, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
    }
    public void Stop() {
        _timer?.Dispose();
        Console.WriteLine("Timer stopped");
    }
}

您现在正在做的事情基本上是:

var ev = new Events();
var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);               
ev.Start();    
return ob;

您需要的是可观察到的,只有在某人订阅时才可以做到这一点:

return Observable.Create<string>(observer =>
{
    var ev = new Events();
    var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
    // first subsribe
    var sub = ob.Subscribe(observer);
    // then start
    ev.Start();
    // when subscription is disposed - unsubscribe from rabbit
    return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
}); 

很好,但是现在每次订阅可观察的都会导致对兔子队的单独订阅,这不是我们所需要的。我们可以用Publish().RefCount()

解决该问题
return Observable.Create<string>(observer => {
    var ev = new Events();
    var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
    var sub = ob.Subscribe(observer);                    
    ev.Start();                
    return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
}).Publish().RefCount(); 

现在将发生的是,当订阅者首先订阅可观察到的时(ref计数从0到1( - 调用Observable.Create主体的代码并订阅兔子队列。然后,所有后续订户共享此订阅。当上次取消订阅(REF计数为零( - 订阅被处置时,ev.Stop被称为,我们退出兔子队列。

如果发生这种情况,请致电Start()(在您的代码中可观察到(,并且永远不会订阅 - 什么也不会发生,也根本没有对兔子的订阅。

相关内容

  • 没有找到相关文章

最新更新