我有一个使用ReactiveUI的WPF应用程序,它通过定期从外部进程获取状态来工作。
给定如下可观察到的提取:
var latestState =
Observable.Interval(TimeSpan.FromSeconds(.5))
.SelectMany(async _ =>
{
try
{
var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
return state;
}
catch (Exception)
{
return null;
}
})
.Publish();
如果数据获取失败,我需要能够中断它。
我希望能够做的事情是:
var latestState =
Observable.Interval(TimeSpan.FromSeconds(.5))
.SelectMany(async _ =>
{
try
{
var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
return state;
}
catch (Exception)
{
// Show and await the dialog dismissal
// instructions for starting the external process provided etc etc
await dialogs.ShowErrorMessageAsync("Failed to fetch info", "Failed to get the latest state");
/* MISSING:
* Some magical gubbins that will produce the state on a steady interval, but also still support
* displaying the dialog and halting
*/
return null;
}
})
.Publish();
显然,这是不可行的,因为你最终会遇到鸡和蛋的问题。
我尝试对其进行切片的每一种方式(例如,使用Subject<bool>
来跟踪成功/失败(最终都导致了这样一个事实,即失败案例仍然需要能够发出一个可观察的消息,该消息在间隔上获取,和尊重失败处理,但这在处理程序内部是不可能的。
我几乎可以肯定,这是一个概念化错误信号/检索数据/恢复间隔的方法的问题。
基于意见反馈的部分解决方案/实施:
var stateTimer = Observable.Interval(TimeSpan.FromSeconds(10));
var stateFetcher =
Observable.FromAsync(async () =>
await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));
IObservable<GetRobotsStateReply> DisplayStateError(Exception causingException)
=> Observable.FromAsync(async () =>
{
await dialogs.ShowErrorMessageAsync(
"Failed to get robot info",
"Something went wrong");
return new GetRobotsStateReply { };
});
var stateStream =
stateTimer
.SelectMany(stateFetcher)
.Catch((Exception ex) => DisplayStateError(ex))
.Publish();
stateStream.Connect();
这个实现为我提供了所需的行为,并且在显示错误对话框时不会触发计时器;然而,它在关闭对话框后不会触发(我相信是因为流已经终止(——我将在注释中使用建议来解决这个问题,然后添加一个答案。
工作解决方案(如果重新打开,可以添加为答案(。
var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(5));
var stateFetcher = Observable.FromAsync(async () =>
await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));
var timerFetch = Observable.SelectMany(fetchTimer, stateFetcher);
IObservable<GetRobotsStateReply> GetErrorHandler(Exception ex) =>
Observable.FromAsync(async () =>
{
await dialogs.ShowErrorMessageAsync(
"TEST",
"TEST");
return (GetRobotsStateReply)null;
});
IObservable<GetRobotsStateReply> GetStateFetchCycleObservable(
IObservable<GetRobotsStateReply> source) =>
source
.Catch((Exception ex) => GetErrorHandler(ex))
.SelectMany(state =>
state != null
? GetStateFetchCycleObservable(timerFetch)
: GetStateFetchCycleObservable(stateFetcher));
var latestState =
GetStateFetchCycleObservable(timerFetch)
.Publish();
多亏了西奥多的建议,我终于找到了解决方案。
我犯了一个错误,没有从热/冷可观察性的角度进行思考,也没有正确使用内置的错误处理机制。
我最初使用的是Observable.Interval
,但这带来了不希望的后果,即在前一个远程请求仍在运行时激发并启动了一个新的远程请求(我想我本可以抑制(。
该解决方案的工作原理是使用Observable.Timer
设置初始延迟,然后进行远程请求;然后观察该流,出现错误时显示对话框,然后绑定回delay+fetch流。
由于delay+fetch流是冷的,因此延迟会按预期再次工作,并且所有内容都会以一个良好的循环流回。
这一点已经得到了进一步的解决,因为定时器的两次触发(当使用Retry
时(或第二次在对话框取消后什么都不做都存在问题。
我意识到这是因为内部可观测到的没有外部可观测到投影回产生可观测值的值。
新的解决方案解决了这一问题,甚至解决了在用户关闭对话框时立即重新获取状态的问题,或者在成功的情况下填充一个时间间隔的问题。
这是我的建议:
var observable = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
.Select(x => Observable.FromAsync(async () =>
{
return await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
}))
.Concat()
.Catch((Exception ex) => Observable.FromAsync<GetRobotsStateReply>(async () =>
{
await dialogs.ShowErrorMessageAsync("Failed to fetch info",
"Failed to get the latest state");
throw ex;
}))
.Retry();
Timer
+Select
+Concat
运算符确保执行GetRobotsStateAsync
时不会重叠。如果出现异常,计时器将被丢弃,Catch
操作员将启动,并且在关闭对话框后将重新显示原始错误,以触发Retry
操作员。然后,一切都将再次重复,使用一个全新的计时器。循环将一直旋转,直到对可观察对象的订阅被处理为止。
该解决方案假设CCD_ 11的执行将不会定期超过定时器的500毫秒间隔。否则,计时器产生的滴答声将开始堆积(在Concat
的内部队列中(,使系统面临内存压力。对于避免这个问题的更复杂(但也更复杂(的周期机制,看看这个答案。