pySpark 添加文件选项,在执行器中的工作线程上会发生什么



很明显,为了分发小查找数据,最好使用广播变量。

假设我们在 yarn 客户端模式下从主节点运行 pySpark 代码(火花提交)。因此,应用程序驱动程序将始终在主节点上创建。我们从主节点上的本地路径读取文件。

with open('/tmp/myfile.txt', 'r') as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']

然后我们创建广播 var并使用它:

lookupBC = sc.broadcast(lookup)
output = sc.textFile('/path/to/hdfs/')
.map(lambda e: (lookupBC.value.get(e, e), 1))
.collect()

在我们的例子中,这个 bc var 是在驱动程序(主节点)上创建的,spark 在集群中的所有数据节点之间复制这个 var,其中创建了执行程序,将其保存在这些节点上的内存中。 因此,文件将被读取一次,然后分发给执行程序。

如果我们使用添加文件选项会发生什么?

sc.addFile('/tmp/myfile.txt')
with open(SparkFiles.get('/tmp/myfile.txt')) as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']
output = sc.textFile('/path/to/hdfs/')
.map(lambda e: (lookup.get(e, e), 1))
.collect()

Spark 会将文件'/tmp/myfile.txt'复制到将在其中创建执行程序的每个节点。然后:

  1. 文件将被读取多少次?特定节点上的每个执行程序一次?还是每个任务一次?
  2. 步骤是什么,如何在执行器上处理代码?
  3. 什么使用更好的addFile或bc var?
  4. Spark 会基于 pyspark 代码进行任何优化并创建隐式 bc var 吗?

在执行器日志中,我看到有关 bc vars 的信息,但我在代码中没有使用任何变量:

18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms

广播变量似乎已加载到内存中,直到它们被显式销毁。 相比之下,sc.addFile似乎正在创建一个到磁盘的副本(为每个执行程序)。所以我会猜测SparkFiles.get()每次调用文件都会将文件加载到内存中。

  • 因此,在上面的示例中,它将加载一次。
  • 但是,如果您在.map()中调用SparkFiles.get(),它会尝试为 RDD 中的每个条目重新加载文件。

最后,回答你的问题,

文件将被读取多少次?特定节点上的每个执行程序一次?还是每个任务一次?

取决于,.get在哪里调用,如上所述。

步骤是什么,如何在执行器上处理代码?

我不明白这部分。

什么使用更好的addFile或bc var?

这些是不同的用例。例如,考虑一个案例,我们有一个 1GB sqliteDB 转储。Spark 可以通过 JDBC 连接到此数据库对象。它实际上不需要在内存中加载整个对象。

Spark 会基于 pyspark 代码进行任何优化并创建隐式 bc var 吗?

不确定,但我不这么认为。

最新更新