'./manage.py runserver' 在运行芹菜地图/减少任务时重新启动;有时会引发错误inner_run



我在我的 django 项目中有一个视图,它触发了一个芹菜任务。芹菜任务本身通过子进程/结构触发一些map/reduce作业,Hadoop作业的结果存储在磁盘上---数据库中实际上没有存储任何东西。在 hadoop 作业完成后,芹菜任务会发送一个 django 信号,表明它已完成,如下所示:

# tasks.py
from models import MyModel
import signals
from fabric.operations import local
from celery.task import Task
class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )
    def hadoopify_function(self, my_model, other_args):
        local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")

真正让我困惑的是,当芹菜任务运行时,django runserver 正在重新加载,就好像我在 django 项目中的某个地方更改了一些代码(我没有,我可以向你保证!有时,这甚至会导致 runserver 命令中出现错误,在 runserver 命令重新加载并且再次正常之前,我会看到如下所示的输出(注意:此错误消息与此处描述的问题非常相似)。

Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
    if not enabled():
TypeError: 'NoneType' object is not callable
Original exception was:
Traceback (most recent call last):
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
    run(addr, int(port), handler)
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
    httpd.serve_forever()
  File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
    r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'

我已经通过将local("""hadoop ...""")替换为local("ls")将问题缩小到调用 hadoop 时,这不会在重新加载 django runserver 时造成任何问题。Hadoop代码中没有错误---当它不被芹菜调用时,它可以自行运行良好。

知道可能导致这种情况的原因吗?

因此,在深入研究了 fabric 源代码之后,我了解到 django 正在重新加载,因为我在 fabric.operations.local 命令中运行的芹菜任务失败了(这在 Hadoop输出 puke-fest 中很难检测到)。当 fabric.operations.local 命令失败时,fabric 会发出 sys.exit,导致芹菜死亡,django 尝试重新加载。可以通过在Hadoop任务中捕获SystemExit来检测此错误,如下所示:

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )
    def hadoopify_function(self, my_model, other_args):
        try:
            local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")
        except SystemExit, e:
            # print some useful debugging information about exception e here!
            raise

在 fabric github 页面上有一些关于这个问题的讨论 这里, 这里 和 这里.引发错误的另一种选择是使用设置上下文管理器:

from fabric.api import settings
class Hadoopification(Task):
    ...
    def hadoopify_function(self, my_model, other_args):
        with settings(warn_only=True):
            result = local(...)
        if result.failed:
            # access result.return_code, result.stdout, result.stderr
            raise UsefulException(...)

这样做的优点是允许访问返回代码和结果上的所有其他属性。

我的猜测是,芹菜和织物中的任务名称都存在一些冲突。我建议使用更像这样的东西:

import celery
class Hadoopification(celery.task.Task):
    ...

并尽量避免任何进一步的碰撞,如果这种预感是好的。

但实际上,织物的本地是相当不错的,本质上只是一个子过程。Popen,你可以尝试调用 raw 来分离除 python stdlib 之外的任何内容。

最新更新