关于RX我有一个小问题。我有一股来自键盘的符号流,我需要将它们分成几组。当流中出现";"符号时,应启动一个新组。简单来说,我需要一个类似于 Buffer 的运算符,但它在某个条件为真时触发,而不是在一段时间延迟或事件计数之后触发。有没有办法使用 RX 中已经存在的运算符构建它,或者我应该注册自己的运算符?
这是一个来源。
var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable();
以下是您的要求:
var groups = source
// Group the elements by some constant (0)
// and end the group when we see a semicolon
.GroupByUntil(x => 0, group => group.Where(x => x == ';'))
以下是使用它的一种方法:
groups
// Log that we're on the next group now.
.Do(x => Console.WriteLine("Group: "))
// Merge / Concat all the groups together
// {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;}
.Merge()
// Ignore the semicolons? This is optional, I suppose.
.Where(x => x != ';')
// Log the characters!
.Do(x => Console.WriteLine(" {0}", x))
// Make it so, Number One!
.Subscribe();
输出:
Group:
a
b
Group:
c
d
e
我们可以将缓冲区覆盖与边界可观察一起使用,其中边界可观察是过滤为仅分号条目的初始流。
//this is our incoming stream
IObservable<char> keyboardStream;
//if the observable is cold we need to do this
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it
var persistedStream = keyboardStream.Publish().RefCount();
var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';'));
这是尼古拉答案的非 RefCount 版本。这提供了更明确的订阅和处置同步,并且应该删除在与使用者订阅的线程不同的线程上观察到源时发生的争用条件(处理 UI 时经常出现这种情况)。
var groups = Observable.Create(o => {
var publishedSource = source.Publish();
return new CompositeDisposable(
publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o),
publishedSource.Connect()
);
});