如何重试热观察



Rx有很好的Observable.Buffer功能。但在现实生活中它有一个问题。

场景:应用程序向数据库发送一个事件流。一个接一个地插入事件是昂贵的,所以我们需要批量处理。我想使用Observable.Buffer。但插入数据库失败的可能性很小(死锁、超时、停机等(。

我可以在批处理函数本身中添加一些重试逻辑,但这将违背Rx的可组合性思想。Observable.Retry不会剪切它,因为它会重新订阅"热"源,这意味着失败的批将丢失。

是否存在我可以编写以达到预期效果的函数,或者我需要实现自己的扩展?我想要这样的东西:

_inputBuffer = new BufferBlock<int>();
_inputBuffer.AsObservable().
    Buffer(TimeSpan.FromSeconds(10), 1000).
    Do(batch => SqlSaveBatch(batch)).
    {Retry???}.
    Subscribe()

为了使其完美,我希望能够控制调用OnComplete时的情况,而重试缓冲区有不完整的批,并能够执行一些操作(发送错误电子邮件、将数据保存到本地文件系统等(

当保存到数据库失败并需要重试时,实际上不是流或事件出错,而是对事件采取的操作。

我会把你的代码结构得更像这样:

IDisposable subscription =
    _inputBuffer.AsObservable().
    Buffer(TimeSpan.FromSeconds(10), 1000).
    Subscribe(
        batch => SqlSaveBatchWithRetryLogic(batch),
        () => YourOnCompleteAction);
  • 您可以在SqlSaveBatchWithRetryLogic()内部提供重试逻辑
  • 处理YourOnCompleteAction()内部事件的OnComplete
  • 如果未能保存批次,则可以选择从SqlSaveBatchWithRetryLogic()中处置订阅
  • 这也消除了Do的副作用

不过,我会小心这种方法——你需要注意重试逻辑。您没有背压(降低输入速度的方法(。因此,如果您有任何类型的回退/重试,您将冒着队列备份和填充内存的风险。如果你开始看到批次一直处于计数限制,你可能会遇到麻烦!您可能想要实现一个计数器来监视未完成的项目。

相关内容

  • 没有找到相关文章