如何修复 Publish() 的不一致.RefCount() 行为



最近我偶然发现了Enigmatity关于PublishRefCount运算符的有趣声明:

您正在使用危险的.发布()。RefCount() 运算符对,用于创建完成后无法订阅的序列。

这种说法似乎与李·坎贝尔对这些运营商的评估相反。引用他的书Intro to Rx:

Publish/RefCount 对对于获取冷可观察量并将其共享为热可观察序列以供后续观察者使用非常有用。

起初我不相信Enigmativity的说法是正确的,所以我试图反驳它。我的实验表明,Publish().RefCount()可以是 确实不一致。再次订阅已发布序列可能会导致对源序列的新订阅,具体取决于源序列在连接时是否完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。下面是此行为的演示:

var observable = Observable
.Create<int>(o =>
{
o.OnNext(13);
o.OnCompleted(); // Commenting this line alters the observed behavior
return Disposable.Empty;
})
.Do(x => Console.WriteLine($"Producer generated: {x}"))
.Finally(() => Console.WriteLine($"Producer finished"))
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"Consumer received #{x}"))
.Finally(() => Console.WriteLine($"Consumer finished"));
observable.Subscribe().Dispose();
observable.Subscribe().Dispose();

在此示例中,observable由三个部分组成。首先是生成单个值然后完成的生产部分。然后遵循发布机制(Publish+RefCount)。最后是观察生产者发出的价值的消费部分。observable被订阅两次。预期的行为是每个订阅将收到一个值。但事实并非如此!这是输出:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished

(在小提琴上试试)

如果我们注释o.OnCompleted();行,这是输出。这种微妙的变化会导致预期和期望的行为:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished

在第一种情况下,生产者(Publish().RefCount()之前的部分)只被订阅一次。第一个使用者收到发出的值,但第二个使用者未收到任何内容(OnCompleted通知除外)。在第二种情况下,生产者被认购了两次。每次它产生一个值,每个消费者得到一个值。

我的问题是:我们如何解决这个问题?我们如何修改Publish运算符或RefCount,或两者兼而有之,以使它们始终一致且理想地运行?以下是所需行为的规范:

  1. 已发布的序列应将直接来自源序列的所有通知传播到其订阅者,而不是其他任何通知。
  2. 当发布的序列的当前订阅者数从 0 增加到 1 时,应订阅源序列。
  3. 只要已发布的序列至少有一个订阅者,它就应保持与源的连接。
  4. 当已发布序列的当前订阅者数变为零时,应从源取消订阅。

我要求提供上述功能的自定义PublishRefCount运算符,或者使用内置运算符实现所需功能的方法。

顺便说一句,存在类似的问题,询问为什么会发生这种情况。我的问题是如何解决它。

>更新:回想起来,上述规范会导致不稳定的行为,使竞争条件不可避免。不能保证对已发布序列的两个订阅将生成对源序列的单个订阅。源序列可能在两个订阅之间完成,导致第一个订阅者的取消订阅,导致RefCount运营商取消订阅,从而导致下一个订阅者对源的新订阅。内置.Publish().RefCount()的行为可防止这种情况发生。

道德教训是,.Publish().RefCount()序列没有被破坏,但它是不可重用的。它不能可靠地用于多个连接/断开连接会话。如果需要第二个会话,则应创建新的.Publish().RefCount()序列。

Lee 在解释IConnectableObservable方面做得很好,但Publish解释得不是那么好。这是一种非常简单的动物,只是很难解释。我假设你明白IConnectableObservable

如果我们简单而懒惰地重新实现零参数Publish函数,它将看起来像这样:

//  For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
private readonly IObservable<T> _source;
private readonly Subject<T> _proxy = new Subject<T>();
private IDisposable _connection;

public PublishObservable(IObservable<T> source)
{
_source = source;
}

public IDisposable Connect()
{
if(_connection == null)
_connection = _source.Subscribe(_proxy);
var disposable = Disposable.Create(() =>
{
_connection.Dispose();
_connection = null;
});
return _connection;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var _subscription = _proxy.Subscribe(observer);
return _subscription;
}
}
public static class X
{
public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
return new PublishObservable<T>(source);
}
}

Publish创建一个订阅源可观察对象的代理Subject。代理可以根据连接订阅/取消订阅源:调用Connect,代理订阅源。在一次性连接上调用Dispose,代理从源取消订阅。从中得出的重要想法是,有一个单一的Subject可以代理与源的任何连接。不能保证只有一个源订阅,但可以保证一个代理和一个并发连接。可以通过连接/断开连接来拥有多个订阅。

RefCount处理调用Connect部分内容: 下面是一个简单的重新实现:

//  For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
private readonly IConnectableObservable<T> _source;
private IDisposable _connection;
private int _refCount = 0;
public RefCountObservable(IConnectableObservable<T> source)
{
_source = source;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = _source.Subscribe(observer);
var disposable = Disposable.Create(() =>
{
subscription.Dispose();
DecrementCount();
});
if(++_refCount == 1)
_connection = _source.Connect();

return disposable;
}
private void DecrementCount()
{
if(--_refCount == 0)
_connection.Dispose();
}
}
public static class X
{
public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
{
return new RefCountObservable<T>(source);
}
}

更多的代码,但仍然很简单:如果 refcount 上升到 1,则在ConnectableObservable上调用Connect,如果它下降到 0,则断开连接。

将两者放在一起,您将获得一对,该订阅保证只有一个可观察源的并发订阅,通过一个持久Subject代理。当有>0 个下游订阅时,Subject将仅订阅源。


鉴于这个介绍,你的问题中有很多误解,所以我会一一介绍:

。发布()。RefCount() 确实可能不一致。订阅第二个 发布序列的时间可能会导致对 源序列是否,取决于源序列是否 连接时完成。如果它已经完成,那么它就不会是 重新订阅。如果未完成,则将重新订阅。

.Publish().RefCount()只会在一个条件下重新订阅源代码:当它从零订阅者变为 1 时。如果订阅者的数量出于任何原因从0 到 1 到 0 到 1,那么您最终将重新订阅。源可观察完成将导致RefCount发出OnCompleted,并且其所有观察者取消订阅。因此,对RefCount的后续订阅将触发重新订阅源的尝试。当然,如果源正确观察可观察合约,它将立即发布OnCompleted,仅此而已。

[请参阅使用OnComplete可观察的示例...]可观察量被订阅两次。这 预期行为是每个订阅将收到一个 价值。

不。预期的行为是,发出OnCompleted后代理Subject将重新发出对任何后续订阅尝试的OnCompleted。由于源可观察量在第一个订阅结束时同步完成,因此第二个订阅将尝试订阅已发出OnCompletedSubject。这应该导致OnCompleted,否则可观察合约将被破坏。

[请参阅在没有 OnDone 的情况下可观察的示例作为第二种情况...]在 第一种情况 冷生产者(之前的部分 发布()。RefCount()) 只订阅了一次。第一个消费者 收到发射值,但第二个使用者未收到任何内容 (已完成通知除外)。在第二种情况下, 制片人被订阅了两次。每次生成一个值时,并且 每个消费者都有一个价值。

这是正确的。由于代理Subject从未完成,因此后续对源的重新订阅将导致冷可观察的重新运行。

我的问题是:我们如何解决这个问题?[..]

  1. 已发布的序列应将直接来自源序列的所有通知传播到其订阅者,而不执行任何通知 还。
  2. 当发布的序列的当前订阅者数从 0 增加到 1 时,应订阅源序列。
  3. 只要已发布的序列至少有一个订阅者,它就应保持与源的连接。
  4. 已发布的序列应在其当前订阅者数变为零时取消订阅源。

上述所有情况目前都发生在.Publish.RefCount,只要您不完成/错误。我不建议实现一个改变这一点的运算符,从而破坏可观察合约。

>编辑

我认为与 Rx 混淆的 #1 来源是热/冷可观察量。由于Publish可以"预热"冷可观察量,因此它会导致混淆的边缘情况也就不足为奇了。

首先,关于可观察合同的一句话。更简洁的可观察合约是,OnNext永远不能遵循OnCompleted/OnError,并且应该只有一个OnCompletedOnError通知。这确实留下了尝试订阅终止的可观察量的极端情况: 尝试订阅已终止的可观察量会导致立即收到终止消息。这会违反合同吗?也许吧,但据我所知,这是图书馆里唯一的合同作弊。另一种选择是订阅死气。这对任何人都没有帮助。

这如何与热/冷可观察量联系起来?不幸的是,令人困惑。订阅冰冷的可观察量会触发整个可观察量管道的重建。这意味着订阅已终止规则仅适用于热可观察量。冷可观察量总是重新开始。

请考虑以下代码,其中o是冷可观察对象。

var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));

就合同而言,s1背后的可观察量和s2背后的可观察量是完全不同的。因此,即使它们之间存在延迟,并且您最终会在OnCompleted之后看到OnNext,这不是问题,因为它们是完全不同的可观察量。

它的粘性是预热的Publish版本。如果您要在上面的代码中将.Publish().RefCount()添加到o末尾......

  • 如果不更改任何其他内容,s2将立即终止打印任何内容。
  • 将延迟更改为 400 左右,s2将打印最后两个数字。
  • s1更改为仅.Take(2)s2将重新开始打印 0 到 4。

使这种肮脏变得更糟的是施罗德丁格猫效应:如果你在o上设置一个观察者来观察整个过程中会发生什么,这会改变引用计数,影响功能!看着它,改变了行为。调试噩梦。

这是试图"预热"冷可观察物的危险。它只是不能很好地工作,尤其是对于Publish/RefCount.

我的建议是:

  1. 不要试图预热冷的可观察量。
  2. 如果您需要共享订阅,无论是冷的还是热的可观察量,请坚持@Enigmativity严格使用选择器Publish版本的一般规则
  3. 如果必须,请在可观察Publish/RefCount上进行虚拟订阅。这至少提供了一致的 Refcount>= 1,从而降低了量子活动效应。

正如Shlomo指出的那样,这个问题与Publish运算符有关。RefCount工作正常。所以这是需要修复的PublishPublish无非是使用标准Subject<T>作为参数调用Multicast运算符。这是它的源代码:

public IConnectableObservable<TSource> Publish<TSource>(IObservable<TSource> source)
{
return source.Multicast(new Subject<TSource>());
}

因此,Publish运算符继承了Subject类的行为。出于非常充分的理由,此类保持其完成状态。因此,如果您通过调用subject.OnCompleted()来表示其完成,该主题的任何未来订阅者都将立即收到OnCompleted通知。此功能很好地服务于独立主题及其订阅者,但当Subject用作源序列和该序列的订阅者之间的中间传播器时,就会成为有问题的工件。这是因为源序列已经保持了自己的状态,并且在主题中复制此状态会带来两种状态不同步的风险。这正是将PublishRefCount运算符结合使用时发生的情况。题主记得源头已经完成,而源头,作为一个冰冷的序列,已经失去了对前世的记忆,愿意重新开始新的生活。

因此,解决方案是向Multicast运算符提供无状态主体。不幸的是,我找不到一种基于内置Subject<T>来编写它的方法(继承不是一种选择,因为该类是密封的)。幸运的是,从头开始实施它并不是很困难。下面的实现使用ImmutableArray作为主体观察者的存储,并使用互锁操作来确保其线程安全(非常类似于内置的Subject<T>实现)。

public class StatelessSubject<T> : ISubject<T>
{
private IImmutableList<IObserver<T>> _observers
= ImmutableArray<IObserver<T>>.Empty;
public void OnNext(T value)
{
foreach (var observer in Volatile.Read(ref _observers))
observer.OnNext(value);
}
public void OnError(Exception error)
{
foreach (var observer in Volatile.Read(ref _observers))
observer.OnError(error);
}
public void OnCompleted()
{
foreach (var observer in Volatile.Read(ref _observers))
observer.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
ImmutableInterlocked.Update(ref _observers, x => x.Add(observer));
return Disposable.Create(() =>
{
ImmutableInterlocked.Update(ref _observers, x => x.Remove(observer));
});
}
}

现在可以通过将其替换为以下内容来修复Publish().RefCount()

.Multicast(new StatelessSubject<SomeType>()).RefCount()

此更改会导致所需的行为。已发布的序列最初是冷的,在首次订阅时变为热,在其最后一个订阅者取消订阅时再次变为冷。这个圈子继续,对过去的事件没有记忆。

对于源序列完成的另一种正常情况,完成将传播到所有订阅者,导致所有订阅者自动取消订阅,从而导致发布的序列变冷。最终结果是两个序列,源和已发布,始终是同步的。它们要么都是热的,要么都是冷的。

这是一个StatelessPublish运算符,使类的消耗更容易一些。

/// <summary>
/// Returns a connectable observable sequence that shares a single subscription to
/// the underlying sequence, without maintaining its state.
/// </summary>
public static IConnectableObservable<TSource> StatelessPublish<TSource>(
this IObservable<TSource> source)
{
return source.Multicast(new StatelessSubject<TSource>());
}

使用示例:

.StatelessPublish().RefCount()

相关内容

  • 没有找到相关文章

最新更新