我用芹菜节拍设置了一个定期任务。任务运行,我可以在控制台中看到结果。 我想要一个 python 脚本来回忆任务抛出的结果。
我可以这样做:
#client.py
from cfg_celery import app
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc'
try:
x = app.AsyncResult(id)
print(x.get())
except:
print('some error')
无论如何,正如你所看到的,对于这个测试,我不得不复制扔在芹菜节拍控制台上的task_id
(可以这么说)并在我的脚本中硬编码它。显然,这在实际生产中行不通。
我破解了它在芹菜配置文件上设置task_id
:
#cfg_celery.py
app = Celery('celery_config',
broker='redis://localhost:6379/0',
include=['taskos'],
backend = 'redis'
)
app.conf.beat_schedule = {
'something': {
'task': 'tasks.add',
'schedule': 10.0,
'args': (16, 54),
'options' : {'task_id':"my_custom_id"},
}
}
这样我就可以这样读:
#client.py
from cfg_celery import app
task_id = 'my_custom_id'
try:
x = app.AsyncResult(id)
print(x.get())
except:
print('some error')
这种方法的问题在于我丢失了之前的结果(在调用client.py
之前)。
有什么方法可以阅读芹菜后端的task_id
列表吗? 如果我有多个定期任务,是否可以从每个定期任务中获取task_id
的列表? 我可以使用app.tasks.key()
来实现这一点吗?
PD:不是英语母语,加上芹菜新手,如果我用错了一些术语,那就太好了。
好的。我不确定是没有人回答这个问题,因为很难,还是因为我的问题太愚蠢了。 无论如何,我想做的是从另一个 python 进程中获取我的"芹菜节拍"任务的结果。 在相同的过程中,我没有问题,我可以访问任务ID,从那里开始一切都很容易。但是从其他过程中,我没有找到检索已完成任务列表的方法。
我尝试了python-RQ(这很好),但是当我看到使用RQ我也无法做到这一点时,我开始明白我必须手动使用redis存储功能。所以我得到了我想要的,这样做:
. 使用"bind=True"能够从任务函数内部进行内省。 . 一旦我得到了函数的结果,我就用 redis 将其写在一个列表中(我做了一些技巧来限制这个列表的大小) . 现在我可以从一个独立的进程连接到同一个 redis 服务器并检索存储在此类列表中的结果。
我的文件最终是这样的:
cfg_celery.py:在这里,我定义了调用任务的方式。
#cfg_celery.py
from celery import Celery
appo = Celery('celery_config',
broker='redis://localhost:6379/0',
include=['taskos'],
backend = 'redis'
)
'''
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi.
pero como add necesita args, la doy de alta manualmente p pasarselos
'''
appo.conf.beat_schedule = {
'q_loco': {
'task': 'taskos.add',
'schedule': 10.0,
'args': (16, 54),
# 'options' : {'task_id':"lcura"},
}
}
taskos.py :这些是任务。
#taskos.py
from cfg_celery import appo
from celery.decorators import periodic_task
from redis import Redis
from datetime import timedelta
import requests, time
rds = Redis()
@appo.task(bind=True)
def add(self,a, b):
#result of operation. very dummy.
result = a + b
#storing in redis
r= (self.request.id,time.time(),result)
rds.lpush('my_results',r)
# for this test i want to have at most 5 results stored in redis
long = rds.llen('my_results')
while long > 5:
x = rds.rpop('my_results')
print('popping out',x)
long = rds.llen('my_results')
time.sleep(1)
return a + b
@periodic_task(run_every=20)
def urlea(url='https://www.fullstackpython.com/'):
inicio = time.time()
R = dict()
try:
resp = requests.get(url)
R['vato'] = url+" = " + str(resp.status_code*10)
R['num palabras'] = len(resp.text.split())
except:
R['vato'] = None
R['num palabras'] = 0
print('u {} : {}'.format(url,time.time()-inicio))
time.sleep(0.8) # truco pq se vea mas claramente la dif.
return R
consumer.py:可以得到结果的独立过程。
#consumer.py
from redis import Redis
nombre_lista = 'my_results'
rds = Redis()
tamaño = rds.llen(nombre_lista)
ultimos_resultados = list()
for i in range(tamaño):
ultimos_resultados.append(rds.rpop(nombre_lista))
print(ultimos_resultados)
我对编程比较陌生,我希望这个答案可以帮助像我这样的菜鸟。如果我弄错了什么,请随时根据需要进行更正。