为什么是主题<T>。Observers 在 SubscribeOn() 之后不能保证为真吗?



主题。在未确定数量的tick数量的示例代码中,hasobservers并非立即正确。如果我删除订阅(),hasobservers始终是正确的,所以我知道这与Ischeduler初始化有关。

这在我们的生产软件中引起了一个问题,尽管在允许允许使用innext()的线程之前对Idisposable订阅变量进行了初始化,但对OnNext()的前几个调用都无处可去。这是RX中的错误?

使用系统的其他方法是什么。反应性类别保证使用调整器设置订阅而不进行轮询?

我尝试了object.synchronize(),但这没有区别。

static void Main(string[] args)
{
    for (int i = 0; i < 100; i++)
    {
        var source = new Subject<long>();
        IDisposable subscription = source
            .SubscribeOn(ThreadPoolScheduler.Instance)
            .Subscribe(Console.WriteLine);
        // 0 and 668,000 ticks for subscription setup, but rarely 0.
        int iterations = 0;
        while (!source.HasObservers)
        {
            iterations++;
            Thread.SpinWait(1);
        }
        // Next line would rarely output to Console without while loop
        source.OnNext(iterations);
        subscription.Dispose();
        source.Dispose();
    }
}

我期望主题。hasobservers在不进行轮询的情况下是正确的。

据我了解,问题是您的订阅是异步:呼叫没有阻止,因此实际订阅将在以后在其他线程上进行。

我没有找到知道订阅是否真正降落的确切方法(甚至根本不可能)。如果您的问题是第一个OnNext和订阅之间的竞赛,那么您可能需要使用Replay() Connect()转换为可观察的可连接的可连接。这样,您将确保每个订户都获得完全相同的顺序。

using (var source = new Subject<long>())
{
    var connectableSource = source.Replay();
    connectableSource.Connect();
    using (var subscription = connectableSource
                    .SubscribeOn(ThreadPoolScheduler.Instance)
                    .Subscribe(Console.WriteLine))
    {
        source.OnNext(42); // outputs 42 always
        Console.ReadKey(false);
    }
}

在我的代码中,我仍然需要Console.ReadKey,因为在另一个线程和未取消标准上进行的订阅之间的比赛。

我现在提出的解决方案,我希望有人可以改进:

public class SubscribedSubject<T> : ISubject<T>, IDisposable
{
    private readonly Subject<T> _subject = new Subject<T>();
    private readonly ManualResetEventSlim _subscribed = new ManualResetEventSlim();
    public bool HasObservers => _subject.HasObservers;
    public void Dispose() => _subject.Dispose();
    public void OnCompleted() => Wait().OnCompleted();
    public void OnError(Exception error) => Wait().OnError(error);
    public void OnNext(T value) => Wait().OnNext(value);
    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = _subject.Subscribe(observer);
        _subscribed.Set();
        return disposable;
    }
    private Subject<T> Wait()
    {
        _subscribed.Wait();
        return _subject;
    }
}

示例使用:

using (var source = new SubscribedSubject<long>())
{
    using (source
        .SubscribeOn(ThreadPoolScheduler.Instance)
        .Subscribe(Console.WriteLine))
    {
        source.OnNext(42);
        Console.ReadKey();
    }
}

相关内容

  • 没有找到相关文章

最新更新