Python 多进程池与进程



我是Python多处理的新手。我不太明白池和进程之间的区别。有人可以建议我应该使用哪一个来满足我的需求吗?

我有数千个http GET请求要发送。发送每个并得到响应后,我想存储到(共享(字典的响应(一个简单的 int(。我的最终目标是将字典中的所有数据写入文件。

这根本不是 CPU 密集型的。我所有的目标都是加快发送http GET请求的速度,因为太多了。这些请求都是隔离的,不相互依赖。

在这种情况下,我应该使用池还是进程?

谢谢!

----下面的代码是在 8/28 添加的---

我用多处理编程。我面临的主要挑战是:

1( GET 请求有时会失败。我必须设置 3 次重试,以尽量减少重新运行我的代码/所有请求的需要。我只想重试失败的。我可以在不使用池的情况下通过异步 http 请求实现这一点吗?

2(我想检查每个请求的响应值,并进行异常处理

下面的代码是从我的实际代码简化而来的。它工作正常,但我想知道这是否是最有效的做事方式。谁能给出任何建议?多谢!

def get_data(endpoint, get_params):
response = requests.get(endpoint, params = get_params)
if response.status_code != 200:
raise Exception("bad response for " + str(get_params))
return response.json()
def get_currency_data(endpoint, currency, date):
get_params = {'currency': currency,
'date' : date
}
for attempt in range(3):
try:
output = get_data(endpoint, get_params)
# additional return value check
# ......
return output['value']
except:
time.sleep(1) # I found that sleeping for 1s almost always make the retry successfully
return 'error'
def get_all_data(currencies, dates):
# I have many dates, but not too many currencies
for currency in currencies:
results = []
pool = Pool(processes=20)
for date in dates:
results.append(pool.apply_async(get_currency_data, args=(endpoint, date)))
output = [p.get() for p in results]
pool.close()
pool.join()
time.sleep(10) # Unfortunately I have to give the server some time to rest. I found it helps to reduce failures. I didn't write the server. This is not something that I can control

都不是。使用异步编程。考虑以下直接从该文章中提取的代码(归功于Paweł Miech(

#!/usr/local/bin/python3.5
import asyncio
from aiohttp import ClientSession
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def run(r):
url = "http://localhost:8080/{}"
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with ClientSession() as session:
for i in range(r):
task = asyncio.ensure_future(fetch(url.format(i), session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
print(responses)
def print_responses(result):
print(result)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(4))
loop.run_until_complete(future)

也许可以创建一个 URL 的数组,而不是给定的代码,循环该数组并将每个数组发布给fetch


编辑:使用requests_futures

根据下面的@roganjosh评论,requests_futures是实现这一目标的一种超级简单的方法。

from requests_futures.sessions import FuturesSession
sess = FuturesSession()
urls = ['http://google.com', 'https://stackoverflow.com']
responses = {url: sess.get(url) for url in urls}
contents = {url: future.result().content 
for url, future in responses.items()
if future.result().status_code == 200}

编辑:使用grequests支持Python 2.7

你也可以使用 grequests,它支持 Python 2.7 来执行异步 URL 调用。

import grequests
urls = ['http://google.com', 'http://stackoverflow.com']
responses = grequests.map(grequests.get(u) for u in urls)
print([len(r.content) for r in rs])
# [10475, 250785]

编辑:使用多处理

如果要使用多处理来执行此操作,则可以。免责声明:这样做将产生大量的开销,而且它不会像异步编程那样高效......但这是可能的。

这实际上非常简单,您通过http GET函数映射URL:

import requests
urls = ['http://google.com', 'http://stackoverflow.com']
from multiprocessing import Pool
pool = Pool(8)
responses = pool.map(requests.get, urls)

池的大小将是同时发出的 GET 请求的数量。调整它的大小应该可以提高您的网络效率,但它会增加本地机器上的通信和分叉开销。

同样,我不建议这样做,但这肯定是可能的,如果你有足够的内核,它可能比同步调用更快。

相关内容

  • 没有找到相关文章