我目前正在开发一个应用程序,它必须处理几个长时间运行的任务。我使用的是python 3
、flask
、celery
、redis
。
我在localhost上有一个有效的解决方案,但在heroku上有很多错误,每次执行应用程序都会触发不同的错误集。我知道这不可能是随机的,所以我想知道从哪里开始寻找。
我觉得redis一定有问题,我正在努力了解客户是什么以及他们来自哪里,但我找不到关于这个主题的官方文件或解释。
问题:
如果redis服务器启动(甚至在localhost上),许多客户端都已连接,尽管我什么都没做。在heroku上(我使用的是heroku-redis),我总是有6个客户端,在localhost上有11个客户端。
我做了一些研究,我能够用显示它们
if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()
# see what keys are in Redis
all_keys = redis_db.keys()
print (all_keys)
all_clients = redis_db.client_list()
print (all_clients)
我看到了所有这些客户,但那里的信息对我没有任何帮助。它们是什么?他们为什么在那里?他们从哪里来?
所有的heroku redis插件都有客户端限制,所以我需要了解并优化这一点。起初我以为是clientsnumber == tasknumber
,但不是这样。
我总共定义了12个任务,但我现在正在测试2个任务(两个任务都在不到30秒内完成)
当我在localhost上执行任务时,客户端会从11个增加到16个。如果我从16岁到18岁再执行一次,然后他们总是停留在18岁,无论我执行任务的频率如何。
那么这里发生了什么?我有两项任务,为什么客户从11个增加到16个,然后从16个增加到18个?为什么任务完成后它们没有关闭?
我已经为整个问题挣扎了几天了(尽管它在localhost上总是很好地工作),所以欢迎任何帮助或想法。我需要开始寻找某个地方,所以目前我正在努力了解客户。
编辑:
我安装了flower,并尝试在localhost上监视这两个任务,一切看起来都很好。它处理两项任务,两项任务都在几秒钟内成功。返回值是正确的(但它在localhost上总是很有效)。
但问题是,在我开始花后,客户数量猛增至30个。我仍然不知道:什么是客户?鉴于我生成的客户端数量,我需要一个100美元的附加组件来处理两个任务,这需要几秒钟的时间才能完成,这不可能是真的,我仍然认为redis有问题,即使在localhost上也是如此。
我的redis设置非常简单:
if 'DYNO' in os.environ:
app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
以下是一个任务示例:
@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):
start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
end_date = datetime.strptime(g_end_date, '%d-%m-%Y')
a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()
if a1 is None:
a1 = 0
if a2 is None:
a2 = 0
if a3 is None:
a3 = 0
amount = a1 + a2 + a3
return {'some_value' : amount}
# Selects user deregistrations between selected dates
@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
if request.method == "POST":
# init task
task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
task = get_users_deregistrations_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info['current'],
'total': task.info['total'],
'status': 'Finished',
'statistic': task.info['statistic'],
'final_dataset': task.info
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
print ('in else')
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return json.dumps(response)
编辑:
这是我为heroku编写的procfile:
web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info
编辑
我认为问题可能是连接池(在redis端),我没有正确使用它。
我还为芹菜找到了一些配置,并添加了它们:
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None)
我用这些配置再次将所有内容上传到heroku。我仍然只测试两个任务,这两个任务都很快。
我已经在heroku上连续执行了10次任务,其中7次成功。3次看起来他们完成得太早了:返回的结果是错误的(正确的结果是f.e.30000,它返回了3次18000)。
客户端很快跳到了20,但从未超过20,因此至少解决了最大客户端错误和与redis的连接丢失错误。
现在的大问题是任务可能过早完成,返回的结果是否正确非常重要,性能根本不重要。
编辑
无论如何,什么都没有解决,一切似乎都是随机的。我在其中一个任务中添加了两个print()
以进一步调试并上传到heroku。在执行了2次之后,我再次看到与redis的连接丢失,达到了客户端的最大数量(尽管我的redmonitor插件显示客户端从未超过20)
编辑
大量的客户端可能是由空闲客户端造成的,这些客户端由于某种原因从未关闭(在heroku的博客文章中发现):
默认情况下,Redis永远不会关闭空闲连接,这意味着如果您不显式关闭Redis连接,您将锁定把自己排除在你的例子之外。
为了确保这种情况不会发生,Heroku Redis设置了一个默认连接超时300秒。此超时不适用于非发布/订阅客户端以及其他阻塞操作。
我现在在每一项任务之前都为空闲客户端添加了一个终止功能:
def kill_idle_clients():
if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()
all_clients = redis_db.client_list()
counter = 0
for client in all_clients:
if int(client['idle']) >= 15:
redis_db.client_kill(client['addr'])
counter += 1
print ('killing idle clients:', counter)
在一个任务启动之前,它会关闭所有空闲超过15秒的客户端。它在localhost上再次工作(但毫不奇怪,它总是在本地主机上工作)。我的客户减少了,但在heroku上,它现在只工作了10次中的2次。8次任务又过早完成。也许那些无所事事的客户并不是真的无所事事,我完全不知道。
它也几乎不可能进行测试,因为每次执行任务都会有不同的结果(失去与redis的连接,达到客户端限制,过早完成,工作完美)。
编辑
芹菜的设置似乎一直被忽视。我一直对此持怀疑态度,并决定通过添加一些随机参数和将值更改为无意义来测试它。我重新启动了C的芹菜工人。
我本以为会看到一些错误,但它就像什么都没发生一样。
使用这些无感配置,一切都像以前一样工作:
celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}
编辑
我改变了加载芹菜配置的方式(从一个单独的配置文件)。现在看来是可行的,但问题依然存在。
celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')
编辑
通过这些配置,我成功地将localhost上所有任务的客户端数量限制在18个(我尝试了所有12个任务)。然而,在heroku上,它"不知何故"起了作用。客户减少了,但有一次金额达到了20,尽管我认为我不能超过18。(我在heroku上测试了4项任务)。
在heroku上测试所有12个任务会触发许多不同的SQL错误。我现在比以前更困惑了。似乎同一个任务被执行了多次,但我只看到了12个任务URL。
我认为这是因为SQL错误是f.e.:
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1
或
sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')
或
Multiple rows were found for one()
我在heroku上测试了几次4个任务,有时任务结果会返回,但结果非常奇怪。
这一次任务没有过早完成,但返回了增加的值,看起来任务A已经返回了2次值并将其相加。
示例:任务A必须返回10k,但它返回了20k,因此该任务已执行了两次,结果已相加。
这是我目前的配置。我仍然不能100%理解数学,但我认为它(针对客户数量):
max-conncurency * CELERYD_MAX_TASKS_PER_CHILD
在localhost上,我发现了一个新的CLI命令来检查worker统计信息,并且我有max-conncurecy=3
和CELERYD_MAX_TASKS_PER_CHILD=6
CLI命令:
celery -A stats_main.celery_task inspect stats
我当前的配置:
工人启动:
celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3
配置:
CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)
编辑
看到所有这些SQL错误,我决定研究一个完全不同的方向。我的新理论是,这可能是一个MySQL
问题。
我调整了与MySQL服务器的连接,如这个问题的答案所述。
我还发现pymsql有threadsafety=1
,我还不知道这是否是一个问题,但MySQL似乎与连接和连接池有关。
目前,我还可以说内存可能不是问题,因为如果包太大,它就不应该在localhost上工作,这意味着我将max_allowed_packet
保留在默认值,大约4MB。
我还创建了3个虚拟任务,它们在不连接到外部MySQL数据库的情况下进行一些简单的计算。我现在已经在heroku上执行了5次,没有错误,结果总是正确的,所以我认为问题不是芹菜、redis,而是MySQL,尽管我不知道为什么它会在localhost上工作。也许是这三者的结合,导致了heroku的问题。
编辑
我调整了我的JS文件。现在,每个任务都被一个接一个地调用,这意味着它们不是异步的(我仍然使用芹菜的apply_async
,因为apply
不起作用)
所以这是一个很难解决的办法。我只是为每个任务创建了一个var
,例如var task_1_rdy = false;
我还创建了一个函数,它每2秒运行一次,检查一个任务是否准备就绪,如果准备就绪,它将启动下一个任务。我想很容易理解我在这里做了什么。
在heroku上测试了这一点,即使有多个任务,也没有任何错误,所以这个问题可能已经解决了。我需要做更多的测试,但看起来很有希望。Ofc。我没有使用异步功能,一个接一个地运行任务可能会有最差的性能,但嘿,它现在可以工作了。我将对性能差异进行基准测试,并在周一更新问题。
编辑
我今天做了很多测试。完成任务所需的时间是一样的(同步与异步)我不知道为什么,但它是一样的。
在heroku上处理所有12个任务并选择一个巨大的时间范围(巨大的时间区间=任务需要更长的时间,因为需要处理更多的数据):
同样,任务结果并不精确,返回的值是错误的,只是略有错误,但错误,因此不可靠,例如任务A必须返回20k,在heroku上返回19500。我不知道数据丢失/任务过早返回是怎么可能的,但两周后我会放弃,尝试使用一个完全不同的系统。
听起来像是使用celener-worker redis作为消息队列的rest api。这是chk列表:
1在您的客户端中,您是否在逻辑完成后关闭了连接
2芹菜将成为新工人,工人可能会引起麻烦,试着用花监控芹菜
3确保你的客户端完成了任务,试着用打印的东西进行调试,有时临时和本地有网络问题,这会阻止你结束芹菜任务
4如果你正在使用redis来处理芹菜消息队列,试着监控队列的数量,也许它们会自动扩大?
现在我60%确信是您的任务花费了太长时间,服务器无法在默认的web请求返回时间内做出响应。70%/30%适用于网络速度非常快的本地机器。在云平台上,延迟是个问题,有时它会影响你的程序。在此之前,如果celery worker
失败,由于gunicon
和celery
,它将自动创建另一个工作进程来完成未完成的作业,这会导致连接增加。
因此,解决方案是:
-
选项1使您的任务更快地完成
-
选项2首先返回一个确认,在后台进行计算,然后再调用一个api以发回结果