RxJs - 获取两个可观察量的重复项



我需要获取两个流的重复项目。我想我几乎设法做到了,但前提是那些与第二个流重复的项目按顺序进行。例如:

这有效:

first = Observable.of(1, 2, 3)
second = Observable.of(2, 3, 1) 

但这不会:

first = Observable.of(1, 4, 3)
second = Observable.of(1, 2, 3)

当我的循环到达 4 时,它中断了:

EmptyError {name: "EmptyError", stack: "EmptyError: no element in 序列↵ 在新 Emp...e (http://localhost:4200/vendor.bundle.js:161:22)",消息:"否 元素按顺序排列"}

我的整个代码都在一个函数中,您可以复制/粘贴并测试它:

findDublicates() {
  let match = 0; // setting it to 0, so later could assign other number
  let keys = []; // list of maching keys 
  let elementAt = 0; // index of item of first observable          
  let allKeys$;
  let validKeys$;
  // counting the length of both observables, so this will be the number of loops
  // that checks for dublicates
  let allKeysLength;
  let validKeysLength;
  let allKeysLength$ = Observable.of(2, 1, 4, 5, 7).count()
    allKeysLength$.subscribe(val => allKeysLength = val)
  let validKeysLength$ = Observable.of(1, 2, 3, 8, 5).count()
    validKeysLength$.subscribe(val => validKeysLength = val)   
  let cycles = Math.min(allKeysLength,validKeysLength); // length of the shorter observable               
  // wrapping it in a function so when called variables will take new values
  function defineObs() {
    allKeys$ = Observable.of(2, 1, 4, 5, 7)
      .elementAt(elementAt).take(1);
    validKeys$ = Observable.of(1, 2, 3, 8, 5)
      .filter((x) => (x === match)).first(); 
   }
  for (var i=0; i<=cycles; i++) {
    defineObs();
    allKeys$.subscribe(
      function (val) { match = val },
      function (err) { console.log(err) },
      function () { console.log('Done filter')}
    );
    validKeys$.subscribe(
      function (val) { keys.push(val) },
      function (err) { console.log(err) },
      function () { console.log('Done push')}
    );
    elementAt += 1;
    cycles -= 1;
  } 
  return console.log(keys);
}

感谢您的任何帮助。

如果您不关心哪个流发出一组重复项的第一个值,您可以合并它们并视为在单个流上查找重复值:

first.merge(second)
     .scan(([ dupes, uniques ], next) =>
       [ uniques.has(next) ? dupes.add(next) : dupes, uniques.add(next) ], 
       [ new Set(), new Set() ]
     )
     .map(([ dupes ]) => dupes)

注意:上述集合是不可变的,以避免scan中未定义的行为。

我会检查可观察序列上的Observable.combineLatestscan方法。

这就是我的想法,使用 combineLatest 将两个可观察量组合在一起,并在其上应用 scan 运算符。您甚至可以使用Set来确保唯一性,甚至是mapfilter

最新更新