在RX中,如何组合不同类型的两个源



设置:

  • 第一个IObservable产生A类型的值
  • 第二个IObservable产生B类型的值
  • 它们以不同的速度产生价值(相当快,每10毫秒一次)

我正在努力实现的目标:

N一次(N相当慢,大约500ms),必须对服务进行调用,并提供FirstSecondIObservable的最新值。

问题:我想知道如何使用RX。

当前解决方案(非工作)

var stateObs = from drag in dragObs.MostRecent(0).ToObservable()
                from roll in rollObs.MostRecent(0).ToObservable()
                select new ClientState
                            {
                                FileDragPerc = drag,
                                PhoneRoll = roll,
                                PendingFileType = FileType.Image,
                                TransferState = TransferState.SelectiveTransfer
                            };
stateObs.Sample(TimeSpan.FromMilliseconds(300))
        .Subscribe(x => _lsService.SetClientStateAsync(x),
                    x => Debug.WriteLine("Error in observable "),
                    () => Debug.WriteLine("Error observable finished! "));

你是对的。CombineLatest操作员就是这么做的:

A: 1...2...3...4...5...
B: a.....b.........c...

保留任一序列的最后一个值以生成选择器作用的对。输出流将是(1,a) (2,a) (2,b) (3,b),依此类推

如果您必须从任一流中构造正确的对,请使用"Zip"运算符,它将为您提供(1,a) (2,b) (3,c)

p.S.

我建议尝试更好地理解编译器如何重写查询理解。它将解决你的大部分困惑。

from a in oA
from b in oB
select ...

是有效的SelectMany(oA, oB)

我想我自己找到了答案

CombineLatest()操作可以满足我的需要,这就是我得到的:

var stateObs = dragObs.CombineLatest(rollObs, (d, r) => new ClientState
                                                       {
                                                           FileDragPerc = d,
                                                           PhoneRoll = r,
                                                           TransferState = TransferState.SelectiveTransfer,
                                                           PendingFileType = FileType.Image
                                                       });
    stateObs.Sample(TimeSpan.FromMilliseconds(300))
            .Subscribe(x => _lsService.SetClientStateAsync(x),
                        x => Debug.WriteLine("Error in observable "),
                        () => Debug.WriteLine("Error observable finished! "));

相关内容

  • 没有找到相关文章

最新更新