反应式扩展非重叠,串行 GroupBy(或 WindowUntilChange)



我试图为 Rx.Net 制作一个 SerialGroupBy 运算符。运算符的要点是像 GroupBy 一样工作,但每次创建新组时,前者就完成了。因此,一次打开的组永远不会超过一个。

我目前的"最佳"实现是这样的:

public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
stream.Publish(shared => 
shared.GroupByUntil(keySelector, elementSelector, group => 
shared.DistinctUntilChanged(keySelector)));

我希望在下一组开始之前关闭小组,就像这里测试的那样:

[Fact]
public void SerialGroupBy()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateHotObservable(
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
var observer = scheduler.CreateObserver<string>();
stream.SerialGroupBy(x => x.Length, x => x)
.Select(x => x.Subscribe(observer))
.Subscribe();
scheduler.Start();
observer.Messages.ShouldBeLike(
OnNext(201, "First group"),
OnCompleted<string>(202),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
}

但是第一组的完成来得太晚了,比如:

OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));

我可以理解为什么(根据 GroupByUntill 的实现,在结束观察者之前通知开场观察者(,但我如何实现它以使组不重叠?

我已经尝试了几种不同的方法,但它总是以同一问题的变体结束。

这是我最后想出的:

public static class SerialGroupByOperator
{
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
Observable.Create<IGroupedObservable<TKey, TElement>>(observer => stream
.Scan<TSource, GroupedObservable<TKey, TElement>>(null, (current, next) =>
{
var key = keySelector(next);
if (current == null)
{
var nextGroup = new GroupedObservable<TKey, TElement>(key);
observer.OnNext(nextGroup);
nextGroup.Group.OnNext(elementSelector(next));
return nextGroup;
}
if (!Equals(key, current.Key))
{
current.Group.OnCompleted();
var nextGroup = new GroupedObservable<TKey, TElement>(key);
observer.OnNext(nextGroup);
nextGroup.Group.OnNext(elementSelector(next));
return nextGroup;
}
current.Group.OnNext(elementSelector(next));
return current;
})
.LastOrDefaultAsync()
.Where(x => x != null)
.Subscribe(x => x.Group.OnCompleted(), observer.OnError, observer.OnCompleted));
}
internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
{
public GroupedObservable(TKey key)
{
Key = key;
Group = new Subject<TElement>();
}
public TKey Key { get; }
public ISubject<TElement> Group { get; }
public IDisposable Subscribe(IObserver<TElement> observer) => Group.Subscribe(observer);
}

相关内容

  • 没有找到相关文章

最新更新