通过跨多个源的连接激发ETL



我需要使用spark数据帧将50多个文件连接到一组3个键上。我有一个司机表,每天有100000条记录。我将此表与其他53个使用数据帧的文件连接起来,如下所示。

val df1 = spark.read.parquet(<driver file>)
val df2 = spark.read.parquet(<right side file1>)
.
.
val df52 = spark.read.parquet(<right side file 52>)
//join
val refinedDF1 = df1.join(df2,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
val refinedDF2 = refinedDF1.join(df3,Seq("key1","key2","key3"),"leftouter")).select(<some from left table>, <some from right table>)
.
.
so on for all 50 odd files
refinedFinalDF.write.parquet(<s3 location>)

执行失败,出现错误

容器以非零退出代码52 退出

这基本上是内存外的例外。我有一个相当大的集群,用于100000条记录的数据集。我有一个EMR,有12个执行器,每个执行器16G,驱动器内存20G。

我尝试过用df.repartition(200)以循环方式手动将数据帧划分为200个分区,但这根本没有帮助。在联接键中,只有key1对于所有记录都是不同的,key2和key3对于所有记录是相同的值。是否可以进行任何优化以使其发挥作用?我试图保存的最后一个数据帧中有140多列。如果驱动程序表有n条记录,那么在每个左外部之后,我只得到n条记录。

更新:我已经尝试过用限制(100)从驱动程序表中创建一个较小的数据帧,但我仍然得到内存不足的异常。

您的表是1-1还是1-many?如果它们是一对多的,那么您的联接将导致比您可能想要的更多的行。如果是这种情况,一种选择是首先在你要加入的每个表上做一个groupBy

val df1 = Seq(1, 2).toDF("id")
val df2 = Seq(
(1, "a", true),
(1, "b", false),
(2, "c", true)
).toDF("id", "C2", "B2")
val df3 = Seq(
(1, "x", false),
(1, "y", true),
(2, "z", false)
).toDF("id", "C3", "B3")
// Left outer join without accounting for 1-Many relationship.  Results in cartesian
// joining on each ID value!
df1.
join(df2, Seq("id"), "left_outer").
join(df3, Seq("id"), "left_outer").show()
+---+---+-----+---+-----+
| id| C2|   B2| C3|   B3|
+---+---+-----+---+-----+
|  1|  b|false|  y| true|
|  1|  b|false|  x|false|
|  1|  a| true|  y| true|
|  1|  a| true|  x|false|
|  2|  c| true|  z|false|
+---+---+-----+---+-----+

或者,如果你在联接之前对行进行分组,这样你的关系总是1-1,你就不会得到添加的记录

val df2Grouped = df2.groupBy("id").agg(collect_list(struct($"C2", $"B2")) as "df2")
val df3Grouped = df3.groupBy("id").agg(collect_list(struct($"C3", $"B3")) as "df3")
val result = df1.
join(df2Grouped, Seq("id"), "left_outer").
join(df3Grouped, Seq("id"), "left_outer")
result.printSchema
result.show(10, false)
scala> result.printSchema
root
|-- id: integer (nullable = false)
|-- df2: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- C2: string (nullable = true)
|    |    |-- B2: boolean (nullable = false)
|-- df3: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- C3: string (nullable = true)
|    |    |-- B3: boolean (nullable = false)

scala> result.show(10, false)
+---+-----------------------+-----------------------+
|id |df2                    |df3                    |
+---+-----------------------+-----------------------+
|1  |[[a, true], [b, false]]|[[x, false], [y, true]]|
|2  |[[c, true]]            |[[z, false]]           |
+---+-----------------------+-----------------------+

碰巧,我用来创建数据帧的s3 bucket中的底层数据中有多个文件夹,作为过滤器的一部分,我正在筛选特定的文件夹。示例:spark.read.parquet(s3 bucket).filter('folder_name="val")。看起来spark正在将s3 bucket中的所有数据加载到executor内存中,然后运行筛选器。这就是为什么它轰炸了与在hive外部表上运行的hive查询相同的逻辑运行的地方,该表指向s3位置,文件夹作为分区列。我不得不删除过滤器并阅读特定的文件夹来解决问题。。spark.read.parquet(s3桶/文件夹=值)。。

我遇到了类似的情况,我有多个联接,最后我必须将最终数据帧写入HDFS/Hive表(Parquet格式)。

Spark使用Lazy Execution机制,这意味着,当你的53’rd数据帧被操作(保存/写入为Parquet)时,Spark会返回到所有连接并执行它们,这会导致数据的巨大混乱,最终你的作业容器会失败并抛出内存不足错误。

建议:你可以先把每个加入的数据帧写到HDFS上,我的意思是,一旦你加入了2个(可以超过2个,但保持有限)数据帧,就把加入的数据框写到HDFS/Hive中,并使用select * 'hive parquet table

val refinedDF1 = df1.join(df2 ,condition,'join_type')
refinedDF1.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine1")
val refinedDF1 = hc.sql("select * from dbname.refine1")
val refinedDF2 = refinedDF1.join(df3)
refinedDF2.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine2")
val refinedDF2 = hc.sql("select * from dbname.refine2")

现在,您经常将联接写入hdfs,这意味着当您调用最终联接时,spark不必执行它们,它只需要使用以表形式保存的52'nd联接输出。

通过使用这种方法,我的脚本从22小时(包括容器内存错误)减少到了15到30分钟(没有内存异常/错误)。

几个提示:

1) 排除联接key为空的记录,spark与具有null = null条件的联接相比性能不佳,因此在联接数据帧之前将其删除

2) 当左边的数据帧是多行,而右边的数据帧则是查找或几行时,请使用广播联接。

3) 脚本执行后,您必须清理保存在Hive/Hdfs中的中间数据帧。

相关内容

  • 没有找到相关文章

最新更新