pyspark- for each函数的数据框架对多个工人/并行化不起作用



我在EC2上运行一个带有1个主的Spark独立群集和2个奴隶。集群正在工作。我有一个Python应用程序,该应用程序加载来自S3的数据。代码如下:

spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")

然后,我在DF上应用.foreach(func)在DF的每一行上进行一些工作:

def test_func(row):
    row = modify(row)
    row.save() # just an example
df.foreach(test_func)

我已经阅读了文档,他们说.foreach()已针对分布式/并行处理进行了优化。但是,test_func仅在1个节点上运行,请参阅下面的日志:(任务3是.foreach(test_func)

INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2

无论如何是否有将此test_func分发给集群中多个节点/工人的?帮助非常感谢。预先感谢您。

******更新******

我已经凸起了数据,但是仍然只有1个任务分配给1个工作人员,并且运行该功能需要大量时间。这就是我运行应用程序

的方式
./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster

另一件事是我设置了--executor-memory 5G,但工人只使用1GB RAM。有人可以帮我吗?我已经陷入了几天。非常感谢您。

来自@lostinoverflow:

如果确实如此,此代码不会解释单个任务。 可能没有足够的数据来实现更多数据。

这是正确的。将数据增加到几千记录后,任务将被拆分并分配给所有执行者。

相关内容

  • 没有找到相关文章