.NET - 使用 .NET 反应式的多线程生产者使用者



我正在努力建立一个制作人。使用 .NET 反应式的使用者模式。生产者从 Kafka 消息总线读取消息。读取消息后,需要将其移交给消费者来处理消息。

我能够使用 .NET 反应式来做到这一点。但是,我注意到消费者正在与生产者相同的线程上处理消息。请参阅下面的代码。目标是:有一个从总线读取消息的生产者线程。然后,在单独的线程上将其交给使用者来处理消息。我拥有的代码是:

 // Producer Code
 private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();
 private IObserver<LGMessage> messageBusObserver;
 protected IObservable<LGMessage> _onMessageObservable
    {
        get
        {
            return _onMessageSubject.AsObservable();
        }
    }

public void AddObserver(IObserver<LGMessage> observer)
    {
       _onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);

    }

// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
    {
            // add the message to the observable
            _onMessageSubject.OnNext(msg.Value);

    }
// Consumer Code
public virtual void OnNext(string value)
    {
        Console.WriteLine("Thread {0} Consuming",          
        Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("{1}: Message I got from bus: {0}", value.Key, 
         this.Name);
        // Take Action
    }

很难从你的代码中分辨出来,但看起来你没有公开可观察量。这将拒绝下游使用 Rx 运算符。在您的情况下,您希望使用线程运算符。

在制作人中,与其公开AddObserver(IObserver<string> observer),不如公开这样的东西:

public IObservable<string> Messages => _onMessageSubject.AsObservable();

然后,消费者可以执行类似操作

Messages
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(consumerObserver);

编辑

以下代码按预期对我有用:

var subject = new Subject<int>();
var observer1 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer1: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer2 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer2: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer3 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer3: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer3);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
下面是输出(观察者 1 得到线程 14,观察者 2 得到线程

15,观察者 3 得到线程 16(:

Observer1: Observed 1 on thread 14.
Observer2: Observed 1 on thread 15.
Observer1: Observed 2 on thread 14.
Observer1: Observed 3 on thread 14.
Observer2: Observed 2 on thread 15.
Observer2: Observed 3 on thread 15.
Observer3: Observed 1 on thread 16.
Observer3: Observed 2 on thread 16.
Observer3: Observed 3 on thread 16.

相关内容

  • 没有找到相关文章

最新更新