我正在使用Python
和RabbitMQ
和Celery
将任务分配给工作人员。每个任务大约需要 15 分钟,并且 99% 的 CPU 受限。我的系统有 24 个内核,每当我的工作线程执行此任务时,我都会收到与代理的连接错误。
[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
[...]
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
我发现了其他几个有这个问题的帖子,但没有一个修复它。尤其是在 CPU 负载繁重的情况下,知道我该如何解决这个问题吗?
视窗 10 (工作人员(
macOS 10.14 (RabbitMQ Server(
蟒蛇3.7
芹菜4.3.0(大黄(
RabbitMQ3.7.16(Erlang22.0.7(
我的配置让worker一次只消耗1个任务,甚至每个作业后工作进程都会重新启动,仍然没有运气:
CELERYD_MAX_TASKS_PER_CHILD = 1,
CELERYD_CONCURRENCY = 1,
CELERY_TASK_RESULT_EXPIRES=3600,
CELERYD_PREFETCH_MULTIPLIER = 1,
CELERY_MAX_CACHED_RESULTS = 1,
CELERY_ACKS_LATE = True,
这是整个调用堆栈:
[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:Python37libsite-packagesceleryworkerconsumerconsumer.py", line 318, in start
blueprint.start(self)
File "C:Python37libsite-packagescelerybootsteps.py", line 119, in start
step.start(parent)
File "C:Python37libsite-packagesceleryworkerconsumerconsumer.py", line 596, in start
c.loop(*c.loop_args())
File "C:Python37libsite-packagesceleryworkerloops.py", line 118, in synloop
qos.update()
File "C:Python37libsite-packageskombucommon.py", line 442, in update
return self.set(self.value)
File "C:Python37libsite-packageskombucommon.py", line 435, in set
self.callback(prefetch_count=new_value)
File "C:Python37libsite-packagesceleryworkerconsumertasks.py", line 47, in set_prefetch_count
apply_global=qos_global,
File "C:Python37libsite-packageskombumessaging.py", line 558, in qos
apply_global)
File "C:Python37libsite-packagesamqpchannel.py", line 1853, in basic_qos
wait=spec.Basic.QosOk,
File "C:Python37libsite-packagesamqpabstract_channel.py", line 68, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "C:Python37libsite-packagesamqpabstract_channel.py", line 88, in wait
self.connection.drain_events(timeout=timeout)
File "C:Python37libsite-packagesamqpconnection.py", line 504, in drain_events
while not self.blocking_read(timeout):
File "C:Python37libsite-packagesamqpconnection.py", line 509, in blocking_read
frame = self.transport.read_frame()
File "C:Python37libsite-packagesamqptransport.py", line 252, in read_frame
frame_header = read(7, True)
File "C:Python37libsite-packagesamqptransport.py", line 438, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
我找到了这个问题的解决方案。我觉得问题出在芹菜后端。就我而言,我正在使用 redis。
以下是我的配置
Broker - rabbitmq
Backend - redis
Python - 3.7
OS - Windows 10
在芹菜客户端,我尝试从客户端每 60 秒 ping 一次工人的芹菜状态。在这种情况下,我没有遇到连接重置问题。
while not doors_res.ready():
sleep(60)
result = app.get()
其中应用是芹菜实例。
在芹菜工人方面
celery worker -A <celery_file_name> -l info -P gevent
我的任务运行了大约 2 个小时,我没有遇到连接重置错误。