假设你有一个IObservable
var immediate_values = new [] { "curerntly", "available", "values" }.ToObservable();
var future_values = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)).Select(x => "new value!");
IObservable<string> input = immediate_values.Concat(future_values);
有没有办法将输入转换为 IObservable<string[]>,其中推送的第一个数组包含所有立即可用的值,每个后续数组仅包含 1 个值(此后推送每个值(?上面只是示例数据,这需要在不知道各个输入流的情况下处理任何IObservable>T>。
IObservable<string[]> buffered = input.BufferSomehow();
// should push values:
// First value: string[] = ["currently", "available", "values"]
// Second value: string[] = ["new value!"]
// Third value: string[] = ["new value!"]
// .....
我想到了.当然是 Buffer(( 函数,但我真的不想通过任何特定的 TimeSpan 进行缓冲,并且想不出任何方法来生成带有缓冲区窗口关闭信号的可观察量。
谁能想到一个合理的方法来实现这一目标,或者这根本不可能?
谢谢!
没有直接的方法可以区分可观察值的启动时值和后续值。我的建议是推断它:
var autoBufferedInput1 = input.Publish(_input => _input
.Buffer(_input.Throttle(TimeSpan.FromSeconds(.1)))
.Select(l => l.ToArray())
);
这会将缓冲区边界设置为 .1 秒的滚动扩展窗口:每次输入值时,它将窗口从值传入的时间扩展到 0.1 秒,并将值添加到缓冲区。如果 .1 秒过去了,没有值,则缓冲区将被刷新。
这将产生副作用,即如果您有几乎同时的"热"值(彼此相距 0.1 秒以内(,那么这些值将被缓冲在一起。如果这是不需要的,你可以Switch
出来,尽管这会让事情变得更加复杂:
var autoBufferedInput2 = input.Publish(_input =>
_input.Throttle(TimeSpan.FromSeconds(.1)).Publish(_boundary => _boundary
.Take(1)
.Select(_ => _input.Select(s => new[] { s }))
.StartWith(_input
.Buffer(_boundary)
.Select(l => l.ToArray())
)
.Switch()
)
);
autoBufferedInput2
使用 .1 秒推理方法,直到第一个缓冲列表,然后切换到简单地选择输出并将值包装在数组中。
编辑:如果您还想要绝对的 1 秒门,那么片段将如下所示:
var autoBufferedInput1 = input.Publish(_input => _input
.Buffer(
Observable.Merge(
Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default),
_input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default)
)
)
.Select(l => l.ToArray())
);
var autoBufferedInput2 = input.Publish(_input =>
Observable.Merge(
_input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default),
Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default)
)
.Publish(_boundary => _boundary
.Take(1)
.Select(_ => _input.Select(s => new[] { s }))
.StartWith(_input
.Buffer(_boundary)
.Select(l => l.ToArray())
)
.Switch()
)
);
对于任何IObservable<T>
,您需要执行以下操作:
var sequence = ongoingSequence.StartWith(initialSequence);
您可以利用在订阅期间同步传播立即可用的值这一事实,并在Subscribe
方法返回后切换一些开关。下面的实现基于这个想法。在订阅期间,将缓冲所有传入消息,在订阅之后将发出缓冲区,之后将立即逐个发出所有将来的传入消息。
public static IObservable<T[]> BufferImmediatelyAvailable<T>(
this IObservable<T> source)
{
return Observable.Create<T[]>(observer =>
{
var buffer = new List<T>();
var subscription = source.Subscribe(x =>
{
if (buffer != null)
buffer.Add(x);
else
observer.OnNext(new[] { x });
}, ex =>
{
buffer = null;
observer.OnError(ex);
}, () =>
{
if (buffer != null)
{
var output = buffer.ToArray();
buffer = null;
observer.OnNext(output);
}
observer.OnCompleted();
});
if (buffer != null)
{
var output = buffer.ToArray();
buffer = null;
observer.OnNext(output);
}
return subscription;
});
}