通过文件独立分发火花处理



我的测量值超过10年,来自10 000个传感器。这可以作为HDFS中存储的ASCII文件(待改进,而不是此请求的主题(可用:

  • 每个传感器一个文件
  • 每个样品一行
  • 两个列(时间,值(
  • 一线标头

作为概念证明,我使用SPARK使用以下Scala代码定位 SPARK v1.6.1

来计算传感器的平均值。
// Read file as text
val lines = sc.textFile("/data/sensor_1.dat")
// Drop header
val header = lines.first
val lines_clean = lines.filter(line => line != header)
// Compute mean
val values = lines_clean.map(_.split("t").last.toDouble)
val mean = values.sum / values.count

现在,我想将其应用于100000个文件,为每个传感器获得一个平均值。我应该如何继续?我应该实现循环吗?我可以在文件级别上处理RDD,而不是文件行级别吗?有更好的想法吗?

谢谢!

您可以尝试使用wholetextfiles((方法(此处(,它读取整个目录,并返回带有(文件名,内容(对的一对rdd。

然后,文件名是传感器,并且内容可以与之前的方式相似。

数据存储在 hdfs中,这意味着它是在群集上分发的,而spark可以实现数据并行性,但是您编写代码,因为它是一个目录,您需要担心分区。如果可能的话,也将数据转换为镶木。

我强烈建议使用datasets,因为spark将能够优化计算。

导入org.apache.spark.sql.functions ._

case class Sensor(time: java.sql.Timestamp, value: Double)
val ds = spark.read
  .option("header", "true")
  .option("delimiter", "t")
  .csv(s"hdfs://${master}:9000/data.tsv")
.as[Sensor] 
 // tune by partition number   .partition(100)
val mean = ds.select(avg(col("value")).as("mean"))

,因为每个文件中的数据似乎都不包括您可能想使用的wholetextfiles选项的传感器ID,该选项将加载 每个文件中的一个PAIRDD,其中键是文件名。这将意味着更多解析,因为您需要解析以获取传感器名称并将整个文件与值分开以获取样品 - 但至少您可以区分哪个数据属于哪个传感器。p>您应该注意,您传递给wholetextfiles的路径(或此事的textfile(可以是路径的列表,包括通配符,如sc.wholeTextFiles("/dir/to/first/sensor,/dir/to/second/sensor,/sensor[0-10]*,/etc")

您可以像这样读取整个文件夹:

import org.apache.spark.sql.functions.input_file_name
val inputPath: String = "/data/"
val lines = sqlContext.read.text(inputPath)
  .select(input_file_name.alias("filename"), $"value")
  .rdd

然后,您可以处理文件名的String,并且值与问题相同的方式相同:

val linesClean = lines.filter(l => l.getString(1) != header).map(l => (l.getString(0), l.getString(1)))
val meanForEachFile = linesClean.groupByKey().map{
    case (name, linesInFile) => 
    val values = linesInFile.map(v => v.split("t").last.toDouble)
    val mean = values.sum / values.count
    mean
}

最新更新