我对芹菜组有一些问题。当任务数量更多时,一些线程被卡住,永远不会返回。
@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT*15)
def schedule(self, data):
try:
job = group(process.s(i[0], i[1]) for i in data)
result = job.apply_async()
result_data = None
with allow_join_result():
result_data = result.get()
callback.delay(result_data)
except SoftTimeLimitExceeded as ex:
LOGGER.error("Timeout schedule Failed. " + str(ex))
然后我有另一个方法,它需要大约1分钟来处理一些数据——还有一些I/O调用,然后返回结果。
@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT)
def process(self, param1, param2):
.......
.......
return result
使用eventlet 的Celery worker命令
celery -A proj worker --loglevel=debug -P eventlet --concurrency=100 -n worker@%h
我在kubernetes上运行两个工作程序,每个pod的CPU=1和MEM=512MB。
我来解释一下流程:
有一个api调用,它触发了一个芹菜任务"调度"。在"时间表"方法中,我创建了一个小组(我也尝试过和弦,但没有成功(,并根据输入创建任务,如果任务少于400,那么它就可以完美地工作。
如果任务超过500,那么我有一些线程被卡住了,永远不会返回,因此接下来的步骤永远不会在"schedule"方法中运行。
- 我做错了什么
- 为什么芹菜运行线程卡住了?它说它是活动的,但它根本没有运行,因此我的cpu使用率一直很高。以下是检查统计结果
worker@service-worker-lbv46: OK { "broker": { "alternates": [], "connect_timeout": 4, "failover_strategy": "round-robin", "heartbeat": 120.0, "hostname": "redis-service-stg", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "/" }, "clock": "1243", "pid": 7, "pool": { "free-threads": 98, "max-concurrency": 100, "running-threads": 2 }, "prefetch_count": 400, "rusage": { "idrss": 0, "inblock": 0, "isrss": 0, "ixrss": 0, "majflt": 0, "maxrss": 376584, "minflt": 4313949, "msgrcv": 0, "msgsnd": 0, "nivcsw": 12441, "nsignals": 0, "nswap": 0, "nvcsw": 1214, "oublock": 1840, "stime": 18.567744, "utime": 585.98883 }, "total": { "worker.tasks.process": 402, "worker.tasks.schedule": 1 } } -> worker@service-worker-g9kh7: OK { "broker": { "alternates": [], "connect_timeout": 4, "failover_strategy": "round-robin", "heartbeat": 120.0, "hostname": "redis-service-stg", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "/" }, "clock": "1243", "pid": 7, "pool": { "free-threads": 99, "max-concurrency": 100, "running-threads": 1 }, "prefetch_count": 400, "rusage": { "idrss": 0, "inblock": 0, "isrss": 0, "ixrss": 0, "majflt": 0, "maxrss": 348324, "minflt": 3903269, "msgrcv": 0, "msgsnd": 0, "nivcsw": 14085, "nsignals": 0, "nswap": 0, "nvcsw": 28288, "oublock": 1840, "stime": 11.887338, "utime": 291.123721 }, "total": { "worker.tasks.process": 382 } }
在这里,你可以看到worker1中有2个正在运行的线程,worker2中有1个正在运行,它们就像从很长时间开始就被卡住了。
我知道一项任务不应该等待另一项任务,建议我是否有更好的方法来完成它。如果架构发生了变化。
==========================编辑==============================
也尝试过使用chord,但线程仍然在执行"进程"任务时卡住了,芹菜检查统计数据显示1个正在运行的线程,它什么都没做。问题是它不会在这个线程返回时执行回调。
另一个问题是它也没有超时。
eventlet==0.23.0kombu==4.1.0台球===3.5.0.2芹菜==4.0.2redis==2.10.5
提前谢谢。
您正在启动任务,并从另一个任务中获取结果。众所周知,这会导致死锁——一旦你的工人库耗尽,你的第一个任务就是等待其他由于缺乏可用工人而无法执行的任务。
解决方案是使用回调或更复杂的工作流。在你的情况下,你确实应该使用和弦,这是正确的解决方案,所以再试一次,如果你仍然有问题,就发布关于这个问题的帖子。