处理可观察计时器



我正在使用Reactive . net扩展,我想知道如何处理它。我知道在某些情况下这样处理它是好的:.TakeUntil(Observable.Timer(TimeSpan.FromMinutes(x)))。我

第一例在本例中,我有一个计时器,在x秒后触发,然后它完成并应该被处理。

public void ScheduleOrderCancellationIfNotFilled(string pair, long orderId, int waitSecondsBeforeCancel)
{
Observable.Timer(TimeSpan.FromSeconds(waitSecondsBeforeCancel))
.Do(e =>
{
var result = _client.Spot.Order.GetOrder(pair, orderId);
if (result.Success)
{
if (result.Data?.Status != OrderStatus.Filled)
{
_client.Spot.Order.CancelOrder(pair, orderId);
}
}
})
.Subscribe();
}

第二种情况在本例中,计时器在第一秒运行,然后每29分钟重复一次。它应该一直存在,直到它的定义类被销毁。我认为这个问题应该用IDisposable实现来解决。如何?

var keepAliveListenKey = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29))
.Do(async e =>
{
await KeepAliveListenKeyAsync().ConfigureAwait(false);
})
.Subscribe();

编辑

我还希望它使用Subject<T>,这使得它更容易处置和重置订阅。

例如:重置和处置可观察订阅者,响应式扩展(@Enigmativity)

public class UploadDicomSet : ImportBaseSet
{
IDisposable subscription;
Subject<IObservable<long>> subject = new Subject<IObservable<long>>();
public UploadDicomSet()
{
subscription = subject.Switch().Subscribe(s => CheckUploadSetList(s));
subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
}
void CheckUploadSetList(long interval)
{
subject.OnNext(Observable.Never<long>());
// Do other things
}
public void AddDicomFile(SharedLib.DicomFile dicomFile)
{
subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
// Reset the subscription to go off in 2 minutes from now
// Do other things
}
}

在第一种情况下它会被自动处理。实际上,这是实现自动订阅管理的一种常用方法,而且这绝对是处理rx的一种很好的、优雅的方法。

在第二种情况下,您已经过度设计了。Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))本身足以生成随时间递增的long序列。由于这个流本质上是无止境的,因此需要对订阅进行明确的管理。所以有:

就足够了var sub = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Subscribe()

…和sub.Dispose()它以后。

注:请注意,在您的代码中.Doasync/await。很可能那不是你想要的。您希望SelectMany确保async操作被正确等待,异常被处理。


在评论区回答你的问题:

用Subject代替处理怎么样?

嗯,没什么特别的。IObserver<>IObservable<>都是由这个类实现的,它类似于经典的。net事件(对某些事件调用的回调列表)。对于你的问题和用例,它在任何意义上都没有区别。

你能给一个关于。do异常处理的例子吗?

确定。这个想法是,您希望将您的async/await封装成一些Task<T>IObservable<T>,这样就保留了取消和错误信号。为此,必须使用.SelectMany方法(就像LINQ中的SelectMany一样,同样的想法)。所以把你的.Do改成.SelectMany

Observable
.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(() => /* that's the point where your Task<> becomes Observable */ myTask))

我又糊涂了。我需要IObservable(Select)或IObservable (SelectMany)

很可能,你不需要开关。为什么?因为创建它主要是为了避免IO竞争条件,这样每当发出新事件时,当前事件(由于自然并行性或异步工作流可能正在进行中)都保证被取消(即取消订阅)。否则,竞争条件可能(也必将)破坏你的状态。

SelectMany则相反,它将确保所有事件都是按顺序发生的,它们确实以某种总顺序到达。没有什么会被取消。您将完成(等待,如果您愿意)当前回调,然后触发下一个回调。当然,这样的行为可以通过适当的IScheduler来改变,但那是另一回事。

响应式可观察订阅处理(@Enigmativity)

订阅扩展方法返回的一次性对象只允许你在可观察对象自然结束之前手动取消订阅。

如果可观察对象以OnCompleted或OnError完成,则订阅已经为你处置。

有一件重要的事情需要注意:垃圾收集器永远不会对可观察对象订阅调用。dispose(),所以如果订阅在你的订阅超出作用域之前还没有(或可能没有)自然结束,你必须处理掉它们。

第一例看起来我不需要在第一种情况下手动. dispose()订阅,因为它自然结束。

结束时正在触发Dispose

var xs = Observable.Create<long>(o =>
{
var d = Observable.Timer(TimeSpan.FromSeconds(5))
.Do(e =>
{
Console.WriteLine("5 seconds elapsed.");
})
.Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));

第二种情况但在第二种情况下,它没有"自然"结束,我应该处理它。

除非手动处置,否则不会触发处置。

var xs = Observable.Create<long>(o =>
{
var d = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
.Do(e =>
{
Console.WriteLine("Test.");
})
.Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
结论

他举了一个很好的例子,如果你也在问自己同样的问题,值得一看。

相关内容

  • 没有找到相关文章

最新更新