在Hadoop中使用PySpark进行多处理



我当前的python脚本在下面,并在本地运行,具有6个CPU内核。

我可以将其转换为 PySpark 并在 Hadoop 集群中运行它,在那里我可以指定要500cores吗?

我有一千个文件要下载,所以我想尽可能多地启动并行进程。

另请注意,我的函数正在尝试连接到数据源并下载数据文件。

pool = Pool(cores=5)
pool.map(myfunction, arg_list)
pool.close()
def myfunction(args)
(url,account,password,output_file_name) = args
page = requests.get(url, auth=HTTPBasicAuth(account,password), timeout=None)
tables = lh.document_fromstring(page.content)
rows = tables.xpath('//table[@id="Report1_Grid1"]/tr')
body = [[td.text for td in rows[rowindex]] for rowindex in range(1, len(rows))]           
tbl = pd.DataFrame(body)
pd.DataFrame.to_csv(output_file_name)

只是在Hadoop集群上运行它不会真正改变任何东西。我在这里假设您当前运行的 PC 和集群位于同一网络中,而您从中下载的服务器不是。然后,您可能会花费大部分时间等待字节通过网络物理到达,并且投入更多的计算能力不会改变任何事情。

如果你的下行链路已经饱和,你就无法真正改变任何东西。如果您有多个位置(可能有多个数据中心),则可以在每个位置运行一个下载器,进行处理,然后在单个位置进行聚合。

如果服务器的上行链路是问题所在(或者服务器只是不想给你更多的字节/秒),并且你想从多个服务器下载(或者如果你有多个连接,服务器被"欺骗"给你更多的字节/秒),你可以简单地运行更多的请求,但同样你真的不需要集群。只需增加池大小,在此 IO 情况下,一个请求不会使一个 CPU 内核饱和。如果进程开销降低了性能,您可以尝试类似 grequest 的东西。

还可能发生的是,您的网卡或您插入其中的以太网电缆已饱和,但您的网络实际上可以为您提供更多的下行链路。在这种情况下,在Hadoop集群上运行实际上会提高速度,但是占用Hadoop集群而不实际使用计算能力并不是一件好事。只需找到一个(或 2 个或 3 个)连接良好的服务器并从那里下载,可能带有 grequests 或更大的池。

相关内容

  • 没有找到相关文章