数据丰富与Spark作业失败与大型数据集(数十亿行)



我们有一个Pipeline,我们在其中进行数据充实。我们的目标是处理至少10tb的数据。目前,我们只处理了不到2%的数据(180 GB),但我们正面临内存问题。

集群组件

• 150 cores, 570 GB RAM (10 nodes: 16 CPU, 64 GB RAM, 256 GB Disk) 
• A s3 server to store processed data in parquet format

我们的SPARK配置

deploy.mode = "client"
master = "spark://192.168.x.x:7077"
network.timeout = 864001
default.parallelism = 450 
sql.shuffle.partitions = 450
sql.files.maxPartitionBytes = "128MB"
# executor configs
executor.memory = 19g 
executor.memoryOverhead = 3g
executor.instances = 30 
executor.cores = 5 #Number of concurrent tasks an executor can run.
// executor.heartbeatInterval = 864000000
# driver configs
driver.cores = 15
driver.memory = 19g
driver.memoryOverhead = 6g

用例

我们目前有一部分数据(10tb的180gb)以parquet格式存储在S3存储上。我们把这个数据命名为dataDDD.我们有各种其他文件来源:CSV, TXT, XLSX等。我们称它们为F1 F2 F3…FnFn文件的大小从mb到50gb不等。

浓缩让我们像这样恢复数据模式。(实际上它有超过30列)

name: String
phone_number: ArrayType
hobbies:ArrayType
job_title: String

为了丰富数据,我们需要按具有相同phone_number的所有列分组。

流程下面是我们充实DDD的过程F1与

  • 读取DDDdf_d和F1df_1spark.read
  • 将df_d和df_1合并为df_union
  • 在df_union中,爆破phone_number_exploded中的phone_number列
  • 然后首先对phone_number_exploded的单个值进行分组,然后对phone_number进行分组,以重建所有行,同时在hobbiesjob列上应用collect_set,如果不为空。
  • 然后我们再次在S3中保存df_union

问题

这很好,但随着数据的增长,我们开始面临面向对象的问题。任务开始失败,出现以下错误:

错误1

ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Error2

java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:492)

我们还启用了AQE。它运行得很好,但当我们达到每df 10亿条记录时,我们开始面临数据倾斜的问题。由于保存的数据在最后一次拍摄时挂起,有时会持续数小时。

我们的问题

  • 我们做错了什么吗?
  • 有没有更好的方法来处理我们的工作流程?也许使用连接或任何其他技术?
  • 当AQE达到一定数量后似乎下降时,我们如何处理歪斜数据?

我建议调整执行器配置,这些值对于你的集群大小来说似乎很低。

# executor configs
executor.memory = 60g
executor.memoryOverhead = 3g
executor.instances = 10
executor.cores = 32

最新更新