我有以下SPARK SQL代码,该代码检查大表格中没有某些日期(几十亿行):
spark = SparkSession.builder
.master("yarn")
.appName("minimal_example")
.config('spark.submit.deployMode', 'client')
.getOrCreate()
SQL = '''
select distinct
substr(entrydate, 1, 10) as datum,
1 as in_table
from {table}
where entrydate >= '{datum}'
'''
print("RUN1")
df1 = spark.sql(SQL.format(datum='2017-01-01', table='table1'))
c1 = df1.count()
print("count1: ", c1)
print("RUN2")
df2 = spark.sql(SQL.format(datum='2017-01-01', table='table2'))
c2 = df2.count()
print("count2: ", c2)
本质上,该函数只是从表列获得了不同的日期。
现在我无法缠绕我的头:
- 每次呼叫
count()
自行运行 - 当我将每个呼叫作为单独的
spark-submit
作业运行时,它可以正常工作 - 但是,如果像上面的连续运行,第二次运行会产生以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o150.sql.
: java.util.concurrent.ExecutionException: java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded
我的解释是,第二次运行中的垃圾收集是在第二次运行中开始的。
我尝试的是:
- 在每次迭代开始时呼叫spark.clearcache()
- 在每次迭代开始时呼叫
spark._jvm.SparkSession.clearDefaultSession()
,spark._jvm.SparkSession.clearActiveSession()
- 查看Spark Web UI,并尝试从DAG和存储选项卡中获得感知(后一个没有显示任何内容),无用
- 更改两个
count
s的顺序。这会导致不同的错误:java.io.IOException: Connection reset by peer
(有关类似错误,请参见此处)
最后一个观察:第一个呼叫旋转> 100个火花/纱线执行者,也许Spark的动态分配机制不喜欢第二个呼叫实际上是对执行者有不同要求的新工作?
任何帮助都非常感谢!
环境:Cloudera CDH 6.1群集的Spark 2.3
编辑:更多详细信息
- 表格在HDFS中将表持续为Parquet文件:
+--------+------------+-------+--------+--------------+
| table | # rows |# cols |# files | raw size |
+--------+------------+-------+--------+--------------+
| table1 | 5660970439 | 46 | 49167 | 228876171398 |
| table2 | 5656000217 | 52 | 80000 | 518996700170 |
+--------+------------+-------+--------+--------------+
- 内存设置:带有动态分配的纱线上的火花,min执行器内存为1GB,最大为72 GB,总群集存储器为〜300GB。
- 第一个
count()
旋转约150个执行者,充分利用当前可用的内存资源
让问题沉入几天后,我只是尝试增加驱动程序内存:
spark2-submit --master yarn --deploy-mode client --driver-memory 4G minimal_example.py
也许决定因素是我的应用程序以client
模式启动。显然,即使驾驶员本身仅收到简单的df.count()
的结果。