在Spark中有效地聚合多个csv



请原谅我的简单问题,但我对Spark/Hadoop相对较新。

我试图加载一堆小的CSV文件到Apache Spark。它们目前存储在S3中,但如果简化的话,我可以将它们下载到本地。我的目标是尽可能高效地完成这项工作。让一些单线程主机下载和解析一堆CSV文件,而我的几十个Spark工作人员却无所事事,这似乎是一种耻辱。我希望有一种习惯的方式来分发这个作品。

CSV文件按如下目录结构排列:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...

我有两年的数据,每天都有目录,每个目录中都有几百个csv。所有这些CSV都应该有一个相同的模式,但当然也有可能有一个CSV出错了,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某个地方的日志中通知我发生了这种情况,这些文件就可以跳过。

似乎我脑子里的每个Spark项目都是同样的形式,我不知道如何解决它。(例如,试图读取一堆以制表符分隔的天气数据,或读取一堆日志文件来查看这些)

What I've try

我已经尝试了SparkR和Scala库。我真的不在乎我需要使用哪种语言;我对正确的习语/工具更感兴趣。

纯Scala

我最初的想法是枚举和parallelize所有year/mm-dd组合的列表,这样我就可以让我的Spark工作人员每天都独立处理(下载并解析所有CSV文件,然后将它们堆叠在一起(unionAll())以减少它们)。不幸的是,使用Spark - CSV库下载和解析CSV文件只能在"父"/主作业中完成,而不能从每个子作业中完成,因为Spark不允许作业嵌套。因此,只要我想使用Spark库进行导入/解析,这就行不通了。

混合语言

当然,您可以使用该语言的本地CSV解析来读取每个文件,然后将它们"上传到"Spark。在R中,这是将一些包从S3中取出文件的组合,然后是read.csv,最后是将数据放入Spark的createDataFrame()。不幸的是,这真的很慢,而且似乎也违背了我希望Spark工作的方式。如果我所有的数据在进入Spark之前都是通过R传递的,为什么还要用Spark呢?

蜂巢/Sqoop/凤凰/猪/水槽/水槽Ng/s3distcp

我已经开始研究这些量身定制的工具,很快就不知所措了。我的理解是,许多/所有这些工具都可以用来将我的CSV文件从S3获取到HDFS。

当然,从HDFS读取我的CSV文件比从S3读取要快,所以这解决了部分问题。但是我仍然有成千上万的csv需要解析,而且我不知道在Spark中有一种分布式的方式来做这件事。

所以现在(Spark 1.4) SparkR支持jsonparquet文件结构。Csv文件可以被解析,但随后spark上下文需要用一个额外的jar启动(需要下载并放在适当的文件夹中,我自己从来没有这样做过,但我的同事做过)。

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

文档中有更多信息。我希望更新的spark版本会对此提供更多支持。

如果你不这样做,你需要使用不同的文件结构或使用python将所有文件从.csv转换为.parquet。下面是最近一次python演讲的一个片段,它做到了这一点。

data = sc.textFile(s3_paths, 1200).cache()
def caster(x):
    return Row(colname1 = x[0], colname2 = x[1])
df_rdd = data
    .map(lambda x: x.split(','))
    .map(caster)
ddf = sqlContext.inferSchema(df_rdd).cache()
ddf.write.save('s3n://<bucket>/<filename>.parquet')

还有,你的数据集有多大?您甚至可能不需要火花进行分析。请注意,现在也是;

  • SparkR只支持DataFrame
  • 还没有分布式机器学习。
  • 如果你想使用像ggplot2这样的库,你需要将分布式数据框转换回正常的数据框。
  • 如果你的数据集不超过几gb,那么学习spark的额外麻烦可能还不值得
  • 它现在是适度的,但你可以期待从未来更多

我以前遇到过这个问题(但是我阅读了大量的Parquet文件),我的建议是避免使用数据帧并使用rdd。

通常使用的习语是:

  1. 读取文件列表,每个文件为一行(在驱动程序中)。这里的预期输出是字符串列表
  2. 并行化字符串列表,并使用客户csv阅读器对它们进行映射。返回一个case类的列表。

如果在一天结束时你想要一个像List[weather_data]这样的数据结构,可以重写为parquet或数据库,你也可以使用flatMap。

最新更新