Rx有很好的Observable.Buffer功能。但在现实生活中它有一个问题。
场景:应用程序向数据库发送一个事件流。一个接一个地插入事件是昂贵的,所以我们需要批量处理。我想使用Observable.Buffer。但插入数据库失败的可能性很小(死锁、超时、停机等(。
我可以在批处理函数本身中添加一些重试逻辑,但这将违背Rx的可组合性思想。Observable.Retry不会剪切它,因为它会重新订阅"热"源,这意味着失败的批将丢失。
是否存在我可以编写以达到预期效果的函数,或者我需要实现自己的扩展?我想要这样的东西:
_inputBuffer = new BufferBlock<int>();
_inputBuffer.AsObservable().
Buffer(TimeSpan.FromSeconds(10), 1000).
Do(batch => SqlSaveBatch(batch)).
{Retry???}.
Subscribe()
为了使其完美,我希望能够控制调用OnComplete时的情况,而重试缓冲区有不完整的批,并能够执行一些操作(发送错误电子邮件、将数据保存到本地文件系统等(
当保存到数据库失败并需要重试时,实际上不是流或事件出错,而是对事件采取的操作。
我会把你的代码结构得更像这样:
IDisposable subscription =
_inputBuffer.AsObservable().
Buffer(TimeSpan.FromSeconds(10), 1000).
Subscribe(
batch => SqlSaveBatchWithRetryLogic(batch),
() => YourOnCompleteAction);
- 您可以在
SqlSaveBatchWithRetryLogic()
内部提供重试逻辑 - 处理
YourOnCompleteAction()
内部事件的OnComplete - 如果未能保存批次,则可以选择从
SqlSaveBatchWithRetryLogic()
中处置订阅 - 这也消除了
Do
的副作用
不过,我会小心这种方法——你需要注意重试逻辑。您没有背压(降低输入速度的方法(。因此,如果您有任何类型的回退/重试,您将冒着队列备份和填充内存的风险。如果你开始看到批次一直处于计数限制,你可能会遇到麻烦!您可能想要实现一个计数器来监视未完成的项目。