我有大约 20000 篇文章需要翻译,每篇平均长度约为 100 个字符。我正在使用多处理库来加速我的 API 调用。如下所示:
from google.cloud.translate_v2 import Client
from time import sleep
from tqdm.notebook import tqdm
import multiprocessing as mp
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_file
translate_client = Client()
def trans(text, MAX_TRIES=5):
res = None
sleep_time = 1
for i in range(MAX_TRIES):
try:
res = translate_client.translate(text, target_language="en", model="nmt")
error = None
except Exception as error:
pass
if res is None:
sleep(sleep_time) # wait for 1 seconds before trying to fetch the data again
sleep_time *= 2
else:
break
return res["translatedText"]
src_text = # eg. ["this is a sentence"]*20000
with mp.Pool(mp.cpu_count()) as pool:
translated = list(tqdm(pool.imap(trans, src_text), total=len(src_text)))
不幸的是,上面的代码每次在迭代 2828 +/- 5 左右失败(HTTP Error 503: Service Unavailable
(。我希望有一个可变的睡眠时间可以让它重新启动并正常运行。奇怪的是,如果我立即重新启动循环,即使自代码完成执行以来已经过去了 2^4 秒<它也会再次启动而不会出现问题。所以问题是:>
- 我做错
try/except
了吗? - 正在进行多处理以某种方式影响 API。
- 一般想法?
我需要多处理,否则我将等待大约 3 个小时才能完成整个过程。
有些想法,之前尝试过的谷歌 API 只能处理一定数量的并发请求,如果达到限制,服务将返回错误HTTP 503
"Service Unavailable
"和HTTP 403
如果Daily limit is Exceeded
或User Rate Limit
.
尝试使用指数退避实现重试。重试等待时间呈指数级增长的操作,最多已达到最大重试计数。它将提高带宽使用率并最大限度地提高并发环境中请求的吞吐量。
并查看配额和限制页面。
- 指数退避
503错误意味着这个问题在谷歌方面,这让我相信你可能会受到速率限制。正如拉斐尔提到的,响应中是否有Retry-After
标题?我建议查看响应标头,因为它可能会更具体地告诉您发生了什么,并可能为您提供有关如何修复它的信息。
GoogleAPI非常擅长隐藏执行Google翻译的复杂性。不幸的是,如果你进入Google API代码,它使用的是标准的HTTP请求。这意味着,当您运行 20, 000 多个请求时,无论线程池如何,都会有一个巨大的瓶颈。
考虑使用 aiohttp(您需要从 pip 安装(和 asyncio 创建 HTTP 请求。这将允许您运行异步 HTTP 请求。(这意味着您不需要使用google.cloud.translate_v2、多处理或tqdm.notebook(。
只需在asyncio.run((中调用一个 await 方法,该方法就可以创建一个方法数组来预执行aiohttp.session.get((。然后调用asyncio.gather((来收集所有结果。
在下面的示例中,我使用的是 API 密钥 https://console.cloud.google.com/apis/credentials(而不是 Google 应用程序凭据/服务帐户(。
使用带有asyncio和aiohttp的示例,它在30秒内运行并且没有任何错误。(尽管您可能希望将超时延长到会话(。
值得指出的是,谷歌每分钟有600万个字符的限制。您的测试正在执行360,000。因此,如果您在一分钟内运行测试 17 次,您将达到极限!
此外,速度主要由机器而不是Google API决定。(我在具有 3GHz、8 核和 16GB 内存的 PC 上运行了测试(。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
async def main():
statements = ["this is another sentence"]*20000
Logger({'Message': f'Start running Google Translate API for {len(statements)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(statements)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(statements))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
附加测试
初始测试正在运行相同的翻译。因此,我创建了一个测试来检查结果是否未缓存在Google上。我手动将电子书复制到文本文件中。然后在 Python 中,代码打开文件并将文本分组为 100 个字符的数组,然后从数组中获取前 20,000 个项目并翻译每一行。有趣的是,它仍然花了不到30秒的时间。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await aiohttp.session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
def readEbook():
# This is a simple test to make sure response is not cached.
# I grabbed a random online pdf (http://sd.blackball.lv/library/Beginning_Software_Engineering_(2015).pdf) and copied text into notepad.
with open("C:\Dev\ebook.txt", "r", encoding="utf8") as f:
return f.read()
def chunkText(text):
chunk_size = 100
chunks= len(text)
chunk_array = [text[i:i+chunk_size] for i in range(0, chunks, chunk_size)]
formatResults = [x for x in chunk_array if len(x) > 10]
return formatResults[:20000]
async def main():
data = readEbook()
chunk_data = chunkText(data)
Logger({'Message': f'Start running Google Translate API for {len(chunk_data)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(chunk_data)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(chunk_data))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
最后,您可以找到有关Google Translate API HTTP请求 https://cloud.google.com/translate/docs/reference/rest/v2/translate 的更多信息,并且可以通过Postman运行请求。