在异步 python 中运行 sqlite 插入操作



我正在尝试在python3.6中异步执行许多阻塞任务。所有阻塞任务都将数据存储在SQLITE3(peewee orm(中。虽然执行这些任务偶尔会给我一个 sqlite3 的数据库块错误。我正在使用sanic,Peewee和python3.6。

代码中的任何工作或改进,以阻止此DBBlock错误。

#sqlite3_store_func is a blocking sqlite3 store function, which insert #data into the database
async def function_a(config, loop, executor):
    ins = SomesyncClass(path, config)
    ##this gives a list of data
    _, purchases = await ins.parse()
    #Blocking functions should not be called directly. 
    #For example, if a function blocks for 1 second, other tasks are
    # delayed by 1 second which can have an important impact on
    # reactivity.
    # An executor can be used to run a task in a different thread or 
    #even in a different process, to not block the thread of the 
    #event loop. See the AbstractEventLoop.run_in_executor() method. 
    await asyncio.wait(
            fs=[loop.run_in_executor(executor,  
                functools.partial(sqlite3_store_func, **purchase)) for purchase in purchases],
            return_when=asyncio.ALL_COMPLETED)
    return

async def parse(config, loop, executor):
    await asyncio.gather(*[
                function_a(config, loop, executor), 
                function_b(config, loop, executor),
                ])
    logger.info('Periodic task has finished execution')
@SANIC_BLUEPRINT.post('parse')
async def parse(request):
    """
    To get all the assets created by the requester
    """
    request.app.config.VALIDATE_FIELDS(["path"], request.json)
    #loop = asyncio.get_event_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    ##task which is non blocking
    request.app.add_task(parse_takeout(request.app.config, request.app.loop, executor))
    return response.json({"error": True, "success": False})

有几件事需要注意。

Sqlite 嵌入到 python 进程中。它不是您通过套接字与之通信的单独服务器,因此您已经无法利用事件循环来异步查询 sqlite db。

这样您就可以使用线程池了。众所周知,sqlite 使用全局写锁,因此在任何给定时间只有一个连接可以写入数据库。这意味着,如果您使用的是线程池,则需要在写入周围放置互斥锁,对所有写入使用专用线程,或者正常处理获取锁的失败。

最新更新