在创建可观察量时将未知数量的可观察量组合/合并在一起



我想做的是:

  • 调用一个函数(DoWork(,作为其工作的一部分,它将通过多个Worker类订阅多个热输入
  • 在调用函数之前订阅DoWork订阅的所有更新
  • 完成后,释放所有订阅
  • DoWork完成之前,可能至少会触发一个传入事件。

问题:

  • Subject是正确的方法吗?感觉应该有更好的方法吗?
  • 如何确保一旦Main中的订阅被释放,所有incomingX订阅也会被释放 - 即 Main应控制所有订阅的生命周期。

    void Main()
    {
        var worker = new Worker();
        using (worker.UpdateEvents.Subscribe(x => Console.WriteLine()))
        {
            worker.DoWork();
        }
    }
    public class Worker1
    {
        private readonly Subject<string> updateEvents = new Subject<string>();
        public IObservable<string> UpdateEvents { get { return updateEvents; } }
        public void DoWork()
        {
            // Do some work
            // subscribe to a hot observable (events coming in over the network)
            incoming1.Subscribe(updateEvents);
            var worker2 = new Worker2();
            worker2.UpdateEvents.Subscribe(updateEvents);
            worker2.DoWork();
        }
    }
    public class Worker2
    {
        private readonly Subject<string> updateEvents = new Subject<string>();
        public IObservable<string> UpdateEvents { get { return updateEvents; } }
        public void DoWork()
        {
            // Do some work
            // subscribe to some more events
            incoming2.Subscribe(updateEvents);
            var workerN = new WorkerN();
            workerN.UpdateEvents.Subscribe(updateEvents);
            workerN.DoWork();
        }
    }
    

詹姆斯的回答(使用SubjectMerge(抓住了问题的本质。 这个答案提供了一种我发现在这种情况下有用的模式(基于您对詹姆斯答案的评论(。

从本质上讲,该模式是让您的工作人员公开调用者在调用DoWork之前将订阅的IObservable。 但是这种 API(在调用 B 之前调用 A(是有问题的,因为它引入了时间耦合。

为了消除时间耦合,您最终将工作线程本身变成了一个冷的可观察对象,当调用方订阅时隐式调用DoWork。 一旦你意识到冷可观察量的力量,以及在观察者订阅时使用Observable.Create采取行动的能力,天空就是你可以创建的Rx链的极限,而不需要伸手去拿Subject。 下面是一个基于原始代码的示例。

Worker很简单。 它只是订阅incoming1Worker2Worker2稍微复杂一些。 它订阅incoming2,执行一些额外的工作,然后最终订阅WorkerN

始终保持正确的OnErrorOnCompleted原始代码示例无法做到的逻辑。 这意味着在所有传入流和工作流完成之前,Main看到的可观察流不会Complete。 但是,一旦任何传入流或工作流失败,Main就会失败。 具有多个 Subscribe(someSubject) 调用的代码示例将导致Subject在任何incoming流完成后立即完成(从而完成 Main 的传入流(。

public class Worker1
{
    public IObservable<string> UpdateEvents { get; private set; };
    public Worker1()
    {
        // Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            var worker2 = new Worker2();
            return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
        });
    }
}
public class Worker2
{
    public IObservable<string> UpdateEvents { get; private set; };
    public Worker2()
    {
        // Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            // maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
            var doWorkThenSubscribeToWorker = Observable.Create(o =>
            {
                DoWork(o);
                var worker = new WorkerN();
                return worker.UpdateEvents.Subscribe(o);
            }
            return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
        });
    }
    private void DoWork(IObserver<string> observer)
    {
        // do some work
        observer.OnNext("result of work");
    }
}

void Main()
{
    var worker = new Worker();
    worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}

很难完全遵循您的要求 - 我认为一个小而完整的程序会有所帮助。

也就是说,使用Subject将输入引入 Rx 管道并没有错 - 在 StackOverflow 和其他地方有很多关于它的文章,所以我不会重新讨论它。

仅凭您问题的标题,我想知道以下内容是否符合您的目的?

组合动态数量的流

为此,您可以在流流上使用Merge。您的流必须全部属于同一类型 - 如果不是,您可以创建一个合适的容器类型并使用 Select 将它们投影到该类型中。为简单起见,我假设统一类型为 long

要开始为流创建容器,请执行以下操作:

var container = new Subject<IObservable<long>>();

然后合并包含的流:

var combined = container.Merge();

订阅 to combined 以常规方式使用结果,并释放订阅以一次取消订阅所有流。

然后,您可以在创建流时添加流,如下所示:

// assume we got this from somewhere - e.g. a "worker" factory function
// Observable.Create may well be helpful to create an observable
// that initiates getting data from a network connection upon its subscription
IObservable<long> someNewStream;
// add the someNewStream to the container (it will be subscribed to once added)
container.OnNext(someNewStream);

使用示例

// dump out the combined results to the console,
// IRL you would subscribe to this to process the results
var subscription = combined.Subscribe(Console.WriteLine);
// add a stream of longs
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));    
Console.WriteLine("Stream 1 added");
Console.ReadLine();
// add another stream
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));    
Console.WriteLine("Step 2");
Console.ReadLine();
// this will unsubscribe from all the live streams
subscription.Dispose();

相关内容

  • 没有找到相关文章

最新更新