我有一系列使用RX发布/订阅模型的模块。
这是事件注册码(每个订阅模块重复):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
出版商很简单,多亏了jos
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
现在我的问题:正如你在上面看到的,我使用。observeon (Scheduler.TaskPool)方法从每个模块的池中为每个事件派生一个新线程。这是因为我有很多事件和模块。当然,问题是事件在时间顺序上混淆了,因为一些事件被彼此接近地触发,然后以错误的顺序调用OnDataEvent回调(每个OnDataEvent都携带一个时间戳)。
是否有一种简单的方法来使用RX来确保事件的正确顺序?或者我可以编写自己的Scheduler来确保每个模块按顺序获得事件吗?
事件当然是按照正确的顺序发布的。
尝试使用EventPublisher
:
public class EventPublisher : IEventPublisher
{
private readonly EventLoopScheduler _scheduler = new EventLoopScheduler();
private readonly Subject<object> _subject = new Subject<object>();
public IObservable<TEvent> GetEvent<TEvent>()
{
return _subject
.Where(o => o is TEvent)
.Select(o => (TEvent)o)
.ObserveOn(_scheduler);
}
public void Publish<TEvent>(TEvent sampleEvent)
{
_subject.OnNext(sampleEvent);
}
}
它使用EventLoopScheduler
来确保所有事件在同一个后台线程中有序发生。
从订阅中删除ObserveOn
,因为如果您在另一个线程上观察,您可能会再次出现事件以错误顺序发生的风险。
这解决你的问题了吗?
尝试如下同步方法:
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool).Synchronize()
.Subscribe(module.OnDataEvent);
虽然我用相同的代码尝试了你的场景,发现数据按顺序到达,不重叠。