如何有条件地合并单个可观察流中的对象



流包含以下对象

const data = [
  { type: 'gps',   id: 1, val: 1 },
  { type: 'gps',   id: 2, val: 2 },
  { type: 'speed', id: 2, val: 3 },
  { type: 'gps',   id: 3, val: 4 },
  { type: 'speed', id: 4, val: 5 },
  { type: 'gps',   id: 4, val: 6 },
  { type: 'gps',   id: 5, val: 7 }
]

如果 id 相同,则合并对象。如果没有匹配的 id,则忽略该对象:

[
   [{type: 'gps', id:2, val:2}, { type: 'speed', id: 2, val: 3 }],
   [{ type: 'speed', id: 4, val: 5 },{ type: 'gps',   id: 4, val: 6 }]
]

我的想法是将具有相同类型的对象分组,最终得到两个新流

Rx.Observable.from(data)
  .groupBy((x) => x.type)
  .flatMap((g) => ...)
  ....

然后在id相等的情况下再次合并/压缩它们。

我不确定如何在 Rx 中指定它,我也不确定这是否是一个好方法。

无需拆分流并再次合并。您可以使用scan收集对象并filter出不符合条件的对象

const data = [
  { type: 'gps', id: 1, val: 1 },
  { type: 'gps', id: 2, val: 2 },
  { type: 'speed', id: 2, val: 3 },
  { type: 'gps', id: 3, val: 4 },
  { type: 'speed', id: 4, val: 5 },
  { type: 'gps', id: 4, val: 6 },
  { type: 'gps', id: 5, val: 7 }
]
const generator$ = Rx.Observable.from(data)
generator$
  .scan((acc, x) => {
    if (R.contains(x.id, R.pluck('id', acc))) {
      acc.push(x);
    } else {
      acc = [x]
    }
    return acc
  }, [])
  .filter(x => x.length > 1)
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/ramda/0.23.0/ramda.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

最新更新