>我有一个异步WCF服务,它接受"URI"并返回图像(作为流(。
我想做的是:
- 确保有效的 WCF 通道存在(如果没有创建(
- 进行异步服务调用
- 成功后将图像保存到成员变量
- 如果我遇到异常,请关闭频道
- 无论失败还是成功,请等待 200 毫秒,然后重新开始(永远循环或直到取消(
到目前为止,我已经想出了这个怪物:
private void PollImage(string imageUri)
{
const int pollingHertz = 1;
const int millisecondsTimeout = 1000 / pollingHertz;
Thread.Sleep(millisecondsTimeout);
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc = Observable.FromAsyncPattern<string, Stream>
(_channel.BeginGetImage, _channel.EndGetImage);
getImageFunc(imageUri)
.Finally(() => PollImage(imageUri))
.Subscribe(
stream => UpdateImageStream(imageUri, stream),
ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject) _channel).CloseOrAbort();
_channel = null;
});
}
我真的很想学习Rx,但每次尝试时,我都会挠头。
有人愿意给我一些指示吗?谢谢
我有一个解决方案给你,但我建议改变你的PollImage
方法,使其更像Rx。
签名应如下所示:
IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
您应该PollImage
视为可观察的工厂,在您订阅返回的可观察量之前,它实际上不会轮询图像。这种方法的优点是它可以取消订阅 - 您的最后一个项目符号点需要这样做 - 并且它干净地分离了轮询图像的代码和更新局部变量的代码。
因此,对PollImage
的调用如下所示:
PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
.Subscribe(image =>
{
/* do save/update images here */
});
实现如下所示:
private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
Func<Stream, Image> getImageFromStream = st =>
{
/* read image from stream here */
};
return Observable.Create<Image>(o =>
{
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc =
Observable
.FromAsyncPattern<string, Stream>(
_channel.BeginGetImage,
_channel.EndGetImage);
var query =
from ts in Observable.Timer(gapInterval)
from stream in getImageFunc(imageUri)
from img in Observable.Using(
() => stream,
st => Observable.Start(
() => getImageFromStream(st)))
select img;
return query.Do(img => { }, ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject)_channel).CloseOrAbort();
_channel = null;
}).Repeat().Retry().Subscribe(o);
});
}
query
可观察量等到gapInterval
完成,然后调用 WCF 函数返回流,然后将流转换为图像。
内部return
语句做了很多事情。
首先,它使用 Do
运算符来捕获发生的任何异常,并像以前一样执行跟踪和通道重置。
接下来,它调用.Repeat()
以有效地重新运行查询,使其在再次调用 Web 服务之前等待gapInterval
。我本可以在query
中使用Observable.Interval
而不是Observable.Timer
并放弃对.Repeat()
的调用,但这意味着对 Web 服务的调用每gapInterval
启动一次,而不是在上次完成后等待那么长时间。
接下来,它调用.Retry()
如果遇到异常,它会有效地重新启动可观察量,以便订阅者永远不会看到异常。Do
运算符捕获错误,因此这没问题。
最后,它订阅观察器并返回IDisposable
,允许调用代码取消订阅。
除了实现getImageFromStream
功能之外,仅此而已。
现在提醒一下。很多人误解了订阅可观察量的工作原理,这可能导致难以发现错误。
以这个为例:
var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));
var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });
s1
和s2
都订阅xs
,但不是共享一个计时器,而是各自创建一个计时器。您创建了Observable.Interval
的内部工作的两个实例,而不是一个。
现在,这是可观察量的正确行为。如果一个失败,那么另一个不会,因为它们不共享任何内部结构 - 它们彼此隔离。
但是,在您的代码(以及我的代码(中,您有一个潜在的线程问题,因为您在多个调用PollImage
之间共享_channel
。如果一个呼叫失败,它将重置通道,这可能会导致并发呼叫失败。
我的建议是为每个调用创建一个新通道,以防止并发问题。
我希望这有所帮助。
这就是我想出的(有一些帮助!...仍然不"完美",但似乎有效。
正如@Enigma所说,我现在摆脱了共享_channel,并将其替换为捕获的本地var。它有效,但我不明白 Rx enuf 知道这是否是糟糕/有缺陷的方法。我怀疑至少有一种更清洁的方法。
除此之外,我的主要反对意见是我调用 EnsureChannel 的 Do((。好像有点臭。但。。。它有效...
哦,我必须在 SelectMany 中有 _(下划线(,否则不会再次调用 GetImage。
private IDisposable PollImage(string imageUri)
{
ICameraServiceAsync channel = _channelFactory.CreateChannel();
return Observable
.Timer(TimeSpan.FromSeconds(0.2))
.Do(_ => { channel = EnsureChannel(channel); })
.SelectMany(_ =>
Observable
.FromAsyncPattern<string, Stream>(channel.BeginGetImage, channel.EndGetImage)(imageUri))
.Retry()
.Repeat()
.Subscribe(stream => UpdateImageStream(imageUri, stream));
}
private ICameraServiceAsync EnsureChannel(ICameraServiceAsync channel)
{
var icc = channel as ICommunicationObject;
if (icc != null)
{
var communicationState = icc.State; // Copy local for debug inspection
if (communicationState == CommunicationState.Faulted)
{
icc.CloseOrAbort();
channel = null;
}
}
return channel ?? _channelFactory.CreateChannel();
}
如果您真的想使用,那么人们已经回答了您的问题,如果您正在寻找替代方法,那么我建议您看看 TPL(任务等对象(,它允许您从异步方法模式(您的 Web 服务调用(创建任务对象,然后使用取消令牌启动任务,以便在一段时间后如果任务未完成,您可以通过调用来取消它令牌取消方法。