如何使用不同的参数并行化同一函数?



我必须从每个返回子URL和奖励(整数(值的URL中获取JSON。目的是遍历整个 URL 树并计算奖励值的总和。 我的代码有效,但我正在尝试并行化它。我发现了多处理,但是我将如何使用它一次使用不同的URL来执行自定义的fetch((函数?

def fetch(url):
json_data = requests.get(url).json()
try:
    children = list(json_data['children']) #No duplicate children
    for i in children:
        next_url.append(i)
except:
    print('Tree end')
reward = json_data['reward']
reward_list.append(reward)

如果我理解这个问题,您希望将 url 传递给系统,并在发现新 url 时将其动态馈送回系统。您可以使用任务队列和线程列表来执行此操作。将一个 URL 馈送到队列中,线程将馈送它们发现的 URL 以进行更多处理。

import threading
import queue
def fetch_worker(url_q, reward_list):
    while True:
        try:
            url = url_q.get()
            # controller requests exit
            if url is None:
                return
            # get url data
            json_data = requests.get(url).json()
            # queue more url_q tasks
            for child in json_data.get('children', []): #No duplicate children
                next_url.append(child)
            # add found reward to list
            reward_list .append(json_data['reward'])
        finally:
            url_q.task_done()

def fetch(url):
    NUM_WORKERS = 10  # just a guess
    reward_list = []
    url_q = queue.Queue()
    threads = [threading.Thread(target=fetch_worker, args=(url_q, reward_list))
        for _ in range(NUM_WORKERS)]
    for t in threads:
        t.start()
    url_q.put(url)
    # wait for url and all subordinate urls to process
    url_q.join()
    # kill the workers
    for _ in range(NUM_WORKERS):
        url_q.put(None)
    for t in threads:
        t.join()
    return reward_list

假设你有一个网址列表,并希望获取获得奖励。
我将列表拆分为存储桶并在获取中添加新循环

def fetch(urls, reward_lst):
    for url in urls:
        json_data = requests.get(url).json()
        try:
            children = list(json_data['children']) 
            for i in children:
                next_url.append(i)
        except:
            print('Tree end')
        reward = json_data['reward']
        reward_lst.append(reward)
def run():
    core_num = mp.cpu_count()
    bucket_size = (len(urls)//core_num) + 1
    reward_lst = mp.Manager().list()
    jobs = []
    for i in range(core_num):
        url_bucket = urls[i*bucket_size,(i+1)*bucket_size]
        p = mp.Process(target=fetch, args=(url_bucket,reward_lst,))
        p.start()
        jobs.append(p)
    [p.join() for p in jobs]

相关内容

  • 没有找到相关文章

最新更新