共享异步.与另一个线程或进程一起排队



我最近将我的旧模板匹配程序转换为 asyncio,我遇到一种情况,我的一个协程依赖于阻塞方法 ( processing_frame )。

每当调用该方法analyze_frame)的协程从共享asyncio.Queue()获取项目时,我想在单独的线程或进程中运行该方法

我不确定这是否可能或值得性能明智,因为我在线程和多处理方面的经验很少

import cv2
import datetime
import argparse
import os
import asyncio
#   Making CLI
if not os.path.exists("frames"):
    os.makedirs("frames")
t0 = datetime.datetime.now()
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", required=True,
                help="path to our file")
args = vars(ap.parse_args())
threshold = .2
death_count = 0
was_found = False
template = cv2.imread('youdied.png')
vidcap = cv2.VideoCapture(args["video"])
loop = asyncio.get_event_loop()
frames_to_analyze = asyncio.Queue()

def main():
    length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    tasks = []
    for _ in range(int(length / 50)):
        tasks.append(loop.create_task(read_frame(50, frames_to_analyze)))
        tasks.append(loop.create_task(analyze_frame(threshold, template, frames_to_analyze)))
    final_task = asyncio.gather(*tasks)
    loop.run_until_complete(final_task)
    dt = datetime.datetime.now() - t0
    print("App exiting, total time: {:,.2f} sec.".format(dt.total_seconds()))
    print(f"Deaths registered: {death_count}")

async def read_frame(frames, frames_to_analyze):
    global vidcap
    for _ in range(frames-1):
        vidcap.grab()
    else:
        current_frame = vidcap.read()[1]
    print("Read 50 frames")
    await frames_to_analyze.put(current_frame)

async def analyze_frame(threshold, template, frames_to_analyze):
    global vidcap
    global was_found
    global death_count
    frame = await frames_to_analyze.get()
    is_found = processing_frame(frame)
    if was_found and not is_found:
        death_count += 1
        await writing_to_file(death_count, frame)
    was_found = is_found

def processing_frame(frame):
    res = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
    max_val = cv2.minMaxLoc(res)[1]
    is_found = max_val >= threshold
    print(is_found)
    return is_found

async def writing_to_file(death_count, frame):
    cv2.imwrite(f"frames/frame{death_count}.jpg", frame)
if __name__ == '__main__':
    main()

我尝试使用不同步,但没有取得多大成功
我会得到类似的东西

与self._rlock:
权限错误: [WinError 5] 访问被拒绝

如果processing_frame是一个阻塞函数,你应该用await loop.run_in_executor(None, processing_frame, frame)调用它。这会将函数提交到线程池,并允许事件循环继续执行其他操作,直到调用函数完成。

呼叫也是如此,例如 cv2.imwrite .如前所述,writing_to_file并不是真正的异步,尽管用async def定义。这是因为它不等待任何东西,所以一旦它的执行开始,它就会继续到最后,而不会暂停。在这种情况下,人们也可以首先让它成为一个正常的功能,以明确正在发生的事情。

最新更新