使用 Rx 限制来自非异步调用的回调



我的问题与这个问题相似,但一如既往地略有不同。

我目前正在从事一个项目,该项目使用简单的模式异步接收基础数据更改的通知。

Class Foo 负责订阅这些数据更改,并为类提供了一种方法,通过注册实现给定接口的类的实例来注册它们对这些更改的兴趣:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

类栏实现此回调并注册自身:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...
    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

理想情况下,我们希望限制这些回调,例如,我们可以获得一个回调,该回调将特定数据片段的值从 x 更改为 y,然后在一秒钟内更改回 x。查看 Rx 文档,通过使用 DistinctUntilChanged 运算符,这将是微不足道的。

但是,问题是如何创建一个 IObservable 集合,然后我可以将运算符应用于给定上面的回调实现。这些文档非常清楚地说明了如何从标准 .Net 事件或开始.../结束创建 IObservable...方法对。

更新:正如Richard Szalay在他的评论中指出的那样,我需要将DistinctUntilChanged与Throttle结合使用来实现我需要的东西。

再次,感谢理查德,我也应该提到我需要取消订阅回调的能力。在当前的模型中,我只是在Foo的实例上调用Unscubscribe(Guid subscriptionToken)。

我想到的唯一方法是使用 ISomeCallbackInterfaceObservable.Create 的自定义实现。尽管如此,它仍然感觉被硬塞进了解决方案中:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();
        var callback = new ObservableSomeCallback(subject);
        Guid subscription = publisher.SubscribeToChanges(callback);
        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}
private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;
    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }
    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}

相关内容

  • 没有找到相关文章

最新更新