RX-Zip输出意外结果



请帮助我理解一个现象:

为什么X不等于Observable项目中的索引

构建块,例如:

public class EcgSample
{               
public EcgSample(int y)
{
Y = y;   
} 
public int X { get; set; }
public int Y { get; set; }  
}
private void Print(Tuple<EcgSample, int> s)
{
Debug.WriteLine("X : {0} , Y : {1} , Index : {2}", s.Item1.X, s.Item1.Y, s.Item2);
}
private List<EcgSample> CreateSamples()
{
var testSamples = new List<EcgSample>();
for (short i = 0; i < 1400; i++)
{
testSamples.Add(new EcgSample(i));   
}
return testSamples;
}

可观察的示例:(输出预期结果)

// (1) Create From Collection .
IObservable<EcgSample> sampleObservable = CreateSamples().ToObservable(new EventLoopScheduler());
// (2) Repeat 
IObservable<EcgSample> repeated = sampleObservable.Repeat();
// (3) Indexed 
IObservable<Tuple<EcgSample,int>> indexed = repeated.Select((item, index) =>
{
item.X = index;
return new Tuple<EcgSample, int>(item, index);
}); 
// (4) Buffered 
IObservable<IList<Tuple<EcgSample, int>>> buffered = indexed.Buffer(250); 
// (5) SelectMany and Print .
_disposable = buffered.SelectMany(buf => buf).Subscribe(Print);

OUTPUT:这是Observable序列的预期输出。

[8384] X : 0 , Y : 0 , Index : 0 
[8384] X : 1 , Y : 1 , Index : 1 
[8384] X : 2 , Y : 2 , Index : 2 
[8384] X : 3 , Y : 3 , Index : 3 
[8384] X : 4 , Y : 4 , Index : 4 

修改:(不输出UN预期结果)

现在我不希望每个缓冲区在每个间隔都被占用:

// (5) Create an Observable from a Timer. 
IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
observer =>
{
var timer = new Timer();
timer.Interval = 250;
timer.Elapsed += (s, e) => observer.OnNext(e);
timer.Start();
return Disposable.Create(() =>
{
timer.Stop();
});
});
// (6) Zip with the buffer observable 
IObservable<IList<Tuple<EcgSample, int>>> zipped = timerObservable.Zip(buffered, (t, b) => b);
// (7) SelectMany and Print .
_disposable = zipped.SelectMany(buf => buf).Subscribe(Print);

OUTPUT:这会输出一个意外的结果:请注意,X不等于index

[9708] X : 187600 , Y : 0 , Index : 0 
[9708] X : 187601 , Y : 1 , Index : 1 
[9708] X : 187602 , Y : 2 , Index : 2 
[9708] X : 187603 , Y : 3 , Index : 3 

你知道为什么X从187600开始吗(不用说,每次运行程序时这个值都不一样)。。?

编辑:

我通过简单地在最后投影来解决问题,但我仍然想知道为什么会出现第一个问题。

List<EcgSample> list = CreateSamples();     
var loop = new EventLoopScheduler();
var sampleObservable = list.ToObservable(loop);
IObservable<EcgSample> reapeted = sampleObservable.Repeat();
IObservable<IList<EcgSample>> buffered = reapeted.Buffer(250);
IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
observer =>
{
var timer = new Timer();
timer.Interval = 250;
timer.Elapsed += (s, e) => observer.OnNext(e);
timer.Start();
return Disposable.Create(() =>
{
timer.Stop();
});
});
IObservable<IList<EcgSample>> zipped = timerObservable.Zip(buffered, (t, b) => b);
_disposable = zipped.SelectMany(buf => buf).Select((item, index) =>
{
item.X = index;
return new Tuple<EcgSample, int>(item, index);
}).Subscribe(Print);

你的答案显示了一件事,你可以改变它来获得你想要的行为,但这并不是它没有按照你预期的方式工作的真正原因。

如果您想将Observable中的每个条目与一个数字相关联,那么实际上应该将其与一个编号相关联。按照您的做法,流中的每个元素和数字之间没有实际的连接。您的修复只确保在下一个项目通过之前处理好每个项目,因此数字恰好处于正确的值。但这是一个非常不稳定的情况。

如果你只想对流中的项目进行连续计数,请查看Select的过载,它会为你提供索引:

stream.Select((item, index) => new { item, index })
.Subscribe(data => Debug.WriteLine("Item at index {0} is {1}", data.index, data.item))

或者,如果你想要一些不同于流上项目计数的东西,你可以做一些类似的事情:

stream.Select(item => new { item, index = <some value you calculate> })
...

通过这种方式,您的对象及其索引被绑定在一起。您可以在将来的任何时候使用项目的索引,并且仍然知道它的索引是什么。而您的代码依赖于在处理下一个项目之前到达每个项目。

解决问题中的编辑

首先,看看Observable.Interval。它可以做你想用计时器做的事情,但要容易得多。

其次,看看下面的例子,它再现了你在问题中所做的事情。运行此代码会产生正确的输出:

var items = Enumerable.Range(65, 26)
.Select(i => (char)i)
.Repeat();
var observableItems = items.ToObservable()
.Select((c, i) => new { Char = c, Index = i });
var interval = Observable.Interval(TimeSpan.FromSeconds(0.25));
var buffered = observableItems.Buffer(10);
var zipped = buffered.Zip(interval, (buffer, _) => buffer);
zipped.SelectMany(buffer => buffer).Dump();

您可以在LinqPad中运行该代码,这是一个非常有用的工具,用于探索Rx(以及.Net的其他部分)

最后,我想这是一个简单的练习,试图弄清楚你的情况。看起来你可能正在试图处理传感器数据,这些数据会推送比你想要处理的更多的更新。将Zip与间隔一起使用不会有多大帮助。你会减慢数据的到达速度,但它只会建立一个越来越大的数据队列,等待通过Zip。

如果希望每250毫秒获取一个数据点,请查看Sample。如果你想一次获得250毫秒的读数,可以看看Buffer的过载,它需要一个时间跨度而不是计数。

最新更新