我在AWS S3上有一个CSV文件和一个JSON文件(每个文件有500万行/记录)。它们包含完全相同的数据,只是格式不同。
在一个有10个任务节点的EMR集群上,我启动了Spark(10个执行器,40个执行器核心),并创建了两个DataFrames:一个针对CSV,一个针对JSON。
对基于JSON文件构建的DF的查询运行速度是对基于CSV构建的DF类似查询的2-3倍。我没有发现任何关于存储文件格式之间性能差异的信息。
有人知道为什么针对JSON上的DF的查询比CSV上的DF运行得更快吗
下方的数据帧创建代码
根据JSON文件创建DF:
val hc_json = new org.apache.spark.sql.hive.HiveContext(sc)
val path_json = "s3://<mybucket>/<myjsonfile>.json"
val df_json = hc_json.read.json(path_json)
df_json.registerTempTable("table_json")
hc_json.sql("Select count(*) from table_json").collect()
根据CSV文件创建DF:
(我在启动spark时导入spark-csv包,并使用以下参数:--packages.com.databricks:spark-csv_2.11.2.0)
val hc_csv = new org.apache.spark.sql.hive.HiveContext(sc)
val path_csv = "s3://<mybucket>/<mycsvfile>.csv"
val df_csv = hc_csv.load("com.databricks.spark.csv", Map("path" -> path,"header"->"false"))
df_csv.registerTempTable("table_csv")
hc_csv.sql("Select count(*) from table_csv").collect()
许多因素都会影响查询性能,包括:
- 数据格式
- DataFrame的DataSource的实现细节
- 查询本身
- 数据结构
对于大多数查询来说,使用CSV可能会更快,因为文件大小小于JSON,并且需要从磁盘读取的数据更少。使用镶木地板文件格式可能会更快,因为文件大小更小,解码时间更快。
像(select count(*) from table_csv
)这样的查询在某些格式(例如parquet)下可以运行得更快,因为Spark足够聪明,可以在没有请求列的情况下跳过读取数据。
@zero323建议先将数据加载到内存中,这很可能会提高查询执行速度,但如果集群中没有足够的RAM来容纳整个数据集,则无法运行。
使用不同数据源创建的DataFrames之间不应有性能差异,包括JSON或csv。
问题是,当您在上面的代码段中调用hc_json.sql
时,您不仅要执行查询,而且每次执行查询时都要从磁盘加载数据。这意味着您测量的不是查询时间,而是磁盘访问+解析+查询。JSON和csv的第一个*和最后一个应该或多或少相同,但解析因源而异。
如果您只想测量实际的查询时间,那么应该缓存数据并执行一个操作以确保数据已经实际加载。例如
df_csv.registerTempTable("table_csv")
sqlContext.cacheTable("table_csv")
hc_csv.sql("SELECT count(*) FROM table_csv").collect()
现在应该加载数据,您可以期待类似的查询时间。
编辑实际上这里还有一个区别。从JSON源创建的DataFrame将获得正确的数据类型,而从csv创建的DataFrame,在不提供模式或设置inferSchema
选项的情况下,将所有内容读取为字符串。
*正如kostya所指出的,JSON文件通常较小。另一方面,与csv不同,JSON可以很好地处理稀疏日期。