请原谅我的简单问题,但我对Spark/Hadoop相对较新。
我试图加载一堆小的CSV文件到Apache Spark。它们目前存储在S3中,但如果简化的话,我可以将它们下载到本地。我的目标是尽可能高效地完成这项工作。让一些单线程主机下载和解析一堆CSV文件,而我的几十个Spark工作人员却无所事事,这似乎是一种耻辱。我希望有一种习惯的方式来分发这个作品。
CSV文件按如下目录结构排列:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
我有两年的数据,每天都有目录,每个目录中都有几百个csv。所有这些CSV都应该有一个相同的模式,但当然也有可能有一个CSV出错了,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某个地方的日志中通知我发生了这种情况,这些文件就可以跳过。
似乎我脑子里的每个Spark项目都是同样的形式,我不知道如何解决它。(例如,试图读取一堆以制表符分隔的天气数据,或读取一堆日志文件来查看这些)
What I've try
我已经尝试了SparkR和Scala库。我真的不在乎我需要使用哪种语言;我对正确的习语/工具更感兴趣。
纯Scala我最初的想法是枚举和parallelize
所有year/mm-dd
组合的列表,这样我就可以让我的Spark工作人员每天都独立处理(下载并解析所有CSV文件,然后将它们堆叠在一起(unionAll()
)以减少它们)。不幸的是,使用Spark - CSV库下载和解析CSV文件只能在"父"/主作业中完成,而不能从每个子作业中完成,因为Spark不允许作业嵌套。因此,只要我想使用Spark库进行导入/解析,这就行不通了。
当然,您可以使用该语言的本地CSV解析来读取每个文件,然后将它们"上传到"Spark。在R中,这是将一些包从S3中取出文件的组合,然后是read.csv
,最后是将数据放入Spark的createDataFrame()
。不幸的是,这真的很慢,而且似乎也违背了我希望Spark工作的方式。如果我所有的数据在进入Spark之前都是通过R传递的,为什么还要用Spark呢?
蜂巢/Sqoop/凤凰/猪/水槽/水槽Ng/s3distcp
我已经开始研究这些量身定制的工具,很快就不知所措了。我的理解是,许多/所有这些工具都可以用来将我的CSV文件从S3获取到HDFS。
当然,从HDFS读取我的CSV文件比从S3读取要快,所以这解决了部分问题。但是我仍然有成千上万的csv需要解析,而且我不知道在Spark中有一种分布式的方式来做这件事。
所以现在(Spark 1.4) SparkR支持json
或parquet
文件结构。Csv文件可以被解析,但随后spark上下文需要用一个额外的jar启动(需要下载并放在适当的文件夹中,我自己从来没有这样做过,但我的同事做过)。
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
文档中有更多信息。我希望更新的spark版本会对此提供更多支持。
如果你不这样做,你需要使用不同的文件结构或使用python将所有文件从.csv
转换为.parquet
。下面是最近一次python演讲的一个片段,它做到了这一点。
data = sc.textFile(s3_paths, 1200).cache()
def caster(x):
return Row(colname1 = x[0], colname2 = x[1])
df_rdd = data
.map(lambda x: x.split(','))
.map(caster)
ddf = sqlContext.inferSchema(df_rdd).cache()
ddf.write.save('s3n://<bucket>/<filename>.parquet')
还有,你的数据集有多大?您甚至可能不需要火花进行分析。请注意,现在也是;
- SparkR只支持DataFrame
- 还没有分布式机器学习。 如果你想使用像
- 如果你的数据集不超过几gb,那么学习spark的额外麻烦可能还不值得
- 它现在是适度的,但你可以期待从未来更多
ggplot2
这样的库,你需要将分布式数据框转换回正常的数据框。我以前遇到过这个问题(但是我阅读了大量的Parquet文件),我的建议是避免使用数据帧并使用rdd。
通常使用的习语是:
- 读取文件列表,每个文件为一行(在驱动程序中)。这里的预期输出是字符串列表
- 并行化字符串列表,并使用客户csv阅读器对它们进行映射。返回一个case类的列表。
如果在一天结束时你想要一个像List[weather_data]这样的数据结构,可以重写为parquet或数据库,你也可以使用flatMap。