是否可以在 Rx 中的不同线程上调用订阅者的 OnNext?



我是Rx的新手。我想知道是否有可能向不同的订阅者发送消息,使他们在不同的线程上运行?一个观察者如何控制它?据我所知,普通的Subject实现在一个线程上一个接一个地调用订阅者。


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }
} 
public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}

如果我使用Subject作为"SomeClass",那么在sub1的OnNext((完成之前,不会调用sub2的OnNext。如果sub1需要很多时间,我不希望它延迟sub2的接收。有人能告诉我Rx是如何允许SomeClass实现这种功能的吗。

您编写的代码几乎可以并行运行observable。如果你这样写你的观察者:

public class Subscriber : IObserver<int>
{
    public void OnNext(int a)
    {
        Console.WriteLine("{0} on {1} at {2}",
            a,
            Thread.CurrentThread.ManagedThreadId,
            DateTime.Now.ToString());
    }
    public void OnError(Exception e)
    { }
    public void OnCompleted()
    { }
} 

然后运行这个代码:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => (int)x)
        .Take(5)
        .ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);

将产生以下内容:

0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53

它已经在不同的线程上并行运行订阅了。

我使用的最重要的东西是.ObserveOn扩展方法——这就是它的工作原理。

您应该记住,观察者通常不会共享相同的可观察实例。订阅一个可观测项有效地连接了从可观测项的来源到观测者的一条独特的可观测算子"链"。这与在枚举表上调用GetEnumerator两次非常相似,您不会共享同一个枚举器实例,您将获得两个唯一的实例。

现在,我想描述一下我所说的链条是什么意思。我要给反射器。NET从CCD_ 3&CCD_ 4来说明这一点。

以这个代码为例:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });

在发动机罩下,Generate&CCD_ 6各自创建内部Rx类CCD_。AnonymousObservable<T>的构造函数接受Func<IObserver<T>, IDisposable>委托,每当它接收到对Subscribe的调用时都会使用该委托。

Reflector为Observable.Generate<T>(...)稍微清理过的代码。NET是:

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector,
    IScheduler scheduler)
{
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
    {
        TState state = initialState;
        bool first = true;
        return scheduler.Schedule((Action self) =>
        {
            bool flag = false;
            TResult local = default(TResult);
            try
            {
                if (first)
                {
                    first = false;
                }
                else
                {
                    state = iterate(state);
                }
                flag = condition(state);
                if (flag)
                {
                    local = resultSelector(state);
                }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(local);
                self();
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

Action self参数是一个迭代输出值的递归调用。您会注意到,在这段代码中,observer没有被存储,或者值被粘贴到多个观察者。对于每个新的观察者,此代码运行一次。

Reflector为Observable.Where<T>(...)稍微清理过的代码。NET是:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return new AnonymousObservable<TSource>(observer =>
        source.Subscribe(x =>
        {
            bool flag;
            try
            {
                flag = predicate(x);
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(x);
            }
        }, ex => observer.OnError(ex), () => observer.OnCompleted));
}

同样,此代码不跟踪多个观察者。它调用Subscribe,有效地将其自己的代码作为观察器传递给底层的source observable。

您应该看到,在我上面的示例代码中,订阅Where创建了对Generate的订阅,因此这是一个可观察的链。事实上,它在一系列AnonymousObservable对象上链接订阅调用。

如果你有两个订阅,你就有两个链。如果你有1000个订阅,你就有1000个链。

现在,只是附带说明一下——即使有IObservable<T>IObserver<T>接口——您应该很少在自己的类中实际实现这些接口。内置类和运算符处理99.99%的所有情况。它有点像IEnumerable<T>——您需要多久自己实现一次这个接口?

如果这有帮助,如果你需要进一步的解释,请告诉我。

如果您有IObservable,并且需要强制订阅在其他线程上运行,则可以使用ObserveOn函数。

如果运行以下代码,它将强制数字生成器在不同的线程上下文中运行。您还可以使用EventLoopScheduler并指定系统。要使用的线程、设置优先级、设置名称等…

void Main()
{
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var disposable = new CompositeDisposable()
    {
       numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
    };
    Thread.Sleep(1000);
    disposable.Dispose();
}

输出

Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10

请注意我是如何使用CompositeDisposable在最后处理所有订阅的。例如,如果您不在LinqPad中执行此操作。可观察。Interval将继续在内存中运行,直到您终止进程为止

相关内容

  • 没有找到相关文章