"long"间隔后的完整可观察序列



下面的可观察序列将每个元素添加到ReplaySubject中,这样我以后就可以访问任何元素,甚至可以等待ReplaySubject。它在到达一个时间跨度后完成"重新获取主题"。

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});

我希望序列一直运行到上次"OnNext"调用后的给定时间,然后完成。这将在各种蓝牙通信场景中非常有用,在这些场景中,蓝牙设备将向我提供一系列数据,然后在没有任何完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果距离上次蓝牙通知"太长",我想完成ReplaySubject。

我可以通过创建一个计时器,在接收到每个元素时重置它,然后在计时器达到"太长"时完成ReplaySubject来实现这一点,但我听说创建一个对象并从可观察的订阅中操作它是不安全的。

关于如何在"太长"的间隔后完成一个序列,有什么建议吗?

根据我所听到的,这是一个不安全的版本,但应该按预期工作:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
reading = false;
};
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeWhile(x => reading)
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
timer.Stop();
timer.Start();
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});

根据Simonare的第一个答案:,这似乎令人满意

characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});

您可以考虑使用超时运算符。这样做的唯一缺点是它以错误信号结束。您可能需要处理错误

如果Observable在指定的时间段内未能发出任何项目,则Timeout操作符允许您通过onError终止中止Observable。

如果你使用下面的方法,你可以超过错误

.Timeout(200, Promise.resolve(42));

另一个变体允许您指示超时切换到您指定的备份Observable,而不是在触发超时条件时以错误终止。


characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
....
rfidPlayer.OnNext(....);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});

由于出现异常,我发现使用Timeout很麻烦。

我更喜欢在序列中注入一个值,可以用来终止序列。例如,如果我的序列产生非负数,那么如果我注入-1,我知道结束序列。

这里有一个例子:

从这个可观测值开始,它生成从1开始的2的幂,并且它还将生成每个值延迟挂起值的毫秒数。

Observable
.Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))

所以1、2、4、8等等,越来越慢。

现在我想停止这个序列,如果3.0秒没有值,那么我可以这样做:

.Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
.Switch()
.TakeWhile(x => x >= 0)

如果我运行这个序列,我会得到这个输出:

12.4.8.16326412825651210242048

序列即将产生4096,但它首先等待4096毫秒来产生该值,同时Observable.Timer(TimeSpan.FromSeconds(3.0))触发并输出-1,从而停止序列。

这个查询的关键部分是Switch的使用。它获取一个IObservable<IObservable<T>>,并通过只订阅最新的外部可观测值和取消订阅前一个来生成IObservable<T>

因此,在我的查询中,序列产生的每个新值都会停止并重新启动Timer

在你的情况下,你的可观察到的是这样的:

characteristic
.WhenNotificationReceived()
.Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
.Switch()
.TakeWhile(x => x != null)
.Subscribe(rfidPlayer);

这里有一个您可以使用的自定义运算符TakeUntilTimeout,它是内置Timeout运算符之上的一个薄层。

/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
this IObservable<T> source,
TimeSpan timeout)
{
return source.Timeout(timeout, Observable.Empty<T>());
}

相关内容

  • 没有找到相关文章

最新更新