Spark:并行化多个数据范围的创建



我目前正在基于ID列表生成数据范围 - 基于一个ID的每个查询都使A 非常大的postgresql表的可管理子集。然后,我根据需要写出的文件结构对输出进行分区。问题在于我达到了一个速度限制,并且主要利用了我的执行者资源。

我不确定这是否是重新思考我的体系结构的问题,还是有一些简单的方法可以解决这个问题,但基本上我想获得更多的任务并行化,但是我无法让所有16位执行者忙碌忙碌在尝试尽快完成此ETL工作时。

所以...这是我认为我可以做的事情:

  1. 并行化列表。
  2. 然后列表中的每个元素,在执行者上,通过JDBC选择一个(相对较小的)数据框。
  3. 然后foreachPartition(其中一定很少),我需要采取一些行动(包括每个分区中的数据写作),这些分区操作也可以分支到工人节点/执行者。

当前代码看起来像这样,但当然会抛出" py4j.py4jexception:method getNewargs ([]),因为Spark Sessign上下文无法传递到foreach closure中这将使这可以留在执行者上:

spark = SparkSession 
    .builder 
    .appName
    ... etc
# the list, distributed to workers
idsAndRegionsToProcess = sc.parallelize(idList)
# the final thing that needs to be done with the data
# (each partition written to a file and sent somewhere)
def transformAndLoad(iterator, someField, someOtherField):
    for row in iterator:
        ...do stuff
    ...write a file to S3
# !! The issue is here (well, at least with my current approach)!!
# In theory these are the first operations that really need to be
# running on various nodes.
def dataMove(idAndRegion, spark):
        # now pull dataFrames from Postgres foreach id
        postgresDF = spark.read 
            .format("jdbc") 
            .option("url" …
        .option("dbtable", "(select id, someField, someOtherField from table_region_“ + idAndRegion[1] + ” where id = ‘“ + idAndRegion[0] + ”') as history") 
        … more setup        
    postgresDF.repartition('someOtherField')
    postgresDF.persist(StorageLevel.MEMORY_AND_DISK)
    postgresDF.foreachPartition(lambda iterator: transformAndLoad(iterator, someField, someOtherField))
# invoking the problematic code on the parallelized list
idsAndRegionsToProcess.foreach(lambda idAndRegion: dataMove(idAndRegion, spark))

我知道这不是完全可能的,但是也许我错过了一个使这成为可能的微妙之处?这似乎比选择1TB数据然后对其进行处理要高得多,但是也许有一些我不知道的基本分页。

我的工作代码非常相似,否则几乎使用此确切的代码在收集的列表上运行的常规循环,但这速度很慢,并且不接近使用执行者。

对于一些额外的上下文,我在EMR和Yarn上,我的Spark-Submit(来自主节点)看起来像这样:spark-submit -packages org.postgresql:postgresql:9.4.1207.jre7- deploy-mode cluster -num-executors 16 - executor-memory 3G-Master yarn dataMove.py.py

另外,选择这些数据框并非有问题,因为结果是数据的一小部分,并且数据库正确地索引了,但是选择整个表似乎绝对是不可能的在其中一些。另外,重新分配将其除以需要写入每个(个人和特别命名)的文件。

我将对任何建议开放,即使这只是意味着使用我的工作代码,并以某种方式将其开始尽可能多的工作,而其他事情仍然从上次开始运行。但是首先,我的方法可以工作吗?

您可以考虑将数据工作负载作为火花集群上的单独作业/应用程序运行:

https://spark.apache.org/docs/latest/submitting-applications.html

但是,您对将数据存储在多个分区中的评论也应大大有助于减少对其进行处理所需的内存。您可以避免以这种方式将其分成单独的工作。

Spark UI AT:

http://localhost:4040

是您的朋友,弄清楚您的工作在Spark内部创建的步骤以及它消耗的资源。根据这些见解,您可以优化它并减少所需的内存量或提高处理速度。

相关内容

  • 没有找到相关文章

最新更新