我有以下 Rx 扩展方法来对IEnumerable<T>
进行分区并延迟每个分区值的生成。它使用IEnumerable<T>
扩展对数据进行分区,这也与单元测试一起显示。
有没有比使用Observable.Timer().Wait()
方法调用更好的方法来执行"延迟"?
public static class RxExtensions
{
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(
this IObservable<IEnumerable<T>> source, int size, TimeSpan interval,
IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
var intervalEnabled = false;
return source.SelectMany(x => x.Partition(size).ToObservable())
.Window(1)
.SelectMany(x =>
{
if (!intervalEnabled)
{
intervalEnabled = true;
}
else
{
Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
}
return x;
})
.ObserveOn(scheduler);
}
}
public static class EnumerableExtensions
{
public static IEnumerable<IEnumerable<T>> Partition<T>(
this IEnumerable<T> source, int size)
{
using (var enumerator = source.GetEnumerator())
{
var items = new List<T>();
while (enumerator.MoveNext())
{
items.Add(enumerator.Current);
if (items.Count == size)
{
yield return items.ToArray();
items.Clear();
}
}
if (items.Any())
{
yield return items.ToArray();
}
}
}
}
Rx 扩展方法的测试如下所示:
static void Main(string[] args)
{
try
{
var data = Enumerable.Range(0, 10);
var interval = TimeSpan.FromSeconds(1);
Observable.Return(data)
.PartitionWithInterval(2, interval)
.Timestamp()
.Subscribe(x =>
{
var message = $"{x.Timestamp} - count = {x.Value.Count()}" +
$", values - {x.Value.First()}, {x.Value.Last()}";
Console.WriteLine(message);
});
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
这应该可以做到:
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
return source
//don't need the .ToObservable() call, since Zip can work on IEnumerable + IObservable.
.SelectMany(x => x.Partition(size))
.Zip(Observable.Interval(interval, scheduler).StartWith(0), (x, _) => x)
.ObserveOn(scheduler);
}
有趣的是PartitionWithInterval
实际上如何称呼Partition
和Interval
.
StartWith
就在那里,所以你会立即删除一个分区:类似于你拥有intervalEnabled
标志的方式。
下面是PartitionWithInterval
运算符的实现,它针对内存效率进行了优化。IObservable<IEnumerable<T>>
发出的枚举项是延迟枚举的,刚好足以生成下一个或两个分区。然后,它们的枚举将暂停,直到下一个间隔。为了实现这种惰性,实现使用IAsyncEnumerable
s 而不是IObservable
s,并使用包 System.Linq.Async 和 System.Interactive.Async 中的运算符。
public static IObservable<IList<T>> PartitionWithInterval<T>(
this IObservable<IEnumerable<T>> source, int size,
TimeSpan interval, IScheduler scheduler = null)
{
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
Task delayTask = Task.CompletedTask;
return source
.ToAsyncEnumerable()
.SelectMany(x => x.ToAsyncEnumerable()).Buffer(size) /* Behavior A */
//.SelectMany(x => x.ToAsyncEnumerable().Buffer(size)) /* Behavior B */
.Do(async (_, cancellationToken) =>
{
await delayTask;
var timer = Observable.Timer(interval, scheduler);
delayTask = timer.ToTask(cancellationToken);
})
.ToObservable();
});
}
下面是一个弹珠图,显示了配置了size: 2
的PartitionWithInterval
运算符的行为:
Source: +----[1,2,3,4,5]--------------------[6,7,8,9]---|
Output: +----[1,2]-------[3,4]--------------[5,6]-------[7,8]-------[9]|
如图所示,输出分区可能包含来自多个枚举项(上图中的分区[5,6]
)的值。如果这是不可取的,只需注释"行为 A"行并取消注释"行为 B"行。下面的大理石图显示了此更改的效果:
Source: +----[1,2,3,4,5]--------------------[6,7,8,9]---|
Output: +----[1,2]-------[3,4]-------[5]-------[6,7]-------[8,9]|
注意:对于延迟枚举可观察源发出的枚举的意图,上述解决方案并不绝对令人满意。理想的情况是准确地在应该发出分区的时间生成每个分区。相反,上面的实现在发出前一个分区后立即收集下一个分区的元素。另一种方法是在发出每个分区(包括最后一个分区)后强制实施延迟。这会将结果IObservable
的完成推迟等于interval
的时间跨度,这也不理想(此行为由本答案的修订版 3 实现)。理想的行为可以通过重新实现运算符ToAsyncEnumerable
、SelectMany
、Buffer
和Do
来实现,以便它们传达当前发出的元素的状态IsLast
。即使这是可能的,也需要付出很多努力才能实现如此微不足道的改进。
感觉你必须使用运算符缓冲区。试试这个:
data.ToObservable()
.Buffer(2)
.Zip(Observable.Interval(interval), (x, _) => x)
.Timestamp()
.Subscribe(x =>
{
var message = $"buffer {x.Timestamp} - count = {x.Value.Count()}, values - {x.Value.First()}, {x.Value.Last()}";
Console.WriteLine(message);
});