我想知道在芹菜任务中使用mysql池的正确方法是什么。目前,我的任务模块(相关部分)是这样的:
from start import celery
import PySQLPool as pool
dbcfg = config.get_config('inputdb')
input_db = pool.getNewConnection(username=dbcfg['user'], password=dbcfg['passwd'], host=dbcfg['host'], port=dbcfg['port'], db=dbcfg['db'], charset='utf8')
dbcfg = config.get_config('outputdb')
output_db = pool.getNewConnection(username=dbcfg['user'], password=dbcfg['passwd'], host=dbcfg['host'], port=dbcfg['port'], db=dbcfg['db'], charset='utf8')
@celery.task
def fetch():
ic = pool.getNewQuery(input_db)
oc = pool.getNewQuery(output_db)
count = 1
for e in get_new_stuff():
# do stuff with new stuff
# read the db with ic
# write to db using oc
# commit from time to time
if count % 1000:
pool.commitPool()
# commit whatever's left
pool.commitPool()
在一台机器上,最多可以同时运行4个fetch()任务(每个内核1个)。然而,我注意到,有时任务会挂起,我怀疑这是由于mysql造成的。关于如何使用mysql和芹菜有什么建议吗?
谢谢!
我也在使用芹菜和PySQLPool。
maria = PySQLPool.getNewConnection(username=app.config["MYSQL_USER"],
password=app.config["MYSQL_PASSWORD"],
host=app.config["MYSQL_HOST"],
db='configuration')
def myfunc(self, param1, param2):
query = PySQLPool.getNewQuery(maria, True)
try:
sSql = """
SELECT * FROM table
WHERE col1= %s AND col2
"""
tDatas = ( var1, var2)
query.Query(sSql, tDatas)
return query.record
except Exception, e:
logger.info(e)
return False
@celery.task
def fetch():
myfunc('hello', 'world')