我正在尝试在python3.6中异步执行许多阻塞任务。所有阻塞任务都将数据存储在SQLITE3(peewee orm(中。虽然执行这些任务偶尔会给我一个 sqlite3 的数据库块错误。我正在使用sanic,Peewee和python3.6。
代码中的任何工作或改进,以阻止此DBBlock错误。
#sqlite3_store_func is a blocking sqlite3 store function, which insert #data into the database
async def function_a(config, loop, executor):
ins = SomesyncClass(path, config)
##this gives a list of data
_, purchases = await ins.parse()
#Blocking functions should not be called directly.
#For example, if a function blocks for 1 second, other tasks are
# delayed by 1 second which can have an important impact on
# reactivity.
# An executor can be used to run a task in a different thread or
#even in a different process, to not block the thread of the
#event loop. See the AbstractEventLoop.run_in_executor() method.
await asyncio.wait(
fs=[loop.run_in_executor(executor,
functools.partial(sqlite3_store_func, **purchase)) for purchase in purchases],
return_when=asyncio.ALL_COMPLETED)
return
async def parse(config, loop, executor):
await asyncio.gather(*[
function_a(config, loop, executor),
function_b(config, loop, executor),
])
logger.info('Periodic task has finished execution')
@SANIC_BLUEPRINT.post('parse')
async def parse(request):
"""
To get all the assets created by the requester
"""
request.app.config.VALIDATE_FIELDS(["path"], request.json)
#loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
##task which is non blocking
request.app.add_task(parse_takeout(request.app.config, request.app.loop, executor))
return response.json({"error": True, "success": False})
有几件事需要注意。
Sqlite 嵌入到 python 进程中。它不是您通过套接字与之通信的单独服务器,因此您已经无法利用事件循环来异步查询 sqlite db。
这样您就可以使用线程池了。众所周知,sqlite 使用全局写锁,因此在任何给定时间只有一个连接可以写入数据库。这意味着,如果您使用的是线程池,则需要在写入周围放置互斥锁,对所有写入使用专用线程,或者正常处理获取锁的失败。