无法将芹菜服务器连接到本地主机上的 RabbitMQ



我使用Celery和RabbitMQ作为消息队列,每个消息队列都封装在自己的Docker映像中。当它们使用Docker中的--link参数连接时,一切都很好。这个设置我已经用了一段时间了。我想将它们分开,以便它们在不同的主机上运行,因此我不能再为此使用--link参数。当我尝试使用AMQP连接时,我得到了一个gaierror: [Errno -2] Name or service not known,但不明白为什么。

服务器只是在DockerHub:上使用rabbitmq容器

docker run --rm --name=qrabbit -p 5672:5672 rabbitmq

我可以成功地telnet到此:

$ telnet 192.168.99.100 5672
Trying 192.168.99.100...
Connected to 192.168.99.100.
Escape character is '^]'.
abc
^D
AMQP    Connection closed by foreign host.
$

所以我知道服务器正在运行。

我的客户是这样的:

import os
from logging import getLogger, StreamHandler, DEBUG
from serverlib import QueueServer, CeleryMonitor
from celery import Celery
from argparse import ArgumentParser
log = getLogger('server')
log.addHandler(StreamHandler())
log.setLevel(DEBUG)
broker_service_host = os.environ.get('MESSAGE_QUEUE_SERVICE_SERVICE_HOST')
broker = 'amqp://{0}'.format(broker_service_host)
host = ''
port = 8000
retry = 5
if __name__ == '__main__':
    log.info('connecting to {0}, {1}:{2}, retry={3}'.format(broker, host, port, retry))
    app = Celery(broker=broker)
    monitor = CeleryMonitor(app, retry=retry)
    server = QueueServer((host, port), app)
    monitor.start()
    try:
        log.info('listening on {0}:{1}'.format(host, port))
        server.serve_forever()
    except KeyboardInterrupt:
        log.info('shutdown requested')
    except BaseException as e:
        log.error(e)
    finally:
        monitor.shutdown()

我有点确定外部模块(QueueServer和CeleryMonitor)不是问题的一部分,因为当我执行以下操作时,它可以正常运行:

 $ docker run --rm --name=qmaster -e "MESSAGE_QUEUE_SERVICE_SERVICE_HOST=localhost" --link qrabbit:rabbit -p 80:8000 render-task-master
connecting to amqp://localhost, :8000, retry=5
listening on :8000
^Cshutdown requested
$

但如果我执行以下操作(没有--link参数),则不会:

$  docker run --rm --name=qmaster -e "MESSAGE_QUEUE_SERVICE_SERVICE_HOST=localhost" -p 80:8000 render-task-master
connecting to amqp://localhost, :8000, retry=5
listening on :8000
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/home/celery/serverlib/celerymonitor.py", line 68, in run
    '*': self.__state.event
  File "/usr/local/lib/python2.7/site-packages/celery/events/__init__.py", line 287, in __init__
    self.channel = maybe_channel(channel)
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 1054, in maybe_channel
    return channel.default_channel
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection
    conn = self.Connection(**opts)
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = self.Transport(host, connect_timeout, ssl)
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 186, in Transport
    return create_transport(host, connect_timeout, ssl)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 299, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 75, in __init__
    socket.SOCK_STREAM, SOL_TCP):
gaierror: [Errno -2] Name or service not known
^Cshutdown requested
$

使用和不使用可能导致此错误的--link参数有什么区别?

更新:

我已经把它缩小到我创建的监视器类中的一个错误:

recv = self.app.events.Receiver(connection, handlers={
    'task-received': self.registerTask,
    'task-failed': self.retryTask,
    'task-succeeded': self.deregisterTask,
    # should process all events to have state up to date
    '*': self.__state.event 
})

当调用它时,它会停留几秒钟(超时?),然后抛出一个异常。知道为什么这不喜欢指定为amqp://localhost的amqp URL,但当我使用--link参数时,一切都正常吗?

以下是调用的整个方法,用于其他上下文:

def run(self):
    log.info('run')
    self.__state = self.app.events.State()
    with self.app.connection() as connection:
        log.info('got a connection')
        recv = self.app.events.Receiver(connection, handlers={
            'task-received': self.registerTask,
            'task-failed': self.retryTask,
            'task-succeeded': self.deregisterTask,
            # should process all events to have state up to date
            '*': self.__state.event 
        })
        log.info('received receiver')
        # Capture until shutdown requested
        while not self.__shutdown:
            log.info('main run loop')
            try:
                recv.capture(limit=None, timeout=1, wakeup=True)
            except timeout:
                # timeout exception is fired when nothing occurs
                # during timeout. Just ignore it.
                pass

我发现了问题:我在容器运行的Docker环境中设置了CELERY_BROKER_URL,这导致后端尝试连接到不存在的主机名。一旦我取消设置变量,所有的东西都会在我的环境中正常连接。

$ docker inspect server
<... removed ...>
        "Env": [
        "MESSAGE_QUEUE_SERVICE_SERVICE_HOST=192.168.99.100",
        "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
        "LANG=C.UTF-8",
        "PYTHON_VERSION=2.7.10",
        "PYTHON_PIP_VERSION=7.1.2",
        "CELERY_VERSION=3.1.18",
        "CELERY_BROKER_URL=amqp://guest@rabbit"
    ],
<... removed ...>

相关内容

  • 没有找到相关文章

最新更新