我正试图使用django-dramatiq
来运行一个由几个阶段组成的管道,每个阶段都定义为戏剧性的Actor
s,使用pipeline(<stages>).run()
方法,但它只运行第一个阶段/Actor
,而不尝试其他阶段。
我定义了一些精简的假演员来说明这个问题:
import dramatiq
@dramatiq.actor
def fake_extract(process_pk, *args, **kwargs):
print(f"fake_extract: Process PK= {process_pk} Running extract on {kwargs['fits_file']}")
@dramatiq.actor
def fake_astromfit(process_pk, *args, **kwargs):
print(f"fake_astromfit: Process PK= {process_pk} Astrometric fit on {kwargs['ldac_catalog']}, updating {kwargs['fits_file']}")
@dramatiq.actor
def fake_zeropoint(process_pk, *args, **kwargs):
print(f"fake_zeropoint: Process PK= {process_pk} ZP determination on {kwargs['ldac_catalog']} with {kwargs['desired_catalog']} ref catalog")
然后我定义了阶段并构建了一个管道:
import os
from dramatiq import pipeline
from test_dramatiq.dramatiq_tests import fake_extract, fake_astromfit, fake_zeropoint
fits_filepath = '/foo/bar.fits'
fits_file = os.path.basename(fits_filepath)
steps = [{
'name' : 'proc-extract',
'runner' : fake_extract,
'inputs' : {'fits_file':fits_filepath,
'datadir': os.path.join(dataroot, temp_dir)}
},
{
'name' : 'proc-astromfit',
'runner' : fake_astromfit,
'inputs' : {'fits_file' : fits_filepath,
'ldac_catalog' : os.path.join(dataroot, temp_dir, fits_file.replace('e91.fits', 'e91_ldac.fits')),
'datadir' : os.path.join(dataroot, temp_dir)
}
},
{
'name' : 'proc-zeropoint',
'runner' : fake_zeropoint,
'inputs' : {'ldac_catalog' : os.path.join(dataroot, temp_dir, fits_file.replace('e91.fits', 'e92_ldac.fits')),
'datadir' : os.path.join(dataroot, temp_dir),
'desired_catalog' : 'PS1'
}
}]
pipes = []
for step_num, step in enumerate(steps):
inputs = step['inputs']
print(f" Performing pipeline step {step['name']}")
pk = 1234+step_num
pipes.append(step['runner'].message_with_options(args=[pk,], kwargs=inputs, pipe_ignore=True))
pipeline(pipes).run()
使用常规dramatiq
在ipython
内部运行,这些似乎运行良好,并且所有阶段都运行:
fake_extract: Process PK= 1234 Running extract on /foo/bar.fits
fake_astromfit: Process PK= 1235 Astrometric fit on /foo/Temp_cvc2/bar.fits, updating /foo/bar.fits
fake_zeropoint: Process PK= 1236 ZP determination on /foo/Temp_cvc2/bar.fits with PS1 ref catalog
然而,在django-dramatiq
通过Django项目的settings.py
文件导入的模块中定义它们,并在python manage.py shell
和python manage.py rundramatiq
运行程序中如上所述定义管道,只运行第一阶段/Actor
:
fake_extract: Process PK= 1234 Running extract on /foo/bar.fits
它从不执行其他阶段。。。关于这里发生了什么以及为什么多级管道不能在django-dramatiq
下工作,有什么想法吗?
因此,事实证明,问题是由于Django项目的settings.py
中缺少中间件,导致它无法正常工作。我有:
DRAMATIQ_BROKER = {
'BROKER': 'dramatiq.brokers.redis.RedisBroker',
'OPTIONS': {
'url': f'redis://{REDIS_HOSTNAME}:6379',
},
'MIDDLEWARE': [
'dramatiq.middleware.AgeLimit',
'dramatiq.middleware.TimeLimit',
'dramatiq.middleware.Callbacks',
'dramatiq.middleware.Retries',
'django_dramatiq.middleware.DbConnectionsMiddleware',
]
}
但CCD_ 14也缺失了一个CCD_。将其添加到runserver
和rundramatiq
管理命令中并重新启动会导致上面精简的测试示例和原始的全功能版本都开始工作。