Celery运行线程在运行任务中的组时会卡住



我对芹菜组有一些问题。当任务数量更多时,一些线程被卡住,永远不会返回。

@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT*15)
def schedule(self, data):
try:
job = group(process.s(i[0], i[1]) for i in data)
result = job.apply_async()
result_data = None
with allow_join_result():
result_data = result.get()
callback.delay(result_data)
except SoftTimeLimitExceeded as ex:
LOGGER.error("Timeout schedule Failed. " + str(ex))

然后我有另一个方法,它需要大约1分钟来处理一些数据——还有一些I/O调用,然后返回结果。

@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT)
def process(self, param1, param2):
.......
.......
return result

使用eventlet 的Celery worker命令

celery -A proj worker --loglevel=debug -P eventlet --concurrency=100 -n worker@%h

我在kubernetes上运行两个工作程序,每个pod的CPU=1和MEM=512MB。

我来解释一下流程:

有一个api调用,它触发了一个芹菜任务"调度"。在"时间表"方法中,我创建了一个小组(我也尝试过和弦,但没有成功(,并根据输入创建任务,如果任务少于400,那么它就可以完美地工作。

如果任务超过500,那么我有一些线程被卡住了,永远不会返回,因此接下来的步骤永远不会在"schedule"方法中运行。

  1. 我做错了什么
  2. 为什么芹菜运行线程卡住了?它说它是活动的,但它根本没有运行,因此我的cpu使用率一直很高。以下是检查统计结果
worker@service-worker-lbv46: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "redis-service-stg",
"insist": false,
"login_method": null,
"port": 6379,
"ssl": false,
"transport": "redis",
"transport_options": {},
"uri_prefix": null,
"userid": null,
"virtual_host": "/"
},
"clock": "1243",
"pid": 7,
"pool": {
"free-threads": 98,
"max-concurrency": 100,
"running-threads": 2
},
"prefetch_count": 400,
"rusage": {
"idrss": 0,
"inblock": 0,
"isrss": 0,
"ixrss": 0,
"majflt": 0,
"maxrss": 376584,
"minflt": 4313949,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 12441,
"nsignals": 0,
"nswap": 0,
"nvcsw": 1214,
"oublock": 1840,
"stime": 18.567744,
"utime": 585.98883
},
"total": {
"worker.tasks.process": 402,
"worker.tasks.schedule": 1
}
}
-> worker@service-worker-g9kh7: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "redis-service-stg",
"insist": false,
"login_method": null,
"port": 6379,
"ssl": false,
"transport": "redis",
"transport_options": {},
"uri_prefix": null,
"userid": null,
"virtual_host": "/"
},
"clock": "1243",
"pid": 7,
"pool": {
"free-threads": 99,
"max-concurrency": 100,
"running-threads": 1
},
"prefetch_count": 400,
"rusage": {
"idrss": 0,
"inblock": 0,
"isrss": 0,
"ixrss": 0,
"majflt": 0,
"maxrss": 348324,
"minflt": 3903269,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 14085,
"nsignals": 0,
"nswap": 0,
"nvcsw": 28288,
"oublock": 1840,
"stime": 11.887338,
"utime": 291.123721
},
"total": {
"worker.tasks.process": 382
}
}

在这里,你可以看到worker1中有2个正在运行的线程,worker2中有1个正在运行,它们就像从很长时间开始就被卡住了。

我知道一项任务不应该等待另一项任务,建议我是否有更好的方法来完成它。如果架构发生了变化。

==========================编辑==============================

也尝试过使用chord,但线程仍然在执行"进程"任务时卡住了,芹菜检查统计数据显示1个正在运行的线程,它什么都没做。问题是它不会在这个线程返回时执行回调。

另一个问题是它也没有超时。

eventlet==0.23.0kombu==4.1.0台球===3.5.0.2芹菜==4.0.2redis==2.10.5

提前谢谢。

您正在启动任务,并从另一个任务中获取结果。众所周知,这会导致死锁——一旦你的工人库耗尽,你的第一个任务就是等待其他由于缺乏可用工人而无法执行的任务。

解决方案是使用回调或更复杂的工作流。在你的情况下,你确实应该使用和弦,这是正确的解决方案,所以再试一次,如果你仍然有问题,就发布关于这个问题的帖子。

相关内容

  • 没有找到相关文章

最新更新