用例:在Django框架(1.6版)中使用celeni来调度本质上写入数据库的任务。我只有一个自定义队列,芹菜节拍调度程序将任务放入其中。创建了一个芹菜工作程序,它侦听这个并发性为8 的队列
问题:8个单独的工作者中的每一个都继续创建从未回收的线程(我猜)。这会导致线程过多(我已经看到计数上升到2万个线程)。在4-5小时内,线程数达到10k!
我看到的错误:无法启动新线程。
关于谁在启动新线程的Python回溯告诉我:调用django-save会创建一个新线程。"adgroup"这里是一个django模型对象
[2015-12-03 18:40:17,133: WARNING/Worker-3] adgroup.save(update_fields=['bids_today', 'impressions_today', 'spent_today', 'last_metric_update_time'])
[2015-12-03 18:40:17,887: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/db/models/base.py", line 545, in save
[2015-12-03 18:40:17,887: WARNING/Worker-3] force_update=force_update, update_fields=update_fields)
[2015-12-03 18:40:18,715: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/db/models/base.py", line 582, in save_base
[2015-12-03 18:40:18,716: WARNING/Worker-3] update_fields=update_fields, raw=raw, using=using)
[2015-12-03 18:40:18,716: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/dispatch/dispatcher.py", line 185, in send
[2015-12-03 18:40:18,716: WARNING/Worker-3] response = receiver(signal=self, sender=sender, **named)
[2015-12-03 18:40:19,300: INFO/MainProcess] Task ExtendTV.celery_tasks.stats_collector.collectAdGroupMetricsTask[2ae52b3d-77b9-46d3-93ac-d7fad9b96382] succeeded in 26.486441362s: None
[2015-12-03 18:40:19,395: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/signals.py", line 48, in handle_save
[2015-12-03 18:40:19,593: WARNING/Worker-3] index.update_object(instance, using=using)
[2015-12-03 18:40:19,593: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/indexes.py", line 274, in update_object
[2015-12-03 18:40:19,593: WARNING/Worker-3] backend.update(self, [instance])
[2015-12-03 18:40:19,593: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/backends/whoosh_backend.py", line 208, in update
[2015-12-03 18:40:20,515: WARNING/Worker-3] writer.commit()
[2015-12-03 18:40:20,516: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/whoosh/writing.py", line 1043, in commit
[2015-12-03 18:40:21,318: WARNING/Worker-3] self.start()
[2015-12-03 18:40:21,642: WARNING/Worker-3] File "/usr/lib64/python2.7/threading.py", line 748, in start
[2015-12-03 18:40:22,340: WARNING/Worker-3] _start_new_thread(self.__bootstrap, ())
[2015-12-03 18:40:22,340: WARNING/Worker-3] error: can't start new thread
其他信息:正如你从图片中看到的,记忆力完全在正常范围内。这个"线程问题"在以前版本的芹菜3.0.x中没有出现。但是,这里的内存变得相当高
我用来创建工作者的Celery命令:
celery -A ProjectName worker -l DEBUG -Q ExampleQueueName
我使用的芹菜设置:
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES=60*60*24
CELERYD_PREFETCH_MULTIPLIER = 128
其他相关设置:使用rabbitmq3.5.4作为消息代理
更新:
def collectAdGroupMetricsTask(*args, **kwargs):
try:
adgroup = AdGroup.objects.get(id=kwargs.get("adgroupID"))
collectAdGroupMetrics(adgroup)
except Exception as e:
logger.error("Could not retreive AdGroup for collectAdGroupMetrics. " + str(e))
return
def collectAdGroupMetrics(adgroup, currDate=None):
Value1=function1_making_another_db_call()
Value2=function2_making_another_db_call()
adgroup.fieldname1 = Value1
adgroup.fieldname2 = Value2
adgroup.save(update_fields=['fieldname1', 'fieldname2'])
具有大量线程的工作进程的示例。
whoosh(python包)出现问题,它试图获取写锁并一直等待,导致创建了这么多线程。因此,将whoosh从django中已安装的应用程序列表中删除。此外,在芹菜中使用maxtasksperchild配置来防止内存持续增长。
- 首先,在Python虚拟环境中安装了gevent包
- Next对运行芹菜的命令进行了一些更改
- 最后,我添加了参数
--pool gevent
。默认情况下,celeb使用池"prefork",该池应该有一些错误 - 选择gevent后,celeb的进程数降低到子进程数(并发)