在 RX 中,如何创建缓冲区的速度不快于处理速度



使用 RX 的缓冲区运算符允许在出现一定数量的结果后或指定时间(以较早者为准)创建批处理。当将结果管道传输到另一台计算机上的数据库时,这非常有用,其中人们希望降低延迟,但避免发送大量请求(每个结果一个)。

我还有一个额外的要求,那就是将结果的顺序保留到数据库中(有些是更新,必须在相应的添加之后)。这意味着传出请求不能重叠,以防它们出现故障。

理想情况下,如果以前的数据库请求尚未返回,即使通常发出每个缓冲区也应继续填满,因为这将最大限度地减少延迟和进入数据库的请求数。

如何修改以下代码以使其正常工作?

source
.Buffer(TimeSpan.FromSeconds(1), 25)
.Subscribe(async batch => await SendToDatabase(batch));

为了强制传出请求等到前一个请求返回后再进行处理,有一个 RX 技巧可以将每个结果转换为一个可观察量,该结果仅在完成处理后完成。通过将它们与 concat 相结合,下一个将直到前一个完成才会启动。

source
.Buffer(TimeSpan.FromSeconds(1), 25)
.Select(batch => 
Observable.FromAsync(async () => 
await SendToDatabase(batch)
)
)
.Concat()
.Subscribe(async batch => await SendToDatabase(batch));

但是,这仍然会在等待时产生批次,因此不是一个完美的解决方案。

我写了一个新的可观察扩展BufferAndAct来做到这一点。

总之,它需要一个时间间隔、一个(项目)和一个要应用于每个批次的操作。当时间间隔到期或达到项目数时,它会尝试对批处理执行操作,但在前一个批处理完成之前,它永远不会开始对新批处理执行操作,因此对批处理的潜在大小没有限制。可以进行修改以使其与Buffer的其他一些重载保持一致。

它使用进一步的扩展Split它的作用类似于Buffer的重载之一,将源项的可观察量转换为源项的可观察量,当从输入可观察量接收到信号时将它们分开。

BufferAndAct使用Split创建一个可观察量,当在源可观察量上发出正常的定时缓冲区时,它会给出一个时钟周期,并在释放实际缓冲区时重置。这可能是以后,因为还有另一个可观察量,当当前没有正在进行的请求时,它会滴答作响。通过将这两个刻度压缩在一起,Buffer可用于在满足两个条件时立即发出批处理。

用法如下:

source
.BufferAndAct(TimeSpan.FromSeconds(1), 25, async batch =>
await SendToDatabase(batch)
)
.Subscribe(r => {})

以及两个扩展的来源:

public static IObservable<TDest> BufferAndAct<TSource, TDest>(
this IObservable<TSource> source,
TimeSpan timeSpan,
int count,
Func<IList<TSource>, Task<TDest>> action
)
{
return new AnonymousObservable<TDest>(observer =>
{
var actionStartedObserver = new Subject<Unit>();
var actionCompleteObserver = new Subject<Unit>();
var published = source.Publish();
var batchReady = published.Select(i => Unit.Default).Split(actionStartedObserver).Select(s => s.Buffer(timeSpan, count).Select(u => Unit.Default).Take(1)).Concat();
var disposable = published.Buffer(Observable.Zip(actionCompleteObserver.StartWith(Unit.Default), batchReady)).SelectMany(async list =>
{
actionStartedObserver.OnNext(Unit.Default);
try
{
return await action(list);
}
finally
{
actionCompleteObserver.OnNext(Unit.Default);
}
}).Finally(() => {}).Subscribe(observer);
published.Connect();
return Disposable.Create(() =>
{
disposable.Dispose();
actionCompleteObserver.Dispose();
});
});
}
public static IObservable<Unit> BufferAndAct<TSource>(
this IObservable<TSource> source,
TimeSpan timeSpan,
int count,
Func<IList<TSource>, Task> action
)
{
return BufferAndAct(source, timeSpan, count, s =>
{
action(s);
return Task.FromResult(Unit.Default);
});
}
public static IObservable<IObservable<TSource>> Split<TSource>(
this IObservable<TSource> source,
IObservable<Unit> boundaries
)
{
return Observable.Create<IObservable<TSource>>(observer =>
{
var tuple = Split(observer);
var d1 = boundaries.Subscribe(tuple.Item2);
var d2 = source.Subscribe(tuple.Item1);
return Disposable.Create(() =>
{
d2.Dispose();
d1.Dispose();
});
});
}
private static Tuple<IObserver<TSource>, IObserver<Unit>> Split<TSource>(this IObserver<IObservable<TSource>> output)
{
ReplaySubject<TSource> obs = null;
var completed = 0; // int not bool to use in interlocked
Action newObservable = () =>
{
obs?.OnCompleted();
obs = new ReplaySubject<TSource>();
output.OnNext(obs);
};
Action completeOutput = () =>
{
if (Interlocked.CompareExchange(ref completed, 0, 1) == 1)
{
output.OnCompleted();
}
};
newObservable();
return new Tuple<IObserver<TSource>, IObserver<Unit>>(Observer.Create<TSource>(obs.OnNext, output.OnError, () =>
{
obs.OnCompleted();
completeOutput();
}), Observer.Create<Unit>(s => newObservable(), output.OnError, () => completeOutput()));
}

相关内容

  • 没有找到相关文章

最新更新