我已经重构了一些Django代码来进行一些Web刮擦。我为每个用户启动一个单独的芹菜任务,我正在为数据刮擦。在每个芹菜任务中,我都使用Asyncio和AioHTTP为给定用户进行刮擦。
我可以访问我的所有Django模型类和方法,但是一旦我做了触发实际数据库查询的事情,我就会收到一个错误:
...
[2019-02-16 18:04:38,126] WARNING log /home/chrisadmin/anaconda3/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: OperationalError('SSL SYSCALL error: Bad file descriptorn',):
Traceback (most recent call last):
File "/home/chrisadmin/anaconda3/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
return self.cursor.execute(sql, params)
psycopg2.OperationalError: SSL SYSCALL error: Socket operation on non-socket
...
在芹菜任务中,我可以做一些导致Django与数据库进行交互的事情,只要它们不涉及Asyncio即可。同样,只要这些异步任务依次从芹菜任务中启动。
,我就可以在异步任务中成功与数据库进行互动。如果我设置了CELERY_TASK_ALWAYS_EAGER=True
,我不会得到例外,但是在这种情况下,芹菜任务当然不会同时运行。
用于刮擦单个用户,Asyncio/aiohttp足够了。但是我想使用芹菜能够在过程/计算机上扩展并并行刮擦多个用户。以前,我曾经尝试过专门使用芹菜,但我试图与Asyncio/aiohttp进行重构,以减少没有必要的地方的开销。
我希望能够使用芹菜并行启动为多个用户启动刮擦,然后在每个芹菜任务中,我希望能够刮擦各个用户,包括通过Django模型/方法保存其刮擦数据。/p>
在进行进一步的研究之后,在组合异步和芹菜时,数据库连接和可能的线程安全似乎存在问题。
似乎到目前为止工作的解决方案是创建一个新功能:
from django.db import connections
from django.conf import settings
def reset_db_connections():
if not settings.CELERY_TASK_ALWAYS_EAGER:
connections.close_all()
我将此功能称为任何芹菜任务的第一行:
@shared_task(bind=True)
def my_celery_task(self, args):
reset_db_connections()
# do stuff
# call stuff that uses asyncio
到目前为止,这似乎允许我的代码工作,无论我对CELERY_TASK_ALWAYS_EAGER
的设置有什么设置。
最初而不是connections.close_all()
我尝试过:
for conn in db.connections.all():
if conn.connection.closed != 0:
conn.connection.close()
但这导致了我没有关闭连接的连接和/或我正在关闭已经关闭的连接的错误。
作为上述解决方案的替代方法,我发现更改设置:
DATABASES = {"default": dj_database_url.config(conn_max_age=600)}
to
DATABASES = {"default": dj_database_url.config(conn_max_age=0)}
也解决了问题。但是,据我了解,设置conn_max_age=0
将对每个DB操作都使用新连接,这似乎不是一个好主意。使用上面的reset_db_connections()
方法,我可以离开conn_max_age=600
。