下面的可观察序列将每个元素添加到ReplaySubject中,这样我以后就可以访问任何元素,甚至可以等待ReplaySubject。它在到达一个时间跨度后完成"重新获取主题"。
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
我希望序列一直运行到上次"OnNext"调用后的给定时间,然后完成。这将在各种蓝牙通信场景中非常有用,在这些场景中,蓝牙设备将向我提供一系列数据,然后在没有任何完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果距离上次蓝牙通知"太长",我想完成ReplaySubject。
我可以通过创建一个计时器,在接收到每个元素时重置它,然后在计时器达到"太长"时完成ReplaySubject来实现这一点,但我听说创建一个对象并从可观察的订阅中操作它是不安全的。
关于如何在"太长"的间隔后完成一个序列,有什么建议吗?
根据我所听到的,这是一个不安全的版本,但应该按预期工作:
bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
reading = false;
};
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeWhile(x => reading)
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
timer.Stop();
timer.Start();
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
根据Simonare的第一个答案:,这似乎令人满意
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
您可以考虑使用超时运算符。这样做的唯一缺点是它以错误信号结束。您可能需要处理错误
如果Observable在指定的时间段内未能发出任何项目,则Timeout操作符允许您通过onError终止中止Observable。
如果你使用下面的方法,你可以超过错误
.Timeout(200, Promise.resolve(42));
另一个变体允许您指示超时切换到您指定的备份Observable,而不是在触发超时条件时以错误终止。
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
....
rfidPlayer.OnNext(....);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
由于出现异常,我发现使用Timeout
很麻烦。
我更喜欢在序列中注入一个值,可以用来终止序列。例如,如果我的序列产生非负数,那么如果我注入-1
,我知道结束序列。
这里有一个例子:
从这个可观测值开始,它生成从1开始的2的幂,并且它还将生成每个值延迟挂起值的毫秒数。
Observable
.Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))
所以1、2、4、8等等,越来越慢。
现在我想停止这个序列,如果3.0
秒没有值,那么我可以这样做:
.Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
.Switch()
.TakeWhile(x => x >= 0)
如果我运行这个序列,我会得到这个输出:
12.4.8.16326412825651210242048
序列即将产生4096
,但它首先等待4096
毫秒来产生该值,同时Observable.Timer(TimeSpan.FromSeconds(3.0))
触发并输出-1
,从而停止序列。
这个查询的关键部分是Switch
的使用。它获取一个IObservable<IObservable<T>>
,并通过只订阅最新的外部可观测值和取消订阅前一个来生成IObservable<T>
。
因此,在我的查询中,序列产生的每个新值都会停止并重新启动Timer
。
在你的情况下,你的可观察到的是这样的:
characteristic
.WhenNotificationReceived()
.Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
.Switch()
.TakeWhile(x => x != null)
.Subscribe(rfidPlayer);
这里有一个您可以使用的自定义运算符TakeUntilTimeout
,它是内置Timeout
运算符之上的一个薄层。
/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
this IObservable<T> source,
TimeSpan timeout)
{
return source.Timeout(timeout, Observable.Empty<T>());
}