在下面的例子中,我有两个主题,并且_primarySubject总是在_secondarySubject之前调用。订阅者是否保证在二次回调之前接收主回调?测试简单的测试似乎是肯定的,还是侥幸?
如果这是侥幸,我如何才能更改代码以保证订单,正如我在这里描述的那样。
非常感谢。
public class EventGenerator
{
ISubject<string> _primarySubject = new Subject<string>();
ISubject<int> _secondarySubject = new Subject<int>();
private int i;
public void SendEvent(string message)
{
_primarySubject.OnNext(message);
_secondarySubject.OnNext(i++);
}
public IObservable<string> GetPrimaryStream()
{
return _primarySubject;
}
public IObservable<int> GetSecondaryStream()
{
return _secondarySubject;
}
}
public class EventSubscriber
{
private static IScheduler StaticFakeGUIThread = new EventLoopScheduler(x => new Thread(x) { Name = "GUIThread" });
private readonly int _id;
public EventSubscriber(int id, EventGenerator eventGenerator)
{
_id = id;
eventGenerator.GetPrimaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnPrimaryEvent);
eventGenerator.GetSecondaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnSecondaryEvent);
}
private void OnPrimaryEvent(String eventMsg)
{
string msg = eventMsg;
}
private void OnSecondaryEvent(int i)
{
int msg = i;
}
}
[TestFixture]
public class EventGeneratorTest
{
[Test]
public void Test()
{
EventGenerator eg = new EventGenerator();
EventSubscriber s1 = new EventSubscriber(1,eg);
EventSubscriber s2 = new EventSubscriber(2, eg);
eg.SendEvent("A");
eg.SendEvent("B");
}
}
通常,Rx不会对不同可观察流的相对排序做出很多保证。排序取决于RX无法控制的许多因素。
在您的特定情况下,订单是有保证的:
- 由于您按顺序同步触发受试者,因此可以保证按顺序触发他们的即时订阅(
.ObserveOn(StaticFakeUIThread)
) - 由于对
ObserveOn
的两次调用都使用相同的调度程序实例,因此可以保证在观察次要流事件之前安排主要流事件的观察,并且两次观察都将使用相同的调度器进行安排 - 由于您的调度程序是
EventLoopScheduler
,因此可以保证它不会同时运行计划任务,而是按照计划的顺序运行这些任务
因此,你的观察确实会按顺序进行。
将EventLoopScheduler
替换为TaskPoolScheduler
、ThreadPoolScheduler
或Scheduler.Default
,将不再保证两个流的相对顺序,因为允许这些调度器同时运行两个调度任务。
这一切都取决于你所说的"保证"是什么意思。Rx的当前实现和编写的代码是这样的,调用顺序将始终与您所观察到的一样——然而,没有发布的规则来管理这种行为并保证它
代码的编写方式对使用两个主题(或者实际上是两个订阅)没有特别的优势,因为处理程序是在同一个线程上调用的。只需使用一个组合事件的主题,就可以获得所需的保证——为此使用自定义类型。为了方便起见,我使用了一个Tuple:
ISubject<Tuple<string,int>> _subject = new Subject<Tuple<string,int>>();
private int i;
public void SendEvent(string message)
{
_subject.OnNext(Tuple.Create(message,i++));
}
以及您的订阅:
private void OnEvent(Tuple<String,int> eventMsg)
{
string msgText = eventMsg.Item1;
int msgNum = eventMsg.Item2;
}
这给了你保证,无论日程安排等。我怀疑你的问题一定有更令人信服的原因——但除此之外,这更有效。
如果您只想使用计数器btw,Select
的重载会为您引入一个基于0的事件索引计数器。
理论上一切都应该按顺序进行。在实践中,EventLoopScheduler中存在一个错误,如果队列不足,它会破坏秩序。
请在此处查看问题https://github.com/Reactive-Extensions/Rx.NET/issues/455