将任务添加到运行在单独线程中的asyncio循环中



我试图从我的主线程运行一个单独的线程asyncio循环。当线程和循环运行时,我想向它添加新的任务。我有以下代码:

class Craft:
# [...]
async def exec_subscription(self, session, subscription: str, variable_values: dict, callback: Callable) -> None:
"""Execute a subscription on the GraphQL API."""
async for response in session.subscribe(gql(subscription), variable_values=variable_values):
callback(response)
def subscribe_job(self, job_id: int, callback: Callable) -> Union[bool, None]:
"""Subscribe to a job, receive the job information every time it is updated and pass it to a callback function."""
async def _schedule_subscription_task(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Schedule subscription task in asyncio loop using existing websocket connection."""
async with self._ws_client as session:
task = asyncio.create_task(self.exec_subscription(session, subscribe_job, variable_values, callback))
await asyncio.gather(task)

def _run_subscription_loop(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Run asyncio loop."""
asyncio.run(_schedule_subscription_task(subscribe_job, variable_values, callback))
# Build GraphQL subscription
subscribe_job = """
subscription jobSubscription($jobId: Int!) {
jobSubscriptionById(id: $jobId) {
job {...}
}
}
"""
# Build variables dictionary
variable_values = {
'jobId': job_id
}
# Check if subscription thread is running
thread_name = 'CraftSubscriptionThread'
for thread in threading.enumerate():
# If thread is found, add subscription to existing asyncio loop
if thread.name == thread_name:
# Add task to asyncio loop
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(_schedule_subscription_task(subscribe_job, variable_values, callback), loop)
return True
# Else create new event loop and new thread
thread = threading.Thread(name=thread_name, daemon=True, target=_run_subscription_loop, args=(subscribe_job, variable_values, callback))
thread.start()
return True

在Python终端中,我使用以下回调函数运行主方法subscribe_job:

from craft import Craft
global updated_job
updated_job = "0"
def print_job(job):
global updated_job
updated_job = job
print(updated_job)
craft.subscribe_job(1561, print_job)

这工作得很好,当订阅接收到消息时,它在终端中打印作业:

>>> {'jobSubscriptionById': {'job': {'id': 1561, 'spaceId': 1, 'applicationId': 47, 'configurationId': 139, 'title': 'Job 1631357928', 'description': None, 'status': 'failed', 'step': 1, 'progress': '1.00'}}}

然而,当我触发另一个订阅以向循环添加新任务时,似乎什么都没有发生。我简单地触发了一个不同作业的新订阅,如下所示,它应该调用run_coroutine_threadsafe:

craft.subscribe_job(1561, print_job)

我看到你的代码的主要问题是行loop = asyncio.get_event_loop()。此方法获取当前线程中当前正在运行的事件循环。如果一个不存在,它就会创造一个。问题是你第二次调用subscribe,你从主线程调用它,你只在CraftSubscriptionThread线程中启动了一个事件循环,因此get_event_loop创建了一个新的事件循环,甚至没有运行(在你调用get_event_loop()后打印loop.is_running()-它将返回False),解释为什么什么都没有发生。

还要注意,每次调用asyncio.run时都会创建一个新的事件循环,根据您的描述,这听起来不像您想要的。我会避免循环查看CraftSubscriptionThread线程是否正在运行并试图获得事件循环。相反,创建一个带有一个显式事件循环的线程,然后用run_forever启动它。然后,当提交到线程事件循环时,可以像以前一样调用run_coroutine_threadsafe。下面是代码的简化版本,它应该说明如何解决这个问题:

class ThreadedEventLoop(threading.Thread):
def __init__(self):
super().__init__()
self._loop = asyncio.new_event_loop()
self.daemon = True
async def exec_subscription(self, callback: Callable) -> None:
while True: #Simulate the callback firing every two seconds.
await asyncio.sleep(2)
callback('hello!')
async def schedule_subscription_task(self, callback):
await self.exec_subscription(callback)
def submit(self, callback):
asyncio.run_coroutine_threadsafe(self.schedule_subscription_task(callback), self._loop)
def run(self) -> None:
self._loop.run_forever()

def print_job(job):
print('job1: updated_job')

def print_job2(job):
print('job2: updated_job')

threaded_loop = ThreadedEventLoop()
threaded_loop.start()
threaded_loop.submit(print_job)
threaded_loop.submit(print_job2)
threaded_loop.join()

从您提供的代码中我可以看出,subscribe_job()是在主线程中运行的。因此,当您执行这一行时:

loop = asyncio.get_event_loop()

正在检索的循环是主线程中的事件循环。这不是你想要的;你想在次级线程中循环。由于您在下一行中说调用asyncio.run_coroutine_threadsafe时没有发生任何事情,因此我冒昧地猜测您的程序实际上并没有在主线程中启动事件循环。否则,您将看到函数实际运行-但它将在主线程中执行。

需要一些工作来修复这个问题。您需要检索辅助线程的事件循环并将其存储在一个变量中。我猜你的程序的其余部分是如何编写的,但一种方法是在类的构造函数中创建一个成员变量:

def __init__(self):
self.secondary_loop = None

现在在您的嵌套函数async def schedule_subscription_task(...)中添加以下行:

async def schedule_subscription_task(...):
self.secondary_loop = asyncio.get_running_loop()

回到subscribe_job的主要部分,将对run_coroutine_threadsafe的调用更改为:

asyncio.run_coroutine_threadsafe(..., self.secondary_loop)

不可否认这里有一个竞争条件。变量self。Secondary_loop不会在启动辅助线程时立即设置。如果快速连续调用subscribe_job两次,即使实际上已经启动了辅助线程,该变量仍然可能为None。您可能需要在线程创建后添加一个小的时间延迟,它只会运行一次,不应该明显影响程序的性能。

最新更新