每次任务传递给RabbitMQ时,Celery都会断开与RabbitMQ的连接,但是任务最终会成功:
我的问题是:
- 如何解决这个问题? 你对我的芹菜/rabbitmq配置有什么改进建议吗?
芹菜版本:5.1.2RabbitMQ版本:3.9.0Erlang版本:24.0.4
RabbitMQ错误(抱歉日志长度:
** Generic server <0.11908.0> terminating
** Last message in was {'$gen_cast',
{method,{'basic.ack',1,false},none,noflow}}
** When Server state == {ch,
{conf,running,rabbit_framing_amqp_0_9_1,1,
<0.11899.0>,<0.11906.0>,<0.11899.0>,
<<"someIPAddress:45610 -> someIPAddress:5672">>,
undefined,
{user,<<"someadmin">>,
[administrator],
[{rabbit_auth_backend_internal,none}]},
<<"backoffice">>,<<"celery">>,<0.11900.0>,
[{<<"consumer_cancel_notify">>,bool,true},
{<<"connection.blocked">>,bool,true},
{<<"authentication_failure_close">>,bool,true}],
none,0,134217728,1800000,#{},1000000000},
{lstate,<0.11907.0>,true},
none,2,
{1,
{[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,<<"celery">>},
2097}],
[]}},
{state,#{},erlang},
#{<<"None4">> =>
{{amqqueue,
{resource,<<"backoffice">>,queue,<<"celery">>},
true,false,none,[],<0.471.0>,[],[],[],undefined,
undefined,[],[],live,0,[],<<"backoffice">>,
#{user => <<"someadmin">>},
rabbit_classic_queue,#{}},
{false,0,false,[]}}},
#{{resource,<<"backoffice">>,queue,<<"celery">>} =>
{1,{<<"None4">>,nil,nil}}},
{state,none,5000,undefined},
false,1,
{rabbit_confirms,undefined,#{}},
[],[],none,flow,[],
{rabbit_queue_type,
#{{resource,<<"backoffice">>,queue,<<"celery">>} =>
{ctx,rabbit_classic_queue,
{resource,<<"backoffice">>,queue,<<"celery">>},
{rabbit_classic_queue,<0.471.0>,
{resource,<<"backoffice">>,queue,<<"celery">>},
#{}}}},
#{<0.471.0> =>
{resource,<<"backoffice">>,queue,<<"celery">>}}},
#Ref<0.4203289403.2328100865.106387>,false}
** Reason for termination ==
** {function_clause,
[{rabbit_channel,'-notify_limiter/2-fun-0-',
[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,<<"celery">>},
2097},
0],
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
{rabbit_channel,notify_limiter,2,
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{rabbit_channel,ack,2,[{file,"src/rabbit_channel.erl"},{line,2057}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1343}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,644}]},
{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},
{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}
crasher:
initial call: rabbit_channel:init/1
pid: <0.11908.0>
registered_name: []
exception exit: {function_clause,
[{rabbit_channel,'-notify_limiter/2-fun-0-',
[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,
<<"celery">>},
2097},
0],
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
{rabbit_channel,notify_limiter,2,
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{rabbit_channel,ack,2,
[{file,"src/rabbit_channel.erl"},{line,2057}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1343}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,644}]},
{gen_server2,handle_msg,2,
[{file,"src/gen_server2.erl"},{line,1067}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,236}]}]}
in function gen_server2:terminate/3 (src/gen_server2.erl, line 1183)
ancestors: [<0.11905.0>,<0.11903.0>,<0.11898.0>,<0.11897.0>,<0.508.0>,
<0.507.0>,<0.506.0>,<0.504.0>,<0.503.0>,rabbit_sup,
<0.224.0>]
message_queue_len: 0
messages: []
links: [<0.11905.0>]
dictionary: [{channel_operation_timeout,15000},
{process_name,
{rabbit_channel,
{<<"someIPAddress:45610 -> someIPAddress:5672">>,
1}}},
{rand_seed,
{#{jump => #Fun<rand.3.92093067>,
max => 288230376151711743,
next => #Fun<rand.5.92093067>,type => exsplus},
[262257290895536220|242201045588130196]}},
{{xtype_to_module,direct},rabbit_exchange_type_direct},
{permission_cache_can_expire,false},
{msg_size_for_gc,115}]
trap_exit: true
status: running
heap_size: 28690
stack_size: 29
reductions: 67935
neighbours:
Error on AMQP connection <0.11899.0> (someIPAddress:45610 -> someIPAddress:5672, vhost: 'backoffice', user: 'someadmin', state: running), channel 1:
{function_clause,
[{rabbit_channel,'-notify_limiter/2-fun-0-',
[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,<<"celery">>},
2097},
0],
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
{rabbit_channel,notify_limiter,2,
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{rabbit_channel,ack,2,[{file,"src/rabbit_channel.erl"},{line,2057}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1343}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,644}]},
{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},
{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}
supervisor: {<0.11905.0>,rabbit_channel_sup}
errorContext: child_terminated
reason: {function_clause,
[{rabbit_channel,'-notify_limiter/2-fun-0-',
[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,<<"celery">>},
2097},
0],
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
{rabbit_channel,notify_limiter,2,
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{rabbit_channel,ack,2,
[{file,"src/rabbit_channel.erl"},{line,2057}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1343}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,644}]},
{gen_server2,handle_msg,2,
[{file,"src/gen_server2.erl"},{line,1067}]},
{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}
offender: [{pid,<0.11908.0>},
{id,channel},
{mfargs,
{rabbit_channel,start_link,
[1,<0.11899.0>,<0.11906.0>,<0.11899.0>,
<<"someIPAddress:45610 -> someIPAddress:5672">>,
rabbit_framing_amqp_0_9_1,
{user,<<"someadmin">>,
[administrator],
[{rabbit_auth_backend_internal,none}]},
<<"backoffice">>,
[{<<"consumer_cancel_notify">>,bool,true},
{<<"connection.blocked">>,bool,true},
{<<"authentication_failure_close">>,bool,true}],
<0.11900.0>,<0.11907.0>]}},
{restart_type,intrinsic},
{shutdown,70000},
{child_type,worker}]
Non-AMQP exit reason '{function_clause,
[{rabbit_channel,'-notify_limiter/2-fun-0-',
[{pending_ack,1,<<"None4">>,1627738474140,
{resource,<<"backoffice">>,queue,<<"celery">>},
2097},
0],
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1267}]},
{rabbit_channel,notify_limiter,2,
[{file,"src/rabbit_channel.erl"},{line,2124}]},
{rabbit_channel,ack,2,
[{file,"src/rabbit_channel.erl"},{line,2057}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1343}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,644}]},
{gen_server2,handle_msg,2,
[{file,"src/gen_server2.erl"},{line,1067}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,236}]}]}'
supervisor: {<0.11905.0>,rabbit_channel_sup}
errorContext: shutdown
reason: reached_max_restart_intensity
offender: [{pid,<0.11908.0>},
{id,channel},
{mfargs,
{rabbit_channel,start_link,
[1,<0.11899.0>,<0.11906.0>,<0.11899.0>,
<<"someIPAddress:45610 -> someIPAddress:5672">>,
rabbit_framing_amqp_0_9_1,
{user,<<"someadmin">>,
[administrator],
[{rabbit_auth_backend_internal,none}]},
<<"backoffice">>,
[{<<"consumer_cancel_notify">>,bool,true},
{<<"connection.blocked">>,bool,true},
{<<"authentication_failure_close">>,bool,true}],
<0.11900.0>,<0.11907.0>]}},
{restart_type,intrinsic},
{shutdown,70000},
{child_type,worker}]
closing AMQP connection <0.11899.0> (someIPAddress:45610 -> someIPAddress:5672, vhost: 'backoffice', user: 'someadmin')
accepting AMQP connection <0.14133.0> (someIPAddress:57452 -> someIPAddress:5672)
connection <0.14133.0> (someIPAddress:57452 -> someIPAddress:5672): user 'someadmin' authenticated and granted access to vhost 'backoffice'
芹菜日志:
INFO/MainProcess] Task subscribe_task[aae43c55-3396-45f3-8bea-d01a66983835] received
DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7fbd03f32af0> (args:('subscribe_task', 'aae43c55-3396-45f3-8bea-d01a66983835', {'lang': 'py', 'task': 'subscribe_task', 'id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'parent_id': None, 'argsrepr': "(140, 'Subscribe Confirm Email Address')", 'kwargsrepr': '{}', 'origin': 'gen20577@webserver', 'ignore_result': True, 'reply_to': '91cf548f-4e42-3870-b27f-2cc1fd3f7074', 'correlation_id': 'aae43c55-3396-45f3-8bea-d01a66983835', 'hostname': 'worker@webserver', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}, 'args': [140, 'Subscribe Confirm Email Address'], 'kwargs': {}}, '[[140, "Subscribe Confirm Email Address"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
DEBUG/MainProcess] Closed channel #1
DEBUG/MainProcess] Closed channel #2
WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
blueprint.start(self)
File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
c.loop(*c.loop_args())
File "/opt/backoffice/venv/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
next(loop)
File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
cb(*cbargs)
File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/transport/base.py", line 235, in on_readable
reader(loop)
File "/opt/backoffice/venv/lib/python3.8/site-packages/kombu/transport/base.py", line 217, in _read
drain_events(timeout=0)
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/opt/backoffice/venv/lib/python3.8/site-packages/amqp/connection.py", line 665, in _on_close
raise error_for_code(reply_code, reply_text,
amqp.exceptions.InternalError: (0, 0): (541) INTERNAL_ERROR
DEBUG/MainProcess] | Consumer: Restarting event loop...
DEBUG/MainProcess] | Consumer: Restarting Control...
DEBUG/MainProcess] | Consumer: Restarting Tasks...
DEBUG/MainProcess] Canceling task consumer...
DEBUG/MainProcess] | Consumer: Restarting Connection...
DEBUG/MainProcess] | Consumer: Starting Connection
DEBUG/MainProcess] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@webserver', 'copyright': 'Copyright (c) 2007-2021 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.0.4', 'product': 'RabbitMQ', 'version': '3.9.0'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
INFO/MainProcess] Connected to amqp://someAdmin:**@webserver:5672/backoffice
芹菜服务配置:
[Unit]
Description=Celery Service
After=network.target
[Service]
Type=forking
User=DJANGO_USER
Group=DJANGO_USER
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/backoffice
ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES
--pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE}
--loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES
--pidfile=${CELERYD_PID_FILE} --loglevel="${CELERYD_LOG_LEVEL}"'
ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES
--pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE}
--loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
Restart=always
[Install]
WantedBy=multi-user.target
芹菜conf.d:
CELERYD_NODES="worker"
CELERY_BIN="/opt/backoffice/venv/bin/celery"
CELERY_APP="backoffice"
CELERYD_CHDIR="/opt/backoffice/"
CELERYD_MULTI="multi"
CELERYD_OPTS="--time-limit=300 --without-heartbeat --without-gossip --without-mingle"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="DEBUG"
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
CELERYBEAT_DB_FILE="/var/cache/backoffice/celerybeat/celerybeat-schedule.db"
Django celery_config.py:
from django.conf import settings
broker_url = settings.RABBITMQ_BROKER
worker_send_task_event = False
task_ignore_result = True
task_time_limit = 60
task_soft_time_limit = 50
task_acks_late = True
worker_prefetch_multiplier = 10
worker_cancel_long_running_tasks_on_connection_loss = True
同样的问题。尝试了不同的设置,但没有解决方案。
解决方案:将RabbitMQ降级到3.8。降级后,不再有连接错误。所以,我认为这一定与v3.9的不同行为有关。
谢谢Xentux,你让我走上了正确的道路。在寻找降级时,我看到有一个兔子补丁(3.9.1)。Rabbit再次正常工作,没有错误,至少目前是这样。
我在amqp==5.0.8
上有这个错误,通过升级到amqp==5.0.9
来修复:
pip install -U amqp==5.0.9