我有无限的对象流。我的要求是,具有相同键的可观察流中的每个项目都应该同步处理,并且具有不同键的所有其他项目可能/应该并行处理。最简单的方法(如大多数地方所述(是使用GroupByUntil
运算符:
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
代码运行良好,直到我可以保证执行ProcessItem(item)
的时间少于 30 秒。否则group.Throttle(TimeSpan.FromSeconds(30), scheduler)
将关闭组的流,并且新项目到达并开始在新线程上处理的可能性非常高。
所以基本上我需要以某种方式知道我的线程已经完成了对具有特定键的所有项目的处理,并且我需要在durationSelector
内通知GroupByUntil
运算符参数。
关于如何实现这一目标的任何想法?提前谢谢。
这与这个问题非常相似:一种以偶数间隔推送缓冲事件的方法。
形成这个问题的答案,有一个运算符Drain
:
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
给定该运算符,您的问题变得非常简单:
var results = observableStream
.GroupBy(item => item.Id)
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Drain(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
给定一个看起来像 A1、A2、B1、A3、B2、C1、B3、C2 的流,GroupBy
按 ID 分隔流:
A: A1, A2, A3
B: B1, B2, B3
C: C1, C2
。Drain
确保对于给定子流中的项目,它们是串行运行的,而不是并行运行的。
似乎您需要 RxJSexhaustMap
运算符的变体:
将每个源值投影到一个可观察量,仅当上一个投影的可观察量已完成时,该可观察量才会合并到输出可观察量中。
这个运算符(ExhaustMap
(的Rx实现可以在这里找到。在您的情况下,您只需要对可观察序列的每个分组子序列应用相同的逻辑:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence that has the same key has completed.</summary>
public static IObservable<TResult> ExhaustMapPerKey<TSource, TKey, TResult>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TKey, IObservable<TResult>> function,
IEqualityComparer<TKey> keyComparer = default)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
return source
.GroupBy(keySelector, keyComparer)
.SelectMany(group =>
{
int localMutex = 0; // 0: not acquired, 1: acquired
return group.SelectMany(item =>
{
if (Interlocked.CompareExchange(ref localMutex, 1, 0) == 0)
return function(item, group.Key)
.Finally(() => Volatile.Write(ref localMutex, 0));
return Observable.Empty<TResult>();
});
});
}
使用示例:
var results = observableStream
.ExhaustMapPerKey(item => item.Id, (item, key) =>
Observable.Start(() => ProcessItem(item), scheduler));