我在EMR上使用了spark shell-spark版本2.2.0/2.1.0。在尝试广播简单对象(我的CSV文件只包含1列,不到2MB)时,我注意到它并没有保存在每个执行器内存中,而是只保存在驱动程序内存中,尽管它应该按照文档中的建议https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-TorrentBroadcast.html
广播前(即sc.broadcast(arr_collected))和广播后附上的印刷屏幕显示了我的结论。此外,我检查了工人的机器内存使用情况,与Spark UI中的情况相同,广播后没有更改。
1-广播前打印屏幕
2-广播后的打印屏幕
在添加"log4j.logger.org.apache.spark.storage.BlockManager=TRACE"后,附加了广播过程的日志,如这里建议的那样-https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-blockmanager.html
3-打印屏幕广播日志
下面是代码-
val input = "s3://bucketName/pathToFile.csv"
val df = spark.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", ",").load(input)
val df_2 = df_read_for_bc.withColumn("is_exist",lit("true").cast("Boolean"))
val arr_collected = df_2.collect()
val broadcast_map_fraud_locations4 = sc.broadcast(arr_collected)
有什么想法吗?
您可以使用广播变量来加入数据或进行某种操作吗。它可能很懒,所以不使用任何内存