强制完成 RXJS 观察器



我有一个rxjs观察者(实际上是一个主题),它永远尾随文件,就像tail -f一样。 例如,它非常适合监视日志文件。

这种"永远"的行为对我的应用程序非常有用,但对于测试来说很糟糕。 目前我的应用程序有效,但我的测试永远挂起。

我想强制观察器更改提前完成,因为我的测试代码知道文件中应该有多少行。 我该怎么做?

尝试在我返回的主题句柄上调用 onComplete,但此时它基本上被转换为观察者,您无法强制关闭它,错误是:

对象 # 没有方法 'onComplete'

以下是源代码:

function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}
var lineSep = /[r]{0,1}n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     
this.source = source;
}           

这是无法弄清楚如何强制永久结束的测试代码(磁带样式测试)。请注意"非法"行:

test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);
})

听起来你解决了你的问题,但对于你原来的问题

我想强制观察器更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做?

一般来说,当你有更好的选择时,不鼓励使用Subject s,因为它们往往是人们使用他们熟悉的编程风格的拐杖。与其尝试使用Subject,我建议您考虑每个事件在可观察生命周期中的含义。

包装事件发射器

EventEmitter#on/off模式已经存在Observable.fromEvent形式的包装器。它仅在有侦听器时才处理清理并保持订阅处于活动状态。因此ObserveTail可以重构为

function ObserveTail(filename) {
  return Rx.Observable.create(function(observer) {
    var lineSep = /[r]{0,1}n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

与香草使用相比,这有几个好处 Subjects ,一,您现在实际上会看到下游的错误,二,这将在您完成事件时处理事件的清理。

避免*同步方法

然后,这可以滚动到您的文件存在检查中,而无需使用readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

接下来,您可以通过改用flatMap来简化过滤器/地图/地图序列。

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

不发信号,取消订阅

当流只沿一个方向传播时,如何停止"信号"停止。我们实际上很少希望观察者直接与可观察对象通信,因此更好的模式是实际上不是"发出停止信号",而是简单地取消订阅Observable,并将其留给可观察对象的行为来确定它应该从那里做什么。

从本质上讲,你的Observer真的不应该关心你的Observable,而是说"我在这里完成了"。

为此,您需要声明停止时要达到的条件。

在这种情况下,由于您只是在测试用例中的设定数字后停止,因此您可以使用take取消订阅。因此,最终的订阅块如下所示:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

编辑 1

正如评论中指出的那样,在这个特定 API 的情况下,没有真正的"关闭"事件,因为 Tail 本质上是一个无限操作。从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件。所以你的块最终可能看起来像:

function ObserveTail(filename) {
  return Rx.Observable.create(function(observer) {
    var lineSep = /[r]{0,1}n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

添加finallyshare运算符会创建一个对象,当新订阅者到达时,该对象将附加到尾部,并且只要至少有一个订阅者仍在侦听,该对象就会保持连接状态。但是,一旦所有订阅者都完成了,我们就可以安全地unwatch尾巴。

相关内容

  • 没有找到相关文章

最新更新