如何使用 Rx 通过异步 WCF 服务轮询图像



>我有一个异步WCF服务,它接受"URI"并返回图像(作为流(。

我想做的是:

  • 确保有效的 WCF 通道存在(如果没有创建(
  • 进行异步服务调用
  • 成功后将图像保存到成员变量
  • 如果我遇到异常,请关闭频道
  • 无论失败还是成功,请等待 200 毫秒,然后重新开始(永远循环或直到取消(

到目前为止,我已经想出了这个怪物:

    private void PollImage(string imageUri)
    {
        const int pollingHertz = 1;
        const int millisecondsTimeout = 1000 / pollingHertz;
        Thread.Sleep(millisecondsTimeout);
        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }
        var getImageFunc = Observable.FromAsyncPattern<string, Stream>
                                  (_channel.BeginGetImage, _channel.EndGetImage);
        getImageFunc(imageUri)
            .Finally(() => PollImage(imageUri))
            .Subscribe(
                stream => UpdateImageStream(imageUri, stream),
                ex =>
                    {
                        Trace.TraceError(ex.ToString());
                        ((ICommunicationObject) _channel).CloseOrAbort();
                        _channel = null;
                    });
    }

真的很想学习Rx,但每次尝试时,我都会挠头。

有人愿意给我一些指示吗?谢谢

我有一个解决方案给你,但我建议改变你的PollImage方法,使其更像Rx。

签名应如下所示:

IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)

您应该PollImage视为可观察的工厂,在您订阅返回的可观察量之前,它实际上不会轮询图像。这种方法的优点是它可以取消订阅 - 您的最后一个项目符号点需要这样做 - 并且它干净地分离了轮询图像的代码和更新局部变量的代码。

因此,对PollImage的调用如下所示:

PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
    .Subscribe(image =>
    {
        /* do save/update images here */
    });

实现如下所示:

private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
    Func<Stream, Image> getImageFromStream = st =>
    {
        /* read image from stream here */
    };
    return Observable.Create<Image>(o =>
    {
        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }
        var getImageFunc =
            Observable
                .FromAsyncPattern<string, Stream>(
                    _channel.BeginGetImage,
                    _channel.EndGetImage);
        var query =
            from ts in Observable.Timer(gapInterval)
            from stream in getImageFunc(imageUri)
            from img in Observable.Using(
                () => stream,
                st => Observable.Start(
                    () => getImageFromStream(st)))
            select img;
        return query.Do(img => { }, ex =>
        {
            Trace.TraceError(ex.ToString());
            ((ICommunicationObject)_channel).CloseOrAbort();
            _channel = null;
        }).Repeat().Retry().Subscribe(o);                   
    });
}

query可观察量等到gapInterval完成,然后调用 WCF 函数返回流,然后将流转换为图像。

内部return语句做了很多事情。

首先,它使用 Do 运算符来捕获发生的任何异常,并像以前一样执行跟踪和通道重置。

接下来,它调用.Repeat()以有效地重新运行查询,使其在再次调用 Web 服务之前等待gapInterval。我本可以在query中使用Observable.Interval而不是Observable.Timer并放弃对.Repeat()的调用,但这意味着对 Web 服务的调用每gapInterval启动一次,而不是在上次完成后等待那么长时间。

接下来,它调用.Retry()如果遇到异常,它会有效地重新启动可观察量,以便订阅者永远不会看到异常。Do运算符捕获错误,因此这没问题。

最后,它订阅观察器并返回IDisposable,允许调用代码取消订阅。

除了实现getImageFromStream功能之外,仅此而已。

现在提醒一下。很多人误解了订阅可观察量的工作原理,这可能导致难以发现错误。

以这个为例:

var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));
var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });

s1s2都订阅xs,但不是共享一个计时器,而是各自创建一个计时器。您创建了Observable.Interval的内部工作的两个实例,而不是一个。

现在,这是可观察量的正确行为。如果一个失败,那么另一个不会,因为它们不共享任何内部结构 - 它们彼此隔离。

但是,在您的代码(以及我的代码(中,您有一个潜在的线程问题,因为您在多个调用PollImage之间共享_channel。如果一个呼叫失败,它将重置通道,这可能会导致并发呼叫失败。

我的建议是为每个调用创建一个新通道,以防止并发问题。

我希望这有所帮助。

这就是我想出的(有一些帮助!...仍然不"完美",但似乎有效。

正如@Enigma所说,我现在摆脱了共享_channel,并将其替换为捕获的本地var。它有效,但我不明白 Rx enuf 知道这是否是糟糕/有缺陷的方法。我怀疑至少有一种更清洁的方法。

除此之外,我的主要反对意见是我调用 EnsureChannel 的 Do((。好像有点臭。但。。。它有效...

哦,我必须在 SelectMany 中有 _(下划线(,否则不会再次调用 GetImage。

    private IDisposable PollImage(string imageUri)
    {
        ICameraServiceAsync channel = _channelFactory.CreateChannel();
        return Observable
            .Timer(TimeSpan.FromSeconds(0.2))
            .Do(_ => { channel = EnsureChannel(channel); })
            .SelectMany(_ =>
                Observable
                .FromAsyncPattern<string, Stream>(channel.BeginGetImage, channel.EndGetImage)(imageUri))
            .Retry()
            .Repeat()
            .Subscribe(stream => UpdateImageStream(imageUri, stream));
    }
    private ICameraServiceAsync EnsureChannel(ICameraServiceAsync channel)
    {
        var icc = channel as ICommunicationObject;
        if (icc != null)
        {
            var communicationState = icc.State; // Copy local for debug inspection
            if (communicationState == CommunicationState.Faulted)
            {
                icc.CloseOrAbort();
                channel = null;
            }
        }
        return channel ?? _channelFactory.CreateChannel();
    }

如果您真的想使用,那么人们已经回答了您的问题,如果您正在寻找替代方法,那么我建议您看看 TPL(任务等对象(,它允许您从异步方法模式(您的 Web 服务调用(创建任务对象,然后使用取消令牌启动任务,以便在一段时间后如果任务未完成,您可以通过调用来取消它令牌取消方法。

相关内容

  • 没有找到相关文章

最新更新