在各种情况下,我希望有一个 RxReplay
运算符来缓冲传入的通知,在第一次订阅时同步重播其缓冲区,然后停止缓冲。这个轻量级Replay
运营商应该只能为一个订户提供服务。可以在此处找到此类运算符的一个用例,其中在第一次订阅后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个人为的例子,说明我希望我能避免的问题行为:
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
.Take(800_000)
.Do(x =>
{
if (x % 100_000 == 0) Console.WriteLine(
$"{DateTime.Now:HH:mm:ss.fff} > " +
$"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
})
.Replay()
.AutoConnect(0);
await Task.Delay(2200);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");
// First subscription
await observable.Do(x =>
{
if (x % 100_000 == 0)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});
// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");
可观察量总共生成 800,000 个值。Replay
机制立即连接到源,并在完成前中途订阅。
输出:
16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000
订阅后,内存使用量持续增长。这是意料之中的,因为所有值都已缓冲,并在重播的可观察量的整个生存期内保持缓冲状态。理想的行为是在订阅后内存使用量急剧下降。缓冲区应在传播缓冲值后丢弃,因为在订阅后没有用处。此外,第二个订阅(await observable.Count()
)应该失败并显示InvalidOperationException
。我不希望在可观察量失去其Replay
功能后能够再次订阅它。
这是我尝试实现的自定义ReplayOnce
运算符的存根。有没有人知道如何实现它?
public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
return source.Replay(); // TODO: enforce the subscribe-once policy
}
顺便说一句,这里有一个相关的问题,关于如何使Replay
运算符具有可以根据需要偶尔清空的缓冲区。我的问题是不同的,因为我希望在订阅后完全禁用缓冲区,并且不再开始增长。
我想出了ReplayOnce
运算符的实现,它基于多播自定义ReplayOnceSubject<T>
。此主题最初由ReplaySubject<T>
支持,该在第一次(也是唯一允许的)订阅期间以正常Subject<T>
进行切换:
public static IConnectableObservable<T> ReplayOnce<T>(
this IObservable<T> source)
{
return source.Multicast(new ReplayOnceSubject<T>());
}
private class ReplayOnceSubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value)
{
lock (_locker) _subject.OnNext(value);
}
public void OnError(Exception error)
{
lock (_locker) _subject.OnError(error);
}
public void OnCompleted()
{
lock (_locker) _subject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
_subject = subject;
return subscription;
}
else
throw new InvalidOperationException("Already subscribed.");
}
}
}
行replaySubject.Subscribe(subject)
确保不仅缓冲的值将传播到观察器,而且还将传播任何可能的OnError
/OnCompleted
通知。订阅后,不再引用ReplaySubject
,并且符合垃圾回收条件。