我的服务器端向我发送了成批的消息。批处理中的消息数和频率是任意的。有时,我每隔 1 分钟收到一次消息,有时一个小时都没有消息。从 1 到 10 条消息的任何位置。
我当前的实现使用 Observable.Buffer(TimeSpan.FromSeconds(5))
对消息进行分组并将消息发送给订阅者。
有没有办法将 Observable 配置为在两条消息之间存在 x 秒延迟时将缓冲消息发送给订阅者,而不是每 5 秒检查一次。
如何避免不必要的计时器每 5 秒滴答作响?(我愿意接受其他建议来优化批处理。
使用 bufferClosesingSelector 工厂方法
decPL 建议使用接受bufferClosingSelector
的 Buffer
重载 - 在新缓冲区打开时调用的工厂函数。它生成一个流,其第一个OnNext()
或OnCompleted()
信号刷新当前缓冲区。decPLs代码如下所示:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
这在解决方案方面取得了相当大的进展,但它有几个问题:
- 在消息在限制持续时间内一致发布的活动期间,服务器不会发送消息。这可能会导致大型、不经常发布的列表。
- 源有多个订阅;如果天气寒冷,这可能会产生意想不到的副作用。
bufferClosingSelector
工厂在每次缓冲区关闭后调用,因此,如果源是冷的,它将从初始事件而不是最新事件进行限制。
防止无限期限制
我们需要使用附加机制来限制缓冲区长度并防止无限期限制。 Buffer
有一个重载,允许您指定最大长度,但不幸的是,您无法将其与关闭选择器结合使用。
我们将所需的缓冲区长度限制称为 n。回想一下,关闭选择器的第一OnNext
足以关闭缓冲区,因此我们需要做的就是使用计数流Merge
限制,该流在来自源的 n 个事件后发送OnNext
。我们可以使用.Take(n).LastAsync()
来做到这一点;取前 n 个事件,但忽略除最后一个事件之外的所有事件。这是 Rx 中非常有用的模式。
让源"热"
为了解决bufferClosingSelector
工厂重新订阅源的问题,我们需要使用源上的常见.Publish().RefCount()
模式来为我们提供一个流,该流只会将最新事件发送给订阅者。这也是一个非常有用的模式,需要记住。
溶液
下面是重新设计的代码,其中限制持续时间与计数器合并:
var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;
// single subscription to source
var sourcePub = source.Publish().RefCount();
var output = sourcePub.Buffer(
() => sourcePub.Throttle(throttleDuration)
.Merge(sourcePub.Take(bufferSize).LastAsync()));
生产就绪代码和测试
这是一个带有测试的生产就绪实现(使用 nuget 包 rx-testing & nunit)。请注意调度程序的参数化以支持测试。
public static partial class ObservableExtensions
{
public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
this IObservable<TSource> source,
TimeSpan maxInterval,
int maxBufferSize,
IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
if (maxBufferSize <= 0)
throw new ArgumentOutOfRangeException(
"maxBufferSize", "maxBufferSize must be positive");
var publishedSource = source.Publish().RefCount();
return publishedSource.Buffer(
() => publishedSource
.Throttle(maxInterval, scheduler)
.Merge(publishedSource.Take(maxBufferSize).LastAsync()));
}
}
public class BufferNearEventsTests : ReactiveTest
{
[Test]
public void CloseEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int> expectedBuffer = new [] {1, 2, 3};
var expectedTime = maxInterval.Ticks + 300;
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
}
[Test]
public void FarEventsAreUnbuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(1000, 1),
OnNext(2000, 2),
OnNext(3000, 3));
IList<int>[] expectedBuffers =
{
new[] {1},
new[] {2},
new[] {3}
};
var expectedTimes = new[]
{
maxInterval.Ticks + 1000,
maxInterval.Ticks + 2000,
maxInterval.Ticks + 3000
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
}
[Test]
public void UpToMaxEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 2;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int>[] expectedBuffers =
{
new[] {1,2},
new[] {3}
};
var expectedTimes = new[]
{
200, /* Buffer cap reached */
maxInterval.Ticks + 300
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
}
private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
{
CollectionAssert.AreEquivalent(expected, actual);
return true;
}
}
如果我正确理解了您的描述,Observable.Buffer
仍然是您的朋友,只是使用导致可观察事件的重载来指示何时应发送缓冲项目。 内容如下:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
这是一个老问题,但它似乎与我最近的问题有关。谜语找到了一种很好的方法来做我认为你想要实现的事情,所以我想我会分享。我将解决方案包装在扩展方法中:
public static class ObservableExtensions
{
public static IObservable<T[]> Batch<T>(this IObservable<T> observable, TimeSpan timespan)
{
return observable.GroupByUntil(x => 1, g => Observable.Timer(timespan))
.Select(x => x.ToArray())
.Switch();
}
}
它可以像这样使用:
observableSource.Batch(TimeSpan.FromSeconds(5));