这两个可观测操作等价吗



我不知道为什么,但由于某种原因,当使用通过concat创建的可观察对象时,我总是会从列表中获取所有推送的值(按预期工作)。其中,与正常订阅一样,似乎有些值永远不会提供给那些订阅了可观测值的人(仅在特定条件下)。

这是我正在使用的两个案例。有人能解释一下为什么在订阅第二个版本的某些情况下,并没有收到所有的值吗?它们不对等吗?这里的意图是让水流倒回。有什么原因可以解释为什么案例2失败,而案例1没有。

此处的重播只是正在进行的流的列表。

案例1。

let observable = 
Observable.Create(fun (o:IObserver<'a>) ->
let next b =
for v in replay do
o.OnNext(v.Head)
o.OnNext(b)
o.OnCompleted()
someOtherObs.Subscribe(next, o.OnError, o.OnCompleted))
let toReturn = observable.Concat(someOtherObs).Publish().RefCount()

案例2。

let toReturn = 
Observable.Create(fun (o:IObserver<'a>) ->
for v in replay do
o.OnNext(v.Head)
someOtherObs.Subscribe(o)
).Publish().RefCount()

小心!我没有经常使用F#来100%适应语法,但我想我知道发生了什么。

也就是说,这两种情况在我看来都很奇怪,这在很大程度上取决于一些OtherObs是如何实现的,以及在哪里(就线程而言)运行。

案例1分析

你将concat应用于一个看起来像这样工作的源流:

  • 它订阅someOtherObs,并响应第一个事件(a),将重播元素推送给观察者
  • 然后它将事件(a)发送给观测者
  • 然后它就完成了。此时,流已结束,不再发送任何其他事件
  • 如果someOtherObs为空或只有一个错误,则会将其传播到观察器

现在,当这个流完成时,someOtherObs被连接到它上。现在发生的事情有点不可编辑-如果someOtherOb是冷的,那么第一个事件将被第二次发送,如果someOtherObs是热的,那么第一个事件不会被重新发送,但剩余的事件中的哪个事件下一个将进行存在潜在的竞争条件,这取决于someOtherObs的实现方式。如果天气炎热,你很容易错过活动。

案例2分析

您重播所有重播事件,然后发送someOtherObs的所有事件-但是,如果someOtherOb很热,也会有比赛条件,因为您只在推送重播后订阅,因此可能会错过一些事件。

评论

无论哪种情况,在我看来都很混乱。

这看起来像是试图将世界状态(sotw)和直播相结合。在这种情况下,您需要首先订阅直播流,并缓存任何事件,然后获取和推送sotw事件。推送sotw后,您将推送缓存的事件-小心对可能在sotw中读取的事件进行重复数据消除-直到您赶上实时事件,此时您可以通过实时事件。

您通常可以通过简单的实现来刷新实时流订阅的OnNext处理程序中的实时缓存,从而在刷新时有效地阻塞源代码,但如果您有大量历史记录和/或快速移动的实时流,则会面临对实时源代码施加过多背压的风险。

让你思考的一些考虑因素有望让你走上正确的道路。

作为参考,这里是一个非常天真和简单的C#实现,我用rx-main-nuget包在LINQPad中编译。我过去做过的准备生产的实现可能会变得相当复杂:

void Main()
{
// asynchronously produce a list from 1 to 10
Func<Task<List<int>>> sotw =
() => Task<List<int>>.Run(() => Enumerable.Range(1, 10).ToList());

// a stream of 5 to 15
var live = Observable.Range(5, 10);

// outputs 1 to 15
live.MergeSotwWithLive(sotw).Subscribe(Console.WriteLine);
}
// Define other methods and classes here
public static class ObservableExtensions
{
public static IObservable<TSource> MergeSotwWithLive<TSource>(
this IObservable<TSource> live,
Func<Task<List<TSource>>> sotwFactory)
{
return Observable.Create<TSource>(async o =>
{       
// Naïve indefinite caching, no error checking anywhere             
var liveReplay = new ReplaySubject<TSource>();
live.Subscribe(liveReplay);
// No error checking, no timeout, no cancellation support
var sotw = await sotwFactory();
foreach(var evt in sotw)
{
o.OnNext(evt);
}                               

// note naive disposal
// and extremely naive de-duping (it really needs to compare
// on some unique id)
// we are only supporting disposal once the sotw is sent            
return liveReplay.Where(evt => !sotw.Any(s => s.Equals(evt)))
.Subscribe(o);                  
});
}
}

相关内容

  • 没有找到相关文章

最新更新