使用asyncio在Python中进行a/b测试



假设有一些API已经在生产中运行,并且您创建了另一个API,您有点想使用正在访问生产API的传入请求对其进行A/B测试。现在我想知道,有没有可能做这样的事情,(我知道有人通过保留两个不同的API版本进行A/B测试等来进行流量拆分(

一旦收到生产api的传入请求,您就向新的api发出异步请求,然后继续执行生产api的其余代码,然后,在将最终响应返回给调用方之前,检查是否已计算出之前创建的异步任务的结果。如果它可用,那么您将返回它,而不是当前的API。

我想知道,做这样的事情最好的方法是什么?我们是想为这个或其他东西写一个装饰器吗?我有点担心,如果我们在这里使用async,可能会发生很多边缘情况。有人对改进代码或整个方法有什么建议吗?

谢谢你抽出时间!


上述方法的一些伪代码,

import asyncio
def call_old_api():
pass
async def call_new_api():
pass
async def main():
task = asyncio.Task(call_new_api())
oldResp = call_old_api()
resp = await task
if task.done():
return resp
else:
task.cancel() # maybe
return oldResp
asyncio.run(main())

您不能只在asyncio的协程中执行call_old_api()。这里有详细的解释。请确保您理解它,因为根据您的服务器的工作方式,您可能无法执行您想要的操作(例如,在同步服务器上运行异步API,以保留编写异步代码的要点(。

如果你知道你在做什么,并且你有一个异步服务器,你可以在线程中调用旧的同步API,并使用一个任务来运行新的API:

task = asyncio.Task(call_new_api())
oldResp = await in_thread(call_old_api())
if task.done():
return task.result()  # here you should keep in mind that task.result() may raise exception if the new api request failed, but that's probably ok for you
else:
task.cancel() # yes, but you should take care of the cancelling, see - https://stackoverflow.com/a/43810272/1113207
return oldResp

我认为你可以更进一步,而不是总是等待旧的API完成,你可以同时运行两个API并返回第一个完成的API(以防新的API比旧的API工作得更快(。有了上面的所有检查和建议,它应该看起来像这样:

import asyncio
import random
import time
from contextlib import suppress

def call_old_api():
time.sleep(random.randint(0, 2))
return "OLD"

async def call_new_api():
await asyncio.sleep(random.randint(0, 2))
return "NEW"

async def in_thread(func):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func)

async def ensure_cancelled(task):
task.cancel()
with suppress(asyncio.CancelledError):
await task

async def main():
old_api_task = asyncio.Task(in_thread(call_old_api))
new_api_task = asyncio.Task(call_new_api())
done, pending = await asyncio.wait(
[old_api_task, new_api_task], return_when=asyncio.FIRST_COMPLETED
)
if pending:
for task in pending:
await ensure_cancelled(task)
finished_task = done.pop()
res = finished_task.result()
print(res)

asyncio.run(main())

最新更新