我想使用RQ在单独的工人上运行任务,以从测量仪器中收集数据。任务的结束将通过用户按下仪表板应用程序上的按钮发出信号。问题是任务本身不知道何时终止,因为它无法访问Dash应用程序的上下文。
我已经使用meta
将信息从工人传递回呼叫者,但是我可以将信息从呼叫者传递给工人吗?
示例任务:
from rq import get_current_job
from time import time
def mock_measurement():
job = get_current_job()
t_start = time()
# Run the measurement
t = []
i = []
job.meta['should_stop'] = False # I want to use this tag to tell the job to stop
while not job.meta['should_stop']:
t.append(time() - t_start)
i.append(np.random.random())
job.meta['data'] = (t, i)
job.save_meta()
sleep(5)
print("Job Finished")
从控制台,我可以开始工作
queue = rq.Queue('test-app', connection=Redis('localhost', 6379))
job = queue.enqueue('tasks.mock_measurement')
我希望能够从控制台上执行此操作,以表示可以停止运行的工人:
job.meta['should_stop'] = True
job.save_meta()
job.refresh
但是,虽然上述命令返回而没有错误,但它们实际上并未更新meta
字典。
因为您没有获取更新的元数据。但是,不要这样做!调用save_meta和刷新呼叫者和工人会丢失数据。
而是使用job.connection.set(job + ':should_stop', 1, ex=300)
设置标志,然后使用job.connection.get(job + ':should_stop')
检查是否设置了标志。