我正在努力建立一个制作人。使用 .NET 反应式的使用者模式。生产者从 Kafka 消息总线读取消息。读取消息后,需要将其移交给消费者来处理消息。
我能够使用 .NET 反应式来做到这一点。但是,我注意到消费者正在与生产者相同的线程上处理消息。请参阅下面的代码。目标是:有一个从总线读取消息的生产者线程。然后,在单独的线程上将其交给使用者来处理消息。我拥有的代码是:
// Producer Code
private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();
private IObserver<LGMessage> messageBusObserver;
protected IObservable<LGMessage> _onMessageObservable
{
get
{
return _onMessageSubject.AsObservable();
}
}
public void AddObserver(IObserver<LGMessage> observer)
{
_onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);
}
// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
{
// add the message to the observable
_onMessageSubject.OnNext(msg.Value);
}
// Consumer Code
public virtual void OnNext(string value)
{
Console.WriteLine("Thread {0} Consuming",
Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("{1}: Message I got from bus: {0}", value.Key,
this.Name);
// Take Action
}
很难从你的代码中分辨出来,但看起来你没有公开可观察量。这将拒绝下游使用 Rx 运算符。在您的情况下,您希望使用线程运算符。
在制作人中,与其公开AddObserver(IObserver<string> observer)
,不如公开这样的东西:
public IObservable<string> Messages => _onMessageSubject.AsObservable();
然后,消费者可以执行类似操作
Messages
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(consumerObserver);
编辑:
以下代码按预期对我有用:
var subject = new Subject<int>();
var observer1 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer1: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer2 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer2: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer3 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer3: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer3);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
下面是输出(观察者 1 得到线程 14,观察者 2 得到线程 15,观察者 3 得到线程 16(:
Observer1: Observed 1 on thread 14.
Observer2: Observed 1 on thread 15.
Observer1: Observed 2 on thread 14.
Observer1: Observed 3 on thread 14.
Observer2: Observed 2 on thread 15.
Observer2: Observed 3 on thread 15.
Observer3: Observed 1 on thread 16.
Observer3: Observed 2 on thread 16.
Observer3: Observed 3 on thread 16.