系统/依赖项详细信息:
CPU --> 4
requirements --> celery==4.3.0, twisted==19.7.0 , python3.7
下面是我有的芹菜设置
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor, defer
from twisted.web.error import Error
from celery import signals
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('started new thread')
@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
"""
This is invoked when the individual workers shut down. It just stops the twisted reactor
@param kwargs:
@return:
"""
reactor.callFromThread(reactor.stop)
print('REACTOR SHUTDOWN')
def getPage(inp):
print(inp)
return inp
def inThread():
print('inside inthread method')
try:
result = threads.blockingCallFromThread(
reactor, getPage, "http://twistedmatrix.com/")
except Exception as exc:
print(exc)
else:
print(result)
@app.task
def add(x, y):
print('inside add method')
inThread()
return x + y
像下面这样运行芹菜工人:
celery -A run worker --loglevel=info
芹菜启动时的日志:
(2_env) ubuntu@gpy:~/app/env/src$ celery -A run worker --loglevel=info
[tasks]
. run.add
[2020-04-09 07:25:29,357: WARNING/Worker-1] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-4] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-3] started new thread
[2020-04-09 07:25:29,364: WARNING/Worker-2] started new thread
[2020-04-09 07:25:29,367: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
调用方法如下:
>>> run.add.delay(1,2)
<AsyncResult: d41680fd-7cc1-4e75-81be-6496bad0cc16>
>>>
有时我能看到它工作得很好
[2020-04-09 07:27:17,998: INFO/MainProcess] Received task: run.add[00934769-48c4-48b8-852c-8b746bdd5e03]
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside add method
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside inthread method
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: INFO/MainProcess] Task run.add[00934769-48c4-48b8-852c-8b746bdd5e03] succeeded in 0.00144551398989s: 3
有时我会看到它无法调用getPage
方法,并像下面的日志一样挂起
[2020-04-09 07:27:22,318: INFO/MainProcess] Received task: run.add[d41680fd-7cc1-4e75-81be-6496bad0cc16]
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside add method
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside inthread method
在Thread
中使用reactor.run
有什么问题吗?
更新
我把印刷品放入*twisted.internet.threads.blockingCallFromThread*
中。
def blockingCallFromThread(reactor, f, *a, **kw):
queue = Queue.Queue()
def _callFromThread():
print('inside _callFromThread')
result = defer.maybeDeferred(f, *a, **kw)
result.addBoth(queue.put)
print('before calling _callFromThread')
reactor.callFromThread(_callFromThread)
print('after calling _callFromThread')
result = queue.get()
if isinstance(result, failure.Failure):
result.raiseException()
return result
我可以看到,只有当reactor.callFromThread(_callFromThread)
中没有调用_callFromThread
方法时,芹菜工作者才会挂起,但当我用CTRL + c
手动停止工作者时,我可以调用它。
每次我在挂起作业的地方停下工人,它就会开始处理作业。
更新:2020年4月27日
如果我用钩针来运行扭曲的反应堆,问题就解决了。我更新了以下功能。
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
from crochet import setup
setup()
print('started new thread')
您似乎已经非常小心了,您可以在一个线程中运行Twisted reactor。然而,您将无法在多个线程中运行它,我想这就是当您将它与Celery一起使用时所发生的情况。它同时具有实例和全局状态,如果它在多个线程中运行,就会被践踏。
相反,可以尝试使用钩针来协调对在单个非主线程中运行的反应器的调用,这些调用来自任意多个其他线程。