在 Rx 中,如何确保不会因异常而丢失通知



我想确保我的 Rx 通知在使用我的消费者Do委托处理时不会丢失。我有一个生成器,它生成我需要处理的消息,如果失败,请重试处理。

所需的大理石图:

Producer   1 2 3 4 5
Consumer   1 X 2 3 X X 4 5
Downstream 1   2 3     4 5

标准Retry在这里无济于事,因为它会在出错后重新订阅生产者。这将丢失处理失败的通知,并继续处理下一个通知。

Retry大理石图:

Producer  1 2 3 4 5
Consumer  1 X 3 X 5

到目前为止,我有这段代码,但对我来说似乎不对:

static void RetryWithBacklog()
{
  var data = Enumerable.Range(1, 100).ToObservable();
  var backlog = new Subject<long>();
  backlog
    .Merge(data)
    .Retry()
    .Do(l =>
    {
      try
      {
        ProcessNotification(l);
      }
      catch (Exception)
      {
        backlog.OnNext(l);
      }
    })
    .Subscribe();
  Console.ReadKey();
}

(完整代码示例)

背景

Do操作将根据生产者的通知执行网络请求。这些可能会失败,因此我需要重试它而不会丢失要传输的信息。当然,我可以盲目地while try/catch网络请求,但只要已知网络连接断开,请求就不应该发生(请参阅此问题)。

溶液

static void RetryBeforePassingDownstream()
{
  var data = Enumerable.Range(1, 100).ToObservable();
  data
    .Replay(l => l.SelectMany(ProcessNotification).Retry(), 1)
    .Subscribe(Downstream);
  Console.ReadKey();
}
static IObservable<int> ProcessNotification(int notification)
{
  Console.WriteLine("process: {0}", i);
  // either:
  // throw new Exception("error");
  // or:
  return Observable.Return(notification);
}
static void Downstream(int i)
{
  Console.WriteLine("downstream: {0}", i);
}

Retry运算符的要点是它通过重新订阅来处理可观察量,因此每次"重试"时都会导致订阅副作用。例如,您的可观察对象可能会在观察者每次订阅时发送 Web 请求。在您的示例中,Retry运算符没有任何意义,特别是考虑到Interval从不调用OnError

Retry语义在技术上可以分别存在于可观察量和观察者中。在您的情况下,您不能简单地调用ProcessNotification直到它在Do运算符中成功吗?

或者您的问题仅仅是下游观察者在抛出Do时不会观察到通知?我想这不是你的意思,因为在这种情况下,你可以简单地吞下例外。

或者您的问题是,如果要重播通知,通知可能会以某种方式更改或被上游操作员过滤掉?在这种情况下,您可能只需要使用Do后跟Catch和递归来实现相同的目标,而无需Subject,但我无法弄清楚您为什么真的想这样做。

更新

Do运算符不是异步的。请改用SelectMany,使其成为查询的一部分。这样,取消订阅也会取消请求。如果请求方法返回Task<T>而不是IObservable<T>,请考虑使用接受CancellationToken的重载。

有一种更简单的方法可以使冷可观察,该可观察对象重播最后一个通知,只需使用 Retry 运算符即可重试,如下所示。

(未经测试)

data.Replay(r => r.YourStateMachine.SelectMany(SendRequestAsync).Retry(), 1);

詹姆斯在这里的回答可以填满你的状态机器

相关内容

  • 没有找到相关文章

最新更新