所以我有两个webscraper,它们从两个不同的来源收集数据。我同时运行它们,以收集特定的数据(例如新冠肺炎数字(。当其中一个函数找到数据时,我希望在不等待另一个函数完成的情况下使用该数据。
到目前为止,我尝试了多处理池模块,并用get((返回结果,但根据定义,我必须等待两个get((都完成,然后才能继续我的代码。我的目标是使代码尽可能简单和简短。
我的webscraper函数可以使用参数运行,并在找到时返回结果。也可以修改它们。
到目前为止,我有等待两个get((完成的代码。
from multiprocessing import Pool
from scraper1 import main_1
from scraper2 import main_2
from twitter import post_tweet
if __name__ == '__main__':
with Pool(processes=2) as pool:
r1 = pool.apply_async(main_1, ('www.website1.com','June'))
r2 = pool.apply_async(main_2, ())
data = r1.get()
data2 = r2.get()
post_tweet("New data is {}".format(data))
post_tweet("New data is {}".format(data2))
从这里我已经看到线程可能是一个更好的选择,因为网络抓取需要大量的等待和很少的解析,但我不确定我将如何实现这一点。
我认为这个解决方案相当简单,但我一整天都在寻找和尝试不同的东西,但没有取得多大成功,所以我想我只想在这里问一下。(我两个月前才开始编程(
一如既往,有很多方法可以完成这项任务。
您已经提到使用Queue
:
from multiprocessing import Process, Queue
from scraper1 import main_1
from scraper2 import main_2
def simple_worker(target, args, ret_q):
ret_q.put(target(*args)) # mp.Queue has it's own mutex so we don't need to worry about concurrent read/write
if __name__ == "__main__":
q = Queue()
p1 = Process(target=simple_worker, args=(main_1, ('www.website1.com','June'), q))
p2 = Process(target=simple_worker, args=(main_2, ('www.website2.com','July'), q))
p1.start()
p2.start()
first_result = q.get()
do_stuff(first_result)
#don't forget to get() the second result before you quit. It's not a good idea to
#leave things in a Queue and just assume it will be properly cleaned up at exit.
second_result = q.get()
p1.join()
p2.join()
您也可以通过使用imap_unordered
并只获取第一个结果来使用Pool
:
from multiprocessing import Pool
from scraper1 import main_1
from scraper2 import main_2
def simple_worker2(args):
target, arglist = args #unpack args
return target(*arglist)
if __name__ == "__main__":
tasks = ((main_1, ('www.website1.com','June')),
(main_2, ('www.website2.com','July')))
with Pool() as p: #Pool context manager handles worker cleanup (your target function may however be interrupted at any point if the pool exits before a task is complete
for result in p.imap_unordered(simple_worker2, tasks, chunksize=1):
do_stuff(result)
break #don't bother with further results
我见过人们在这种情况下使用队列:创建一个队列并将其传递给两个解析器,以便他们将结果放入队列中,而不是返回结果。然后在队列上执行阻塞弹出以检索第一个可用结果。
我已经看到线程可能是更好的选择
几乎正确,但不完全正确。当我们讨论具有大量阻塞I/O的代码时,我想说异步和基于异步的库比线程和多处理要好得多。如果它适用于您的情况,我建议以异步方式重写两个解析器。