我刚开始做戏剧和实验几天,我没有让一个工人在一个(简单的)脚本中工作。
test.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
from dramatiq.worker import Worker
redis_broker = RedisBroker(host="127.0.0.1", port=6379)
results_backend = RedisBackend(url="redis://127.0.0.1:6379")
redis_broker.add_middleware(Results(backend=results_backend))
dramatiq.set_broker(redis_broker)
worker = Worker(
broker=redis_broker
)
worker.start()
@dramatiq.actor(queue_name="default", max_retries=1, store_results=True)
def print_hello_world():
print("Hello World!")
print_hello_world.send()
结果(复述):
127.0.0.1:6379> keys *
1) "dramatiq:default.msgs"
2) "dramatiq:default"
3) "dramatiq:__heartbeats__"
但是当使用test.py:
从文件夹启动一个戏剧性的进程时$ dramatiq test
结果如我所料
结果(复述)
127.0.0.1:6379> keys *
1) "3f11649148820d957b2945da46b3c2b7"
2) "dramatiq:__heartbeats__"
似乎工人没有以这种方式接收消息。在互联网上很难找到从脚本中设置worker的例子。
有人能帮我得到这份工作吗?
您可以参考此链接https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
Refer from https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
import os
import sys
from dramatiq.cli import (
CPUS,
HAS_WATCHDOG,
main as dramatiq_worker,
make_argument_parser as dramatiq_argument_parser,
)
from app.core.config import get_settings
from app.tasks import rabbitmq_broker
from app.tasks.manual_selenium import demo
from app.utils.logger import logger
def list_managed_actors(broker, queues):
queues = set(queues)
all_actors = broker.actors.values()
if not queues:
return all_actors
else:
return [a for a in all_actors if a.queue_name in queues]
def format_actor(actor):
return "%s@%s" % (actor.actor_name, actor.queue_name)
def guess_code_directory(broker):
actor = next(iter(broker.actors.values()))
modname, *_ = actor.fn.__module__.partition('.')
mod = sys.modules[modname]
return os.path.dirname(mod.__file__)
def worker(verbose=0, processes=CPUS, threads=8, queues=None, broker=rabbitmq_broker):
"""Run dramatiq workers.
Setup Dramatiq with broker and task modules from Flask app.
b
examples:
# Run dramatiq with 1 thread per process.
$ flask worker --threads 1
b
# Listen only to the "foo" and "bar" queues.
$ flask worker -Q foo,bar
b
# Consuming from a specific broker
$ flask worker mybroker
"""
# Plugin for flask.commands entrypoint.
#
# Wraps dramatiq worker CLI in a Flask command. This is private API of
# dramatiq.
# TODO Plugin for fastapi
parser = dramatiq_argument_parser()
command = [
"--processes", str(processes),
"--threads", str(threads),
# This module does not have broker local. Thus dramatiq fallbacks to
# global broker.
__name__,
]
if get_settings().DEBUG:
verbose = max(1, verbose)
if HAS_WATCHDOG:
command += ["--watch", guess_code_directory(broker)]
queues = queues.split(",") if queues else []
if queues:
command += ["--queues"] + queues
command += verbose * ['-v']
args = parser.parse_args(command)
logger.info("Able to execute the following actors:")
for actor in list_managed_actors(broker, queues):
logger.info("t%s." % format_actor(actor))
dramatiq_worker(args)
if '__main__' == __name__:
worker()