System.Reactive - 缓冲区/组可观察量的立即可用值



假设你有一个IObservable,它可以立即提供一些值,并且有些值被连续推送:

    var immediate_values = new [] { "curerntly", "available", "values" }.ToObservable();
    var future_values = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)).Select(x => "new value!");
    IObservable<string> input = immediate_values.Concat(future_values);

有没有办法将输入转换为 IObservable<string[]>,其中推送的第一个数组包含所有立即可用的值,每个后续数组仅包含 1 个值(此后推送每个值(?上面只是示例数据,这需要在不知道各个输入流的情况下处理任何IObservable>T>。

    IObservable<string[]> buffered = input.BufferSomehow();
    // should push values:
    // First value: string[] = ["currently", "available", "values"]
    // Second value: string[] = ["new value!"]
    // Third value: string[] = ["new value!"]
    // .....

我想到了.当然是 Buffer(( 函数,但我真的不想通过任何特定的 TimeSpan 进行缓冲,并且想不出任何方法来生成带有缓冲区窗口关闭信号的可观察量。

谁能想到一个合理的方法来实现这一目标,或者这根本不可能?

谢谢!

没有直接的方法可以区分可观察值的启动时值和后续值。我的建议是推断它:

var autoBufferedInput1 = input.Publish(_input => _input
   .Buffer(_input.Throttle(TimeSpan.FromSeconds(.1)))
   .Select(l => l.ToArray())
);

这会将缓冲区边界设置为 .1 秒的滚动扩展窗口:每次输入值时,它将窗口从值传入的时间扩展到 0.1 秒,并将值添加到缓冲区。如果 .1 秒过去了,没有值,则缓冲区将被刷新。

这将产生副作用,即如果您有几乎同时的"热"值(彼此相距 0.1 秒以内(,那么这些值将被缓冲在一起。如果这是不需要的,你可以Switch出来,尽管这会让事情变得更加复杂:

var autoBufferedInput2 = input.Publish(_input =>
    _input.Throttle(TimeSpan.FromSeconds(.1)).Publish(_boundary => _boundary
        .Take(1)
        .Select(_ => _input.Select(s => new[] { s }))
        .StartWith(_input
            .Buffer(_boundary)
            .Select(l => l.ToArray())
        )
        .Switch()
    )
);

autoBufferedInput2使用 .1 秒推理方法,直到第一个缓冲列表,然后切换到简单地选择输出并将值包装在数组中。


编辑:如果您还想要绝对的 1 秒门,那么片段将如下所示:

var autoBufferedInput1 = input.Publish(_input => _input
    .Buffer(
        Observable.Merge(
            Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default),
            _input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default)
        )
    )
    .Select(l => l.ToArray())
);
var autoBufferedInput2 = input.Publish(_input =>
    Observable.Merge(
        _input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default),
        Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default)
    )
    .Publish(_boundary => _boundary
        .Take(1)
        .Select(_ => _input.Select(s => new[] { s }))
        .StartWith(_input
            .Buffer(_boundary)
            .Select(l => l.ToArray())
        )
        .Switch()
    )
);

对于任何IObservable<T>,您需要执行以下操作:

var sequence = ongoingSequence.StartWith(initialSequence);

您可以利用在订阅期间同步传播立即可用的值这一事实,并在Subscribe方法返回后切换一些开关。下面的实现基于这个想法。在订阅期间,将缓冲所有传入消息,在订阅之后将发出缓冲区,之后将立即逐个发出所有将来的传入消息。

public static IObservable<T[]> BufferImmediatelyAvailable<T>(
    this IObservable<T> source)
{
    return Observable.Create<T[]>(observer =>
    {
        var buffer = new List<T>();
        var subscription = source.Subscribe(x =>
        {
            if (buffer != null)
                buffer.Add(x);
            else
                observer.OnNext(new[] { x });
        }, ex =>
        {
            buffer = null;
            observer.OnError(ex);
        }, () =>
        {
            if (buffer != null)
            {
                var output = buffer.ToArray();
                buffer = null;
                observer.OnNext(output);
            }
            observer.OnCompleted();
        });
        if (buffer != null)
        {
            var output = buffer.ToArray();
            buffer = null;
            observer.OnNext(output);
        }
        return subscription;
    });
}

相关内容

  • 没有找到相关文章

最新更新