我对NodeJS流有点陌生,我对它了解得越多,我越相信它不是一个特别简单稳定的东西。我正试着用csv/csv-parse(显然是最流行的csv模块使用管道API,其中包括由同一作者使用流转换。
我在这里所经历的部分实际上是可重复的,而不实际使用解析器,所以我注释了这些部分,使示例更简单(对于那些更喜欢JavaScript而不是CoffeeScript的人,还有一个JS版本):
#-------------------------------------------------------------------------------
fs = require 'fs'
transform_stream = require 'stream-transform'
log = console.log
as_transformer = ( method ) -> transform_stream method, parallel: 11
# _new_csv_parser = require 'csv-parse'
# new_csv_parser = -> _new_csv_parser delimiter: ','
#-------------------------------------------------------------------------------
$count = ( input_stream, title ) ->
count = 0
#.............................................................................
input_stream.on 'end', ->
log ( title ? 'Count' ) + ':', count
#.............................................................................
return as_transformer ( record, handler ) =>
count += 1
handler null, record
#-------------------------------------------------------------------------------
read_trips = ( route, handler ) ->
# parser = new_csv_parser()
input = fs.createReadStream route
#.............................................................................
input.on 'end', ->
log 'ok: trips'
return handler null
input.setMaxListeners 100 # <<<<<<
#.............................................................................
# input.pipe parser
input.pipe $count input, 'trips A'
.pipe $count input, 'trips B'
.pipe $count input, 'trips C'
.pipe $count input, 'trips D'
# ... and so on ...
.pipe $count input, 'trips Z'
#.............................................................................
return null
route = '/Volumes/Storage/cnd/node_modules/timetable-data/germany-berlin-2014/trips.txt'
read_trips route, ( error ) ->
throw error if error?
log 'ok'
输入文件包含204865行GTFS数据;我在这里没有解析它,只是原始地阅读它,所以我猜我用上面的代码计算的是块的数据。
我将流从一个计数器输送到另一个计数器,并期望尽可能频繁地击中最后一个计数器第一个;然而,这是我得到的:
trips A: 157
trips B: 157
trips C: 157
...
trips U: 157
trips V: 144
trips W: 112
trips X: 80
trips Y: 48
trips Z: 16
在前面的设置中,我确实解析了数据,我得到了这个:
trips A: 204865
trips B: 204865
trips C: 204865
...
trips T: 204865
trips U: 180224
trips V: 147456
trips W: 114688
trips X: 81920
trips Y: 49152
trips Z: 16384
所以看起来小溪不知何故沿路干涸了。
我的怀疑是输入流的end
事件不是一个可靠的信号来听的时候试图确定所有的处理是否已经完成——毕竟,假设处理已经完成是合乎逻辑的只能在流被完全消耗后一段时间完成。
,所以我寻找另一个事件来收听(没有找到),并延迟调用回调(与setTimeout
, process.nextTick
和setImmediate
),但没有效果。
如果有人能指出来就太好了
- (1)
setTimeout
,process.nextTick
和setImmediate
在这种情况下的关键区别是什么,以及 - (2)如何可靠地确定最后一个字节是否已被管道的最后一个成员处理。
更新我现在认为问题在于流变换,它有一个问题,有人报告了一个非常类似的问题,几乎相同的数字(他有234841条记录,最后是16390,我有204865,最后是16384)。不是证据,但太接近了,不可能是偶然。
i放弃流转换,使用事件流。地图相反;然后测试运行正常。
几天后,我想我可以说stream-transform在处理大文件时存在问题。
我已经切换到事件流,这在我看来是一个更好的解决方案,因为它是完全通用的(即,它一般是关于流,而不是关于CSV-data-as-streams)。我已经在文档中概述了一些关于NodeJS中流库的想法,我的初始pipdreams模块提供了许多常用的流操作。