>我遇到了一个问题,我想在谓词为真时订阅可观察的流,并在谓词为假时停止订阅。当将来某个时间点的谓词再次为 true 时,它应该重新订阅可观察流。
用例:
我将我的可观察流作为输入(IObservable<IList<LogEntity>> items
(,如果我无法将日志实体插入数据库,它应该取消订阅流,当数据库备份运行时,它应该自动订阅流(基于属性IsSubscribed
(并开始插入数据。
我自己的尝试:
我已经尝试了以下不起作用的方法:
var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
where dataItem.Any()
select new {Type = dataItem.Key, List = dataItem.Select(o => o)};
groups
.TakeWhile(o => IsSubscribed)
.SubscribeOn(_scheduler)
.Repeat()
.Subscribe(o => Insert(o.Type, o.List));
根据属性IsSubscribed
,我想流式传输订阅和取消订阅。当TakeWhile
为真时,OnCompleted
会被调用,当Subscribe
之后不起作用时。旁注:这是一个冷的可观测流
问题:
如何创建一个可观察的流,我可以在其中根据需要多次订阅和取消订阅(有点像 C# 中的事件处理程序(
提前感谢您的帮助
看起来像一个重复的问题。
但是,从冷 IObservable 上的暂停和恢复订阅中提取代码,可以将其调整为
var subscription = Observable.Create<IObservable<YourType>>(o =>
{
var current = groups.Replay();
var connection = new SerialDisposable();
connection.Disposable = current.Connect();
return IsSubscribed
.DistinctUntilChanged()
.Select(isRunning =>
{
if (isRunning)
{
//Return the current replayed values.
return current;
}
else
{
//Disconnect and replace current.
current = source.Replay();
connection.Disposable = current.Connect();
//yield silence until the next time we resume.
return Observable.Never<YourType>();
}
})
.Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));
你可以看到马特·巴雷特(和我(在这里谈论它。我建议观看整个视频(可能以 2 倍的速度(以获得完整的上下文。
你想要的是添加 组 .延迟(群.SelectMany(WaitForDatabaseUp((
public async Task WaitForDatabaseUp()
{
//If IsSubscribed continue execution
if(IsSubscribed) return;
//Else wait until IsSubscribed == true
await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
.Value()
.Where(isSubscribed => isSubscribed)
.Take(1);
}
使用您喜欢的框架将 INPC 转换为可观察量,您可以在其中看到 ObserveProperty((
基本上,我们内联了一个仅在IsSubscribed == true
时返回的任务,然后将该任务转换为可观察量,以便与Rx兼容。