我一直推迟使用反应式扩展这么久,我认为这将是一个很好的用途。很简单,我有一个方法,可以出于各种原因在各种代码路径上调用
private async Task GetProductAsync(string blah) {...}
我需要能够限制此方法。也就是说,我想停止调用流,直到不再进行调用(在指定的时间段内(。或者更清楚的是,如果在某个时间段内对此方法进行了 10 次调用,我想在最后一次调用时将其限制(限制(为仅 1 次调用(一段时间后(。
我可以看到一个使用带有IEnumerable
的方法的示例,这种方式是有道理的
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{ ... }
...
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
但是,(这一直是我对 Rx 的主要问题,永远(,我如何从简单的async
方法创建Observable
。
更新
我已经设法找到了使用反应属性的替代方法
Barcode = new ReactiveProperty<string>();
Barcode.Select(text => Observable.FromAsync(async () => await GetProductAsync(text)))
.Throttle(TimeSpan.FromMilliseconds(1000))
.Switch()
.ToReactiveProperty();
前提是我在 text 属性Barcode
上捕获它,但它有自己的缺点,因为ReactiveProperty
负责通知,并且我无法默默更新支持字段作为它已经管理。
总而言之,如何将async
方法调用转换为可观察,以便我可以使用 Throttle 方法?
与您的问题无关,但可能会有所帮助:Rx 的Throttle
运算符实际上是一个去抖运算符。最接近节流运算符的是Sample
.这是区别(假设您想限制或反跳到一个项目/3 秒(:
items : --1-23----4-56-7----8----9-
throttle: --1--3-----4--6--7--8-----9
debounce: --1-------4--6------8----9-
Sample
/throttle 将聚合在敏感时间内到达的项目,并在下一个采样时钟周期发出最后一个项目。Debounce 会丢弃在敏感时间内到达的项目,然后重新启动时钟:项目发出的唯一方法是在它之前有静音的时间范围。
RX.Net的Throttle
运算符执行上述debounce
描述的操作。Sample
做了上面throttle
描述的。
如果您想要不同的东西,请描述您想要如何限制。
将任务转换为可观察量有两种关键方法,它们之间有一个重要区别。
Observable.FromAsync(()=>GetProductAsync("test"));
和
GetProductAsync("test").ToObservable();
第一个任务在您订阅任务之前不会启动它。 第二个将创建(并启动(任务,结果将立即或稍后出现在可观察量中,具体取决于任务的速度。
不过,从总体上看您的问题,您似乎想停止通话。您不希望限制结果流,这会导致不必要的计算和损失。
如果这是你的目标,你的 GetProductAsync 可以被视为调用事件的观察者,并且 GetProductAsync 应该限制这些调用。实现这一目标的一种方法是宣布
public event Action<string> GetProduct;
和使用
var callStream= Observable.FromEvent<string>(
handler => GetProduct+= handler ,
handler => GetProduct-= handler);
然后,问题就变成了如何返回结果,以及当"调用方"的呼叫受到限制并被丢弃时会发生什么。
一种方法是声明一个类型"GetProductCall",它将输入字符串和输出结果作为属性。
然后,您可以进行如下设置:
var callStream= Observable.FromEvent<GetProductCall>(
handler => GetProduct+= handler ,
handler => GetProduct-= handler)
.Throttle(...)
.Select(r=>async r.Result= await GetProductCall(r.Input).ToObservable().FirstAsync());
(代码未经测试,仅供参考(
另一种方法可能包括限制并发可观察量最大数量的 Merge(N( 重载。