在芹菜任务中使用Scrapy解析函数(有时可能需要10分钟)时,我得到了这个。
我用:——Django = = 1.6.5——django-celery = = 3.1.16——芹菜= = 3.1.16- psycopg2==2.5.5(我也用了psycopg2==2.5.4)
<>之前[2015-07-19 11:27:49 . 488: CRITICAL/MainProcess]任务myapp。parse_items[63fc40eb-c0d6-46f4-a64e-acce8301d29a] INTERNAL ERROR: InterfaceError('连接已关闭',)回溯(最近一次调用):文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/芹菜/app/trace.py",第284行,在trace_task .py中uuid,检索,成功,请求=task_request,文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/backends/base.py",第248行,在store_result .py中请求=请求,* * kwargs)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/dj芹菜/backends/database.py",第29行,在_store_result .py中回溯=回溯,孩子= self.current_task_children(请求),文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/dj芹菜/managers.py",第42行,在_inner .py中返回fun(*args, **kwargs)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/dj芹菜/managers.py",第181行,在store_result .py中'meta': {'children': children}})文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/dj芹菜/managers.py",第87行,在update_or_create .py中返回get_queryset(自我).update_or_create (* * kwargs)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/dj芹菜/managers.py",第70行,在update_or_create .py中Obj, created = self.get_or_create(**kwargs)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py",第376行,在get_or_create .py中返回self.get(**lookup), False文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py",第304行Num = len(克隆)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py",第77行,__len__self._fetch_all ()文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py",第857行,_fetch_all .py中自我。_result_cache = list(self.iterator())文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py",第220行,在迭代器中对于compiler.results_iter()中的行:文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py",第713行,在results_iter中对于self.execute_sql(MULTI)中的行:文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py",第785行,在execute_sql .py中Cursor = self.connection.cursor()文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py",第160行,在光标中Cursor = self.make_debug_cursor(self._cursor())文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py",第134行,在_cursor .py中返回self.create_cursor ()文件"/home/mo/Work/python/pp -env/local/lib/python2.7/site-packages/django/db/utils.py",第99行,__exit__ .py六。Reraise (dj_exc_type, dj_exc_value, traceback)文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py",第134行,在_cursor .py中返回self.create_cursor ()文件"/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py",第137行,在create_cursor .py中Cursor = self.connection.cursor()接口错误:连接已关闭编辑:不幸的是这是django + psycopg2 +芹菜组合的问题。这是一个老问题,没有解决。
看一看这个线程来理解:https://github.com/celery/django-celery/issues/121
基本上,当芹菜启动一个worker时,它会分叉一个数据库连接从django.db框架。如果这个连接因为某种原因中断了,它不会创建一个新的。芹菜与这个问题无关一旦无法检测数据库连接何时被删除使用django.db库。Django不会在它发生时发出通知,因为它只是启动一个连接,它接收一个wsgi调用(没有连接池)。我在一个大制作中遇到了同样的问题环境中有很多机器工人,有时,这些机器与postgres服务器失去连接。
我解决了这个问题,把每个芹菜主进程在linux下管理处理程序和监视程序,并实现了一个decorator处理psycopg2。接口错误,当它发生时,这个函数调度一个系统调用来强制管理程序优雅地重新启动SIGINT芹菜的过程。
找到了更好的解决方案。我实现了一个像这样的芹菜任务基类:
from django.db import connection
import celery
class FaultTolerantTask(celery.Task):
""" Implements after return hook to close the invalid connection.
This way, django is forced to serve a new connection for the next
task.
"""
abstract = True
def after_return(self, *args, **kwargs):
connection.close()
@celery.task(base=FaultTolerantTask)
def my_task():
# my database dependent code here
我相信它也会解决你的问题。 男人和emanuelcds
我有同样的问题,现在我已经更新了我的代码,并为芹菜创建了一个新的加载器:
from djcelery.loaders import DjangoLoader
from django import db
class CustomDjangoLoader(DjangoLoader):
def on_task_init(self, task_id, task):
"""Called before every task."""
for conn in db.connections.all():
conn.close_if_unusable_or_obsolete()
super(CustomDjangoLoader, self).on_task_init(task_id, task)
当然,如果你使用的是dj芹菜,它也需要这样的设置:
CELERY_LOADER = 'myproject.loaders.CustomDjangoLoader'
os.environ['CELERY_LOADER'] = CELERY_LOADER
我还需要测试,我会更新。
如果您在运行测试时遇到这种情况,那么您可以将测试更改为TransactionTestCase类而不是TestCase,或者添加标记pytest.mark.django_db(transaction=True)
。这使我的db连接从创建pytest-芹菜fixture到数据库调用都是活跃的。
Github issue - https://github.com/Koed00/django-q/issues/167
对于上下文,我在测试中使用pytest-芹菜与celery_app
和celery_worker
作为fixture。我还试图在这些测试中引用的任务中命中测试数据库。
如果有人能解释切换到transaction=True可以保持开放,那就太好了!