有一个代码:
someObservable.Select(x => getY(x));
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
在某种情况下,我需要在一段时间后仔细检查X.value。最简单和不良的解决方案是使用螺纹:
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
if (x.SomethingElse == true)
{
Thread.Sleep(timeout);
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
}
这里的正确代码是什么?我需要以相同的方式订购活动。这意味着如果我有延迟,并且我得到了一个新值,就必须等待处理。
解决方案(来自https://rsdn.org/forum/dotnet/6629370.1)是返回iObservable,而不是在gety中返回y,并与concat.delay进行concat。
IObservable<Y> getY(X x)
{
if (x.Value == X.ABC)
return Observable.Return(new Y(1));
else
if (x.SomethingElse == true)
{
return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
}
}
或
IObservable<Y> getY(X x)
{
return Observable.Create<Y>(async (obs, token) =>
{
if (x.Value == X.ABC)
obs.OnNext(new Y(1));
else
if (x.SomethingElse == true)
{
await Task.Delay(timeout, token);
if (x.Value == X.ABC)
obs.OnNext(new Y(1));
else
obs.OnNext(new Y(2));
}
}
}
and:someObservable.Select(x => gety(x))。concat();
如果这是一个一般问题,建议您实现自定义操作员。该延迟方法的过载可以接受另一个可观察到的以控制延迟时间。在示例中,即使数字延迟时,均匀的数字也会立即推动;
Observable.Interval(TimeSpan.FromSeconds(1))
.Delay(i => (i % 2 == 0) ? Observable.Return(0L) : Observable.Timer(TimeSpan.FromSeconds(0.9)))
.Subscribe(Console.WriteLine);
编辑:这是一个更简单的订单保留操作员以延迟:
static IObservable<T> DelayOrdered<T>(this IObservable<T> observable, Func<T, TimeSpan> delaySelector, IScheduler scheduler = default(IScheduler))
{
scheduler = scheduler ?? DefaultScheduler.Instance;
return Observable.Create<T>(observer =>
{
var now = scheduler.Now;
return observable
.Subscribe(value =>
{
now = now.Add(delaySelector(value));
scheduler.Schedule(now, () => observer.OnNext(value));
});
});
}
用法:
Observable.Interval(TimeSpan.FromSeconds(0.2))
.DelayOrdered(i => (i % 2 == 0) ? TimeSpan.Zero : TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine);
订单将被保留,因为绝对计划的时间只能增加。