RX对不同的触发器做出反应,然后同步处理动作



我正在编写一个基本同步两个数据源的服务。要求它定期同步,但管理员也可以在某些情况下手动触发它同步。

我们正在使用IPC的MassTransit消息,所以web界面将通过MassTransit发送(说)TriggerSyncMessage,服务将看到这些。

然后使用响应式扩展,我们订阅了MT消息,并设置了一个计时器可观察对象来响应——非常好。

但是,由于这些可观察对象的类型不同,我们不能Merge()它们。我们希望两个(重)动作触发相同的事情(同步的东西),所以我目前使用一个主题绑定在一起的2个可观察对象,然后我订阅了最后一部分。

像这样:

var syncMessageObserver = Bus.Instance.AsObservable<TriggerSyncMessage>();
var subject = new Subject<string>();
syncMessageObserver.Subscribe(msg =>
{
    subject.OnNext("from message");
});
var timerSyncer = Observable.Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(60000)).Timestamp();
 timerSyncer.Subscribe(x =>
{
    subject.OnNext("from timer");
});
subject.AsObservable().Subscribe(msg =>
{
    // actual syncing stuff
});

我对这种方法有两个问题:1. 这是……处理这个案子的正确方法是什么?我不确定你是否打算使用一个主题将可观察对象联系在一起,或者是否有更好的方法。

  1. 作为这个问题的扩展,我希望同步方法的下一个运行不要开始,直到当前的一个已经完成。而且,当需要同步时,启动同步的多个调用只考虑一次。因此,如果我们从计时器运行同步操作,而不耐烦的管理员按下"同步"按钮5次,当计时器操作完成后,我们只在此之后再运行一次。有什么建议吗?

第一部分看起来是可以解决的。忽略价值观,投射你的信息。这里我们使用的模式是将一个被忽略的参数命名为"_"(这是一个有效的c#变量名)。

//TODO: Rename syncMessageObserver. It is not an Observer, it is an Observable.
Observable.Merge(
    syncMessageObserver.Select(_ => "from message"),
    Observable.Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(60000)).Timestamp().Select( _=> "from timer")
)
.Subscribe(msg =>
{
    // actual syncing stuff
});

下一个问题有点难,但很常见。通常这些需求是出于不同的原因,但在这里你仍然可以使用相同的解决方案,那就是ObserveLatestOn算子。这来自银行,我们只需要显示最新的价格。如果在报出价格的时间内,收到了许多其他的价格,我们应该只显示最新的价格。

你可以在MSDN Rx论坛,JamesWorld的博客和YouTube视频中看到它的讨论,讨论这是如何产生的。

您只需更新您的查询,使其具有新的操作符并传入适当的调度器。

//TODO: Rename syncMessageObserver. It is not an Observer, it is an Observable.
Observable.Merge(
    syncMessageObserver.Select(_=>"from message"),
    Observable.Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(60000)).Timestamp().Select(_=>"from timer")
)
.ObserveLatestOn(Scheduler.CurrentThread)
.Subscribe(msg =>
{
    // actual syncing stuff
});

有关Scheduler是什么以及如何为您的场景选择合适的Scheduler的更多信息,请参阅此处。如果测试对您很重要,那么您还需要在这里查找有关如何以快速和确定的方式对调度/并发代码进行单元测试的内容。

相关内容

  • 没有找到相关文章

最新更新