我想确保我的 Rx 通知在使用我的消费者Do
委托处理时不会丢失。我有一个生成器,它生成我需要处理的消息,如果失败,请重试处理。
所需的大理石图:
Producer 1 2 3 4 5
Consumer 1 X 2 3 X X 4 5
Downstream 1 2 3 4 5
标准Retry
在这里无济于事,因为它会在出错后重新订阅生产者。这将丢失处理失败的通知,并继续处理下一个通知。
Retry
大理石图:
Producer 1 2 3 4 5
Consumer 1 X 3 X 5
到目前为止,我有这段代码,但对我来说似乎不对:
static void RetryWithBacklog()
{
var data = Enumerable.Range(1, 100).ToObservable();
var backlog = new Subject<long>();
backlog
.Merge(data)
.Retry()
.Do(l =>
{
try
{
ProcessNotification(l);
}
catch (Exception)
{
backlog.OnNext(l);
}
})
.Subscribe();
Console.ReadKey();
}
(完整代码示例)
背景
Do
操作将根据生产者的通知执行网络请求。这些可能会失败,因此我需要重试它而不会丢失要传输的信息。当然,我可以盲目地while try/catch
网络请求,但只要已知网络连接断开,请求就不应该发生(请参阅此问题)。
溶液
static void RetryBeforePassingDownstream()
{
var data = Enumerable.Range(1, 100).ToObservable();
data
.Replay(l => l.SelectMany(ProcessNotification).Retry(), 1)
.Subscribe(Downstream);
Console.ReadKey();
}
static IObservable<int> ProcessNotification(int notification)
{
Console.WriteLine("process: {0}", i);
// either:
// throw new Exception("error");
// or:
return Observable.Return(notification);
}
static void Downstream(int i)
{
Console.WriteLine("downstream: {0}", i);
}
Retry
运算符的要点是它通过重新订阅来处理冷可观察量,因此每次"重试"时都会导致订阅副作用。例如,您的可观察对象可能会在观察者每次订阅时发送 Web 请求。在您的示例中,Retry
运算符没有任何意义,特别是考虑到Interval
从不调用OnError
。
Retry
语义在技术上可以分别存在于可观察量和观察者中。在您的情况下,您不能简单地调用ProcessNotification
直到它在Do
运算符中成功吗?
或者您的问题仅仅是下游观察者在抛出Do
时不会观察到通知?我想这不是你的意思,因为在这种情况下,你可以简单地吞下例外。
或者您的问题是,如果要重播通知,通知可能会以某种方式更改或被上游操作员过滤掉?在这种情况下,您可能只需要使用Do
后跟Catch
和递归来实现相同的目标,而无需Subject
,但我无法弄清楚您为什么真的想这样做。
更新
Do
运算符不是异步的。请改用SelectMany
,使其成为查询的一部分。这样,取消订阅也会取消请求。如果请求方法返回Task<T>
而不是IObservable<T>
,请考虑使用接受CancellationToken
的重载。
有一种更简单的方法可以使冷可观察,该冷可观察对象重播最后一个通知,只需使用 Retry
运算符即可重试,如下所示。
(未经测试)
data.Replay(r => r.YourStateMachine.SelectMany(SendRequestAsync).Retry(), 1);
詹姆斯在这里的回答可以填满你的状态机器。