如何处理IDisposableValue的IOobservable



底漆;在我的整个代码库中,我经常需要使用池内存块。这样做是出于性能原因,以减少垃圾收集(制作实时视频游戏引擎组件(。我通过将类型公开为IDisposableValue来处理此问题,在IDisposable Value中,您只能访问T值,直到包装器被释放为止。您可以处置包装器,以便将值返回到池中以便重用。

我建立了数据处理流,使用这些封装的值来响应随时间发生的事件。这通常是Observables/Reactive Extensions的完美候选者,除了必须处理包装器本质上是一种可变性,这是你在进行反应时不想要的。如果一个订阅者在完成包装时处理了它,但另一个观察者仍在处理它,那么包装器将抛出异常。

预期目标:让每个订阅者在原始的实际包装值上获得一个单独的包装。只有在每个订阅者处理完各自的包装器后,才会处理基础值(想想RefCountDisposable(。因此,每个订阅者都可以使用价值工作,只要他们需要,他们就表示他们是通过处置完成的。当它们全部完成时,值将释放回池。

唯一的问题是我不知道如何在RX中正确地实现这一点。这是处理我的情况的合适方法吗?如果是的话,还有关于如何实际实现它的建议吗?

使用I主题编辑1-脏解决方案:

我试着使用Observable.Select/Create/Defer的各种组合来实现它,但无法实现上面的预期目标。相反,我不得不转向使用主题,我知道这是被回避的。这是我当前的代码。

public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
    private readonly Subject<SharedDisposable> subject;
    private readonly SubscriptionCounter<SharedDisposable> counter;
    private readonly IObservable<IDisposableValue<T>> observable;
    public SharedDisposableValueSubject()
    {
        this.subject = new Subject<SharedDisposable>();
        this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
        this.observable = this.counter.Source.Select(value => value.GetValue());
    }
    /// <inheritdoc />
    public void OnCompleted() => this.subject.OnCompleted();
    /// <inheritdoc />
    public void OnError(Exception error) => this.subject.OnError(error);
    /// <inheritdoc />
    public void OnNext(IDisposableValue<T> value) =>
        this.subject.OnNext(new SharedDisposable(value, this.counter.Count));
    /// <inheritdoc />
    public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);
    /// <inheritdoc />
    protected override void ManagedDisposal() => this.subject.Dispose();
    private class SharedDisposable
    {
        private readonly IDisposableValue<T> value;
        private readonly AtomicInt count;
        public SharedDisposable(IDisposableValue<T> value, int count)
        {
            Contracts.Requires.That(count >= 0);
            this.value = value;
            this.count = new AtomicInt(count);
            if (count == 0)
            {
                this.value?.Dispose();
            }
        }
        public IDisposableValue<T> GetValue() => new ValuePin(this);
        private class ValuePin : AbstractDisposable, IDisposableValue<T>
        {
            private readonly SharedDisposable parent;
            public ValuePin(SharedDisposable parent)
            {
                Contracts.Requires.That(parent != null);
                this.parent = parent;
            }
            /// <inheritdoc />
            public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
            /// <inheritdoc />
            protected override void ManagedDisposal()
            {
                if (this.parent.count.Decrement() == 0)
                {
                    this.parent.value?.Dispose();
                }
            }
        }
    }
}
public class SubscriptionCounter<T>
{
    private readonly AtomicInt count = new AtomicInt(0);
    public SubscriptionCounter(IObservable<T> source)
    {
        Contracts.Requires.That(source != null);
        this.Source = Observable.Create<T>(observer =>
        {
            this.count.Increment();
            return new Subscription(source.Subscribe(observer), this.count);
        });
    }
    public int Count => this.count.Read();
    public IObservable<T> Source { get; }
    private class Subscription : AbstractDisposable
    {
        private readonly IDisposable subscription;
        private readonly AtomicInt count;
        public Subscription(IDisposable subscription, AtomicInt count)
        {
            Contracts.Requires.That(subscription != null);
            Contracts.Requires.That(count != null);
            this.subscription = subscription;
            this.count = count;
        }
        /// <inheritdoc />
        protected override void ManagedDisposal()
        {
            this.subscription.Dispose();
            this.count.Decrement();
        }
    }
}
public interface IDisposableValue<out T> : IDisposable
{
    bool IsDisposed { get; }
    T Value { get; }
}

AbstractDisposable只是不包含非托管类型的类型的可丢弃模式的基类实现。它确保ManagedDisposal((只在第一次调用Dispose((时被调用一次。AtomicInt是int上Interlocked的包装器,用于为int提供线程安全的原子更新。

我的测试代码显示了预期如何使用SharedDisposableValueSubject;

public static class SharedDisposableValueSubjectTests
{
    [Fact]
    public static void NoSubcribersValueAutoDisposes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var sourceValue = new DisposableWrapper<int>(0);
            sourceValue.IsDisposed.Should().BeFalse();
            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void SingleSurcriber()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();
            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void ManySubcribers()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            IDisposableValue<int> retrieved1 = null;
            subject.Subscribe(value => retrieved1 = value);
            IDisposableValue<int> retrieved2 = null;
            subject.Subscribe(value => retrieved2 = value);
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved1.Should().NotBeNull();
            retrieved1.Value.Should().Be(testNumber);
            retrieved1.IsDisposed.Should().BeFalse();
            retrieved2.Should().NotBeNull();
            retrieved2.Value.Should().Be(testNumber);
            retrieved2.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();
            // disposing only 1 retrieved value does not yet dispose the source value
            retrieved1.Dispose();
            retrieved1.IsDisposed.Should().BeTrue();
            retrieved2.IsDisposed.Should().BeFalse();
            retrieved2.Value.Should().Be(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            // disposing both retrieved values disposes the source value
            retrieved2.Dispose();
            retrieved2.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            IDisposableValue<int> retrieved1 = null;
            subject.Subscribe(value => retrieved1 = value);
            IDisposableValue<int> retrieved2 = null;
            subject.Subscribe(value => retrieved2 = value);
            subject.OnNext(sourceValue);
            // disposing only 1 retrieved value does not yet dispose the source value
            // even though the retrieved value is disposed many times
            retrieved1.Dispose();
            retrieved1.Dispose();
            retrieved1.Dispose();
            retrieved1.IsDisposed.Should().BeTrue();
            retrieved2.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();
            // disposing both retrieved values disposes the source value
            retrieved2.Dispose();
            retrieved2.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void SingleSubcriberUnsubcribes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            var subscription = subject.Subscribe(value => { });
            subscription.Dispose();
            // source value auto disposes because no subscribers
            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void SubcriberUnsubcribes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);
            var subscription = subject.Subscribe(value => { });
            subscription.Dispose();
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();
            // disposing retrieved causes source to be disposed
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static async Task DelayedSubcriberAsync()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();
            // delay countdown event used just to ensure that the value isn't disposed until assertions checked
            var delay = new AsyncCountdownEvent(1);
            var disposed = new AsyncCountdownEvent(2);
            subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
            {
                await delay.WaitAsync().DontMarshallContext();
                value.Dispose();
                disposed.Signal(1);
            });
            subject.Subscribe(value =>
            {
                value.Dispose();
                disposed.Signal(1);
            });
            // value is not yet disposed
            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeFalse();
            // wait for value to be disposed
            delay.Signal(1);
            await disposed.WaitAsync().DontMarshallContext();
            sourceValue.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
    [Fact]
    public static void MultipleObservedValues()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber1 = 1;
            var sourceValue1 = new DisposableWrapper<int>(testNumber1);
            sourceValue1.IsDisposed.Should().BeFalse();
            var testNumber2 = 2;
            var sourceValue2 = new DisposableWrapper<int>(testNumber2);
            sourceValue2.IsDisposed.Should().BeFalse();
            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);
            // first test value
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue1);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber1);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue1.IsDisposed.Should().BeFalse();
            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue1.IsDisposed.Should().BeTrue();
            // second test value
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue2);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber2);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue2.IsDisposed.Should().BeFalse();
            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue2.IsDisposed.Should().BeTrue();
            subject.OnCompleted();
        }
    }
}

所有这些都过去了,但我意识到你可以用一个可观察的对象做很多事情,所以可能有一些我没有考虑过的用例会破坏这个实现。如果你知道有什么问题,请告诉我。也可能只是我试图让Rx做一些本质上不应该做的事情。

编辑2-使用发布的解决方案:

我使用Publish来包装SharedDisposable中原始可观察到的可丢弃值,确保每个原始值只包装一次。然后,对发布的可观察对象进行订阅者计数,每个订阅者都会获得一个单独的ValuePin,该ValuePin在处理时会递减SharedDisposable上的计数。当SharedDisposable计数达到0时,它将处理原始值。

我试着不进行订阅计数,而是进行计数,这样每次发出ValuePin时,它都会增加计数,但我找不到一种方法来保证它会在允许订阅者处理之前为每个订阅者创建ValuePin。这导致订阅者1获得了他们的引脚,计数从0变为1,然后在订阅者2获得引脚之前处理该引脚,计数由1变为0,触发了要处理的原始值,然后订阅者2应该接收到一个引脚,但现在已经太晚了。

public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
    Contracts.Requires.That(source != null);
    var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
    var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
    published.Connect();
    return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}
private class SharedDisposable<T>
{
    private const int Uninitialized = -1;
    private readonly IDisposableValue<T> value;
    private readonly AtomicInt count;
    public SharedDisposable(IDisposableValue<T> value)
    {
        this.value = value;
        this.count = new AtomicInt(Uninitialized);
    }
    public IDisposableValue<T> GetValue(int subscriberCount)
    {
        Contracts.Requires.That(subscriberCount >= 0);
        this.count.CompareExchange(subscriberCount, Uninitialized);
        return new ValuePin(this);
    }
    private class ValuePin : AbstractDisposable, IDisposableValue<T>
    {
        private readonly SharedDisposable<T> parent;
        public ValuePin(SharedDisposable<T> parent)
        {
            Contracts.Requires.That(parent != null);
            this.parent = parent;
        }
        /// <inheritdoc />
        public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
        /// <inheritdoc />
        protected override void ManagedDisposal()
        {
            if (this.parent.count.Decrement() == 0)
            {
                this.parent.value?.Dispose();
            }
        }
    }
}

这似乎更好,因为我不必以任何方式使用主题,尽管订户计数感觉很脏。特别是因为在给出第一个ValuePin之前,我需要取消初始化计数。需要明确的是,我试图处理将由0共享给许多订阅者的可观察对象产生的值的处理,而不是处理与可观察对象本身的连接的处理,这就是为什么我不使用RefCount而不是Connect。

我想你可以重新计算一次性的。这将要求发布者启动引用计数,然后每个订阅者递增和递减计数器。您可以使用RefCountDisposable来完成此操作。我只会考虑对私有/内部代码这样做,否则你可能会让一个泄漏的消费者破坏你的系统。Rx的另一种解决方案可能是查看Disruptor模式。

相关内容

  • 没有找到相关文章

最新更新