对返回RDD的函数执行平面映射



我正在尝试处理下面代码中的多个avro文件。这个想法是首先在列表中获得一系列avro文件。然后打开每个avro文件并生成一组元组(string,int)。然后最后按键对元组流进行分组,并对int求和。

object AvroCopyUtil {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf().setAppName("Leads Data Analysis").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val fs = FileSystem.get(new Configuration())
    val avroList = GetAvroList(fs, args(0))
    avroList.flatMap(av =>
      sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av)
        .map(r => (r._1.datum.get("field").toString, 1)))
      .reduceByKey(_ + _)
      .foreach(println)
  }

  def GetAvroList(fs: FileSystem, input: String) : List[String] = {
    // get all children
    val masterList : List[FileStatus] = fs.listStatus(new Path(input)).toList
    val (allFiles, allDirs) = masterList.partition(x => x.isDirectory == false)
    allFiles.map(_.getPath.toString) ::: allDirs.map(_.getPath.toString).flatMap(x => GetAvroList(fs, x))
  }
}

我得到的编译错误是

[error]  found   : org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)]
[error]  required: TraversableOnce[?]
[error]     avroRdd.flatMap(av => sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av))
[error]                                                                                                                       ^
[error] one error found

编辑:根据下面的建议,我尝试了

val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](avroList.mkString(","))

但是我得到了错误

Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in scheme name at index 0: 2015-10-
15-00-1576041136-flumetracker.foo.com-FooAvroEvent.1444867200044.avro,hdfs:

您的函数是不必要的。您还试图在一个没有实际意义的转换中创建一个RDD。转换(在本例中为flatMap)在RDD上运行,RDD中的记录将是转换后的记录。在flatMap的情况下,匿名函数的预期输出是TraversableOnce对象,然后通过转换将其展平为多个记录。不过,看看您的代码,您并不真的需要做flatMap,因为简单的map就足够了。还要记住,由于RDD的不变性,您必须始终将转换重新分配为新值。

试试类似的东西:

val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](filePath)
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _)

您似乎需要花一些时间来掌握Spark的一些基本框架细微差别。我建议您全面阅读《Spark编程指南》。最后,如果你想使用Avro,也请查看spark Avro,因为使用Avro的大部分锅炉板都在那里处理(DataFrames可能更直观,更容易用于你的用例)。

(编辑:)

您似乎误解了如何加载要在Spark中处理的数据。parallelize()方法用于在RDD中分发集合,而不是在文件中分发数据。要执行后者,实际上只需要向newAPIHadoopFile()加载程序提供一个逗号分隔的输入文件列表。因此,假设您的GetAvroList()功能有效,您可以执行以下操作:

val avroList = GetAvroList(fs, args(0))
val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](avroList.mkString(","))
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _)
flatMappedRDD.foreach(println)

最新更新