使用 Rx 执行延迟采样的最佳方法是什么?



我正在开发一个Xamarin应用程序,其中我扩展了连接插件以使用Rx而不是事件。

我的目标是在重新建立连接时引入轻微延迟,以允许网络适配器有时间连接到 Internet(UWP 的解决方法(。如果在此延迟内出现任何值,则只需要保留最后一个值,因为只有当前连接状态很重要。

这工作正常,但感觉有点笨拙:

internal static class ConnectivityExtensions
{
public static IObservable<bool> ToObservable(this IConnectivity @this)
{
var connectivity = Observable
.FromEventPattern<ConnectivityChangedEventHandler, ConnectivityChangedEventArgs>(
handler => @this.ConnectivityChanged += handler,
handler => @this.ConnectivityChanged -= handler)
.Select(args => args.EventArgs.IsConnected);
var sampling = connectivity
.Timestamp()
.Select(ts => new
{
ts.Value,
ts.Timestamp,
// If reconnection, delay subscriber notification for 250ms
DelayUntil = ts.Value ? (DateTimeOffset?)ts.Timestamp.Add(TimeSpan.FromMilliseconds(250)) : null
})
.Scan((acc, current) => new
{
current.Value,
current.Timestamp,
// If current notification is during reconnection notification delay period, delay the current notification too
DelayUntil = current.Timestamp < acc.DelayUntil ? acc.DelayUntil : current.DelayUntil
})
// Perform reconnection delay
.Delay(x => x.DelayUntil.HasValue
? Observable.Return(x.DelayUntil.Value).Delay(x.DelayUntil.Value)
: Observable.Empty<DateTimeOffset>())
// All delayed notifications are delayed until the same time, so we only need one notification to trigger sampling after delay
.DistinctUntilChanged()
.Select(_ => Unit.Default);
return connectivity
.Sample(sampling)
.StartWith(@this.IsConnected)
.DistinctUntilChanged()
.Replay(1)
.RefCount();
}
}

示例输出:

通过这个例子,你希望看到我正在做的事情的好处。延迟会阻止订阅者在"连接"更改但尚未建立互联网连接 (UWP( 时处理数据。此外,这还可以保护订阅者免受任何快速"开/关"通知的影响。

               // StartsWith: True
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: True, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Sample: False
Thread.Sleep(TimeSpan.FromMilliseconds(250));
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged
// Final Output:
// True
// False

有没有更优化的方法来实现这种类型的延迟采样?

如果我理解正确的话,这是事情将如何发展的大理石图:

T (millis)           : 0----250--500--1000-1250-1500-1750-2000
Connectivity         : ---F-------T---F----T-------F--T-------
ScanValueDelayUntil  : ---null----800-null-1500----null2100---
Sampling             : -------------x-----------x-----------x-
ResultSampled        : T------------T-----------T-----------T-
ResultSampledDistinct: T--------------------------------------

这似乎没有多大意义。你能描述一下你希望结果代表什么,或者纠正我错的地方吗?

大约一年后,我重新审视这个问题时,发现了一个未记录的Throttle过载,这大大简化了我想要做的事情。我还创建了自己的响应式IConnectivity抽象,以减少对特定库的依赖:

internal sealed class ReactiveCrossConnectivity : IConnectivity
{
public IObservable<bool> IsConnected { get; }
public ReactiveCrossConnectivity(
Plugin.Connectivity.Abstractions.IConnectivity connectivity, 
ISchedulerProvider scheduler)
{
IsConnected = Observable
.FromEventPattern<ConnectivityChangedEventHandler, ConnectivityChangedEventArgs>(
handler => connectivity.ConnectivityChanged += handler,
handler => connectivity.ConnectivityChanged -= handler,
scheduler.Defaults.ConstantTimeOperations)
.Select(args => args.EventArgs.IsConnected)
.Throttle(isConnected => isConnected
? Observable.Timer(TimeSpan.FromMilliseconds(250),
scheduler.Defaults.TimeBasedOperations)
: Observable.Return<long>(0))
.StartWith(scheduler.Defaults.ConstantTimeOperations, connectivity.IsConnected)
.DistinctUntilChanged()
.Replay(1, scheduler.Defaults.Iteration)
.RefCount();
}
}

以下是确认行为的 NUnit/NSubstitute 测试:

public sealed class ReactiveCrossConnectivityTest : ReactiveTest
{
[Test]
public void IsConnected_ThrottlesOnConnect()
{
var connectivity = Substitute.For<Plugin.Connectivity.Abstractions.IConnectivity>();
connectivity.IsConnected.Returns(true);
var testScheduler = new TestScheduler();
var sut = new ReactiveCrossConnectivity(
connectivity, new SingleSchedulerProvider(testScheduler));
var isConnectedObserver = testScheduler.CreateObserver<bool>();
sut.IsConnected.Subscribe(isConnectedObserver);
void Change(bool isConnected) => connectivity.ConnectivityChanged +=
Raise.Event<ConnectivityChangedEventHandler>(connectivity,
new ConnectivityChangedEventArgs { IsConnected = isConnected });
testScheduler.Schedule(TimeSpan.FromTicks(3), () => Change(true));
testScheduler.Schedule(TimeSpan.FromMilliseconds(251), () => Change(false));
testScheduler.Schedule(TimeSpan.FromMilliseconds(501), () => Change(true));
testScheduler.Schedule(TimeSpan.FromMilliseconds(751), () => Change(true));
testScheduler.Schedule(TimeSpan.FromMilliseconds(752), () => Change(false));
testScheduler.Schedule(TimeSpan.FromMilliseconds(753), () => Change(true));
testScheduler.Schedule(TimeSpan.FromMilliseconds(754), () => Change(false));
testScheduler.Schedule(TimeSpan.FromMilliseconds(1001), () => Change(true));
testScheduler.Schedule(TimeSpan.FromMilliseconds(1002), () => Change(false));
testScheduler.Start();
isConnectedObserver.Messages.AssertEqual(
OnNext(2, true),
OnNext(TimeSpan.FromMilliseconds(251).Ticks + 1, false));
}
}

相关内容

  • 没有找到相关文章

最新更新