spark程序以计算以0初始初始化的累加器值,并将以1的速度增量,当程序正在读取具有100个文件的文件夹时?
val myaccumulator = sc.accumulator(0)
val inputRDD= sc.wholeTextFiles("/path/to/100Files")
inputRDD.foreach(f => myaccumulator + f.count)
<console>:29: error: value count is not a member of (String, String)
inputRDD.foreach(f => myaccumulator + f.count)
^
如果您只想计算文件中的行,则不需要任何幻想。这样做是:
sc.textFile("path/to/dir/containing/the/files").count
如果您绝对想使用累加器,则可以这样做:
val myaccumulator = sc.accumulator(0)
sc.textFile("path/to/dir/containing/the/files").foreach(_ => myaccumulator += 1)
如果您绝对想使用wholetextfile(将每个文件的全部内容放入一个字符串中),则以下任何一个都会计算行:
sc.wholetextFiles("path/to/dir/containing/the/files")
.map(_._2.split("\n").size)
.reduce(_+_)
或累加器
val myaccumulator = sc.accumulator(0)
sc.wholeTextFiles
.foreach(x => myaccumulator += x._2.split("\n").size)