使用Asyncio创建新的Python流程



我正在设置一个函数,以异步启动一个新的过程以运行非常cpu的重型功能。大多数文档都无法彻底涵盖这一点,而且我将其拼凑在一起似乎并没有异步工作。

i具有一个功能procManager,该函数 CC_1具有一个函数,args传递到函数中,而基本记录的对象名称。

async def procManager(f,a,o):
    print(f"{o} started at {time.strftime('%X')}")
    p = Process(target=f, args=(a,))
    p_parent = os.getppid()   # parent process
    p_curr = os.getpid()     # current process
    print("parent process:", p_parent)
    print("current process:", p_curr)
    p.start()
    p.join()
    print(f"{o} finished at {time.strftime('%X')}")
    print("=========")

我具有此CPU重型功能,该功能在网络图上运行了Louvain的社区检测

def community(cg):
    start = timer()
    partition = c.best_partition(cg) #default louvain community detection
    v = {} #create dict to group nodes by community
    for key, value in sorted(partition.items()):
        v.setdefault(value, []).append(key)
    stop = timer()
    print(stop-start)

主函数看起来如此。我正在初始化2个图A和B的3000和1000个节点,平均度为5。我使用的是Jupyter笔记本来运行此功能,因此我使用await main()而不是asyncio.run

A = nx.barabasi_albert_graph(3000,5)  
B = nx.barabasi_albert_graph(1000,5)  
async def main():
    task1 = asyncio.create_task(
        procManager(community, A, "A"))
    task2 = asyncio.create_task(
        procManager(community, B, "B"))
    print("async start")
await main()

我要做的是将A和B处理异步(即同时开始(,但要在不同的过程中进行处理。当前的输出看起来像这样,其中A和B在新过程中处理但正在阻止。我需要以异步的方式对A和B社区进行计算,因为它们会被兔子流触发,并且响应需要非阻止。

async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========

希望你们能提供帮助!

在您的情况下,问题是join()方法。它会阻止该过程完成。另外,您甚至不需要asyncio。看看这个快速示例:

import time
from multiprocessing import Process
def procManager(f,a,o):
    print(f"{o} started at {time.strftime('%X')}")
    p = Process(target=f, args=(a,))
    p.start()
    # p.join()
    print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
    print("=========")
def community(cg):
    for i in range(10):
        print("%s - %s" %(cg, i))
        time.sleep(1)
procManager(community, "This is A", "A")
procManager(community, "This is B", "B")

这应该让您了解如何解决问题。我希望它有帮助!

processPool中运行asyncio的考试之一:

loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound_function, args)
        print('custom process pool', result)

run_in_exacutor((工作正常,如何使用Asyncio

的最佳方法

在Asyncio中,您需要使用asyncio.create_task方法。该方法的诀窍是您只能指定您声明为异步的funciton。为了运行它们,您应该使用await asyncio.gather

示例是:

import asyncio
async def print_hello(name):
    print("Hello! {}".format(name))
name_list = ["billy", "bob", "buffalo bob"]
for item in name_list:
    await asyncio.gather(print_hello(item))

使用Asyncio创建和运行子流程的最简单形式是此处概述的Create_task方法:Asyncio Docs

希望这会有所帮助!

相关内容

  • 没有找到相关文章

最新更新