我正在尝试处理下面代码中的多个avro文件。这个想法是首先在列表中获得一系列avro文件。然后打开每个avro文件并生成一组元组(string,int)。然后最后按键对元组流进行分组,并对int求和。
object AvroCopyUtil {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setAppName("Leads Data Analysis").setMaster("local[*]")
val sc = new SparkContext(conf)
val fs = FileSystem.get(new Configuration())
val avroList = GetAvroList(fs, args(0))
avroList.flatMap(av =>
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av)
.map(r => (r._1.datum.get("field").toString, 1)))
.reduceByKey(_ + _)
.foreach(println)
}
def GetAvroList(fs: FileSystem, input: String) : List[String] = {
// get all children
val masterList : List[FileStatus] = fs.listStatus(new Path(input)).toList
val (allFiles, allDirs) = masterList.partition(x => x.isDirectory == false)
allFiles.map(_.getPath.toString) ::: allDirs.map(_.getPath.toString).flatMap(x => GetAvroList(fs, x))
}
}
我得到的编译错误是
[error] found : org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)]
[error] required: TraversableOnce[?]
[error] avroRdd.flatMap(av => sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](av))
[error] ^
[error] one error found
编辑:根据下面的建议,我尝试了
val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](avroList.mkString(","))
但是我得到了错误
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in scheme name at index 0: 2015-10-
15-00-1576041136-flumetracker.foo.com-FooAvroEvent.1444867200044.avro,hdfs:
您的函数是不必要的。您还试图在一个没有实际意义的转换中创建一个RDD。转换(在本例中为flatMap
)在RDD上运行,RDD中的记录将是转换后的记录。在flatMap
的情况下,匿名函数的预期输出是TraversableOnce
对象,然后通过转换将其展平为多个记录。不过,看看您的代码,您并不真的需要做flatMap
,因为简单的map
就足够了。还要记住,由于RDD的不变性,您必须始终将转换重新分配为新值。
试试类似的东西:
val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](filePath)
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _)
您似乎需要花一些时间来掌握Spark的一些基本框架细微差别。我建议您全面阅读《Spark编程指南》。最后,如果你想使用Avro,也请查看spark Avro,因为使用Avro的大部分锅炉板都在那里处理(DataFrames可能更直观,更容易用于你的用例)。
(编辑:)
您似乎误解了如何加载要在Spark中处理的数据。parallelize()
方法用于在RDD中分发集合,而不是在文件中分发数据。要执行后者,实际上只需要向newAPIHadoopFile()
加载程序提供一个逗号分隔的输入文件列表。因此,假设您的GetAvroList()
功能有效,您可以执行以下操作:
val avroList = GetAvroList(fs, args(0))
val avroRDD = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](avroList.mkString(","))
val countsRDD = avroRDD.map(av => (av._1.datum.get("field1").toString, 1)).reduceByKey(_ + _)
flatMappedRDD.foreach(println)