我在EC2上运行一个带有1个主的Spark独立群集和2个奴隶。集群正在工作。我有一个Python应用程序,该应用程序加载来自S3的数据。代码如下:
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")
然后,我在DF上应用.foreach(func)
在DF的每一行上进行一些工作:
def test_func(row):
row = modify(row)
row.save() # just an example
df.foreach(test_func)
我已经阅读了文档,他们说.foreach()
已针对分布式/并行处理进行了优化。但是,test_func仅在1个节点上运行,请参阅下面的日志:(任务3是.foreach(test_func)
)
INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2
无论如何是否有将此test_func
分发给集群中多个节点/工人的?帮助非常感谢。预先感谢您。
******更新******
我已经凸起了数据,但是仍然只有1个任务分配给1个工作人员,并且运行该功能需要大量时间。这就是我运行应用程序
的方式./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster
另一件事是我设置了--executor-memory 5G
,但工人只使用1GB RAM。有人可以帮我吗?我已经陷入了几天。非常感谢您。
来自@lostinoverflow:
如果确实如此,此代码不会解释单个任务。 可能没有足够的数据来实现更多数据。
这是正确的。将数据增加到几千记录后,任务将被拆分并分配给所有执行者。