我正在使用RX框架为消息写一个侦听器。
我面临的问题是,我使用的库使用了每当消息到达时发布事件的消费者。
我设法通过Observable.FromEventPattern
消耗了传入的消息,但是我对已经在服务器中的消息有问题。
目前我有以下命令链
- 创建消费者
- 使用
FromEventPattern
创建可观察的序列并应用所需的转换 - 告诉消费者开始
- 订阅序列
最简单的解决方案是交换步骤3。
理想情况下,我想在步骤4发生时执行步骤3(例如OnSubscribe
方法(。
感谢您的帮助:(
ps:为了添加更多详细信息,这些事件来自兔子队队,我正在使用rabbitmq.client软件包中的EventingBasicConsumer
类。
在这里您可以找到我正在工作的库。具体来说,这是给我问题的类/方法。
编辑
这是有问题代码的剥离版本
void Main()
{
var engine = new Engine();
var messages = engine.Start();
messages.Subscribe(m => m.Dump());
Console.ReadLine();
engine.Stop();
}
public class Engine
{
IConnection _connection;
IModel _channel;
public IObservable<Message> Start()
{
var connectionFactory = new ConnectionFactory();
_connection = connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
a => consumer.Received += a,
a => consumer.Received -= a)
.Select(e => e.EventArgs);
_channel.BasicConsume("a_queue", false, consumer);
return observable.Select(Transform);
}
private Message Transform(BasicDeliverEventArgs args) => new Message();
public void Stop()
{
_channel.Dispose();
_connection.Dispose();
}
}
public class Message { }
我遇到的症状是,自从我在订阅序列之前调用基本计算机以来,兔子队列中的任何消息都会被获取但未传递到管道中。
,由于我没有"自动",因此一旦程序停止,消息就会返回队列。
正如某些评论中指出的那样,正如您在问题中所指出的那样,问题是由于您使用RabbitMQ客户端的方式。
要解决其中一些问题,我实际上所做的就是创建一个观察力的班级类。这是当前正在使用的EventingBasicConsumer的替代方法。我这样做的一个原因是处理问题中描述的问题,但另一件事是让您重复使用单个连接/频道实例之外的消费者对象。这具有允许您的下游反应代码尽管有瞬时连接/频道特征。
。using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;
namespace com.rabbitmq.consumers
{
public sealed class ObservableConsumer : IBasicConsumer
{
private readonly List<string> _consumerTags = new List<string>();
private readonly object _consumerTagsLock = new object();
private readonly Subject<Message> _subject = new Subject<Message>();
public ushort PrefetchCount { get; set; }
public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }
/// <summary>
/// Registers this consumer on the given queue.
/// </summary>
/// <returns>The consumer tag assigned.</returns>
public string ConsumeFrom(IModel channel, string queueName)
{
Model = channel;
return Model.BasicConsume(queueName, false, this);
}
/// <summary>
/// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
/// </summary>
public IObservable<Message> IncomingMessages
{
get { return _subject.ObserveOn(Scheduler.ThreadPool); }
}
///<summary>Retrieve the IModel instance this consumer is
///registered with.</summary>
public IModel Model { get; private set; }
///<summary>Returns true while the consumer is registered and
///expecting deliveries from the broker.</summary>
public bool IsRunning
{
get { return _consumerTags.Count > 0; }
}
/// <summary>
/// Run after a consumer is cancelled.
/// </summary>
/// <param name="consumerTag"></param>
private void OnConsumerCanceled(string consumerTag)
{
}
/// <summary>
/// Run after a consumer is added.
/// </summary>
/// <param name="consumerTag"></param>
private void OnConsumerAdded(string consumerTag)
{
}
public void HandleBasicConsumeOk(string consumerTag)
{
lock (_consumerTagsLock) {
if (!_consumerTags.Contains(consumerTag))
_consumerTags.Add(consumerTag);
}
}
public void HandleBasicCancelOk(string consumerTag)
{
lock (_consumerTagsLock) {
if (_consumerTags.Contains(consumerTag)) {
_consumerTags.Remove(consumerTag);
OnConsumerCanceled(consumerTag);
}
}
}
public void HandleBasicCancel(string consumerTag)
{
lock (_consumerTagsLock) {
if (_consumerTags.Contains(consumerTag)) {
_consumerTags.Remove(consumerTag);
OnConsumerCanceled(consumerTag);
}
}
}
public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
{
//Don't need to do anything.
}
public void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
//Hack - prevents the broker from sending too many messages.
//if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
// Model.BasicReject(deliveryTag, true);
// return;
//}
var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
var deliveryData = new MessageDeliveryData()
{
ConsumerTag = consumerTag,
DeliveryTag = deliveryTag,
Redelivered = redelivered,
};
message.Tag = deliveryData;
if (AckMode != AcknowledgeMode.AckWhenReceived) {
message.Acknowledged += messageAcknowledged;
message.Failed += messageFailed;
}
_subject.OnNext(message);
}
void messageFailed(Message message, Exception ex, bool requeue)
{
try {
message.Acknowledged -= messageAcknowledged;
message.Failed -= messageFailed;
if (message.Tag is MessageDeliveryData) {
Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
}
}
catch {}
}
void messageAcknowledged(Message message)
{
try {
message.Acknowledged -= messageAcknowledged;
message.Failed -= messageFailed;
if (message.Tag is MessageDeliveryData) {
var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
}
}
catch {}
}
}
}
我认为无需实际订阅兔子队列(通过BasicConsume
(,直到您对可观察到的订阅者为止。现在,您正在立即开始兔子订阅,即使没有人订阅它,也可以将物品推到可观察的。
假设我们有此样本类:
class Events {
public event Action<string> MessageArrived;
Timer _timer;
public void Start()
{
Console.WriteLine("Timer starting");
int i = 0;
_timer = new Timer(_ => {
this.MessageArrived?.Invoke(i.ToString());
i++;
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
}
public void Stop() {
_timer?.Dispose();
Console.WriteLine("Timer stopped");
}
}
您现在正在做的事情基本上是:
var ev = new Events();
var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
ev.Start();
return ob;
您需要的是可观察到的,只有在某人订阅时才可以做到这一点:
return Observable.Create<string>(observer =>
{
var ev = new Events();
var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
// first subsribe
var sub = ob.Subscribe(observer);
// then start
ev.Start();
// when subscription is disposed - unsubscribe from rabbit
return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
});
很好,但是现在每次订阅可观察的都会导致对兔子队的单独订阅,这不是我们所需要的。我们可以用Publish().RefCount()
:
return Observable.Create<string>(observer => {
var ev = new Events();
var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
var sub = ob.Subscribe(observer);
ev.Start();
return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
}).Publish().RefCount();
现在将发生的是,当订阅者首先订阅可观察到的时(ref计数从0到1( - 调用Observable.Create
主体的代码并订阅兔子队列。然后,所有后续订户共享此订阅。当上次取消订阅(REF计数为零( - 订阅被处置时,ev.Stop
被称为,我们退出兔子队列。
如果发生这种情况,请致电Start()
(在您的代码中可观察到(,并且永远不会订阅 - 什么也不会发生,也根本没有对兔子的订阅。