我有一个包含字母(a-Z)和数字(1-9)的流。我确实想加入在超时内到达的字母(这可能会改变),并且总是立即发出数字。你能建议我哪些功能最适合这样做吗?
示例工作代码(不确定这是正确的和/或一个好的解决方案):
private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());
private IObservable<string> lettersJoined(IObservable<char> ob)
{
return Observable.Create<string>(observer =>
{
var letters = new List<char>();
var lettersFlush = new SerialDisposable();
return ob.Subscribe(c =>
{
if (char.IsUpper(c))
{
if ((await sTimeouts.FirstAsync()).Ticks > 0)
{
letters.Add(c);
lettersFlush.Disposable =
VariableTimeout(sTimeouts)
.Subscribe(x => {
observer.OnNext(String.Concat(letters));
letters.Clear();
});
}
else
observer.OnNext(letters.ToString());
}
else if (char.IsDigit(c))
observer.OnNext(c.ToString());
}
}
}
private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
return Observable.Create<long>(obs =>
{
var sd = new SerialDisposable();
var first = DateTime.Now;
return timeouts
.Subscribe(timeout =>
{
if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
{
sd.Disposable = null;
obs.OnNext(timeout.Ticks);
obs.OnCompleted();
}
else
{
timeout -= DateTime.Now - first;
sd.Disposable =
Observable
.Timer(timeout)
.Subscribe(t => {
obs.OnNext(t);
obs.OnCompleted();
});
}
});
});
}
private void ChangeTimeout(int timeout)
{
sTimeouts.OnNext(timeout.ms())
}
// I use the following extension method
public static class TickExtensions
{
public static TimeSpan ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms);
}
}
要修改超时,我可以简单地更改私有超时变量,但如果需要/更好的话,它的Subject可能是可以的。
更新
var scheduler = new TestScheduler();
var timeout = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(0000.Ms(), 2000),
ReactiveTest.OnNext(4300.Ms(), 1000));
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(1600.Ms(), '2'),
ReactiveTest.OnNext(1900.Ms(), 'A'),
ReactiveTest.OnNext(2100.Ms(), 'B'),
ReactiveTest.OnNext(4500.Ms(), 'C'),
ReactiveTest.OnNext(5100.Ms(), 'A'),
ReactiveTest.OnNext(5500.Ms(), '5'),
ReactiveTest.OnNext(6000.Ms(), 'B'),
ReactiveTest.OnNext(7200.Ms(), '1'),
ReactiveTest.OnNext(7500.Ms(), 'B'),
ReactiveTest.OnNext(7700.Ms(), 'A'),
ReactiveTest.OnNext(8400.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(1600.Ms(), "2"),
ReactiveTest.OnNext(4100.Ms(), "AB"),
ReactiveTest.OnNext(5500.Ms(), "5"),
ReactiveTest.OnNext(7000.Ms(), "CAB"),
ReactiveTest.OnNext(7200.Ms(), "1"),
ReactiveTest.OnNext(9400.Ms(), "BAA"));
// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")
更新#2
在缓冲期间正确支持超时更改的改进解决方案
这里有几件事可能会有所帮助。
第一个大理石图很好地帮助可视化问题,但当证明某个东西是否有效时,让我们用ITestableObservable<T>
实例进行规范和单元测试。
其次,我不确定你的解决方案应该是什么。如果我看一下你的大理石图,我会发现一些差异。在这里,我添加了一个时间轴来帮助可视化。
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
在这里,我看到了第十单元发布的"AB"输出。然后我看到了第十九单元发布的"CAB"输出。此外,我看到第二十九单元公布的"BAA"输出。但你建议这些应该在持续的超时时间间隔内发生。所以我认为,也许价值观之间的差距很重要,但这似乎也不重要。这让我回到上面的观点,请提供一个可以通过或失败的单元测试。
第三,关于您的实现,您可以通过将SerialDisposable
类型用于lettersFlush
类型来使它稍微好一点。
为了帮助我设置单元测试,我创建了以下代码块
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms(), "AB"),
ReactiveTest.OnNext(2000.Ms(), "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms(), "BAA"));
我已经采取了一些自由,改变了一些价值观,我认为你所说的大理石图的意思。
如果我使用@Shlomo上面提供的非常好的答案,我可以通过使用模糊大理石图看到更多的问题。由于缓冲区边界必须发生在要包含的最后一个值发生之后,因此这些窗口需要关闭一次。
void Main()
{
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0500.Ms(), '2'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(1000.Ms(), 'B'),
ReactiveTest.OnNext(1500.Ms(), 'C'),
ReactiveTest.OnNext(1800.Ms(), 'A'),
ReactiveTest.OnNext(2000.Ms(), 'B'),
ReactiveTest.OnNext(2200.Ms(), '1'),
ReactiveTest.OnNext(2500.Ms(), 'B'),
ReactiveTest.OnNext(2700.Ms(), 'A'),
ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(0500.Ms(), "2"),
ReactiveTest.OnNext(1000.Ms()+1, "AB"),
ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
ReactiveTest.OnNext(2200.Ms(), "1"),
ReactiveTest.OnNext(3000.Ms()+1, "BAA"));
/*
111111111122222222223
Time: 123456789012345678901234567890
Input: 1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA
*/
var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
//Move to a hot test sequence to force the windows to close just after the values are produced
scheduler.CreateHotObservable<Unit>(
ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));
var publishedFinal = input
.Publish(i => i
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge(i
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);
var observer = scheduler.CreateObserver<string>();
publishedFinal.Subscribe(observer);
scheduler.Start();
//This test passes with the "+1" values hacked in.
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
我想我的观点是Rx是确定性的,因此我们可以创建确定性的测试。因此,虽然你的问题很好,我相信@Shlomo提供了一个可靠的最终答案,但我们可以做得更好,而不仅仅是模糊大理石图和在我们的例子/测试中使用Random
。这里的精确性应该有助于防止生产中出现愚蠢的竞赛条件,并帮助读者更好地理解这些解决方案。
假设sampleInput
作为样本输入:
var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
.Select(i => random.Next(2000))
.ToList();
var sampleInput = charStream
.Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
.Select(t => Observable.Return(t.Item1).Delay(t.Item2))
.Concat();
首先,与其更改可变变量,不如生成一些流来表示缓冲区窗口:
Input: 1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA
我生成了一个递增TimeSpan
s的流,并将其称为bufferBoundaries
,如下所示:
var bufferBoundaries = Observable.Range(1, 20)
.Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
.Concat();
这看起来像这样:
Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB : ---1-----2--------3-----------4-
接下来,您需要将sampleInput
拆分为单独的字母和数字流,并相应地处理它们:
var letters = sampleInput
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()));
var numbers = sampleInput
.Where(c => char.IsNumber(c))
.Select(c => c.ToString());
接下来,将两个流合并在一起:
var finalOutput = letters.Merge(numbers);
最后,如果你能帮助的话,两次订阅同一个输入(在我们的例子中是sampleInput
)通常不是一个好主意。因此,在我们的情况下,我们应该用以下内容替换letters
、numbers
和finalOutput
:
var publishedFinal = sampleInput
.Publish(_si => _si
.Where(c => char.IsLetter(c))
.Buffer(bufferBoundaries)
.Where(l => l.Any())
.Select(lc => new string(lc.ToArray()))
.Merge( _si
.Where(c => char.IsNumber(c))
.Select(c => c.ToString())
)
);