我想做的是:
- 调用一个函数(
DoWork
(,作为其工作的一部分,它将通过多个Worker
类订阅多个热输入
。 - 在调用函数之前订阅
DoWork
订阅的所有更新 - 完成后,释放所有订阅 在
DoWork
完成之前,可能至少会触发一个传入事件。
问题:
Subject
是正确的方法吗?感觉应该有更好的方法吗?-
如何确保一旦
Main
中的订阅被释放,所有incomingX
订阅也会被释放 - 即Main
应控制所有订阅的生命周期。void Main() { var worker = new Worker(); using (worker.UpdateEvents.Subscribe(x => Console.WriteLine())) { worker.DoWork(); } } public class Worker1 { private readonly Subject<string> updateEvents = new Subject<string>(); public IObservable<string> UpdateEvents { get { return updateEvents; } } public void DoWork() { // Do some work // subscribe to a hot observable (events coming in over the network) incoming1.Subscribe(updateEvents); var worker2 = new Worker2(); worker2.UpdateEvents.Subscribe(updateEvents); worker2.DoWork(); } } public class Worker2 { private readonly Subject<string> updateEvents = new Subject<string>(); public IObservable<string> UpdateEvents { get { return updateEvents; } } public void DoWork() { // Do some work // subscribe to some more events incoming2.Subscribe(updateEvents); var workerN = new WorkerN(); workerN.UpdateEvents.Subscribe(updateEvents); workerN.DoWork(); } }
詹姆斯的回答(使用Subject
和Merge
(抓住了问题的本质。 这个答案提供了一种我发现在这种情况下有用的模式(基于您对詹姆斯答案的评论(。
从本质上讲,该模式是让您的工作人员公开调用者在调用DoWork
之前将订阅的IObservable
。 但是这种 API(在调用 B 之前调用 A(是有问题的,因为它引入了时间耦合。
为了消除时间耦合,您最终将工作线程本身变成了一个冷的可观察对象,当调用方订阅时隐式调用DoWork
。 一旦你意识到冷可观察量的力量,以及在观察者订阅时使用Observable.Create
采取行动的能力,天空就是你可以创建的Rx链的极限,而不需要伸手去拿Subject
。 下面是一个基于原始代码的示例。
Worker
很简单。 它只是订阅incoming1
和Worker2
。 Worker2
稍微复杂一些。 它订阅incoming2
,执行一些额外的工作,然后最终订阅WorkerN
。
始终保持正确的OnError
,OnCompleted
原始代码示例无法做到的逻辑。 这意味着在所有传入流和工作流完成之前,Main
看到的可观察流不会Complete
。 但是,一旦任何传入流或工作流失败,Main
就会失败。 具有多个 Subscribe(someSubject)
调用的代码示例将导致Subject
在任何incoming
流完成后立即完成(从而完成 Main 的传入流(。
public class Worker1
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker1()
{
// Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
var worker2 = new Worker2();
return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
});
}
}
public class Worker2
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker2()
{
// Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
// maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
var doWorkThenSubscribeToWorker = Observable.Create(o =>
{
DoWork(o);
var worker = new WorkerN();
return worker.UpdateEvents.Subscribe(o);
}
return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
});
}
private void DoWork(IObserver<string> observer)
{
// do some work
observer.OnNext("result of work");
}
}
void Main()
{
var worker = new Worker();
worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}
很难完全遵循您的要求 - 我认为一个小而完整的程序会有所帮助。
也就是说,使用Subject
将输入引入 Rx 管道并没有错 - 在 StackOverflow 和其他地方有很多关于它的文章,所以我不会重新讨论它。
仅凭您问题的标题,我想知道以下内容是否符合您的目的?
组合动态数量的流
为此,您可以在流流上使用Merge
。您的流必须全部属于同一类型 - 如果不是,您可以创建一个合适的容器类型并使用 Select
将它们投影到该类型中。为简单起见,我假设统一类型为 long
。
要开始为流创建容器,请执行以下操作:
var container = new Subject<IObservable<long>>();
然后合并包含的流:
var combined = container.Merge();
订阅 to combined 以常规方式使用结果,并释放订阅以一次取消订阅所有流。
然后,您可以在创建流时添加流,如下所示:
// assume we got this from somewhere - e.g. a "worker" factory function
// Observable.Create may well be helpful to create an observable
// that initiates getting data from a network connection upon its subscription
IObservable<long> someNewStream;
// add the someNewStream to the container (it will be subscribed to once added)
container.OnNext(someNewStream);
使用示例
// dump out the combined results to the console,
// IRL you would subscribe to this to process the results
var subscription = combined.Subscribe(Console.WriteLine);
// add a stream of longs
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("Stream 1 added");
Console.ReadLine();
// add another stream
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("Step 2");
Console.ReadLine();
// this will unsubscribe from all the live streams
subscription.Dispose();