我有一个分布式计算框架,它使用Celery + RABBITMQ + supervisor。我的工作人员的任务包括从数据库中读取,计算一些值并在该过程完成后更新数据库。但是,当我尝试以分布式方式运行多个工作线程时,我不断遇到错误:-
(2014,"命令不同步;您现在无法运行此命令")
谁能建议我一种设置互斥锁或类似锁文件机制的方法,以便工作人员可以同时访问数据库。
任何帮助将不胜感激,谢谢阿米特
编辑:-
con = mdb.connect(parameters...)
def reset_table(table_name,con):
with con:
cur = con.cursor(mdb.cursors.DictCursor)
cur.execute("UPDATE " + table_name + " SET active_status = 0 where last_access < (NOW() - INTERVAL 15 MINUTE)")
con.commit()
堆栈跟踪 :-
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 238, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 416, in __protected_call__
return self.run(*args, **kwargs)
File "/home/elasticsearch/celery_test/tasks.py", line 183, in download_data
auth = get_auth(con)
File "/home/elasticsearch/celery_test/tasks.py", line 94, in get_auth
reset_table("auths",con)
File "/usr/lib/python2.7/dist-packages/MySQLdb/connections.py", line 249, in __exit__
self.rollback()
ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
我不会急于应用锁定机制,而是尝试查看如何修复客户端使用数据库的方式,请参阅 mysql 文档以了解不同步的命令
如果命令不同步;现在无法在客户端代码中运行此命令,则调用客户端函数的顺序错误。
如果您决定实现锁定(同样,我不建议这样做),一种好方法是:
import redis
have_lock = False
my_lock = redis.Redis().lock("my_key")
try:
have_lock = my_lock.acquire(blocking=True)
if have_lock:
print("Got lock.")
else:
print("Did not acquire lock.")
finally:
if have_lock:
my_lock.release()
有关详细说明,请参阅 http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html