当我不知道如何使用 LINQ 生成一些可枚举项时,我只需创建自己的扩展方法并使用 yield
关键字。这给了我一个闭包,我可以在其中存储计数器或其他一些聚合值等内容。
在IObservable
我不知道是否有内置的方法可以做到这一点。我最近想生成一个可观察量,给定另一个IObservable<string>
,忽略所有内容,直到源中出现起始值(例如"开始"(,然后开始忽略所有内容,直到源产生结束值(例如"结束"(。
因此,例如,如果我的来源是{"1", "start", "2", "3", "end", "4", "start", "5", "end}
,则新的可观察量应该是{"2", "3" "5"}
。
可能有一种方法可以使用 IObservable 的内置 int 方法来做到这一点,但如果它是一个IEnumerable
,使用 yield 关键字会很简单。所以我想知道是否有类似的简单方法可以为 IObservable 做到这一点。
我想出了这个基本上可以完成这项工作的小类:
public class ClosureSelectMany<TSource, TResult> : IObservable<TResult>
{
private readonly IObservable<TSource> _Source;
private readonly Func<Func<TSource, IObservable<TResult>>> _Selector;
public ClosureSelectMany(IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
{
_Source = source;
_Selector = selector;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
var selector = _Selector();
return _Source.SelectMany(selector).Subscribe(observer);
}
}
public static class ObservableHelpers
{
public static IObservable<TResult> ClosureSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
{
return new ClosureSelectMany<TSource, TResult>(source, selector);
}
}
然后我可以像这样使用它:
test = input.ClosureSelectMany<string, string>(() =>
{
bool running = false;
return val =>
{
if (val == "end")
running = false;
var result = running ? Observable.Return(val) : Observable.Empty<string>();
if (val == "start")
running = true;
return result;
};
});
这看起来像如果我在 IEnumerable 中使用 yield 关键字我会做什么。
所以我想知道我是否正在重新发明轮子,并且已经有一些内置功能可以做到这一点,如果不是为什么。也许这种方法可能会导致我现在没有看到的任何其他问题。
Observable.Create
而不是将现有运算符组合在一起,尽管尽可能组合通常是一个更好的主意。
请注意,Create
还具有接受 Task
-return 函数的重载,允许您定义异步迭代器 - 一个使用 await 而不是 yield 的协程,尽管这不是您在这里特别需要的。
但是,您的问题可以通过组合Scan
、Where
和Select
来解决:
xs.Scan(
new { Running = false, Value = default(string) },
(previous, current) => new
{
Running = current == "start" || (previous.Running && current != "end"),
Value = current
})
.Where(state => state.Running && state.Value != "start")
.Select(state => state.Value);
对于可观察量,没有yield return
等价物。它肯定是可以烘焙到编译器中的东西,但它目前不可用。
但是,使用可用的运算符,您可以轻松完成所需的操作。
这对我有用:
var results =
source
.Publish(ss =>
ss
.Window(
ss.Where(s0 => s0 == "start"),
s => ss.Where(s1 => s1 == "end"))
.Select(xs => xs.Skip(1).SkipLast(1))
.Merge());
鉴于此来源:
var source = new []
{
"1", "start", "2", "3", "end", "4", "start", "5", "end"
}.ToObservable();
我得到这个输出:
2
3
5
首先,我想澄清一下,这里的重点是收益率回报的替代方案。
其次,这个特定的示例问题可以这样解决:
bool running = false;
Observable.ToObservable(new [] {"1", "start", "2", "3", "end", "4", "start", "5", "end"})
.Where(s => {
if (s == "start") running = true;
if (s == "end") running = false;
return (running && s != "start");
})
.Dump();
现在,答案是:
您需要记住,收益回报只是语法糖,使您不必自己实现 IEnumerable 和 IEnumerator。
您可以将其视为 IEnumerator.MoveNext(( 和 IEnumerator.Current 的组合,而无需额外的类来保存枚举器的状态,如 MSDN 文档 (https://msdn.microsoft.com/en-us/library/9k7k7cf0.aspx( 中所述。
您可能还读过IObservable是异步/推送">双"到同步/拉取IEnumerable(http://reactivex.io/intro.html(。
考虑到所有这些,你可以认为 IObservable 等价于 yield return(IEnumerator.MoveNext(( + IEnumerator.Current( 是 IObserver.OnNext(T value( 方法。
这样想,您可以轻松地实现此示例中所需的内容(在 LINQPad 中测试(:
void Main()
{
var q = FilteredSource.Generate();
q.Dump();
}
public class FilteredSource
{
public static IObservable<string> Generate()
{
var q = from s in OriginalSource.Generate()
select s;
return Observable.Create<string>(
async observer =>
{
bool produce = false;
try
{
await q.ForEachAsync(s => {
if (s == "start")
produce = true;
if (s == "end")
produce = false;
if (produce && s != "start")
observer.OnNext(s);
});
}
catch (Exception ex)
{
observer.OnError(ex);
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}
public class OriginalSource
{
public static IObservable<string> Generate()
{
return Observable.Create<string>(
observer =>
{
try
{
string[] list = {"1", "start", "2", "3", "end", "4", "start", "5", "end"};
foreach (string s in list)
observer.OnNext(s);
}
catch (Exception ex)
{
observer.OnError(ex);
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}