是否通过跨多个处方受试者的呼叫订单来保证订购



在下面的例子中,我有两个主题,并且_primarySubject总是在_secondarySubject之前调用。订阅者是否保证在二次回调之前接收主回调?测试简单的测试似乎是肯定的,还是侥幸?

如果这是侥幸,我如何才能更改代码以保证订单,正如我在这里描述的那样。

非常感谢。

public class EventGenerator
{
    ISubject<string> _primarySubject = new Subject<string>();
    ISubject<int> _secondarySubject = new Subject<int>();
    private int i;
    public void SendEvent(string message)
    {
        _primarySubject.OnNext(message);
        _secondarySubject.OnNext(i++);
    }
    public IObservable<string> GetPrimaryStream()
    {
        return _primarySubject;
    }
    public IObservable<int> GetSecondaryStream()
    {
        return _secondarySubject;
    }
}
public class EventSubscriber
{
    private static IScheduler StaticFakeGUIThread = new EventLoopScheduler(x => new Thread(x) { Name = "GUIThread" });
    private readonly int _id;
    public EventSubscriber(int id, EventGenerator eventGenerator)
    {
        _id = id;
        eventGenerator.GetPrimaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnPrimaryEvent);
        eventGenerator.GetSecondaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnSecondaryEvent);
    }
    private void OnPrimaryEvent(String eventMsg)
    {
        string msg = eventMsg;
    }
    private void OnSecondaryEvent(int i)
    {
        int msg = i;
    }
}
[TestFixture]
public class EventGeneratorTest
{
    [Test]
    public void Test()
    {
        EventGenerator eg = new EventGenerator();
        EventSubscriber s1 = new EventSubscriber(1,eg);
        EventSubscriber s2 = new EventSubscriber(2, eg);
        eg.SendEvent("A");
        eg.SendEvent("B");
    }
}

通常,Rx不会对不同可观察流的相对排序做出很多保证。排序取决于RX无法控制的许多因素。

在您的特定情况下,订单是有保证的:

  1. 由于您按顺序同步触发受试者,因此可以保证按顺序触发他们的即时订阅(.ObserveOn(StaticFakeUIThread)
  2. 由于对ObserveOn的两次调用都使用相同的调度程序实例,因此可以保证在观察次要流事件之前安排主要流事件的观察,并且两次观察都将使用相同的调度器进行安排
  3. 由于您的调度程序是EventLoopScheduler,因此可以保证它不会同时运行计划任务,而是按照计划的顺序运行这些任务

因此,你的观察确实会按顺序进行。

EventLoopScheduler替换为TaskPoolSchedulerThreadPoolSchedulerScheduler.Default,将不再保证两个流的相对顺序,因为允许这些调度器同时运行两个调度任务。

这一切都取决于你所说的"保证"是什么意思。Rx的当前实现和编写的代码是这样的,调用顺序将始终与您所观察到的一样——然而,没有发布的规则来管理这种行为并保证它

代码的编写方式对使用两个主题(或者实际上是两个订阅)没有特别的优势,因为处理程序是在同一个线程上调用的。只需使用一个组合事件的主题,就可以获得所需的保证——为此使用自定义类型。为了方便起见,我使用了一个Tuple:

ISubject<Tuple<string,int>> _subject = new Subject<Tuple<string,int>>();
private int i;
public void SendEvent(string message)
{
    _subject.OnNext(Tuple.Create(message,i++));
}

以及您的订阅:

private void OnEvent(Tuple<String,int> eventMsg)
{
    string msgText = eventMsg.Item1;
    int msgNum = eventMsg.Item2;
}

这给了你保证,无论日程安排等。我怀疑你的问题一定有更令人信服的原因——但除此之外,这更有效。

如果您只想使用计数器btw,Select的重载会为您引入一个基于0的事件索引计数器。

理论上一切都应该按顺序进行。在实践中,EventLoopScheduler中存在一个错误,如果队列不足,它会破坏秩序。

请在此处查看问题https://github.com/Reactive-Extensions/Rx.NET/issues/455

相关内容

  • 没有找到相关文章

最新更新