响应式扩展——引发异步事件并订阅特定线程



我有一系列使用RX发布/订阅模型的模块。

这是事件注册码(每个订阅模块重复):

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(module.OnDataEvent);  

出版商很简单,多亏了jos Romaniello的代码:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> _subjects = 
        new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
        return subject.AsObservable(); 
    }
    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject; 
        if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
        { 
            ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
        }
    }
}

现在我的问题:正如你在上面看到的,我使用。observeon (Scheduler.TaskPool)方法从每个模块的池中为每个事件派生一个新线程。这是因为我有很多事件和模块。当然,问题是事件在时间顺序上混淆了,因为一些事件被彼此接近地触发,然后以错误的顺序调用OnDataEvent回调(每个OnDataEvent都携带一个时间戳)。

是否有一种简单的方法来使用RX来确保事件的正确顺序?或者我可以编写自己的Scheduler来确保每个模块按顺序获得事件吗?

事件当然是按照正确的顺序发布的。

尝试使用EventPublisher:

public class EventPublisher : IEventPublisher
{
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>();
    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        return _subject
            .Where(o => o is TEvent)
            .Select(o => (TEvent)o)
            .ObserveOn(_scheduler);
    }
    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}

它使用EventLoopScheduler来确保所有事件在同一个后台线程中有序发生。

从订阅中删除ObserveOn,因为如果您在另一个线程上观察,您可能会再次出现事件以错误顺序发生的风险。

这解决你的问题了吗?

尝试如下同步方法:

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize()
    .Subscribe(module.OnDataEvent);

虽然我用相同的代码尝试了你的场景,发现数据按顺序到达,不重叠。

相关内容

  • 没有找到相关文章

最新更新