如何将芹菜整合到烧瓶 + redis 中?



我最近下载了一个docker应用程序并开始修改它。我正在尝试实现一些线程函数,但它们似乎阻碍了服务器的初始化。我四处询问,似乎芹菜可能是我的最佳选择,因为我可以设置一个队列并让它在不中断应用程序本身的情况下执行我的功能。

我修改了 bootstrap/Firehouse.py 以运行 celery 函数,因为它是在服务器初始化调用的。所以现在 Firehouse.py 看起来像这样

import datetime
import json
from flask.json import jsonify
from sqlalchemy import desc
import cache
from shared import app, db
from model.Board import Board
from model.BoardListCatalog import BoardCatalog
from model.Post import render_for_catalog
from model.Thread import Thread
from model.ThreadPosts import _datetime_handler

class Firehose:
def get(self):
return jsonify(self._get_threads())
def get_impl(self):
threads = self._get_threads()
for thread in threads:
board_id = thread["board"]
board = db.session.query(Board).get(board_id)
thread["board"] = board.name
render_for_catalog(threads)
return threads
def _get_threads(self):
firehose_cache_key = "firehose-threads"
cache_connection = cache.Cache()
cached_threads = cache_connection.get(firehose_cache_key)
if cached_threads:
deserialized_threads = json.loads(cached_threads)
for thread in deserialized_threads:
thread["last_updated"] = datetime.datetime.utcfromtimestamp(thread["last_updated"])
return deserialized_threads
firehose_limit = app.config["FIREHOSE_LENGTH"]
raw_threads = db.session.query(Thread).order_by(desc(Thread.last_updated)).limit(firehose_limit).all()
threads = BoardCatalog()._to_json(raw_threads)
for thread in threads:
db_thread = db.session.query(Thread).get(thread["id"])
thread["board"] = db_thread.board
cache_friendly = json.dumps(threads, default=_datetime_handler)
cache_connection.set(firehose_cache_key, cache_friendly)
return threads
from bootstrap import runMonitor
runMonitor.delay()

shared.py 也修改为指向以设置芹菜网址 所以现在 shared.py 看起来像这样

import os
import random
import ipaddress
from customjsonencoder import CustomJSONEncoder
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api
from werkzeug.datastructures import ImmutableDict
from celery import Celery
import jinja_cache

class ManiwaniApp(Flask):
jinja_options = ImmutableDict(extensions=["jinja2.ext.autoescape", "jinja2.ext.with_"],
bytecode_cache=jinja_cache.KeystoreCache())

app = ManiwaniApp(__name__, static_url_path='')
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = "~/maniwani/uploads"
app.config["THUMB_FOLDER"] = os.path.join(app.config["UPLOAD_FOLDER"], "thumbs")
app.config["SERVE_STATIC"] = True
app.config["SERVE_REST"] = True
app.config["USE_RECAPTCHA"] = False
app.config["FIREHOSE_LENGTH"] = 10
app.config['CELERY_BROKER_URL'] = 'redis://redis:6397/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6397/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

if os.getenv("MANIWANI_CFG"):
app.config.from_envvar("MANIWANI_CFG")
app.url_map.strict_slashes = False
app.json_encoder = CustomJSONEncoder
db = SQLAlchemy(app)
migrate = Migrate(app, db)
rest_api = Api(app)
SECRET_FILE = "./deploy-configs/secret"
def get_secret():
return open(SECRET_FILE).read()
if os.path.exists(SECRET_FILE):
app.secret_key = get_secret()

def gen_poster_id():
return '%04X' % random.randint(0, 0xffff)

def ip_to_int(ip_str):
# The old version of ip_to_int had a logical bug where it would always shift the
# final result to the left by 8. This is preserved with the `<< 8`.
return int.from_bytes(
ipaddress.ip_address(ip_str).packed,
byteorder="little"
) << 8

最后 bootstrap.py 修改并包括我的自定义编写脚本,脚本的代码并不重要,但我认为相关的是定义的 celery.task,我将在下面包含:

@celery.task
def runMonitor():
# script that does stuff

但是问题是我无法连接到 redis 服务器。它在 docker 中被定义为具有"redis"主机,这应该允许它指向在服务器初始化时为 docker 生成的任何 IP 地址。

谁能帮我弄清楚: a( 在哪里最好调用 runMonitor.delay(( b( 如何连接到 Redis 服务器(当前连接被拒绝,但服务器在线且端口打开(

我得到的错误是:

maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 127, in reconnect_on_error
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3580, in subscribe
maniwani_1    |     ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3466, in execute_command
maniwani_1    |     self.connection = self.connection_pool.get_connection(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args, **kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "app.py", line 9, in <module>
maniwani_1    |     from blueprints.main import main_blueprint
maniwani_1    |   File "./blueprints/main.py", line 9, in <module>
maniwani_1    |     from model.Firehose import Firehose
maniwani_1    |   File "./model/Firehose.py", line 50, in <module>
maniwani_1    |     runMonitor.delay()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 425, in delay
maniwani_1    |     return self.apply_async(args, kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 565, in apply_async
maniwani_1    |     return app.send_task(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 780, in send_task
maniwani_1    |     amqp.send_task_message(P, name, message, **options)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 455, in _reraise_as_library_errors
maniwani_1    |     reraise(ConnectionError, ConnectionError(text_t(exc)),
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/vine/five.py", line 194, in reraise
maniwani_1    |     raise value.with_traceback(tb)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args, **kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | kombu.exceptions.OperationalError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    | unable to load app 0 (mountpoint='') (callable not found or import error)
maniwani_1    | *** no app loaded. going in full dynamic mode ***
maniwani_1    | *** uWSGI is running in multiple interpreter mode ***

Redis 已启动并运行:

sudo docker container ls
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                NAMES
d4614c9b1013        nginx                 "/docker-entrypoint.…"   25 seconds ago      Up 16 seconds       0.0.0.0:80->80/tcp   maniwani_nginx_1
95503d1e7a28        minio/minio           "/usr/bin/docker-ent…"   28 seconds ago      Up 23 seconds       9000/tcp             maniwani_minio_1
0da8e7a9a573        postgres              "docker-entrypoint.s…"   30 seconds ago      Up 25 seconds       5432/tcp             maniwani_postgres_1
143064a00cb8        maniwani_maniwani     "sh ./docker-entrypo…"   30 seconds ago      Up 24 seconds       3031/tcp, 5000/tcp   maniwani_maniwani_1
5b51b76c4471        redis                 "docker-entrypoint.s…"   30 seconds ago      Up 26 seconds       6379/tcp             maniwani_redis_1
54d637f6afe1        maniwani_captchouli   "sh ./entrypoint.sh"     30 seconds ago      Up 23 seconds       8512/tcp             maniwani_captchouli_1

您的 redis 似乎在端口6379/tcp上运行

您应该更新配置信息以设置正确的端口

app.config['CELERY_BROKER_URL'] = 'redis://redis:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6379/0'

最新更新