我如何在芹菜任务中启动的Asyncio共同公路上利用Django模型



我已经重构了一些Django代码来进行一些Web刮擦。我为每个用户启动一个单独的芹菜任务,我正在为数据刮擦。在每个芹菜任务中,我都使用Asyncio和AioHTTP为给定用户进行刮擦。

我可以访问我的所有Django模型类和方法,但是一旦我做了触发实际数据库查询的事情,我就会收到一个错误:

...
[2019-02-16 18:04:38,126] WARNING log /home/chrisadmin/anaconda3/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: OperationalError('SSL SYSCALL error: Bad file descriptorn',):
Traceback (most recent call last):
  File "/home/chrisadmin/anaconda3/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
psycopg2.OperationalError: SSL SYSCALL error: Socket operation on non-socket
...

在芹菜任务中,我可以做一些导致Django与数据库进行交互的事情,只要它们不涉及Asyncio即可。同样,只要这些异步任务依次从芹菜任务中启动。

,我就可以在异步任务中成功与数据库进行互动。

如果我设置了CELERY_TASK_ALWAYS_EAGER=True,我不会得到例外,但是在这种情况下,芹菜任务当然不会同时运行。

用于刮擦单个用户,Asyncio/aiohttp足够了。但是我想使用芹菜能够在过程/计算机上扩展并并行刮擦多个用户。以前,我曾经尝试过专门使用芹菜,但我试图与Asyncio/aiohttp进行重构,以减少没有必要的地方的开销。

我希望能够使用芹菜并行启动为多个用户启动刮擦,然后在每个芹菜任务中,我希望能够刮擦各个用户,包括通过Django模型/方法保存其刮擦数据。/p>

在进行进一步的研究之后,在组合异步和芹菜时,数据库连接和可能的线程安全似乎存在问题。

似乎到目前为止工作的解决方案是创建一个新功能:

from django.db import connections
from django.conf import settings

def reset_db_connections():
    if not settings.CELERY_TASK_ALWAYS_EAGER:
        connections.close_all()

我将此功能称为任何芹菜任务的第一行:

@shared_task(bind=True)
def my_celery_task(self, args):
    reset_db_connections()
    # do stuff
    # call stuff that uses asyncio

到目前为止,这似乎允许我的代码工作,无论我对CELERY_TASK_ALWAYS_EAGER的设置有什么设置。

最初而不是connections.close_all()我尝试过:

 for conn in db.connections.all():
     if conn.connection.closed != 0:
         conn.connection.close()

但这导致了我没有关闭连接的连接和/或我正在关闭已经关闭的连接的错误。

作为上述解决方案的替代方法,我发现更改设置:

DATABASES = {"default": dj_database_url.config(conn_max_age=600)}

to

DATABASES = {"default": dj_database_url.config(conn_max_age=0)}

也解决了问题。但是,据我了解,设置conn_max_age=0将对每个DB操作都使用新连接,这似乎不是一个好主意。使用上面的reset_db_connections()方法,我可以离开conn_max_age=600

相关内容

  • 没有找到相关文章

最新更新