我有一个rxjs观察者(实际上是一个主题),它永远尾随文件,就像tail -f一样。 例如,它非常适合监视日志文件。
这种"永远"的行为对我的应用程序非常有用,但对于测试来说很糟糕。 目前我的应用程序有效,但我的测试永远挂起。
我想强制观察器更改提前完成,因为我的测试代码知道文件中应该有多少行。 我该怎么做?
我尝试在我返回的主题句柄上调用 onComplete,但此时它基本上被转换为观察者,您无法强制关闭它,错误是:
对象 # 没有方法 'onComplete'
以下是源代码:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[r]{0,1}n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
这是无法弄清楚如何强制永久结束的测试代码(磁带样式测试)。请注意"非法"行:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
听起来你解决了你的问题,但对于你原来的问题
我想强制观察器更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做?
一般来说,当你有更好的选择时,不鼓励使用Subject
s,因为它们往往是人们使用他们熟悉的编程风格的拐杖。与其尝试使用Subject
,我建议您考虑每个事件在可观察生命周期中的含义。
包装事件发射器
EventEmitter#on/off
模式已经存在Observable.fromEvent
形式的包装器。它仅在有侦听器时才处理清理并保持订阅处于活动状态。因此ObserveTail
可以重构为
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[r]{0,1}n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var close = Rx.Observable.fromEvent(tail, "close");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line.takeUntil(close).merge(error).subscribe(observer);
});
}
与香草使用相比,这有几个好处 Subjects
,一,您现在实际上会看到下游的错误,二,这将在您完成事件时处理事件的清理。
避免*同步方法
然后,这可以滚动到您的文件存在检查中,而无需使用readSync
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
.filter(function(exists) { return exists; })
.flatMap(ObserveTail(filename));
接下来,您可以通过改用flatMap
来简化过滤器/地图/地图序列。
var result = source.flatMap(function(x) {
try {
return Rx.Observable.just(JSON.parse(x));
} catch (e) {
return Rx.Observable.empty();
}
},
//This allows you to map the result of the parsed value
function(x, json) {
return json.name;
})
.timeout(10000, "observer timed out");
不发信号,取消订阅
当流只沿一个方向传播时,如何停止"信号"停止。我们实际上很少希望观察者直接与可观察对象通信,因此更好的模式是实际上不是"发出停止信号",而是简单地取消订阅Observable
,并将其留给可观察对象的行为来确定它应该从那里做什么。
从本质上讲,你的Observer
真的不应该关心你的Observable
,而是说"我在这里完成了"。
为此,您需要声明停止时要达到的条件。
在这种情况下,由于您只是在测试用例中的设定数字后停止,因此您可以使用take
取消订阅。因此,最终的订阅块如下所示:
result
//After lines is reached this will complete.
.take(lines)
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
编辑 1
正如评论中指出的那样,在这个特定 API 的情况下,没有真正的"关闭"事件,因为 Tail 本质上是一个无限操作。从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件。所以你的块最终可能看起来像:
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[r]{0,1}n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line
.finally(function() { tail.unwatch(); })
.merge(error).subscribe(observer);
}).share();
}
添加finally
和share
运算符会创建一个对象,当新订阅者到达时,该对象将附加到尾部,并且只要至少有一个订阅者仍在侦听,该对象就会保持连接状态。但是,一旦所有订阅者都完成了,我们就可以安全地unwatch
尾巴。