Rx.NET "gate"运算符



[注意:如果这很重要,我正在使用3.1。另外,我已经在代码审查中问过这个问题,但到目前为止还没有回应。

我需要一个运算符来允许布尔流充当另一个流的门(当门流为真时让值通过,当值为假时删除它们(。我通常会为此使用 Switch,但如果源流很冷,它会继续重新创建它,这是我不想要的。

我还想自己清理,以便在源或门完成时结果完成。

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
var s = source.Publish().RefCount();
var g = gate.Publish().RefCount();
var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
var flag = false;
g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
return s.Where(_ => flag).TakeUntil(anyCompleted);
}

除了整体冗长之外,我不喜欢我订阅门,即使结果从未订阅过(在这种情况下,这个运算符应该是无操作的(。有没有办法摆脱订阅?

我也尝试过这种实现,但在清理自身时更糟:

return Observable.Create<T>(
o =>
{
var flag = false;
gate.Subscribe(value => flag = value);
return source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
});
});

这些是我用来检查实现的测试:

[TestMethod]
public void TestMethod1()
{
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
// the gate starts with false, so the source events are ignored
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
CollectionAssert.AreEqual(new int[0], output);
// setting the gate to true will let the source events pass
gate.OnNext(true);
source.OnNext(4);
CollectionAssert.AreEqual(new[] { 4 }, output);
source.OnNext(5);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// setting the gate to false stops source events from propagating again
gate.OnNext(false);
source.OnNext(6);
source.OnNext(7);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// completing the source also completes the result
source.OnCompleted();
CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
// completing the gate also completes the result
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
gate.OnCompleted();
CollectionAssert.AreEqual(new[] { -1 }, output);
}

更新:当门终止时,这也终止。我在复制/粘贴中错过了TestMethod2

return gate.Publish(_gate => source
.WithLatestFrom(_gate.StartWith(false), (value, b) => (value, b))
.Where(t => t.b)
.Select(t => t.value)
.TakeUntil(_gate.IgnoreElements().Materialize()
));

这通过了您的测试TestMethod1,当门可观察时它不会终止。

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return source
.WithLatestFrom(gate.StartWith(false), (value, b) => (value, b))
.Where(t => t.b)
.Select(t => t.value);
}

这有效:

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return
source.Publish(ss => gate.Publish(gs =>
gs
.Select(g => g ? ss : ss.IgnoreElements())
.Switch()
.TakeUntil(Observable.Amb(
ss.Select(s => true).Materialize().LastAsync(),
gs.Materialize().LastAsync()))));
}

这通过了两项测试。

你和Observable.Create走在正确的轨道上。应从可观察量上的两个订阅调用 onError 和 onComplete,以便在需要时正确完成或出错。此外,通过在Create委托中返回两个IDisposable,可以确保在sourcegate完成之前释放 When 订阅时,可以正确清理两个订阅。

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return Observable.Create<T>(
o =>
{
var flag = false;
var gs = gate.Subscribe(
value => flag = value,
e => o.OnError(e),
() => o.OnCompleted());
var ss = source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
},
e => o.OnError(e), 
() => o.OnCompleted());
return new CompositeDisposable(gs, ss);
});
}

一个更短,但更难阅读的版本,仅使用 Rx 运算符。对于冷可观察量,它可能需要源的发布/引用计数。

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return gate
.Select(g => g ? source : source.IgnoreElements())
.Switch()
.TakeUntil(source.Materialize()
.Where(s => s.Kind == NotificationKind.OnCompleted));
}

相关内容

  • 没有找到相关文章

最新更新