我想做什么的声明如下所示:
// Checks input source for timeouts, based on the number of elements received
// from clock since the last one received from source.
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
this IObservable<T1> source,
IObservable<T2> clock,
int countForTimeout,
Func<R> timedOutSelector,
Func<T1, R> okSelector)
弹珠图在ascii中很困难,但这里是:
source --o---o--o-o----o-------------------o---
clock ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---
我尝试寻找现有的Observable
函数,这些函数可以以我可以使用的方式组合source
和clock
,但大多数组合函数依赖于接收"每个中的一个"(And
,Zip
),或者它们从"缺失的"值(CombineLatest
)中重新返回"上一个"值,或者它们离我需要的太远了(Amb
, GroupJoin
、Join
、Merge
、SelectMany
、Timeout
)。 Sample
看起来很接近,但我不想将源吞吐量限制为时钟速率。
所以现在我试图填补这里的大空白:
return new AnonymousObservable<R>(observer =>
{
//One observer, two observables??
});
抱歉,这里的"你尝试过什么"部分有点弱:假设我已经尝试过思考它!我不要求完全实现,只是:
- 是否有内置功能可以帮助我错过?
- 如何构建订阅两个可观察量的基于 lambda 的观察器?
你没有要求完整的实现,但我认为这是一个解决方案:
public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
this IObservable<T1> source,
IObservable<T2> clock,
int countForTimeout,
Func<TR> timedOutSelector,
Func<T1, TR> okSelector)
{
return source
.Select(i => clock.Take(countForTimeout).LastAsync())
.Switch().Select(_ => timedOutSelector())
.Merge(source.Select(okSelector));
}
它的工作原理如下 - 我注意到您的输出是由 okSelector 投影的源,与超时事件合并。因此,我专注于生成超时事件,因为其余的都很容易。
这个想法是每次源发射时创建一个倒计时,并在每个时钟脉冲上减少这个倒计时。如果源发出,我们将中止倒计时,否则当倒计时达到 0 时,我们将生成 timedOut 事件。
分解一下:
- 将每个源元素投影到一个接受前
countForTimeout
元素的流中 - 请注意,时钟流必须是"热"可观察的,因为我们在每个倒计时事件中订阅它。时钟流很热是很正常的。如果这发生事件,我们将超时。 -
Switch
将抛弃除最新倒计时流之外的所有内容。 - 使用
Select
投影到超时事件。 - 现在只需合并源事件。
这是我使用的单元测试,旨在与您的大理石图非常相似(nuget rx-testing和nunit用于编译所需的库):
[Test]
public void AKindOfTimeoutTest()
{
var scheduler = new TestScheduler();
var clockStream = scheduler.CreateHotObservable(
OnNext(100, Unit.Default),
OnNext(200, Unit.Default),
OnNext(300, Unit.Default),
OnNext(400, Unit.Default),
OnNext(500, Unit.Default),
OnNext(600, Unit.Default),
OnNext(750, Unit.Default), /* make clock funky! */
OnNext(800, Unit.Default),
OnNext(900, Unit.Default));
var sourceStream = scheduler.CreateColdObservable(
OnNext(50, 1),
OnNext(150, 2),
OnNext(250, 3),
OnNext(275, 4),
OnNext(400, 5),
OnNext(900, 6));
Func<int> timedOutSelector = () => 0;
Func<int, int> okSelector = i => i;
var results = scheduler.CreateObserver<int>();
sourceStream.TimeoutDetector(clockStream, 3, timedOutSelector, okSelector)
.Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(50, 1),
OnNext(150, 2),
OnNext(250, 3),
OnNext(275, 4),
OnNext(400, 5),
OnNext(750, 0),
OnNext(900, 6));
}
}
要尝试回答您的具体问题,请执行以下操作:
- 问。是否有内置功能可以帮助我错过?答:扫描可能是关键。
- 问。如何构建订阅两个可观察量的基于 lambda 的观察器?一个。不太确定你的意思...有很多方法可以组合流,您提到了其中的大多数。
这是我提到的 Observable.Create 方法(相同的测试有效):
public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
this IObservable<T1> source,
IObservable<T2> clock,
int countForTimeout,
Func<TR> timedOutSelector,
Func<T1, TR> okSelector)
{
return Observable.Create<TR>(observer =>
{
var counter = countForTimeout;
var timeoutSub = clock.Subscribe(_ =>
{
var count = Interlocked.Decrement(ref counter);
if (count == 0)
{
observer.OnNext(timedOutSelector());
}
},
observer.OnError,
observer.OnCompleted);
var sourceSub = source.Subscribe(
i =>
{
Interlocked.Exchange(ref counter, countForTimeout);
observer.OnNext(okSelector(i));
},
observer.OnError,
observer.OnCompleted);
return new CompositeDisposable(sourceSub, timeoutSub);
});
}
请注意,Observable.Create 对于确保使用正确的 Rx 语法非常有帮助(即流发出 OnNext* (OnError |已完成)?- 这意味着我可以稍微放松一次发送OnError或OnComplete。
出了这个,这比詹姆斯的答案更不漂亮。
public static IObservable<R> TimeoutDetector2<T1, T2, R>(
this IObservable<T1> source,
IObservable<T2> clock, int maxDiff,
Func<R> timedOutSelector, Func<T1, R> okSelector)
{
return new AnonymousObservable<R>(observer =>
{
int counter = 0;
object gate = new object();
bool error = false;
bool completed = false;
bool timedOut = false;
var sourceSubscription = source.Subscribe(
x =>
{
lock(gate)
{
if(!error && !completed) observer.OnNext(okSelector(x));
counter = 0;
timedOut = false;
}
},
ex =>
{
lock(gate)
{
error = true;
if(!completed) observer.OnError(ex);
}
},
() =>
{
lock(gate)
{
completed = true;
if(!error) observer.OnCompleted();
}
});
var clockSubscription = clock.Subscribe(
x =>
{
lock(gate)
{
counter = counter + 1;
if(!error && !completed && counter > maxDiff && !timedOut)
{
timedOut = true;
observer.OnNext(timedOutSelector());
}
}
},
ex =>
{
lock(gate)
{
error = true;
if(!completed) observer.OnError(ex);
}
},
() =>
{
lock(gate)
{
completed = true;
if(!error) observer.OnCompleted();
}
});
//need to return a subscription
return new CompositeDisposable(sourceSubscription, clockSubscription);
}).Publish().RefCount(); // prevent subscribers provoking more than one subscription to source and clock
}
当然这是一个老问题。我一直在寻找比timeoutWith
更高级的东西,以便我可以取消超时逻辑。
我想知道超时是否几乎等同于:
race(throwError('timedout').pipe(delay(10000)), yourObs$)
那么这里显示的"throwError"当然是可以取消的。
如果你想知道为什么 - 我有一些由可观察链控制的"步骤",我有一个超时。但是,如果其中一个步骤包括打开对话框,那么我希望取消超时!