如何组合两个按时间戳排序然后分组的流



我有两个对象流,每个对象流都有一个Timestamp值。两个流都是按顺序排列的,因此例如,一个流中的时间戳可能是Ta1,3,6,6,7,而另一个流的时间戳则可能是T=1,2,5,5,6,8。两个流中的对象属于相同的类型。

我希望能够做的是按照时间戳的顺序将这些事件中的每一个放在总线上,即,将A1,然后将B1,B2、A3等等。此外,由于一些流具有多个具有相同时间戳的(顺序)元素,我希望将这些元素分组,以便每个新事件都是一个数组。因此,我们将[A3]放在总线上,然后是[A15,A25],依此类推

我尝试通过制作两个ConcurrentQueue结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前面,首先选择较早的事件,然后遍历队列,使具有此时间戳的所有事件都存在。

然而,我遇到了两个问题:

  • 如果我让这些队列不受限制,我会很快耗尽内存,因为读取操作比接收事件的处理程序快得多。(我有几GB的数据)
  • 有时我会遇到这样一种情况:在a25到达之前,我处理事件,比如a15。不知怎么的,我需要小心

我认为Rx可以在这方面有所帮助,但我没有看到一个明显的组合子使这成为可能。因此,我们非常感谢任何建议。

Rx确实很适合这个问题IMO.

由于明显的原因(您必须首先观察整个流以保证正确的输出顺序),IObservables不能"OrderBy",因此我在下面的回答假设(您已经说过)您的2个源事件流是有序的。

这最终是一个有趣的问题。标准Rx运算符缺少一个GroupByUntilChanged,只要它在观测到下一组的第一个元素时调用上一组上可观测到的OnComplete,就可以很容易地解决这个问题。然而,从DistinctUntilChanged的实现来看,它不遵循这种模式,只在源observable完成时调用OnComplete(即使它知道在第一个非不同元素之后不会有更多元素…奇怪??)。无论如何,出于这些原因,我决定不使用GroupByUntilChanged方法(不违反Rx约定),而是使用ToEnumerableUntilChanged

免责声明:这是我的第一个Rx扩展,所以非常感谢对我所做选择的反馈。此外,我的一个主要关注点是持有distinctElements列表的匿名观察者。

首先,您的应用程序代码非常简单:

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }
    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 
    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }

现在针对ToEnumerableUntilChanged实现(代码警告墙):

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;
        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();
            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }
                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }
                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }
                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }
                observer.OnNext( distinctElements);
                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;
            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);
                observer.OnCompleted();
            });
        });
    }

最新更新